J'ai une fonction pool_map qui peut être utilisée pour limiter le nombre de fonctions exécutées simultanément.

L'idée est d'avoir une fonction coroutine acceptant un seul paramètre qui est mappé à une liste de paramètres possibles, mais pour envelopper également tous les appels de fonction dans une acquisition de sémaphore, sur quoi seul un nombre limité s'exécute à la fois:

from typing import Callable, Awaitable, Iterable, Iterator
from asyncio import Semaphore

A = TypeVar('A')
V = TypeVar('V')

async def pool_map(
    func: Callable[[A], Awaitable[V]],
    arg_it: Iterable[A],
    size: int=10
) -> Generator[Awaitable[V], None, None]:
    """
    Maps an async function to iterables
    ensuring that only some are executed at once.
    """
    semaphore = Semaphore(size)

    async def sub(arg):
        async with semaphore:
            return await func(arg)

    return map(sub, arg_it)

J'ai modifié et n'ai pas testé le code ci-dessus à titre d'exemple, mais ma variante fonctionne bien. Par exemple. vous pouvez l'utiliser comme ceci:

from asyncio import get_event_loop, coroutine, as_completed
from contextlib import closing

URLS = [...]

async def run_all(awaitables):
    for a in as_completed(awaitables):
        result = await a
        print('got result', result)

async def download(url): ...


if __name__ != '__main__':
    pool = pool_map(download, URLS)

    with closing(get_event_loop()) as loop:
        loop.run_until_complete(run_all(pool))

Mais un problème survient s'il y a une exception lancée en attendant un futur. Je ne vois pas comment annuler toutes les tâches planifiées ou en cours d’exécution, ni celles qui attendent toujours l’acquisition du sémaphore.

Y a-t-il une bibliothèque ou un élément de base élégant pour cela que je ne connais pas, ou dois-je construire toutes les pièces moi-même? (c'est-à-dire un Semaphore avec accès à ses serveurs, un as_finished qui donne accès à sa file d'attente de tâches en cours d'exécution,…)

1
flying sheep 16 janv. 2017 à 16:23

2 réponses

Meilleure réponse

Utilisez ensure_future pour obtenir un Task au lieu d'une coroutine:

import asyncio
from contextlib import closing


def pool_map(func, args, size=10):
    """
    Maps an async function to iterables
    ensuring that only some are executed at once.
    """
    semaphore = asyncio.Semaphore(size)

    async def sub(arg):
        async with semaphore:
            return await func(arg)

    tasks = [asyncio.ensure_future(sub(x)) for x in args]

    return tasks


async def f(n):
    print(">>> start", n)

    if n == 7:
        raise Exception("boom!")

    await asyncio.sleep(n / 10)

    print("<<< end", n)
    return n


async def run_all(tasks):
    exc = None
    for a in asyncio.as_completed(tasks):
        try:
            result = await a
            print('=== result', result)
        except asyncio.CancelledError as e:
            print("!!! cancel", e)
        except Exception as e:
            print("Exception in task, cancelling!")
            for t in tasks:
                t.cancel()
            exc = e
    if exc:
        raise exc


pool = pool_map(f, range(1, 20), 3)

with closing(asyncio.get_event_loop()) as loop:
    loop.run_until_complete(run_all(pool))
2
Udi 17 janv. 2017 à 10:20

Voici une solution naïve, basée sur le fait que cancel est un no-op si la tâche est déjà terminée:

async def run_all(awaitables):
    futures = [asyncio.ensure_future(a) for a in awaitables]
    try:
        for fut in as_completed(futures):
            result = await fut
            print('got result', result)
    except:
        for future in futures:
            future.cancel()
        await asyncio.wait(futures)
1
Vincent 17 janv. 2017 à 09:40