Jak wykonać równoległą pętlę Pythona?

Jest to prawdopodobnie trywialne pytanie, ale jak paralelizować następującą pętlę w Pythonie?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

Wiem, jak uruchamiać pojedyncze wątki w Pythonie, ale nie wiem, jak "zbierać" wyniki.

Wiele procesów też byłoby w porządku - cokolwiek jest najłatwiejsze w tym przypadku. Używam obecnie Linuksa, ale kod powinien działać na Windows i Mac-również.

Jaki jest najprostszy sposób na równoległe zestawianie tego kodu?

Author: Aaron Hall, 2012-03-20

9 answers

Używanie wielu wątków na Cpythonie nie da ci lepszej wydajności kodu Pythona ze względu na globalną blokadę interpretera (Gil). Proponuję skorzystać z multiprocessing moduł zamiast:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Zauważ, że nie będzie to działać w interaktywnym interpreterze.

Aby uniknąć zwykłego FUD wokół GIL: nie byłoby żadnej korzyści z używania wątków dla tego przykładu. Ty chcesz używać tutaj procesów, a nie wątków, ponieważ unikają One całej masy problemy.

 113
Author: Sven Marnach,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-17 17:14:32

Aby zrównoleglować prostą pętlę for, joblib wnosi wiele wartości do surowego wykorzystania wieloprocesora. Nie tylko krótka składnia, ale także rzeczy takie jak przezroczyste Wiązanie iteracji, gdy są one bardzo szybkie (aby usunąć nagłówek) lub przechwytywanie śledzenia procesu potomnego, aby mieć lepsze raportowanie błędów.

Zastrzeżenie: jestem oryginalnym autorem joblib.

 33
Author: Gael Varoquaux,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-08-08 19:01:03

Jaki jest najprostszy sposób na równoległe połączenie tego kodu?

Bardzo mi się podoba concurrent.futures za to, dostępne w Python3 od wersji 3.2 - i przez backport do 2.6 i 2.7 na PyPi.

Możesz używać wątków lub Procesów i używać dokładnie tego samego interfejsu.

Multiprocessing

Włóż to do pliku - futuretest.py:

import concurrent.futures
import time, random               # add some random sleep time

offset = 2                        # you don't supply these so
def calc_stuff(parameter=None):   # these are examples.
    sleep_time = random.choice([0, 1, 2, 3, 4, 5])
    time.sleep(sleep_time)
    return parameter / 2, sleep_time, parameter * parameter

def procedure(j):                 # just factoring out the
    parameter = j * offset        # procedure
    # call the calculation
    return calc_stuff(parameter=parameter)

def main():
    output1 = list()
    output2 = list()
    output3 = list()
    start = time.time()           # let's see how long this takes

    # we can swap out ProcessPoolExecutor for ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for out1, out2, out3 in executor.map(procedure, range(0, 10)):
            # put results into correct output list
            output1.append(out1)
            output2.append(out2)
            output3.append(out3)
    finish = time.time()
    # these kinds of format strings are only available on Python 3.6:
    # time to upgrade!
    print(f'original inputs: {repr(output1)}')
    print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
    print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
    print(f'returned in order given: {repr(output3)}')

if __name__ == '__main__':
    main()

A oto wyjście:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Wielowątkowość

Teraz zmień ProcessPoolExecutor na ThreadPoolExecutor i uruchom moduł ponownie:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Teraz wykonałeś zarówno wielowątkowość,jak i wielowątkowość!

Uwaga na wydajność i używanie obu razem.

Pobieranie próbek jest zbyt małe, aby porównać wyniki.

Podejrzewam jednak, że wielowątkowość będzie szybsza niż wielowątkowość w ogóle, szczególnie w systemie Windows, ponieważ Windows nie obsługuje forkingu, więc każdy nowy proces musi zająć trochę czasu, aby go uruchomić. Na Linuksie lub Macu pewnie będą bliżej.

Możesz zagnieżdżać wiele wątki wewnątrz wielu procesów, ale nie zaleca się używania wielu wątków do spin off wielu procesów.

 20
Author: Aaron Hall,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-05 16:28:38

Dlaczego nie używasz wątków i jednego mutexu do ochrony jednej globalnej listy?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

Pamiętaj, że będziesz tak szybki jak Twój najwolniejszy wątek

 3
Author: jackdoe,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-03-20 11:54:59

Istnieje wiele zalet korzystania z Ray :

  • można równoległe na wielu maszynach oprócz wielu rdzeni (z tym samym kodem).
  • efektywna obsługa danych numerycznych poprzez pamięć współdzieloną (i serializację zero-copy).
  • wysoka przepustowość zadań z rozproszonym harmonogramem.
  • tolerancja błędów.

W Twoim przypadku możesz uruchomić Ray i zdefiniować funkcję zdalną

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

A następnie przywołać go w parallel

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

Aby uruchomić ten sam przykład na klastrze, jedyną linią, która się zmieni, będzie wywołanie do Raya.init(). Odpowiednią dokumentację można znaleźć tutaj .

Zauważ, że pomagam rozwijać Raya.
 2
Author: Robert Nishihara,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-08-30 06:49:54

Może to być przydatne przy implementacji obliczeń wieloprocesorowych i równoległych / rozproszonych w Pythonie.

Tutorial YouTube na temat korzystania z pakietu techila

Techila jest rozproszonym oprogramowaniem pośredniczącym, które integruje się bezpośrednio z Pythonem za pomocą pakietu techila. Funkcja peach w pakiecie może być przydatna w równoległych strukturach pętli. (Poniższy fragment kodu pochodzi z Techila Community Forums)

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )
 1
Author: TEe,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-10-22 11:30:45
from joblib import Parallel, delayed
import multiprocessing

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)

Powyższe działa pięknie na moim komputerze (Ubuntu, pakiet joblib był preinstalowany, ale można go zainstalować przez pip install joblib).

Wzięte z https://blog.dominodatalab.com/simple-parallelization/

 1
Author: tyrex,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-06-19 10:31:00

Bardzo prostym przykładem przetwarzania równoległego jest

from multiprocessing import Process
output1 = list()
output2 = list()
output3 = list()

def yourfunction():

    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter = parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)
if __name__ == '__main__':
p = Process(target=pa.yourfunction, args=('bob',))
p.start()
p.join()
 0
Author: Adil Warsi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-05-10 06:56:17

Spójrz na to;

Http://docs.python.org/library/queue.html

To może nie jest dobry sposób, ale zrobiłbym coś takiego;

Rzeczywisty kod;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()
Mam nadzieję, że to pomoże.
 -1
Author: MerreM,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-03-20 12:16:20