Jak uruchamiać funkcje równolegle?
Zbadałem pierwszy i nie mogłem znaleźć odpowiedzi na moje pytanie. Próbuję uruchomić wiele funkcji równolegle w Pythonie.
Mam coś takiego:
files.py
import common #common is a util class that handles all the IO stuff
dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]
def func1():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir1)
c.getFiles(dir1)
time.sleep(10)
c.removeFiles(addFiles[i], dir1)
c.getFiles(dir1)
def func2():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir2)
c.getFiles(dir2)
time.sleep(10)
c.removeFiles(addFiles[i], dir2)
c.getFiles(dir2)
Chcę wywołać func1 i func2 i uruchomić je w tym samym czasie. Funkcje nie oddziałują ze sobą ani na tym samym obiekcie. W tej chwili muszę poczekać na zakończenie func1 przed rozpoczęciem func2. Jak zrobić coś takiego jak poniżej:
process.py
from files import func1, func2
runBothFunc(func1(), func2())
Chcę móc tworzyć zarówno katalogi całkiem zbliżone do tego samego czasu, ponieważ co minutę zliczam ile plików jest tworzonych. Jeśli katalogu tam nie ma, to odrzuci moje wyczucie czasu.
7 answers
You could use threading
lub multiprocessing
.
Ze względu na specyfikę CPython, threading
jest mało prawdopodobne, aby osiągnąć prawdziwą równoległość. Z tego powodu, multiprocessing
jest ogólnie lepszym zakładem.
Oto pełny przykład:
from multiprocessing import Process
def func1():
print 'func1: starting'
for i in xrange(10000000): pass
print 'func1: finishing'
def func2():
print 'func2: starting'
for i in xrange(10000000): pass
print 'func2: finishing'
if __name__ == '__main__':
p1 = Process(target=func1)
p1.start()
p2 = Process(target=func2)
p2.start()
p1.join()
p2.join()
Mechanika uruchamiania / łączenia procesów potomnych może być łatwo zamknięta w funkcji wzdłuż linii runBothFunc
:
def runInParallel(*fns):
proc = []
for fn in fns:
p = Process(target=fn)
p.start()
proc.append(p)
for p in proc:
p.join()
runInParallel(func1, func2)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-08-26 16:08:58
Jeśli twoje funkcje wykonują głównie I / O (i mniej pracy CPU) i masz Pythona 3.2+, możesz użyć ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor
def run_io_tasks_in_parallel(tasks):
with ThreadPoolExecutor() as executor:
running_tasks = [executor.submit(task) for task in tasks]
for running_task in running_tasks:
running_task.result()
run_io_tasks_in_parallel([
lambda: print('IO task 1 running!'),
lambda: print('IO task 2 running!'),
])
Jeśli twoje funkcje wykonują głównie pracę CPU (i mniej pracy we / wy) i masz Pythona 2.6+, możesz użyć modułu multiprocessing:
from multiprocessing import Process
def run_cpu_tasks_in_parallel(tasks):
running_tasks = [Process(target=task) for task in tasks]
for running_task in running_tasks:
running_task.start()
for running_task in running_tasks:
running_task.join()
run_cpu_tasks_in_parallel([
lambda: print('CPU task 1 running!'),
lambda: print('CPU task 2 running!'),
])
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-05-14 21:24:29
Można to zrobić elegancko za pomocą Ray , systemu, który pozwala na łatwą równoległość i dystrybucję kodu Pythona.
Aby zrównoleglować swój przykład, musisz zdefiniować swoje funkcje za pomocą dekoratora @ray.remote
, a następnie wywołać je za pomocą .remote
.
import ray
ray.init()
dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]
# Define the functions.
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
# func1() code here...
@ray.remote
def func2(filename, addFiles, dir):
# func2() code here...
# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)])
Jeśli przekazujesz ten sam argument obu funkcjom i argument jest duży, bardziej efektywnym sposobem na to jest użycie ray.put()
. Pozwala to uniknąć podwójnego serializacji dużego argumentu i utworzyć dwie kopie pamięci it:
largeData_id = ray.put(largeData)
ray.get([func1(largeData_id), func2(largeData_id)])
Ważne - Jeśli func1()
i func2()
zwrócą wyniki, należy przepisać kod w następujący sposób:
ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])
Istnieje wiele zalet używania Ray w stosunku do modułumultiprocessing . W szczególności, ten sam kod będzie działał zarówno na pojedynczej maszynie, jak i na klastrze maszyn. Więcej zalet Ray zobacz ten powiązany post .
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-09-22 08:04:42
Wygląda na to, że masz jedną funkcję, którą musisz wywołać na dwóch różnych parametrach. Można to zrobić elegancko, używając kombinacji concurrent.futures
i map
z Pythonem 3.2 +
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def sleep_secs(seconds):
time.sleep(seconds)
print(f'{seconds} has been processed')
secs_list = [2,4, 6, 8, 10, 12]
Teraz, jeśli Twoja operacja jest związana z IO, możesz użyć ThreadPoolExecutor
jako takiego:
with ThreadPoolExecutor() as executor:
results = executor.map(sleep_secs, secs_list)
Zauważ, jak map
jest tutaj użyte do map
twojej funkcji do listy argumentów.
Teraz, jeśli twoja funkcja jest związana z CPU, możesz użyć ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
results = executor.map(sleep_secs, secs_list)
Jeśli nie jesteś pewien, możesz po prostu wypróbuj oba i zobacz, który z nich daje lepsze wyniki.
Wreszcie, jeśli chcesz wydrukować swoje wyniki, możesz po prostu to zrobić:
with ThreadPoolExecutor() as executor:
results = executor.map(sleep_secs, secs_list)
for result in results:
print(result)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-03-24 13:43:33
Jeśli jesteś użytkownikiem Windows i używasz Pythona 3, ten post pomoże Ci w programowaniu równoległym w Pythonie.gdy uruchomisz zwykłe programowanie puli bibliotek wieloprocesorowych, pojawi się błąd dotyczący głównej funkcji w programie. Dzieje się tak dlatego, że windows nie posiada funkcji fork (). Poniższy post daje rozwiązanie wspomnianego problemu .
Http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html
Odkąd byłem używając Pythona 3, zmieniłem program trochę tak:
from types import FunctionType
import marshal
def _applicable(*args, **kwargs):
name = kwargs['__pw_name']
code = marshal.loads(kwargs['__pw_code'])
gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
defs = marshal.loads(kwargs['__pw_defs'])
clsr = marshal.loads(kwargs['__pw_clsr'])
fdct = marshal.loads(kwargs['__pw_fdct'])
func = FunctionType(code, gbls, name, defs, clsr)
func.fdct = fdct
del kwargs['__pw_name']
del kwargs['__pw_code']
del kwargs['__pw_defs']
del kwargs['__pw_clsr']
del kwargs['__pw_fdct']
return func(*args, **kwargs)
def make_applicable(f, *args, **kwargs):
if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
kwargs['__pw_name'] = f.__name__ # edited
kwargs['__pw_code'] = marshal.dumps(f.__code__) # edited
kwargs['__pw_defs'] = marshal.dumps(f.__defaults__) # edited
kwargs['__pw_clsr'] = marshal.dumps(f.__closure__) # edited
kwargs['__pw_fdct'] = marshal.dumps(f.__dict__) # edited
return _applicable, args, kwargs
def _mappable(x):
x,name,code,defs,clsr,fdct = x
code = marshal.loads(code)
gbls = globals() #gbls = marshal.loads(gbls)
defs = marshal.loads(defs)
clsr = marshal.loads(clsr)
fdct = marshal.loads(fdct)
func = FunctionType(code, gbls, name, defs, clsr)
func.fdct = fdct
return func(x)
def make_mappable(f, iterable):
if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
name = f.__name__ # edited
code = marshal.dumps(f.__code__) # edited
defs = marshal.dumps(f.__defaults__) # edited
clsr = marshal.dumps(f.__closure__) # edited
fdct = marshal.dumps(f.__dict__) # edited
return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)
Po tej funkcji powyższy kod problemu jest również zmieniany trochę tak:
from multiprocessing import Pool
from poolable import make_applicable, make_mappable
def cube(x):
return x**3
if __name__ == "__main__":
pool = Pool(processes=2)
results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
print([result.get(timeout=10) for result in results])
I mam wyjście jako:
[1, 8, 27, 64, 125, 216]
Myślę, że ten post może być przydatny dla niektórych użytkowników windows.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-05-10 14:50:46
Nie ma sposobu, aby zagwarantować, że dwie funkcje będą działać zsynchronizowane ze sobą, co wydaje się być tym, co chcesz zrobić.
Najlepsze, co możesz zrobić, to podzielić funkcję na kilka kroków, a następnie poczekać, aż oba zakończą się w krytycznych punktach synchronizacji za pomocą Process.join
, jak wspomina odpowiedź @aix.
To jest lepsze niż time.sleep(10)
, ponieważ nie możesz zagwarantować dokładnych czasów. Z explicitly waiting, mówisz, że funkcje muszą być wykonane wykonując ten krok przed przechodząc do następnego, zamiast zakładać, że zostanie to zrobione w ciągu 10ms, co nie jest gwarantowane na podstawie tego, co jeszcze dzieje się na maszynie.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-08-26 17:09:47
W 2021 roku najprostszym sposobem jest użycie asyncio:
import asyncio, time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
task1 = asyncio.create_task(
say_after(4, 'hello'))
task2 = asyncio.create_task(
say_after(3, 'world'))
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
Referencje:
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2021-01-12 21:53:46