Martwy prosty przykład użycia kolejki wieloprocesorowej, Puli i blokowania

Próbowałem przeczytać dokumentację na ale wciąż zmagam się z kolejką wieloprocesorową, bilardem i blokowaniem. I na razie udało mi się zbudować poniższy przykład.

Jeśli chodzi o kolejkę i bilard, nie jestem pewien, czy dobrze zrozumiałem tę koncepcję, więc popraw mnie, jeśli się mylę. To, co staram się osiągnąć, to przetwarzaj 2 żądania w czasie (lista danych ma 8 w tym przykładzie ), więc czego powinienem użyć? Pool to create 2 procesy, które mogą obsługiwać dwie różne kolejki ( maksymalnie 2), Czy powinienem używać Queue do przetwarzania 2 wejść za każdym razem? Blokada polegałaby na poprawnym wydrukowaniu wyjść.

import multiprocessing
import time

data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
        ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']

def mp_handler(var1):
    for indata in var1:
        p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))

def mp_worker(inputs, the_time):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    print " Process %s\tDONE" % inputs

if __name__ == '__main__':
Author: Seanny123, 2014-01-02

5 answers

Najlepszym rozwiązaniem Twojego problemu jest użycie Pool. Używanie Queue s i posiadanie osobnej funkcji "queue feeding" jest prawdopodobnie przesadą.

Oto nieco przearanżowana wersja twojego programu, tym razem z tylko 2 procesami umieszczonymi w Pool. Wydaje mi się, że jest to najprostszy sposób, z minimalnymi zmianami w oryginalnym kodzie:

import multiprocessing
import time

data = (
    ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
    ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']

def mp_worker((inputs, the_time)):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    print " Process %s\tDONE" % inputs

def mp_handler():
    p = multiprocessing.Pool(2), data)

if __name__ == '__main__':

Zauważ, że mp_worker() funkcja akceptuje teraz pojedynczy argument (krotkę dwóch poprzednich argumentów), ponieważ map() funkcja dzieli Dane wejściowe na sublisty, każda sublista podawana jako pojedynczy argument funkcji worker.


Processs a  Waiting 2 seconds
Processs b  Waiting 4 seconds
Process a   DONE
Processs c  Waiting 6 seconds
Process b   DONE
Processs d  Waiting 8 seconds
Process c   DONE
Processs e  Waiting 1 seconds
Process e   DONE
Processs f  Waiting 3 seconds
Process d   DONE
Processs g  Waiting 5 seconds
Process f   DONE
Processs h  Waiting 7 seconds
Process g   DONE
Process h   DONE

Edit as per @ Thales comment below:

Jeśli chcesz "blokadę dla każdego limitu puli", aby procesy działały w parach tandemowych, ala:

A waiting B waiting / a done, B done / C waiting, d waiting | C done, d done/...

Następnie zmień funkcję obsługi na uruchamianie puli (2 procesów) dla każdej pary z danych:

def mp_handler():
    subdata = zip(data[0::2], data[1::2])
    for task1, task2 in subdata:
        p = multiprocessing.Pool(2), (task1, task2))

Teraz Twoje wyjście to:

 Processs a Waiting 2 seconds
 Processs b Waiting 4 seconds
 Process a  DONE
 Process b  DONE
 Processs c Waiting 6 seconds
 Processs d Waiting 8 seconds
 Process c  DONE
 Process d  DONE
 Processs e Waiting 1 seconds
 Processs f Waiting 3 seconds
 Process e  DONE
 Process f  DONE
 Processs g Waiting 5 seconds
 Processs h Waiting 7 seconds
 Process g  DONE
 Process h  DONE
Author: Velimir Mlaker,
2014-12-22 16:45:13

To może nie jest w 100% związane z pytaniem, ale na moim poszukiwaniu przykładu użycia multiprocessingu z kolejką to pojawia się najpierw w google.

Jest to podstawowa klasa przykładowa, którą można utworzyć i umieścić elementy w kolejce i poczekać aż Kolejka zostanie zakończona. To wszystko czego potrzebowałem.

from multiprocessing import JoinableQueue
from multiprocessing.context import Process

class Renderer:
    queue = None

    def __init__(self, nb_workers=2):
        self.queue = JoinableQueue()
        self.processes = [Process(target=self.upload) for i in range(nb_workers)]
        for p in self.processes:

    def render(self, item):

    def upload(self):
        while True:
            item = self.queue.get()
            if item is None:

            # process your item here


    def terminate(self):
        """ wait until queue is empty and terminate processes """
        for p in self.processes:

r = Renderer()
Author: linqu,
2016-02-23 09:47:32

Oto moje osobiste goto na ten temat:

Gist tutaj, (pull requests welcome!):

import multiprocessing
import sys


# Used to prevent multiple threads from mixing thier output
GLOBALLOCK = multiprocessing.Lock()

def func_worker(args):
    """This function will be called by each thread.
    This function can not be a class method.
    # Expand list of args into named args.
    str1, str2 = args
    del args

    # Work
    # ...

    # Serial-only Portion

def main(argp=None):
    """Multiprocessing Spawn Example
    # Create the number of threads you want
    pool = multiprocessing.Pool(THREADS)

    # Define two jobs, each with two args.
    func_args = [
        ('Hello', 'World',), 
        ('Goodbye', 'World',), 

        # Spawn up to 9999999 jobs, I think this is the maximum possible.
        # I do not know what happens if you exceed this.
        pool.map_async(func_worker, func_args).get(9999999)
    except KeyboardInterrupt:
        # Allow ^C to interrupt from any thread.
        sys.stdout.write('User Interupt\n')

if __name__ == '__main__':
Author: ThorSummoner,
2016-04-24 06:33:29

Dla każdego używającego edytorów takich jak Komodo Edit (win10) dodaj sys.stdout.flush() do:

def mp_worker((inputs, the_time)):
    print " Process %s\tWaiting %s seconds" % (inputs, the_time)
    print " Process %s\tDONE" % inputs

Lub jako pierwsza linia do:

    if __name__ == '__main__':

To pomaga zobaczyć, co dzieje się podczas uruchamiania skryptu; zamiast patrzeć na czarne pole wiersza poleceń.

Author: ZF007,
2017-11-14 12:32:55

Oto przykład z mojego kodu (dla puli wątków, ale po prostu zmień nazwę klasy i będziesz miał pulę procesów):

def execute_run(rp): 
   ... do something 

pool = ThreadPoolExecutor(6)
    for en in TESTED_ENERGIES:
        for ecut in TESTED_E_CUT:
            rp = RunParams(
                simulations, DEST_DIR,
                PARTICLE, mat, 960, 0.125, ecut, en
            pool.submit(execute_run, rp)

W zasadzie:

  • pool = ThreadPoolExecutor(6) tworzy pulę dla 6 wątków
  • Następnie masz kilka for, które dodają zadania do puli
  • pool.submit(execute_run, rp) dodaje zadanie do puli, pierwszy arogument jest funkcją wywołaną w wątku / procesie, reszta argumentów jest przekazywana do wywołanej funkcji.
  • pool.join czeka aż wszystkie zadania zostaną wykonane.
Author: jb.,
2014-01-02 17:19:05