zeromq: jak zapobiec nieskończonemu czekaniu?

Właśnie zacząłem z ZMQ. Projektuję aplikację, której przepływ pracy to:

  1. jeden z wielu klientów (którzy mają losowe adresy PULL) wysyła żądanie do serwera pod numerem 5555
  2. Serwer zawsze czeka na naciśnięcia klienta. Kiedy przychodzi, proces pracownika jest zrodzony dla tego konkretnego żądania. Tak, procesy robocze mogą istnieć jednocześnie.
  3. Kiedy ten proces zakończy swoje zadanie, Wypycha wynik do klienta.

Zakładam, że Odpowiednia jest do tego Architektura PUSH/PULL. Proszę, popraw mnie w tej sprawie.


Ale jak poradzić sobie z tymi scenariuszami?

  1. client_receiver.recv() będzie czekać nieskończony czas, gdy serwer nie odpowie.
  2. klient może wysłać żądanie, ale nie powiedzie się natychmiast po tym, stąd proces worker pozostanie zablokowany w server_sender.send () forever.

Więc jak ustawić coś takiego jak timeout W PUSH/PULL modelka?


EDIT: dzięki sugestiom user938949, dostałem działającą odpowiedź i dzielę się nią dla potomności.

Author: Jesvin Jose, 2011-09-24

4 answers

Jeśli używasz zeromq >= 3.0, możesz ustawić opcję gniazda RCVTIMEO:

client_receiver.RCVTIMEO = 1000 # in milliseconds

Ale ogólnie można użyć ankieterów:

poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send

I poller.poll() trwa limit czasu:

evts = poller.poll(1000) # wait *up to* one second for a message to arrive.

evts będzie pusta lista, jeśli nie ma nic do odebrania.

Możesz przeprowadzić ankietę za pomocą zmq.POLLOUT, aby sprawdzić, czy wysłanie się powiedzie.

Lub, aby zająć się sprawą rówieśnika, który mógł się nie udać, a:

worker.send(msg, zmq.NOBLOCK)

Może wystarczyć, który zawsze powróci natychmiast-podniesienie ZMQError(zmq.EAGAIN) jeśli wysyłanie nie może się zakończyć.

 81
Author: minrk,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-09-24 16:33:21

To był szybki hack zrobiłem po skierowaniu odpowiedzi user938949 i http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq /. Jeśli zrobisz to lepiej, napisz swoją odpowiedź, polecę Twoją odpowiedź .

Dla tych, którzy chcą trwałych rozwiązań dotyczących niezawodności, zapoznaj się http://zguide.zeromq.org/page:all#toc64

Wersja 3.0 zeromq (beta ATM) obsługuje timeout w ZMQ_RCVTIMEO i ZMQ_SNDTIMEO. http://api.zeromq.org/3-0:zmq-setsockopt

Serwer

Zmq.NOBLOCK zapewnia, że gdy klient nie istnieje, send() nie blokuje.
import time
import zmq
context = zmq.Context()

ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")

i=0

while True:
    i=i+1
    time.sleep(0.5)
    print ">>sending message ",i
    try:
        ventilator_send.send(repr(i),zmq.NOBLOCK)
        print "  succeed"
    except:
        print "  failed"

Klient

Obiekt poller może nasłuchiwać na wielu odbiorczych gniazdach (patrz" Python Multiprocessing with ZeroMQ", link powyżej. Połączyłem go tylko na work_receiver . W pętli nieskończonej, klient wykonuje ankiety z interwałem 1000ms. obiekt socks zwraca pusty, jeśli żadna wiadomość nie ma otrzymano w tym czasie.

import time
import zmq
context = zmq.Context()

work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")

poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)

# Loop and accept messages from both channels, acting accordingly
while True:
    socks = dict(poller.poll(1000))
    if socks:
        if socks.get(work_receiver) == zmq.POLLIN:
            print "got message ",work_receiver.recv(zmq.NOBLOCK)
    else:
        print "error: message timeout"
 16
Author: Jesvin Jose,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-06-21 09:12:05

Jeśli użyjesz ZMQ_NOBLOCK, wyślij blokadę, ale jeśli spróbujesz zamknąć gniazdo i kontekst, ten krok zablokuje wyjście programu..

Powodem jest to, że gniazdo czeka na dowolnego peera, aby wiadomości wychodzące były ustawione w kolejce.. Aby natychmiast zamknąć gniazdo i wypłukać wychodzące wiadomości z bufora, użyj ZMQ_LINGER i ustaw go na 0..

 9
Author: Adobri,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-06-01 07:50:15

Jeśli czekasz tylko na jedno gniazdo, zamiast tworzyć Poller, możesz to zrobić:

if work_receiver.poll(1000, zmq.POLLIN):
    print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
    print "error: message timeout"

Możesz tego użyć, jeśli twój limit czasu zmienia się w zależności od sytuacji, zamiast Ustawienia work_receiver.RCVTIMEO.

 3
Author: Mathieu Longtin,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-05-02 13:41:55