Martwy prosty przykład użycia kolejki wieloprocesorowej, Puli i blokowania
Próbowałem przeczytać dokumentację na http://docs.python.org/dev/library/multiprocessing.html ale wciąż zmagam się z kolejką wieloprocesorową, bilardem i blokowaniem. I na razie udało mi się zbudować poniższy przykład.
Jeśli chodzi o kolejkę i bilard, nie jestem pewien, czy dobrze zrozumiałem tę koncepcję, więc popraw mnie, jeśli się mylę. To, co staram się osiągnąć, to przetwarzaj 2 żądania w czasie (lista danych ma 8 w tym przykładzie ), więc czego powinienem użyć? Pool to create 2 procesy, które mogą obsługiwać dwie różne kolejki ( maksymalnie 2), Czy powinienem używać Queue do przetwarzania 2 wejść za każdym razem? Blokada polegałaby na poprawnym wydrukowaniu wyjść.
import multiprocessing
import time
data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_handler(var1):
for indata in var1:
p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
p.start()
def mp_worker(inputs, the_time):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
if __name__ == '__main__':
mp_handler(data)
5 answers
Najlepszym rozwiązaniem Twojego problemu jest użycie Pool
. Używanie Queue
s i posiadanie osobnej funkcji "queue feeding" jest prawdopodobnie przesadą.
Oto nieco przearanżowana wersja twojego programu, tym razem z tylko 2 procesami umieszczonymi w Pool
. Wydaje mi się, że jest to najprostszy sposób, z minimalnymi zmianami w oryginalnym kodzie:
import multiprocessing
import time
data = (
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_worker((inputs, the_time)):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
def mp_handler():
p = multiprocessing.Pool(2)
p.map(mp_worker, data)
if __name__ == '__main__':
mp_handler()
Zauważ, że mp_worker()
funkcja akceptuje teraz pojedynczy argument (krotkę dwóch poprzednich argumentów), ponieważ map()
funkcja dzieli Dane wejściowe na sublisty, każda sublista podawana jako pojedynczy argument funkcji worker.
Wyjście:
Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a DONE
Processs c Waiting 6 seconds
Process b DONE
Processs d Waiting 8 seconds
Process c DONE
Processs e Waiting 1 seconds
Process e DONE
Processs f Waiting 3 seconds
Process d DONE
Processs g Waiting 5 seconds
Process f DONE
Processs h Waiting 7 seconds
Process g DONE
Process h DONE
Edit as per @ Thales comment below:
Jeśli chcesz "blokadę dla każdego limitu puli", aby procesy działały w parach tandemowych, ala:
A waiting B waiting / a done, B done / C waiting, d waiting | C done, d done/...
Następnie zmień funkcję obsługi na uruchamianie puli (2 procesów) dla każdej pary z danych:
def mp_handler():
subdata = zip(data[0::2], data[1::2])
for task1, task2 in subdata:
p = multiprocessing.Pool(2)
p.map(mp_worker, (task1, task2))
Teraz Twoje wyjście to:
Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a DONE
Process b DONE
Processs c Waiting 6 seconds
Processs d Waiting 8 seconds
Process c DONE
Process d DONE
Processs e Waiting 1 seconds
Processs f Waiting 3 seconds
Process e DONE
Process f DONE
Processs g Waiting 5 seconds
Processs h Waiting 7 seconds
Process g DONE
Process h DONE
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-12-22 16:45:13
To może nie jest w 100% związane z pytaniem, ale na moim poszukiwaniu przykładu użycia multiprocessingu z kolejką to pojawia się najpierw w google.
Jest to podstawowa klasa przykładowa, którą można utworzyć i umieścić elementy w kolejce i poczekać aż Kolejka zostanie zakończona. To wszystko czego potrzebowałem.
from multiprocessing import JoinableQueue
from multiprocessing.context import Process
class Renderer:
queue = None
def __init__(self, nb_workers=2):
self.queue = JoinableQueue()
self.processes = [Process(target=self.upload) for i in range(nb_workers)]
for p in self.processes:
p.start()
def render(self, item):
self.queue.put(item)
def upload(self):
while True:
item = self.queue.get()
if item is None:
break
# process your item here
self.queue.task_done()
def terminate(self):
""" wait until queue is empty and terminate processes """
self.queue.join()
for p in self.processes:
p.terminate()
r = Renderer()
r.render(item1)
r.render(item2)
r.terminate()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-23 09:47:32
Oto moje osobiste goto na ten temat:
Gist tutaj, (pull requests welcome!): https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec
import multiprocessing
import sys
THREADS = 3
# Used to prevent multiple threads from mixing thier output
GLOBALLOCK = multiprocessing.Lock()
def func_worker(args):
"""This function will be called by each thread.
This function can not be a class method.
"""
# Expand list of args into named args.
str1, str2 = args
del args
# Work
# ...
# Serial-only Portion
GLOBALLOCK.acquire()
print(str1)
print(str2)
GLOBALLOCK.release()
def main(argp=None):
"""Multiprocessing Spawn Example
"""
# Create the number of threads you want
pool = multiprocessing.Pool(THREADS)
# Define two jobs, each with two args.
func_args = [
('Hello', 'World',),
('Goodbye', 'World',),
]
try:
# Spawn up to 9999999 jobs, I think this is the maximum possible.
# I do not know what happens if you exceed this.
pool.map_async(func_worker, func_args).get(9999999)
except KeyboardInterrupt:
# Allow ^C to interrupt from any thread.
sys.stdout.write('\033[0m')
sys.stdout.write('User Interupt\n')
pool.close()
if __name__ == '__main__':
main()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-04-24 06:33:29
Dla każdego używającego edytorów takich jak Komodo Edit (win10) dodaj sys.stdout.flush()
do:
def mp_worker((inputs, the_time)):
print " Process %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
sys.stdout.flush()
Lub jako pierwsza linia do:
if __name__ == '__main__':
sys.stdout.flush()
To pomaga zobaczyć, co dzieje się podczas uruchamiania skryptu; zamiast patrzeć na czarne pole wiersza poleceń.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-14 12:32:55
Oto przykład z mojego kodu (dla puli wątków, ale po prostu zmień nazwę klasy i będziesz miał pulę procesów):
def execute_run(rp):
... do something
pool = ThreadPoolExecutor(6)
for mat in TESTED_MATERIAL:
for en in TESTED_ENERGIES:
for ecut in TESTED_E_CUT:
rp = RunParams(
simulations, DEST_DIR,
PARTICLE, mat, 960, 0.125, ecut, en
)
pool.submit(execute_run, rp)
pool.join()
W zasadzie:
-
pool = ThreadPoolExecutor(6)
tworzy pulę dla 6 wątków - Następnie masz kilka for, które dodają zadania do puli
-
pool.submit(execute_run, rp)
dodaje zadanie do puli, pierwszy arogument jest funkcją wywołaną w wątku / procesie, reszta argumentów jest przekazywana do wywołanej funkcji. -
pool.join
czeka aż wszystkie zadania zostaną wykonane.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-01-02 17:19:05