Jak efektywnie wykonywać wiele zadań "trochę później" w Pythonie?

Mam proces, który musi wykonać kilka czynności "później" (zwykle po 10-60 sekundach). Problem polega na tym ,że te "późniejsze" działania mogą być dużo (1000s), więc użycie Thread Na zadanie nie jest wykonalne. Wiem o istnieniu narzędzi takich jak gevent i eventlet , ale jednym z problemów jest to, że proces używa zeromq do komunikacji, więc potrzebowałbym trochę integracji (eventlet już ją ma).

Zastanawiam się jakie są moje opcje? więc, sugestie są mile widziane, w liniach bibliotek (jeśli korzystałeś z którejś z wymienionych, podziel się swoimi doświadczeniami), techniki (Obsługa Pythona "coroutine" , Użyj jednego wątku, który śpi przez chwilę i sprawdza kolejkę), jak użyć poll lub eventloop zeromq do wykonania zadania, lub coś innego.

Author: Emil Ivanov, 2011-07-14

10 answers

Rozważ użycie kolejki priorytetów z jednym lub kilkoma wątkami roboczymi do obsługi zadań. Główny wątek może dodać pracę do kolejki, ze znacznikiem czasu najwcześniej, kiedy powinna być serwisowana. Wątki robocze wyłączają pracę z kolejki, zasypiają do czasu osiągnięcia wartości priorytetu, wykonują pracę, a następnie wyłączają kolejny element z kolejki.

Co powiesz na bardziej rozbudowaną odpowiedź? mklauber ma rację. Jeśli jest szansa, że wszyscy Twoi pracownicy mogą spać, gdy masz Nowa, pilniejsza praca, wtedy queue.PriorityQueue nie jest tak naprawdę rozwiązaniem, chociaż" Kolejka priorytetów " jest nadal techniką do użycia, która jest dostępna z modułu heapq. Zamiast tego użyjemy innej prymitywnej synchronizacji; zmiennej warunkowej, która w Pythonie jest pisana threading.Condition.

Podejście jest dość proste, Zerknij na stertę, a jeśli praca jest aktualna, wyłącz ją i wykonaj tę pracę. Jeśli była praca, ale jest zaplanowana na przyszłość, po prostu poczekaj na warunek, aż wtedy, lub jeśli nie ma pracy w ogóle, spać na zawsze.

Producent robi sprawiedliwą część pracy; za każdym razem, gdy dodaje nową pracę, powiadamia o warunku, więc jeśli są śpiący pracownicy, obudzą się i ponownie sprawdzą kolejkę do nowszej pracy.

import heapq, time, threading

START_TIME = time.time()
SERIALIZE_STDOUT = threading.Lock()
def consumer(message):
    """the actual work function.  nevermind the locks here, this just keeps
       the output nicely formatted.  a real work function probably won't need
       it, or might need quite different synchronization"""
    SERIALIZE_STDOUT.acquire()
    print time.time() - START_TIME, message
    SERIALIZE_STDOUT.release()

def produce(work_queue, condition, timeout, message):
    """called to put a single item onto the work queue."""
    prio = time.time() + float(timeout)
    condition.acquire()
    heapq.heappush(work_queue, (prio, message))
    condition.notify()
    condition.release()

def worker(work_queue, condition):
    condition.acquire()
    stopped = False
    while not stopped:
        now = time.time()
        if work_queue:
            prio, data = work_queue[0]
            if data == 'stop':
                stopped = True
                continue
            if prio < now:
                heapq.heappop(work_queue)
                condition.release()
                # do some work!
                consumer(data)
                condition.acquire()
            else:
                condition.wait(prio - now)
        else:
            # the queue is empty, wait until notified
            condition.wait()
    condition.release()

if __name__ == '__main__':
    # first set up the work queue and worker pool
    work_queue = []
    cond = threading.Condition()
    pool = [threading.Thread(target=worker, args=(work_queue, cond))
            for _ignored in range(4)]
    map(threading.Thread.start, pool)

    # now add some work
    produce(work_queue, cond, 10, 'Grumpy')
    produce(work_queue, cond, 10, 'Sneezy')
    produce(work_queue, cond, 5, 'Happy')
    produce(work_queue, cond, 10, 'Dopey')
    produce(work_queue, cond, 15, 'Bashful')
    time.sleep(5)
    produce(work_queue, cond, 5, 'Sleepy')
    produce(work_queue, cond, 10, 'Doc')

    # and just to make the example a bit more friendly, tell the threads to stop after all
    # the work is done
    produce(work_queue, cond, float('inf'), 'stop')
    map(threading.Thread.join, pool)
 20
Author: SingleNegationElimination,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-07-25 21:56:26

[11]}ta odpowiedź ma właściwie dwie sugestie - moją pierwszą i drugą, którą odkryłem po pierwszej.

