Jak skonfigurować seler, aby wywoływał niestandardową funkcję inicjalizacji przed uruchomieniem zadań?
Mam projekt Django i próbuję użyć selera do wysyłania zadań do przetwarzania w tle ( http://ask.github.com/celery/introduction.html ). Selery dobrze integruje się z Django i byłem w stanie przesłać moje niestandardowe zadania i odzyskać wyniki.
Jedyny problem polega na tym, że nie mogę znaleźć rozsądnego sposobu wykonywania niestandardowej inicjalizacji w procesie demona. Muszę zadzwonić do drogiej funkcji, która ładuje dużo pamięci, zanim zacznę przetwarzać zadania, a ja nie mogę sobie pozwolić na wywoływanie tej funkcji za każdym razem.
Czy ktoś już miał ten problem? Jakieś pomysły, jak obejść to bez modyfikowania kodu źródłowego selera?
Thanks
1 answers
Możesz albo napisać własny loader, albo użyć sygnałów.
Loadery mają metodę on_task_init
, która jest wywoływana, gdy zadanie ma być wykonane,
i on_worker_init
, który jest wywoływany przez główny proces seler+celerybeat.
Używanie sygnałów jest prawdopodobnie najłatwiejsze, dostępne sygnały to:
0.8.x:
-
task_prerun(task_id, task, args, kwargs)
Wysyłane, gdy zadanie ma być wykonane przez pracownika (lub lokalnie jeśli ustawiono
apply
/lubCELERY_ALWAYS_EAGER
). task_postrun(task_id, task, args, kwargs, retval)
Wysyłane po wykonaniu zadania w takich samych warunkach jak powyżej.-
task_sent(task_id, task, args, kwargs, eta, taskset)
Wywołane, gdy zadanie jest stosowane (nie jest dobre dla długotrwałych operacji)
Dodatkowe sygnały dostępne w 0.9.x (obecna gałąź master na GitHubie):
-
worker_init()
Wywołane po uruchomieniu celeryd (przed zainicjowaniem zadania, więc jeśli na Obsługa systemu
fork
, wszelkie zmiany pamięci byłyby skopiowane do dziecka procesów pracowniczych). -
worker_ready()
Wywoływane, gdy celeryd jest w stanie odbierać zadania.
-
worker_shutdown()
Wywołane, gdy celeryd się wyłącza.
Oto przykład wstępnego obliczenia czegoś przy pierwszym uruchomieniu zadania w procesie:
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table = {}
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it's empty, so haven't been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
Jeśli chcesz, aby funkcja była uruchomiona dla wszystkich zadań, po prostu pomiń argument sender
.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2010-01-27 09:07:57