import queue

qq = queue.Queue()
qq.put('hi')

class MyApp():

    def __init__(self, q):
        self._queue = q

    def _process_item(self, item):
        print(f'Processing this item: {item}')

    def get_item(self):
        try:
            item = self._queue.get_nowait()
            self._process_item(item)
        except queue.Empty:
            pass

    async def listen_for_orders(self):  
        '''
        Asynchronously check the orders queue for new incoming orders
        '''
        while True:
            self.get_item()
            await asyncio.sleep(0)      

a = MyApp(qq)

loop = asyncio.get_event_loop()

loop.run_until_complete(a.listen_for_orders())

Utilisation de Python 3.6.

J'essaie d'écrire un gestionnaire d'événements qui écoute constamment les messages dans le queue et les traite (les imprime dans ce cas). Mais il doit être asynchrone - je dois pouvoir l'exécuter dans un terminal (IPython) et alimenter manuellement queue (au moins initialement, pour les tests).

Ce code ne fonctionne pas - il bloque pour toujours.

Comment puis-je faire en sorte que cela s'exécute pour toujours tout en reprenant le contrôle après chaque itération de la boucle while ?

Merci.

remarque : Pour faire fonctionner la boucle d'événement avec IPython (version 7.2), j'utilise ce code de la bibliothèque ib_insync, j'utilise cette bibliothèque pour le problème du monde réel dans l'exemple ci-dessus.

5
Josh D 14 mars 2019 à 00:42

2 réponses

Meilleure réponse

Vous devez faire de votre file d'attente un asyncio.Queue et ajouter des éléments à la file d'attente de manière thread-safe. Par exemple:

qq = asyncio.Queue()

class MyApp():
    def __init__(self, q):
        self._queue = q

    def _process_item(self, item):
        print(f'Processing this item: {item}')

    async def get_item(self):
        item = await self._queue.get()
        self._process_item(item)

    async def listen_for_orders(self):  
        '''
        Asynchronously check the orders queue for new incoming orders
        '''
        while True:
            await self.get_item()

a = MyApp(qq)

loop = asyncio.get_event_loop()

loop.run_until_complete(a.listen_for_orders())

Votre autre thread doit mettre des éléments dans la file d'attente comme ceci :

loop.call_soon_threadsafe(qq.put_nowait, <item>)

call_soon_threadsafe garantira un verrouillage correct et également que la boucle d'événement est réveillée lorsqu'un nouvel élément de file d'attente est prêt.

3
user4815162342 14 mars 2019 à 14:12

Ce n'est pas une file d'attente asynchrone. Vous devez utiliser asyncio.Queue

qq = queue.Queue()

Async est une boucle d'événement. Vous appelez la boucle qui lui transfère le contrôle et elle boucle jusqu'à ce que votre fonction soit terminée, ce qui n'arrive jamais :

loop.run_until_complete(a.listen_for_orders())

Vous avez commenté:

J'ai un autre thread qui interroge une ressource réseau externe pour les données (intensives en E/S) et vide les messages entrants dans ce thread.

Écrivez ce code de manière asynchrone - vous auriez donc :

async def run():
    while 1:
        item = await get_item_from_network()
        process_item(item)

loop = asyncio.get_event_loop()
loop.run_until_complete( run() )

Si vous ne voulez pas faire cela, vous pouvez parcourir la boucle même si vous ne voulez pas le faire.

import asyncio


def run_once(loop):
    loop.call_soon(loop.stop)
    loop.run_forever()


loop = asyncio.get_event_loop()

for x in range(100):
    print(x)
    run_once(loop)

Ensuite, vous appelez simplement votre fonction asynchrone et chaque fois que vous appelez run_once, elle vérifiera votre (file d'attente asyncio) et passera le contrôle à votre fonction d'écoute des commandes si la file d'attente contient un élément.

2
MarkReedZ 14 mars 2019 à 00:08