J'essaie d'exécuter plusieurs demandes d'API en parallèle avec le multiprocessing.Process et les demandes. Je mets des URL à analyser dans l'instance JoinableQueue et remets le contenu dans l'instance Queue. J'ai remarqué que le fait de mettre response.content dans la file d'attente empêche en quelque sorte le processus de se terminer.

Voici un exemple simplifié avec un seul processus (Python 3.5):

import multiprocessing as mp
import queue
import requests
import time


class ChildProcess(mp.Process):
    def __init__(self, q, qout):
        super().__init__()
        self.qin = qin
        self.qout = qout
        self.daemon = True

    def run(self):
        while True:
            try:
                url = self.qin.get(block=False)
                r = requests.get(url, verify=False)
                self.qout.put(r.content)
                self.qin.task_done()
            except queue.Empty:
                break
            except requests.exceptions.RequestException as e:
                print(self.name, e)
                self.qin.task_done()
        print("Infinite loop terminates")


if __name__ == '__main__':
    qin = mp.JoinableQueue()
    qout = mp.Queue()
    for _ in range(5):
        qin.put('http://en.wikipedia.org')
    w = ChildProcess(qin, qout)
    w.start()
    qin.join()
    time.sleep(1)
    print(w.name, w.is_alive())

Après avoir exécuté le code, j'obtiens:

La boucle infinie se termine

ChildProcess-1 True

Veuillez aider à comprendre pourquoi le processus ne se termine pas après la fin de la fonction d'exécution.

Mise à jour: ajout d'une instruction d'impression pour montrer que la boucle se termine

1
oldPadavan 23 mai 2018 à 11:34

3 réponses

Meilleure réponse

Ajoutez un appel à w.terminate () ci-dessus le message d'impression.


Concernant pourquoi le processus ne se termine pas lui-même; votre code de fonction est une boucle infinie, il ne revient donc jamais. L'appel à terminate signale au processus de se tuer.

0
Chen A. 4 juin 2018 à 10:24

Il est un peu difficile de comprendre cela en se basant sur le Queue documentation - J'ai eu du mal avec le même problème.

Le concept clé ici est qu'avant la fin d'un thread producteur, il joint toutes les files d'attente dans lesquelles il contient des données put; cette jointure se bloque ensuite jusqu'à la fin du thread d'arrière-plan de la file d'attente, qui ne se produit que lorsque la file d'attente est vide . Donc, fondamentalement, avant que votre ChildProcess puisse quitter, quelqu'un doit consommer tout ce qu'il put dans la file d'attente!

Il existe de la documentation sur la fonction Queue.cancel_join_thread, qui est censée contourner ce problème, mais je n'ai pas pu lui donner d'effet - je ne l'utilise peut-être pas correctement.

Voici un exemple de modification que vous pouvez apporter pour résoudre le problème:

if __name__ == '__main__':
    qin = mp.JoinableQueue()
    qout = mp.Queue()
    for _ in range(5):
        qin.put('http://en.wikipedia.org')
    w = ChildProcess(qin, qout)
    w.start()
    qin.join()
    while True:
        try:
            qout.get(True, 0.1)     # Throw away remaining stuff in qout (or process it or whatever,
                                    # just get it out of the queue so the queue background process
                                    # can terminate, so your ChildProcess can terminate.
        except queue.Empty:
            break
    w.join()                # Wait for your ChildProcess to finish up.
    # time.sleep(1)         # Not necessary since we've joined the ChildProcess
    print(w.name, w.is_alive())
1
Brionius 5 sept. 2019 à 18:48

Comme indiqué dans la documentation Pipes and Queues

si un processus enfant a placé des éléments dans une file d'attente (et qu'il n'a pas utilisé JoinableQueue.cancel_join_thread), ce processus ne se terminera pas tant que tous les éléments tamponnés n'auront pas été vidés dans le canal.

Cela signifie que si vous essayez de rejoindre ce processus, vous pouvez obtenir un blocage sauf si vous êtes sûr que tous les éléments qui ont été placés dans la file d'attente ont été consommés.

...

Notez qu'une file d'attente créée à l'aide d'un gestionnaire ne présente pas ce problème.

Si vous basculez vers une file d'attente de gestionnaire, le processus se termine avec succès:

import multiprocessing as mp
import queue
import requests
import time


class ChildProcess(mp.Process):
    def __init__(self, q, qout):
        super().__init__()
        self.qin = qin
        self.qout = qout
        self.daemon = True

    def run(self):
        while True:
            try:
                url = self.qin.get(block=False)
                r = requests.get(url, verify=False)
                self.qout.put(r.content)
                self.qin.task_done()
            except queue.Empty:
                break
            except requests.exceptions.RequestException as e:
                print(self.name, e)
                self.qin.task_done()
        print("Infinite loop terminates")


if __name__ == '__main__':
    manager = mp.Manager()
    qin = mp.JoinableQueue()
    qout = manager.Queue()
    for _ in range(5):
        qin.put('http://en.wikipedia.org')
    w = ChildProcess(qin, qout)
    w.start()
    qin.join()
    time.sleep(1)
    print(w.name, w.is_alive())
0
Kim 12 nov. 2019 à 09:49