Jak wykonać równoległą pętlę Pythona?
Jest to prawdopodobnie trywialne pytanie, ale jak paralelizować następującą pętlę w Pythonie?
# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
Wiem, jak uruchamiać pojedyncze wątki w Pythonie, ale nie wiem, jak "zbierać" wyniki.
Wiele procesów też byłoby w porządku - cokolwiek jest najłatwiejsze w tym przypadku. Używam obecnie Linuksa, ale kod powinien działać na Windows i Mac-również.
Jaki jest najprostszy sposób na równoległe zestawianie tego kodu?
9 answers
Używanie wielu wątków na Cpythonie nie da ci lepszej wydajności kodu Pythona ze względu na globalną blokadę interpretera (Gil). Proponuję skorzystać z multiprocessing
moduł zamiast:
pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
Zauważ, że nie będzie to działać w interaktywnym interpreterze.
Aby uniknąć zwykłego FUD wokół GIL: nie byłoby żadnej korzyści z używania wątków dla tego przykładu. Ty chcesz używać tutaj procesów, a nie wątków, ponieważ unikają One całej masy problemy.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-17 17:14:32
Aby zrównoleglować prostą pętlę for, joblib wnosi wiele wartości do surowego wykorzystania wieloprocesora. Nie tylko krótka składnia, ale także rzeczy takie jak przezroczyste Wiązanie iteracji, gdy są one bardzo szybkie (aby usunąć nagłówek) lub przechwytywanie śledzenia procesu potomnego, aby mieć lepsze raportowanie błędów.
Zastrzeżenie: jestem oryginalnym autorem joblib.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-08-08 19:01:03
Jaki jest najprostszy sposób na równoległe połączenie tego kodu?
Bardzo mi się podoba concurrent.futures
za to, dostępne w Python3 od wersji 3.2 - i przez backport do 2.6 i 2.7 na PyPi.
Możesz używać wątków lub Procesów i używać dokładnie tego samego interfejsu.
Multiprocessing
Włóż to do pliku - futuretest.py:
import concurrent.futures
import time, random # add some random sleep time
offset = 2 # you don't supply these so
def calc_stuff(parameter=None): # these are examples.
sleep_time = random.choice([0, 1, 2, 3, 4, 5])
time.sleep(sleep_time)
return parameter / 2, sleep_time, parameter * parameter
def procedure(j): # just factoring out the
parameter = j * offset # procedure
# call the calculation
return calc_stuff(parameter=parameter)
def main():
output1 = list()
output2 = list()
output3 = list()
start = time.time() # let's see how long this takes
# we can swap out ProcessPoolExecutor for ThreadPoolExecutor
with concurrent.futures.ProcessPoolExecutor() as executor:
for out1, out2, out3 in executor.map(procedure, range(0, 10)):
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
finish = time.time()
# these kinds of format strings are only available on Python 3.6:
# time to upgrade!
print(f'original inputs: {repr(output1)}')
print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
print(f'returned in order given: {repr(output3)}')
if __name__ == '__main__':
main()
A oto wyjście:
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
Wielowątkowość
Teraz zmień ProcessPoolExecutor
na ThreadPoolExecutor
i uruchom moduł ponownie:
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
Teraz wykonałeś zarówno wielowątkowość,jak i wielowątkowość!
Uwaga na wydajność i używanie obu razem.
Pobieranie próbek jest zbyt małe, aby porównać wyniki.
Podejrzewam jednak, że wielowątkowość będzie szybsza niż wielowątkowość w ogóle, szczególnie w systemie Windows, ponieważ Windows nie obsługuje forkingu, więc każdy nowy proces musi zająć trochę czasu, aby go uruchomić. Na Linuksie lub Macu pewnie będą bliżej.
Możesz zagnieżdżać wiele wątki wewnątrz wielu procesów, ale nie zaleca się używania wielu wątków do spin off wielu procesów.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-05 16:28:38
Dlaczego nie używasz wątków i jednego mutexu do ochrony jednej globalnej listy?
import os
import re
import time
import sys
import thread
from threading import Thread
class thread_it(Thread):
def __init__ (self,param):
Thread.__init__(self)
self.param = param
def run(self):
mutex.acquire()
output.append(calc_stuff(self.param))
mutex.release()
threads = []
output = []
mutex = thread.allocate_lock()
for j in range(0, 10):
current = thread_it(j * offset)
threads.append(current)
current.start()
for t in threads:
t.join()
#here you have output list filled with data
Pamiętaj, że będziesz tak szybki jak Twój najwolniejszy wątek
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-03-20 11:54:59
Istnieje wiele zalet korzystania z Ray :
- można równoległe na wielu maszynach oprócz wielu rdzeni (z tym samym kodem).
- efektywna obsługa danych numerycznych poprzez pamięć współdzieloną (i serializację zero-copy).
- wysoka przepustowość zadań z rozproszonym harmonogramem.
- tolerancja błędów.
W Twoim przypadku możesz uruchomić Ray i zdefiniować funkcję zdalną
import ray
ray.init()
@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
# Do something.
return 1, 2, 3
A następnie przywołać go w parallel
output1, output2, output3 = [], [], []
# Launch the tasks.
for j in range(10):
id1, id2, id3 = calc_stuff.remote(parameter=j)
output1.append(id1)
output2.append(id2)
output3.append(id3)
# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)
Aby uruchomić ten sam przykład na klastrze, jedyną linią, która się zmieni, będzie wywołanie do Raya.init(). Odpowiednią dokumentację można znaleźć tutaj .
Zauważ, że pomagam rozwijać Raya.Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-08-30 06:49:54
Może to być przydatne przy implementacji obliczeń wieloprocesorowych i równoległych / rozproszonych w Pythonie.
Tutorial YouTube na temat korzystania z pakietu techila
Techila jest rozproszonym oprogramowaniem pośredniczącym, które integruje się bezpośrednio z Pythonem za pomocą pakietu techila. Funkcja peach w pakiecie może być przydatna w równoległych strukturach pętli. (Poniższy fragment kodu pochodzi z Techila Community Forums)
techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
jobs = jobcount # Number of Jobs in the Project
)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-10-22 11:30:45
from joblib import Parallel, delayed
import multiprocessing
inputs = range(10)
def processInput(i):
return i * i
num_cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)
Powyższe działa pięknie na moim komputerze (Ubuntu, pakiet joblib był preinstalowany, ale można go zainstalować przez pip install joblib
).
Wzięte z https://blog.dominodatalab.com/simple-parallelization/
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-06-19 10:31:00
Bardzo prostym przykładem przetwarzania równoległego jest
from multiprocessing import Process
output1 = list()
output2 = list()
output3 = list()
def yourfunction():
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
if __name__ == '__main__':
p = Process(target=pa.yourfunction, args=('bob',))
p.start()
p.join()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-05-10 06:56:17
Spójrz na to;
Http://docs.python.org/library/queue.html
To może nie jest dobry sposób, ale zrobiłbym coś takiego;
Rzeczywisty kod;
from multiprocessing import Process, JoinableQueue as Queue
class CustomWorker(Process):
def __init__(self,workQueue, out1,out2,out3):
Process.__init__(self)
self.input=workQueue
self.out1=out1
self.out2=out2
self.out3=out3
def run(self):
while True:
try:
value = self.input.get()
#value modifier
temp1,temp2,temp3 = self.calc_stuff(value)
self.out1.put(temp1)
self.out2.put(temp2)
self.out3.put(temp3)
self.input.task_done()
except Queue.Empty:
return
#Catch things better here
def calc_stuff(self,param):
out1 = param * 2
out2 = param * 4
out3 = param * 8
return out1,out2,out3
def Main():
inputQueue = Queue()
for i in range(10):
inputQueue.put(i)
out1 = Queue()
out2 = Queue()
out3 = Queue()
processes = []
for x in range(2):
p = CustomWorker(inputQueue,out1,out2,out3)
p.daemon = True
p.start()
processes.append(p)
inputQueue.join()
while(not out1.empty()):
print out1.get()
print out2.get()
print out3.get()
if __name__ == '__main__':
Main()
Mam nadzieję, że to pomoże.Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-03-20 12:16:20