Sched

Podejrzewam, że szukasz sched moduł .

EDIT: moja sugestia wydawała się mało pomocna po przeczytaniu. Postanowiłem więc przetestować moduł sched, aby sprawdzić, czy może on działać zgodnie z moimi sugestiami. Oto mój test: użyłbym go z pojedynczym gwintem, mniej więcej w ten sposób: {]}

class SchedulingThread(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.queue = []
        self.queue_lock = threading.Lock()
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

    def run(self):
        self.scheduler.run()

    def schedule(self, function, delay):
        with self.queue_lock:
            self.queue.append((delay, 1, function, ()))

    def _schedule_in_scheduler(self):
        with self.queue_lock:
            for event in self.queue:
                self.scheduler.enter(*event)
                print "Registerd event", event
            self.queue = []
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

Najpierw stworzyłbym Klasa wątku, która miałaby własny scheduler i kolejkę. Co najmniej jedno zdarzenie zostanie zarejestrowane w harmonogramie: jedno do wywołania metody planowania zdarzeń z kolejki.

class SchedulingThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.queue = []
        self.queue_lock = threading.Lock()
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

Metoda planowania zdarzeń z kolejki będzie blokować kolejkę, planować każde zdarzenie, opróżniać kolejkę i planować się ponownie, w celu szukania nowych zdarzeń w przyszłości. Pamiętaj, że okres poszukiwania nowych wydarzeń jest krótki( jedna sekunda), możesz zmienić it:

    def _schedule_in_scheduler(self):
        with self.queue_lock:
            for event in self.queue:
                self.scheduler.enter(*event)
                print "Registerd event", event
            self.queue = []
        self.scheduler.enter(1, 1, self._schedule_in_scheduler, ())

Klasa powinna mieć również metodę planowania zdarzeń użytkownika. Oczywiście ta metoda powinna zablokować kolejkę podczas aktualizacji:

    def schedule(self, function, delay):
        with self.queue_lock:
            self.queue.append((delay, 1, function, ()))

Wreszcie, klasa powinna wywołać główną metodę schedulera:

    def run(self):
        self.scheduler.run()

Oto przykład użycia:

def print_time():
    print "scheduled:", time.time()


if __name__ == "__main__":
    st = SchedulingThread()
    st.start()          
    st.schedule(print_time, 10)

    while True:
        print "main thread:", time.time()
        time.sleep(5)

    st.join()

Jego wyjście w mojej maszynie to:

$ python schedthread.py
main thread: 1311089765.77
Registerd event (10, 1, <function print_time at 0x2f4bb0>, ())
main thread: 1311089770.77
main thread: 1311089775.77
scheduled: 1311089776.77
main thread: 1311089780.77
main thread: 1311089785.77

Ten kod to tylko szybki przykład, może wymagać trochę pracy. Muszę jednak przyznać, że jestem trochę zafascynowany modułem sched, więc sugerowałem to. Możesz też poszukać innych propozycji:) [12]}

APScheduler

Szukając w Google rozwiązań takich jak ten, który napisałem, znalazłem ten niesamowity moduł APScheduler . To jest tak praktyczne i użyteczne, że założę się, że jest Twoim rozwiązaniem. Mój poprzedni przykład byłby o wiele prostszy z tym modułem:

from apscheduler.scheduler import Scheduler
import time

sch = Scheduler()
sch.start()

@sch.interval_schedule(seconds=10)

def print_time():
    print "scheduled:", time.time()
    sch.unschedule_func(print_time)

while True:
    print "main thread:", time.time()
    time.sleep(5)

(niestety nie znalazłem sposobu na zaplanowanie zdarzenia do wykonania tylko raz, więc zdarzenie funkcji powinno samo się wyłączyć. I założę się, że da się to rozwiązać jakimś dekoratorem.)

 11
Author: brandizzi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-07-19 23:34:47

Jeśli masz kilka zadań, które muszą zostać wykonane później, i chcesz, aby utrzymywały się, nawet jeśli zamkniesz program wywołujący lub Twoich pracowników, powinieneś naprawdę spojrzeć na selery , co sprawia, że bardzo łatwo jest tworzyć nowe zadania, wykonywać je na dowolnej maszynie, i czekać na wyniki.

Ze strony selera "jest to proste zadanie dodawanie dwóch liczb:"

from celery.task import task

@task
def add(x, y):
    return x + y

Możesz wykonać zadanie w tle, lub poczekać na zakończenie:

>>> result = add.delay(8, 8)
>>> result.wait() # wait for and return the result
16
 7
Author: cce,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-07-25 23:41:01

Napisałeś:

Jednym z problemów jest to, że proces używa zeromq do komunikacji, więc potrzebuję trochę integracji (eventlet już to ma)

