RabbitMQ / AMQP: jedna kolejka, wielu konsumentów dla tej samej wiadomości?

Zaczynam używać RabbitMQ i AMQP w ogóle.

  • mam kolejkę wiadomości
  • mam wielu konsumentów, które chciałbym zrobić różne rzeczy z tej samej wiadomości .

Większość dokumentacji RabbitMQ wydaje się skupiać na round-robin, tj. gdzie pojedyncza wiadomość jest konsumowana przez jednego konsumenta, a obciążenie jest rozłożone między każdego konsumenta. To jest zachowanie, którego jestem świadkiem.

Przykład: producent posiada pojedynczą kolejkę i wysyła wiadomości co 2 sek:
var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

A oto konsument:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

Jeśli uruchomię konsumenta dwa razy, widzę, że każdy konsument konsumuje alternatywne wiadomości w zachowaniu round-robin. Np. będę widział wiadomości 1, 3, 5 w jednym terminalu, 2, 4, 6 w drugim.

Moje pytanie brzmi:

  • Czy każdy konsument może otrzymywać te same wiadomości? Ie, obaj konsumenci otrzymują wiadomość 1, 2, 3, 4, 5, 6? Jak to się nazywa w AMQP / RabbitMQ mówić? Jak jest normalnie skonfigurowany?

  • Czy to się często robi? Czy powinienem po prostu wysłać wiadomość do dwóch osobnych kolejek, z jednym konsumentem?

Author: mikemaccana, 2012-05-16

10 answers

Czy każdy konsument może otrzymywać te same wiadomości? Ie, obaj konsumenci otrzymują wiadomość 1, 2, 3, 4, 5, 6? Jak to się nazywa w mowie AMQP/RabbitMQ? Jak jest normalnie skonfigurowany?

Nie, Nie, jeśli konsumenci są w tej samej kolejce. Z Poradnika RabbitMQ AMQP Concepts :

Ważne jest, aby zrozumieć, że w AMQP 0-9-1 wiadomości są równoważone między konsumentami.

To wydaje się sugerować, że round-robin zachowanie w kolejce jest podaną , a nie konfigurowalną. Aby ten sam identyfikator wiadomości był obsługiwany przez wielu konsumentów, wymagane są osobne kolejki.

Czy to się często robi? Czy powinienem po prostu wysłać wiadomość do dwóch osobnych kolejek, z jednym konsumentem?

Nie to nie jest, pojedyncza Kolejka / wielu konsumentów z każdym konsumentem obsługującym ten sam identyfikator wiadomości nie jest możliwe. Mając trasę wymiany wiadomość na dwie osobne kolejki jest rzeczywiście lepiej.

Ponieważ nie wymagam zbyt skomplikowanego routingu, wymiana fanout poradzi sobie z tym ładnie. Wcześniej nie skupiałem się zbytnio na wymianach, ponieważ node-amqp ma koncepcję "domyślnej wymiany" pozwalającej publikować wiadomości bezpośrednio do połączenia, jednak większość wiadomości AMQP jest publikowana na określonej giełdzie.

Oto moja wymiana fanout, zarówno wysyłanie jak i odbieranie:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})
 78
Author: mikemaccana,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-11-01 01:51:17

Po prostu przeczytaj rabbitmq tutorial. Publikujesz wiadomość do exchange, a nie do kolejki; jest ona następnie kierowana do odpowiednich kolejek. W Twoim przypadku powinieneś powiązać osobną kolejkę dla każdego konsumenta. W ten sposób mogą konsumować wiadomości całkowicie niezależnie.

 12
Author: driushkin,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-05-16 15:11:01

Ostatnie kilka odpowiedzi jest prawie poprawnych - mam mnóstwo aplikacji, które generują wiadomości, które muszą skończyć się z różnymi konsumentami, więc proces jest bardzo prosty.

Jeśli chcesz, aby wielu konsumentów miało tę samą wiadomość, wykonaj następującą procedurę.

Utwórz wiele kolejek, po jednej dla każdej aplikacji, która ma odbierać wiadomość, we właściwościach każdej kolejki "bind" znacznik trasowania z AMQ.wymiana bezpośrednia. Zmień aplikację publikowania, aby wysłać do amq.Kieruj i używaj znacznika routingu (nie Kolejka). AMQP skopiuje wiadomość do każdej kolejki z tym samym powiązaniem. Działa jak urok :)

Przykład: Powiedzmy, że generuję ciąg JSON, publikuję go do "amq.direct " exchange using the routing tag "new-sales-order", I have a queue for my order_printer app that prints order, I have a queue for my billing system that will send a copy of the order and invoice the client and I have a web archive system where I archive orders for historic / compliance reasons i mam interfejs internetowy klienta, gdzie zamówienia są śledzone, gdy inne informacje pojawiają się o zamówieniu.

Więc moje kolejki to: order_printer, order_billing, order_archive i order_tracking Wszystkie mają przypisany do nich znacznik "new-sales-order", wszystkie 4 otrzymają dane JSON.

Jest to idealny sposób na wysyłanie danych bez wiedzy aplikacji publikującej lub dbania o aplikacje odbierające.

 10
Author: z900collector,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-09-14 01:17:02

Wzór wysyłania jest relacją jeden do jednego. Jeśli chcesz "wysłać" do więcej niż jednego odbiornika, powinieneś używać wzoru pub / sub. Zobacz http://www.rabbitmq.com/tutorials/tutorial-three-python.html Po Więcej Szczegółów.

 5
Author: Peter Ritchie,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-12-27 21:50:16

