Jak skonfigurować seler, aby wywoływał niestandardową funkcję inicjalizacji przed uruchomieniem zadań?

Mam projekt Django i próbuję użyć selera do wysyłania zadań do przetwarzania w tle ( http://ask.github.com/celery/introduction.html ). Selery dobrze integruje się z Django i byłem w stanie przesłać moje niestandardowe zadania i odzyskać wyniki.

Jedyny problem polega na tym, że nie mogę znaleźć rozsądnego sposobu wykonywania niestandardowej inicjalizacji w procesie demona. Muszę zadzwonić do drogiej funkcji, która ładuje dużo pamięci, zanim zacznę przetwarzać zadania, a ja nie mogę sobie pozwolić na wywoływanie tej funkcji za każdym razem.

Czy ktoś już miał ten problem? Jakieś pomysły, jak obejść to bez modyfikowania kodu źródłowego selera?

Thanks

Author: xelk, 2010-01-25

1 answers

Możesz albo napisać własny loader, albo użyć sygnałów.

Loadery mają metodę on_task_init, która jest wywoływana, gdy zadanie ma być wykonane, i on_worker_init, który jest wywoływany przez główny proces seler+celerybeat.

Używanie sygnałów jest prawdopodobnie najłatwiejsze, dostępne sygnały to:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    Wysyłane, gdy zadanie ma być wykonane przez pracownika (lub lokalnie jeśli ustawiono apply/lub CELERY_ALWAYS_EAGER).

  • task_postrun(task_id, task, args, kwargs, retval) Wysyłane po wykonaniu zadania w takich samych warunkach jak powyżej.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    Wywołane, gdy zadanie jest stosowane (nie jest dobre dla długotrwałych operacji)

Dodatkowe sygnały dostępne w 0.9.x (obecna gałąź master na GitHubie):

  • worker_init()

    Wywołane po uruchomieniu celeryd (przed zainicjowaniem zadania, więc jeśli na Obsługa systemu fork, wszelkie zmiany pamięci byłyby skopiowane do dziecka procesów pracowniczych).

  • worker_ready()

    Wywoływane, gdy celeryd jest w stanie odbierać zadania.

  • worker_shutdown()

    Wywołane, gdy celeryd się wyłącza.

Oto przykład wstępnego obliczenia czegoś przy pierwszym uruchomieniu zadania w procesie:

from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun

_precalc_table = {}

class PowersOfTwo(Task):

    def run(self, x):
        if x in _precalc_table:
            return _precalc_table[x]
        else:
            return x ** 2
tasks.register(PowersOfTwo)


def _precalc_numbers(**kwargs):
    if not _precalc_table: # it's empty, so haven't been generated yet
        for i in range(1024):
            _precalc_table[i] = i ** 2


# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])

Jeśli chcesz, aby funkcja była uruchomiona dla wszystkich zadań, po prostu pomiń argument sender.

 15
Author: asksol,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2010-01-27 09:07:57