Jak uruchamiać funkcje równolegle?

Zbadałem pierwszy i nie mogłem znaleźć odpowiedzi na moje pytanie. Próbuję uruchomić wiele funkcji równolegle w Pythonie.

Mam coś takiego:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

Chcę wywołać func1 i func2 i uruchomić je w tym samym czasie. Funkcje nie oddziałują ze sobą ani na tym samym obiekcie. W tej chwili muszę poczekać na zakończenie func1 przed rozpoczęciem func2. Jak zrobić coś takiego jak poniżej:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

Chcę móc tworzyć zarówno katalogi całkiem zbliżone do tego samego czasu, ponieważ co minutę zliczam ile plików jest tworzonych. Jeśli katalogu tam nie ma, to odrzuci moje wyczucie czasu.

Author: martineau, 2011-08-26

7 answers

You could use threading lub multiprocessing.

Ze względu na specyfikę CPython, threading jest mało prawdopodobne, aby osiągnąć prawdziwą równoległość. Z tego powodu, multiprocessing jest ogólnie lepszym zakładem.

Oto pełny przykład:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

Mechanika uruchamiania / łączenia procesów potomnych może być łatwo zamknięta w funkcji wzdłuż linii runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)
 189
Author: NPE,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-08-26 16:08:58

Jeśli twoje funkcje wykonują głównie I / O (i mniej pracy CPU) i masz Pythona 3.2+, możesz użyć ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

Jeśli twoje funkcje wykonują głównie pracę CPU (i mniej pracy we / wy) i masz Pythona 2.6+, możesz użyć modułu multiprocessing:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])
 25
Author: David Foster,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-05-14 21:24:29

Można to zrobić elegancko za pomocą Ray , systemu, który pozwala na łatwą równoległość i dystrybucję kodu Pythona.

Aby zrównoleglować swój przykład, musisz zdefiniować swoje funkcje za pomocą dekoratora @ray.remote, a następnie wywołać je za pomocą .remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

Jeśli przekazujesz ten sam argument obu funkcjom i argument jest duży, bardziej efektywnym sposobem na to jest użycie ray.put(). Pozwala to uniknąć podwójnego serializacji dużego argumentu i utworzyć dwie kopie pamięci it:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

Ważne - Jeśli func1() i func2() zwrócą wyniki, należy przepisać kod w następujący sposób:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

Istnieje wiele zalet używania Ray w stosunku do modułumultiprocessing . W szczególności, ten sam kod będzie działał zarówno na pojedynczej maszynie, jak i na klastrze maszyn. Więcej zalet Ray zobacz ten powiązany post .

 21
Author: Ion Stoica,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-09-22 08:04:42

Wygląda na to, że masz jedną funkcję, którą musisz wywołać na dwóch różnych parametrach. Można to zrobić elegancko, używając kombinacji concurrent.futures i map z Pythonem 3.2 +

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

Teraz, jeśli Twoja operacja jest związana z IO, możesz użyć ThreadPoolExecutor jako takiego:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Zauważ, jak map jest tutaj użyte do map twojej funkcji do listy argumentów.

Teraz, jeśli twoja funkcja jest związana z CPU, możesz użyć ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Jeśli nie jesteś pewien, możesz po prostu wypróbuj oba i zobacz, który z nich daje lepsze wyniki.

Wreszcie, jeśli chcesz wydrukować swoje wyniki, możesz po prostu to zrobić:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)
 9
Author: BICube,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-03-24 13:43:33

Jeśli jesteś użytkownikiem Windows i używasz Pythona 3, ten post pomoże Ci w programowaniu równoległym w Pythonie.gdy uruchomisz zwykłe programowanie puli bibliotek wieloprocesorowych, pojawi się błąd dotyczący głównej funkcji w programie. Dzieje się tak dlatego, że windows nie posiada funkcji fork (). Poniższy post daje rozwiązanie wspomnianego problemu .

Http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Odkąd byłem używając Pythona 3, zmieniłem program trochę tak:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

Po tej funkcji powyższy kod problemu jest również zmieniany trochę tak:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

I mam wyjście jako:

[1, 8, 27, 64, 125, 216]

Myślę, że ten post może być przydatny dla niektórych użytkowników windows.

 5
Author: Arun Sooraj,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-05-10 14:50:46

Nie ma sposobu, aby zagwarantować, że dwie funkcje będą działać zsynchronizowane ze sobą, co wydaje się być tym, co chcesz zrobić.

Najlepsze, co możesz zrobić, to podzielić funkcję na kilka kroków, a następnie poczekać, aż oba zakończą się w krytycznych punktach synchronizacji za pomocą Process.join, jak wspomina odpowiedź @aix.

To jest lepsze niż time.sleep(10), ponieważ nie możesz zagwarantować dokładnych czasów. Z explicitly waiting, mówisz, że funkcje muszą być wykonane wykonując ten krok przed przechodząc do następnego, zamiast zakładać, że zostanie to zrobione w ciągu 10ms, co nie jest gwarantowane na podstawie tego, co jeszcze dzieje się na maszynie.

 4
Author: Davy8,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-08-26 17:09:47

W 2021 roku najprostszym sposobem jest użycie asyncio:

import asyncio, time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():

    task1 = asyncio.create_task(
        say_after(4, 'hello'))

    task2 = asyncio.create_task(
        say_after(3, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")


asyncio.run(main())

Referencje:

[1] https://docs.python.org/3/library/asyncio-task.html

 0
Author: bruziuz,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2021-01-12 21:53:46