Tak każdy konsument może otrzymywać te same wiadomości. Zobacz też http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq.com/tutorials/tutorial-five-python.html

Dla różnych sposobów przesyłania wiadomości. Wiem, że są dla Pythona i Javy, ale dobrze jest zrozumieć zasady, zdecydować, co robisz, a następnie znaleźć, jak to zrobić w JS. To brzmi jak chcesz zrobić simple fanout (tutorial 3 ), który wysyła wiadomości do wszystkich kolejek podłączonych do exchange.

Różnica w tym, co robisz i co chcesz zrobić, polega na tym, że zamierzasz skonfigurować i wymienić lub wpisać fanout. Fanout excahnges wysyła wszystkie wiadomości do wszystkich podłączonych kolejek. Każda kolejka będzie miała konsumenta, który będzie miał dostęp do wszystkich wiadomości oddzielnie.

Tak jest to często robione, jest to jedna z cech AMPQ.

 4
Author: robthewolf,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-05-16 15:08:40

RabbitMQ / AMQP: pojedyncza Kolejka, wielu konsumentów dla tej samej wiadomości i odświeżania strony.

rabbit.on('ready', function () {    });
    sockjs_chat.on('connection', function (conn) {

        conn.on('data', function (message) {
            try {
                var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));

                if (obj.header == "register") {

                    // Connect to RabbitMQ
                    try {
                        conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                            autoDelete: false,
                            durable: false,
                            exclusive: false,
                            confirm: true
                        });

                        conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                            durable: false,
                            autoDelete: false,
                            exclusive: false
                        }, function () {
                            conn.channel = 'my-queue-'+obj.agentID;
                            conn.q.bind(conn.exchange, conn.channel);

                            conn.q.subscribe(function (message) {
                                console.log("[MSG] ---> " + JSON.stringify(message));
                                conn.write(JSON.stringify(message) + "\n");
                            }).addCallback(function(ok) {
                                ctag[conn.channel] = ok.consumerTag; });
                        });
                    } catch (err) {
                        console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                    }

                } else if (obj.header == "typing") {

                    var reply = {
                        type: 'chatMsg',
                        msg: utils.escp(obj.msga),
                        visitorNick: obj.channel,
                        customField1: '',
                        time: utils.getDateTime(),
                        channel: obj.channel
                    };

                    conn.exchange.publish('my-queue-'+obj.agentID, reply);
                }

            } catch (err) {
                console.log("ERROR ----> " + err.stack);
            }
        });

        // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
        conn.on('close', function () {
            try {

                // Close the socket
                conn.close();

                // Close RabbitMQ           
               conn.q.unsubscribe(ctag[conn.channel]);

            } catch (er) {
                console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
            }
        });
    });
 3
Author: durai,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-06-02 06:16:52

Aby uzyskać pożądane zachowanie, po prostu niech każdy konsument będzie konsumował z własnej kolejki. Będziesz musiał użyć typu non-direct exchange (temat, nagłówek, fanout), aby wysłać wiadomość do wszystkich kolejek naraz.

 1
Author: Skylos,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-11-01 20:11:57

Jak oceniam Twój przypadek to:

  • Mam kolejkę wiadomości (twoje źródło do odbierania wiadomości, nazwijmy je q111)

  • Mam wielu konsumentów, którzy chciałbym robić różne rzeczy z tym samym przesłaniem.

Twój problem polega na tym, że podczas gdy 3 wiadomości są odbierane przez tę kolejkę, wiadomość 1 jest konsumowana przez konsumenta A, inni konsumenci B I C konsumują wiadomość 2 i 3. Gdzie jak jesteś w potrzebie konfiguracji gdzie rabbitmq przechodzi na tym samym kopie wszystkich tych trzech wiadomości (1,2,3) do wszystkich trzech podłączonych konsumentów (A,B,C) jednocześnie.

Chociaż w tym celu można wykonać wiele konfiguracji, prostym sposobem jest użycie następującej koncepcji dwuetapowej:

  • Użyj dynamicznego rabbitmq-shovel, aby odebrać wiadomości z żądanej kolejki(q111) i opublikować je na giełdzie fanout (exchange stworzony i dedykowany wyłącznie do tego celu).
  • Teraz ponownie skonfiguruj swoich konsumentów A, B&C (którzy słuchali queue (q111)), aby słuchać z tej wymiany Fanout bezpośrednio przy użyciu ekskluzywnej i anonimowej kolejki dla każdego konsumenta.

Uwaga: Podczas korzystania z tej koncepcji nie pobieraj bezpośrednio z kolejki źródeł (q111), ponieważ wiadomości już zużyte nie będą przesyłane do wymiany Fanout.

Jeśli uważasz, że to nie spełnia twoich wymagań... zapraszam do zamieszczania swoich sugestii: -)

 1
Author: Anshuman Banerjee,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-08-12 06:59:08

Jeśli przypadkiem używasz biblioteki amqplib tak jak ja, mają poręczny przykład implementacji Publish/Subscribe RabbitMQ tutorial, który może Ci się przydać.

 0
Author: brettjonesdev,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-03-02 21:46:55

Myślę, że powinieneś sprawdzić wysyłanie wiadomości za pomocą wymiennika fan-out . W ten sposób otrzymasz tę samą wiadomość dla różnych konsumentów, pod tabelą RabbitMQ tworzy kolejki differents dla każdego z tych nowych konsumentów / subskrybentów.

To jest link do zobacz przykład samouczka w javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html

 0
Author: Manuel Alejandro Diaz Serret,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-11-04 18:46:19