RabbitMQ QueueingConsumer możliwy wyciek pamięci

Mam następujący kod do zadeklarowania kolejki:

Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(getQueueName(), true,consumer);

I następujące, aby uzyskać następny obiekt dostawy i przetworzyć go:

    Delivery delivery = null;
    T queue = null;

    //loop over, continuously retrieving messages
    while(true) {

        try {
            delivery = consumer.nextDelivery();
            queue = deserialise(delivery.getBody());

            process(queue);

        } catch (ShutdownSignalException e) {
            logger.warn("Shutodwon signal received.");
            break;
        } catch (ConsumerCancelledException e) {
            logger.warn("Consumer cancelled exception: {}",e.getMessage());
            break;
        } catch (InterruptedException e) {
            logger.warn("Interuption exception: {}", e);
            break;
        }
    }

Kod deserializacji. Jak widać używam Kryo:

public T deserialise(byte[] body) {
    Kryo kryo= new Kryo();
    Input input = new Input(body);
    T deserialised = kryo.readObject(input, getQueueClass());
    input.close();

    return deserialised;
}

Jeśli uruchamiam to z kolejką zawierającą dużą liczbę obiektów, po około 2,7 milionie obiektów otrzymuję wyjątek out of memory. Znalazłem to pierwotnie, uruchamiając go w nocy z danymi wchodzącymi z JMeter z szybkością ~90 / s, która na początku zużywa bez żadnych problemów, ale rano zauważyłem dużą liczbę w RabbitMQ i wyjątek out of memory na konsumenta. Uruchomiłem go ponownie i użyłem analizatora pamięci Eclipse, aby określić, gdzie ta pamięć była używana. Z tego widzę, że java.util./ align = "left" / LinkedBlockingQueue, do którego odwołuje się com.rabbitmq.klient.QueueingConsumer rośnie i rośnie, aż zabraknie mu pamięci.

Czy muszę coś zrobić, aby powiedzieć Rabbitowi, aby uwolnił zasoby?

I może zwiększyć rozmiar sterty, ale obawiam się, że jest to tylko krótkoterminowa poprawka i może być coś w moim kodzie, co może mnie ugryźć wyciekiem pamięci kilka miesięcy po wdrożeniu produkcji.

Author: Arthur, 2012-10-02

4 answers

Mój błąd polegał na tym, że ustawiałem kanał na auto ack. Oznacza to, że każda wiadomość od królika była odbierana (uznana za odebraną). Naprawiłem (i przetestowałem) to przez dekalarowanie kanału do not auto-ACK: channel.basicConsume(getQueueName(), false,consumer); i po przetworzeniu queue, ack wiadomość: consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);.

Tak teraz wygląda moja Kolejka:

        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(getQueueName(), false, false, false, null);
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(getQueueName(), false,consumer);

Oraz następujące elementy do przetworzenia kolejki:

    Delivery delivery = null;
    T queue = null;

    //loop over, continuously retrieving messages
    while(true) {

        try {
            delivery = consumer.nextDelivery();
            queue = deserialise(delivery.getBody());
            process(queue);
            consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        } catch (ShutdownSignalException e) {
            logger.warn("Shutodwon signal received.");
            break;
        } catch (ConsumerCancelledException e) {
            logger.warn("Consumer cancelled exception: {}",e.getMessage());
            break;
        } catch (InterruptedException e) {
            logger.warn("Interuption exception: {}", e);
            break;
        } catch (IOException e) {
            logger.error("Could not ack message: {}",e);
            break;
        }
    }

Widzę teraz na ekranie zarządzania RabbitMQ, że wiadomości są dostarczane w bardzo wysokim tempie, ale nie są one ack ' D w tym tempie. Jeśli następnie zabiję mojego konsumenta, w ciągu około 30s wszystkie te wiadomości nie-ack ' D są przenoszone z powrotem do gotowej kolejki. Jedną z ulepszeń, które wprowadzę, jest ustawienie wartości basicQos: channel.basicQos(10); tak, aby nie było zbyt wiele wiadomości dostarczanych, ale nie-ack ' D. jest to pożądane, ponieważ oznacza to, że mogę uruchomić innego konsumenta w tej samej kolejce i rozpocząć przetwarzanie kolejki, a nie wszystko kończy się w pamięci nie-ack ' D i niedostępne dla innych konsumentów.

 6
Author: Arthur,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-10-08 12:04:17

Rozwiązaniem jest ustawienie basicQos- channel.basicQos(2);. Moja deklaracja kanału wygląda teraz tak:

        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(getQueueName(), false, false, false, null);
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(getQueueName(), true,consumer);
        channel.basicQos(2);

Ustawienie basicQos na 2 oznacza przechowywanie tylko 2 wiadomości w pamięci wewnętrznej. Więcej informacji oraz ciekawą dyskusję na temat wykorzystania algorytmu CoDel można znaleźć w http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/

 2
Author: Arthur,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-10-02 10:59:34

Problem polega na tym, że konsument nie może nadążyć za producentem, co powoduje, że kolejka rośnie bez ograniczeń. Musisz ograniczyć rozmiar kolejki i spowolnić producenta, gdy limit zostanie osiągnięty. Chciałbym również spojrzeć na optymalizację konsumenta, aby mógł nadążyć.

 1
Author: Peter Lawrey,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-10-02 09:06:35

Może to być problem, że obiekty nie zostaną zniszczone po ich zużyciu. Możesz pokazać kod do deserialize? Podejrzewam, że wysyłasz obiekty przez kolejkę i deserializujesz je za pomocą jakiegoś strumienia wejściowego obiektu / strumienia wejściowego tablicy bajtów. Jeśli nie zamykasz strumieni prawidłowo, może to spowodować wyciek pamięci.

 1
Author: robthewolf,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2012-10-02 09:53:59