Wydaje się, że twój wybór będzie miał duży wpływ na te szczegóły, które są nieco niejasne-w jaki sposób zeromq jest używany do komunikacji, ile zasobów będzie wymagać integracja, a jakie są Twoje wymagania i dostępne zasoby.


Jest projekt o nazwie django-ztask który używa zeromq i zapewnia task dekorator podobny do selera. Jednakże, jest to (oczywiście) specyficzne dla Django, więc może nie być odpowiednie w Twoim przypadku. Nie używałem, wolę seler siebie.

Używałem selera dla kilku projektów (te są hostowane na ep.io PaaS hosting, który zapewnia łatwy sposób korzystania z niego).

Seler wygląda na dość elastyczne rozwiązanie, pozwalające opóźniać zadania, oddzwaniać, wygaśnięcie i ponowna próba zadania, ograniczenie szybkości wykonywania zadań itp. Może być używany z Redis, Beanstalk, CouchDB, MongoDB lub bazą danych SQL.

Przykładowy kod (definicja zadania i asynchroniczne wykonanie po opóźnieniu):

from celery.decorators import task

@task
def my_task(arg1, arg2):
    pass # Do something

result = my_task.apply_async(
    args=[sth1, sth2], # Arguments that will be passed to `my_task()` function.
    countdown=3, # Time in seconds to wait before queueing the task.
)

Zobacz także sekcję w seler docs .

 3
Author: Tony,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-07-26 09:44:51

Spojrzałeś na moduł multiprocessing? Jest standardowo wyposażony w Python. Jest podobny do modułu threading, ale uruchamia każde zadanie w procesie. Można użyć obiektu Pool() do skonfigurowania puli roboczej, a następnie użyć metody .map() do wywołania funkcji z różnymi kolejkowymi argumentami zadania.

 2
Author: steveha,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-07-20 02:54:11

Pyzmq posiada implementację ioloop z podobnym api jak tornado ioloop. Realizuje on DelayedCallback co może Ci pomóc.

 1
Author: Rob Cowie,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-07-24 10:03:39

Zakładając, że twój proces ma pętlę run, która może odbierać sygnały, a czas każdej akcji mieści się w granicach operacji sekwencyjnych, użyj sygnałów i POSIX alarm ()

    signal.alarm(time)
If time is non-zero, this function requests that a 
SIGALRM signal be sent to the process in time seconds. 

To zależy od tego, co rozumiesz przez "te" późniejsze "działania mogą być dużo" i czy twój proces już używa sygnałów. Ze względu na sformułowanie pytania nie jest jasne, dlaczego zewnętrzny pakiet Pythona byłby potrzebny.

 0
Author: Jonathan Cline IEEE,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-07-20 22:17:07

Inną opcją jest użycie wiązań Phyton GLib , w szczególności jego funkcji timeout.

Jest to dobry wybór, o ile nie chcesz używać wielu rdzeni i o ile zależność od GLib nie stanowi problemu. Obsługuje wszystkie zdarzenia w tym samym wątku, co zapobiega problemom z synchronizacją. Dodatkowo jego ramka zdarzeń może być również używana do obserwacji i obsługi zdarzeń opartych na IO (np. socketach).

UPDATE:

Oto sesja na żywo z wykorzystaniem GLib: {]}

>>> import time
>>> import glib
>>> 
>>> def workon(thing):
...     print("%s: working on %s" % (time.time(), thing))
...     return True # use True for repetitive and False for one-time tasks
... 
>>> ml = glib.MainLoop()
>>> 
>>> glib.timeout_add(1000, workon, "this")
2
>>> glib.timeout_add(2000, workon, "that")
3
>>> 
>>> ml.run()
1311343177.61: working on this
1311343178.61: working on that
1311343178.61: working on this
1311343179.61: working on this
1311343180.61: working on this
1311343180.61: working on that
1311343181.61: working on this
1311343182.61: working on this
1311343182.61: working on that
1311343183.61: working on this
 0
Author: Oben Sonne,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-07-22 14:02:31

Moim zdaniem przydałoby się coś o nazwie "multitasking kooperacyjny". To pokręcona rzecz i jest naprawdę fajna. Wystarczy spojrzeć na prezentację PyCon z 2010 roku: http://blip.tv/pycon-us-videos-2009-2010-2011/pycon-2010-cooperative-multitasking-with-twisted-getting-things-done-concurrently-11-3352182

Cóż, będziesz potrzebował kolejki transportowej, aby to zrobić...

 0
Author: kkszysiu,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-07-25 14:49:43

Proste. Możesz dziedziczyć swoją klasę z wątku i tworzyć instancję swojej klasy z param jak timeout, więc dla każdej instancji twojej klasy możesz powiedzieć timeout, który sprawi, że Twój wątek będzie czekał na ten czas

 0
Author: Jedi Shadow,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-05-31 08:51:56