RabbitMQ: persistent message with Topic exchange

Jestem nowy w RabbitMQ.

Skonfigurowałem wymianę "tematu". Konsument może zostać uruchomiony po wydawcy. Chciałbym, aby konsumenci mogli otrzymywać wiadomości, które zostały wysłane przed ich powstaniem, a które nie zostały jeszcze skonsumowane.

Wymiana jest skonfigurowana z następującymi parametrami:

exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0

Wiadomości są publikowane z tym parametrem:

delivery_mode => 2

Konsumenci używają get() do pobierania wiadomości z exchange.

Niestety, każda wiadomość opublikowana przed uruchomieniem klienta jest tracona. Używałem różnych kombinacji.

Moim problemem jest to, że wymiana nie przechowuje wiadomości. Może muszę mieć kolejkę między wydawcą a kolejką. Ale to nie wydaje się działać z wymianą "temat", gdzie wiadomości są kierowane przez klucz.

Jakieś pomysły, jak mam postąpić. Używam bindowania Perla Net:: RabbitMQ (shouldn ' t matter) oraz RabbitMQ 2.2.0.
Author: alanc10n, 2011-05-27

2 answers

Potrzebujesz trwałej kolejki do przechowywania wiadomości, Jeśli nie ma podłączonych konsumentów dostępnych do przetwarzania wiadomości w momencie ich opublikowania.

Exchange nie przechowuje wiadomości, ale kolejka może. Mylące jest to, że giełdy mogą być oznaczone jako "trwałe", ale wszystko, co naprawdę oznacza, to to, że sama Giełda nadal tam będzie, jeśli ponownie uruchomisz brokera, ale to nie oznacza, że wszelkie wiadomości wysyłane do tej giełdy są automatycznie / align = "left" /

Biorąc to pod uwagę, oto dwie opcje:

  1. wykonaj krok administracyjny zanim zaczniesz samodzielnie tworzyć kolejki. Można to zrobić za pomocą interfejsu WWW lub narzędzi wiersza poleceń. Upewnij się, że utworzysz ją jako trwałą kolejkę, aby przechowywała wszelkie wiadomości kierowane do niej, nawet jeśli nie ma aktywnych konsumentów.
  2. zakładając, że konsumenci są zakodowani, aby zawsze deklarować (a zatem automatycznie tworzyć) swoje wymiany i kolejki przy starcie (i że deklarują je jako trwałe), po prostu Uruchom wszystkich konsumentów co najmniej raz przed uruchomieniem jakichkolwiek wydawców. Zapewni to, że wszystkie Twoje kolejki zostaną poprawnie utworzone. Następnie możesz wyłączyć konsumentów, dopóki nie będą naprawdę potrzebni, ponieważ kolejki będą uporczywie przechowywać wszelkie przyszłe wiadomości kierowane do nich.

Wybrałbym #1. Nie może być wiele kroków do wykonania i zawsze można skrypt kroki wymagane, aby mogły być powtórzone. Ponadto, jeśli wszyscy twoi konsumenci będą wyciągać z tej samej pojedynczej kolejki (zamiast mieć dedykowaną kolejkę), to naprawdę minimalny nakład administracyjny.

Kolejki są czymś, co należy odpowiednio zarządzać i kontrolować. W przeciwnym razie możesz skończyć z nieuczciwymi konsumentami deklarującymi trwałe kolejki, używającymi ich przez kilka minut, ale nigdy więcej. Wkrótce potem pojawi się stale rosnąca kolejka, w której nic nie zmniejsza jej rozmiaru, i zbliżająca się apokalipsa brokerów.

 58
Author: Brian Kelly,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2011-05-27 17:29:34

Jak wspomniał Brian, exchange nie przechowuje wiadomości i jest głównie odpowiedzialny za przekierowanie Wiadomości do innego exchange / s lub kolejki / S. Jeśli exchange nie jest związane z kolejką, wtedy wszystkie wiadomości wysłane do tej giełdy zostaną "utracone"

Nie należy deklarować stałych kolejek klientów w skrypcie wydawcy, ponieważ może to nie być skalowalne. Kolejki mogą być tworzone dynamicznie przez wydawców i kierowane wewnętrznie za pomocą exchange-to-exchange Wiązanie.

RabbitMQ obsługuje wiązania exchange-to-exchange, które pozwolą na elastyczność topologii, odsprzęganie i inne korzyści. Możesz przeczytać więcej tutaj na RabbitMQ Exchange to Exchange Bindings [AMPQ]

RabbitMQ Exchange To Exchange Binding

Przykładowa Topologia

Przykładowy kod Pythona do tworzenia powiązania exchange-to-exchange z zachowaniem trwałości, jeśli żaden konsument nie jest obecny przy użyciu queue.

#!/usr/bin/env python
import pika
import sys


connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()


#Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
channel.exchange_declare(exchange='data_gateway',
exchange_type='fanout',
durable=True,
auto_delete=False)

#Declares the processing exchange to be used.Routes messages to various queues. For internal use only
channel.exchange_declare(exchange='data_distributor',
exchange_type='topic',
durable=True,
auto_delete=False)

#Binds the external/producer facing exchange to the internal exchange
channel.exchange_bind(destination='data_distributor',source='data_gateway')

##Create Durable Queues binded to the data_distributor exchange
channel.queue_declare(queue='trade_db',durable=True)
channel.queue_declare(queue='trade_stream_service',durable=True)
channel.queue_declare(queue='ticker_db',durable=True)
channel.queue_declare(queue='ticker_stream_service',durable=True)
channel.queue_declare(queue='orderbook_db',durable=True)
channel.queue_declare(queue='orderbook_stream_service',durable=True)

#Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')
 16
Author: Skillachie,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-06-27 05:37:26