0MQ: jak używać ZeroMQ w sposób threadsafe?

Przeczytałem poradnik ZeroMq i natknąłem się na:

Nie wolno dzielić gniazd ØMQ pomiędzy nici. Gniazda ØMQ nie są threadsafe. Technicznie jest to możliwe aby to zrobić, ale wymaga semaforów, zamki lub muteksy. To sprawi, że Twoje aplikacja powolna i delikatna. Jedyny miejsce, gdzie jest zdalnie zdrowy na współdzielenie gniazd między wątkami jest w powiązania językowe, które trzeba zrobić magia jak śmieciarka na gniazda.

I później:

Pamiętaj: nie używaj ani nie zamykaj gniazd z wyjątkiem wątku, który je utworzył.

Zrozumiałem również, że ZeroMQ Context jest threadsafe.

Jeśli Klasa zarejestruje Zdarzenie innej klasy w. Net, zdarzenie to może być wywołane z innego wątku niż wątek, na którym został utworzony listener.

Myślę, że są tylko dwie opcje, aby móc wysłać coś przez ZeroMQ-Sockets z w obrębie parzystokopytnych:

    W związku z tym, że nie jest to możliwe, nie jest to możliwe, ponieważ nie jest to możliwe.]}
  • Utwórz nowy ZeroMQ - Socket / uzyskaj exisiting ZeroMQ - Socket dla wątku w programie parzystym, używając threadsafe ZeroMQ - Context

Wydaje się, że 0MQ-Guide zniechęca do pierwszego i nie sądzę, aby tworzenie nowego gniazda ZeroMq dla każdego wątku było wykonalne / droga do zrobienia.

My Pytanie :
Jaki jest prawidłowy wzór (sposób, w jaki ma być) do publikowania wiadomości przez 0MQ z wewnątrz eventhandler?

Czy autorzy przewodnika mieli na uwadze powiązanie ZeroMQ dla. Net, gdy pisali:]}

The only miejsce, gdzie jest zdalnie zdrowy na współdzielenie gniazd między wątkami jest w powiązania językowe, które trzeba zrobić magia jak śmieciarka na gniazda. ?

Oto jakiś samplecode, aby podkreślić mój problem/pytanie:

public class ExampleClass
{
    public event EventHandler<ByteEventArgs> SomethinIsCalledFromAnotherThread;
}

public class ByteEventArgs : EventArgs
{
    public byte[] BytesToSend;
}


public class Dispatcher
{
    ZMQ.Context ctx;

    public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance)
    {
        this.ctx = mqcontext;
        exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler<ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
    }

    void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e)
    {
        // this method might be called by a different thread. So I have to get a new socket etc?
        using (var socket = ctx.Socket(ZMQ.SocketType.PUSH))
        {
            // init socket etc..... and finally: 
            socket.Send(e.BytesToSend);
        }
        // isn't that too much overhead?
    }
}
Author: tobsen, 2011-04-30

3 answers

W. NET framework v4 i nowszych możesz użyć współbieżnej kolekcji, aby rozwiązać ten problem. Mianowicie wzór Producent-konsument. Wiele wątków (programów obsługi) może wysyłać dane do kolejki bezpiecznej dla wątków, a tylko jeden wątek zużywa dane z kolejki i wysyła je za pomocą gniazda.

Oto pomysł:

sendQueue = new BlockingCollection<MyStuff>(new ConcurrentQueue<MyStuff>());
// concurrent queue can accept from multiple threads/handlers safely
MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend);

// start single-threaded data send loop
Task.Factory.StartNew(() => {
    using(var socket = context.Socket()) {
        MyStuff stuffToSend;
        // this enumerable will be blocking until CompleteAdding is called
        foreach(var stuff in sendQueue.GetConsumingEnumerable())
            socket.Send(stuff.Serialize());
    }
});

// break out of the send loop when done
OnMyAppExit += sendQueue.CompleteAdding;
 7
Author: Vadim Chekan,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-11-30 20:58:42

Możesz utworzyć wiele gniazd 0MQ, z pewnością tyle, ile masz wątków. Jeśli utworzysz gniazdo w jednym wątku, a użyjesz go w innym, musisz wykonać pełną barierę pamięci między tymi dwoma operacjami. Wszystko inne spowoduje dziwne losowe awarie w libzmq, ponieważ obiekty socket nie są threadsafe.

Jest kilka konwencjonalnych wzorców, choć Nie wiem jak te mapują konkretnie do. NET:

  1. Utwórz gniazda w wątkach, które ich używają, kropka. Podziel się konteksty między wątkami, które są ściśle związane w jeden proces i tworzyć oddzielne treści w wątkach, które nie są ściśle związane. W interfejsie wysokiego poziomu C API (czmq) są one nazywane wątkami załączonymi i odłączonymi.
  2. tworzy gniazdo w wątku nadrzędnym i przekazuje w czasie tworzenia wątku do dołączonego wątku. Wywołanie tworzenia wątku wykona pełną barierę pamięci. Od tego momentu należy używać tylko gniazda w wątku potomnym. "użyj" oznacza recv, send, setsockopt, getsockopt, i blisko.
  3. Utwórz gniazdo w jednym wątku i użyj w innym, wykonując własną pełną barierę pamięci między każdym użyciem. Jest to niezwykle delikatne i jeśli nie wiesz, czym jest "pełna bariera pamięci", nie powinieneś tego robić.
 24
Author: Pieter Hintjens,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-04-30 14:05:01

Nie zapomnij rzucić okiem na transport inproc. Przydatne może być użycie inproc: / / sockets do komunikacji interthread i mieć jeden wątek, który otwiera sockety, aby rozmawiać z innymi procesami / serwerami.

Nadal potrzebujesz co najmniej jednego gniazda na wątek, ale te inproc w ogóle nie obejmują warstwy Sieci IP.

 2
Author: Michael Dillon,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-07-01 01:47:30