Używanie ZeroMQ razem z Boost:: ASIO

Mam aplikację C++, która używa ZeroMQ do wysyłania wiadomości. Ale musi również zapewnić połączenie SGCI dla usługi internetowej opartej na AJAX / Comet.

Do tego potrzebuję normalnego gniazda TCP. Mógłbym to zrobić przez zwykłe gniazda Posix, ale aby pozostać cross platform portable i ułatwić sobie życie (mam nadzieję...) Myślałem o użyciu Boost:: ASIO.

Ale teraz mam zderzenie ZMQ chcącego używać własnego zmq_poll() i ASIO to io_service.run()...

Czy jest sposób aby ASIO współpracować z 0MQ zmq_poll()?

A może jest inny zalecany sposób, aby osiągnąć taką konfigurację?

Uwaga: mógłbym to rozwiązać używając wielu wątków - ale to tylko mały pojedynczy rdzeń / procesor, który uruchomi ten program z bardzo małą ilością ruchu SCGI, więc wielowątkowość byłaby stratą zasobów...

Author: ildjarn, 2012-10-11

4 answers

Po zapoznaniu się z dokumentacją tutaj i tutaj , a konkretnie ten akapit

ZMQ_FD: pobranie deskryptora pliku powiązanego z gniazdem ZMQ_FD opcja pobiera deskryptor pliku powiązany z określone Gniazdo. Zwracany deskryptor pliku może być użyty do Zintegruj gniazdo z istniejącą pętlą zdarzeń; biblioteka ØMQ sygnalizuje wszelkie oczekujące zdarzenia na gnieździe w wyzwalanym krawędziowo Moda poprzez wykonanie deskryptora pliku przygotuj się do czytania.

Myślę, że możesz użyć null_buffers dla każdego zmq_pollitem_t i odkłada pętlę zdarzeń na io_service, całkowicie pomijając zmq_poll(). Wydaje się jednak, że istnieją pewne zastrzeżenia w wyżej wymienionej dokumentacji, w szczególności

Możliwość odczytu z zwracanego deskryptora pliku nie koniecznie wskazują, że wiadomości są dostępne do odczytu, lub mogą być zapisywane do gniazda bazowego; aplikacje muszą pobierać rzeczywista stan zdarzenia z późniejszym pobraniem ZMQ_EVENTS opcja.

Więc kiedy Obsługa jednego z gniazd zmq zostanie uruchomiona, będziesz musiał wykonać trochę więcej pracy przed obsługą zdarzenia. Nieskomplikowany pseudo-kod znajduje się poniżej

const int fd = getZmqDescriptorSomehow();
boost::asio::posix::stream_descriptor socket( _io_service, fd );
socket->async_read_some(
    boost::asio::null_buffers(),
    [=](const boost::system::error_code& error)
    {
       if (!error) {
           // handle data ready to be read
       }
     }
);

Uwaga nie musisz tutaj używać lambda, boost::bind aby funkcja member była wystarczająca.

 14
Author: Sam Miller,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:25:29

W końcu doszedłem do wniosku, że są dwa możliwe rozwiązania:

  • Sam Miller używa pętli zdarzeń ASIO
  • pętla zdarzeń ZeroMQ poprzez pobranie deskryptorów plików ASIO za pomocą metod .native() acceptor i socket i wstawienie ich do tablicy zmq_pollitem_t

Zaakceptowałem odpowiedź sama Millera, ponieważ jest to dla mnie najlepsze rozwiązanie w przypadku SCGI, gdzie tworzone są i kończone stale nowe połączenia. Obsługa każdej zmiany zmq_pollitem_t tablica jest dużym kłopotem, którego można uniknąć używając pętli zdarzeń ASIO.

 2
Author: Chris,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-10-14 16:09:49

Uzyskanie gniazda do ZeroMQ jest najmniejszą częścią bitwy. ZeroMQ opiera się na protokole , który jest warstwowy nad TCP, więc będziesz musiał ponownie zaimplementować ZeroMQ w niestandardowym Boost.Asio io_service jeśli jedziesz tą trasą. Ten sam problem napotkałem podczas tworzenia usługi asynchronicznej ENet przy użyciu Boost.Asio najpierw po prostu próbuje złapać ruch z klienta ENet za pomocą Boost.Serwis ASIO UDP. ENet jest protokołem TCP podobnym do UDP, więc wszystko co osiągnąłem w tym punktem było łapanie pakietów w praktycznie bezużytecznym stanie.

Boost.Asio jest oparte na szablonach, a wbudowane io_service używają szablonów, aby zasadniczo owinąć bibliotekę gniazd systemowych w celu utworzenia usługi TCP i UDP. Moim ostatecznym rozwiązaniem było stworzenie niestandardowej usługi io_service, która owijała bibliotekę ENet, a nie bibliotekę gniazd systemowych, pozwalając jej na korzystanie z funkcji transportowych ENet, a nie na ich ponowne implementowanie przy użyciu wbudowanego transportu UDP.

To samo można zrobić dla ZeroMQ, ale ZeroMQ jest już bardzo wydajną biblioteką sieciową, która sama w sobie zapewnia asynchroniczne wejścia/Wyjścia.myślę, że możesz stworzyć realne rozwiązanie, odbierając wiadomości za pomocą istniejącego API ZeroMQ i przekazując je do puli wątków io_service. W ten sposób wiadomości / zadania będą nadal obsługiwane asynchronicznie za pomocą Boost.Schemat reaktora Asio bez konieczności ponownego pisania czegokolwiek. ZeroMQ zapewni asynchroniczne I / O, Boost.Asio dostarczy zadanie asynchroniczne opiekunowie/pracownicy.

Istniejący io_service może być nadal podłączony do istniejącego gniazda TCP, umożliwiając threadpool obsługę zarówno TCP (w Twoim przypadku HTTP), jak i ZeroMQ. Jest to całkowicie możliwe w takiej konfiguracji, aby procedury obsługi zadań ZeroMQ miały dostęp do obiektów sesji usług TCP, co pozwala na wysyłanie wyników wiadomości/zadania ZeroMQ z powrotem do klienta TCP.

Poniżej znajduje się tylko ilustracja tego pojęcia.

// Create a pool of threads to run all of the io_services.
std::vector<boost::shared_ptr<boost::thread> > threads;
for(std::size_t i = 0; i < thread_pool_size_; ++i) {
    boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_)));
    threads.push_back(thread);
}

while (1) {
    char buffer [10];
    zmq_recv (responder_, buffer, 10, 0);
    io_service_.post(boost::bind(&server::handle_zeromq_message, buffer, this));
}
 2
Author: JSON,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-02-10 06:06:29

Rozwiązaniem jest również sprawdzenie io_service zamiast run ().

Sprawdź To rozwiązaniedla jakiegoś poll() info.

Użycie poll zamiast run pozwoli na sprawdzenie połączeń zmq bez problemów z blokowaniem.

 0
Author: g19fanatic,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:25:29