Kolejka martwych liter (DLQ) dla Kafki ze sprężyną-kafka

Jak najlepiej zaimplementować koncepcję kolejki martwych liter (DLQ) w aplikacji Spring Boot 2.0 przy użyciu spring-kafka 2.1.x mieć wszystkie wiadomości, które nie zostały przetworzone przez @KafkaListener metoda jakiejś fasoli wysłane do jakiegoś predefiniowanego tematu Kafka DLQ i nie stracić jednej wiadomości?

Tak więc zapis Kafki jest albo:

  1. pomyślnie przetworzone,
  2. nie został przetworzony i jest wysyłany do DLQ temat,
  3. nie został przetworzony, nie jest wysyłany do tematu DLQ (z powodu nieoczekiwanego problemu), więc zostanie ponownie skonsumowany przez słuchacza.

Próbowałem utworzyć kontener słuchacza z niestandardową implementacjąErrorHandler wysyłanie rekordów nie udało się przetworzyć do tematu DLQ przy użyciu KafkaTemplate. Używanie wyłączonego automatycznego zatwierdzania i rekord AckMode.

spring.kafka.enable-auto-ack=false
spring.kafka.listener.ack-mode=RECORD

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ErrorHandler {

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Value("${dlqTopic}")
    private String dlqTopic;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
        log.error("Error, sending to DLQ...");
        kafkaTemplate.send(dlqTopic, record.key(), record.value());
    }
}

Wygląda na to, że ta implementacja nie gwarantuje #3. Jeśli wyjątek zostanie wrzucony do DlqErrorHandler rekord nie zostanie ponownie skonsumowany przez słuchacza.

Czy użycie kontenera transactional listener pomoże?

factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);

Czy Jest jakiś wygodny sposób na wdrożenie koncepcji DLQ przy użyciu Spring Kafka?

Aktualizacja 28/03/2018

Dzięki odpowiedzi Gary ' ego Russella udało mi się osiągnąć pożądane zachowanie, implementując DlqErrorHandler w następujący sposób

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
    ...
    @Override
    public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        Consumerrecord<?, ? record = records.get(0);
        try {
            kafkaTemplate.send("dlqTopic", record.key, record.value());
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
            // Other records may be from other partitions, so seek to current offset for other partitions too
            // ...
        } catch (Exception e) {
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
            // Other records may be from other partitions, so seek to current offset for other partitions too
            // ...
            throw new KafkaException("Seek to current after exception", thrownException);
        }
    }
}

W ten sposób, jeśli ankieta konsumencka zwróci 3 rekordy (1, 2, 3) i 2. nie można przetworzyć:

  • 1 zostanie przetworzony
  • 2 nie będą przetwarzane i wysyłane do DLQ
  • 3 dzięki Consumer dążyć do nagrywania.offset() + 1, zostanie dostarczony do słuchacza

Jeśli wysłanie do DLQ nie powiedzie się, konsument szuka rekordu.offset () i zapis zostanie ponownie dostarczony do słuchacza (a wysłanie do DLQ prawdopodobnie zostanie wycofane).

Author: Evgeniy Khyst, 2018-03-27

1 answers

Zobacz SeekToCurrentErrorHandler.

Gdy wystąpi wyjątek, zwraca się do konsumenta, aby wszystkie nieprzetworzone zapisy zostały ponownie dostarczone w następnej ankiecie.

Możesz użyć tej samej techniki (np. podklasy), aby zapisać do DLQ i szukać bieżącego offsetu (i innych nieprzetworzonych), jeśli zapis DLQ nie powiedzie się, i szukać tylko pozostałych rekordów, jeśli zapis DLQ powiedzie się.

 3
Author: Gary Russell,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-06-22 08:46:54