Kafka Consumer get key value pair

Obecnie pracuję z Kafką i Flinkiem, mam Kafkę działającą na moim lokalnym komputerze i stworzyłem temat, który jest konsumowany.

Desktop \ kafka \ bin \ windows>kafka-console-consumer.bat --zookeeper localhost: 2181-topic test

Ale to tylko odzyskiwanie wiadomości, Tutaj wpisz opis obrazka

Czy jest sposób, aby uzyskać więcej szczegółów na temat wiadomości ? powiedzmy, że czas? klucz? Sprawdzilem dokumentacje Kafki ale nie znalazlem czegos na ten temat

Author: Matthias J. Sax, 2016-12-30

2 answers

Używając out of the box console consumer (ja używam Kafka 0.9.0.1) możesz drukować tylko klucz i wartość wiadomości przy użyciu różnych formatów. Aby wydrukować klucz, ustaw właściwość print.key=true.

Istnieje inna właściwość key.separator, która domyślnie jest" \t " (zakładka), którą można również zmienić na cokolwiek chcesz.

Aby ustawić te właściwości, możesz utworzyć plik konfiguracyjny i użyć --consumer.config <config file> lub przekazać właściwości za pomocą --property key=value.

Możesz również zaimplementować swój własny formatter i używać go z --formatter opcja, ale nadal będziesz mieć tylko klucz i wartość, ponieważ to jest to, co zapewnia cecha MessageFormatter (Zobacz writeTo poniżej).

trait MessageFormatter {
    def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)

    def init(props: Properties) {}

    def close() {}
}

Na przykład:

./bin/kafka-console-consumer.sh --new-consumer --bootstrap-server kafka-1:9092 --topic topic1 --property print.key=true --property key.separator="-" --from-beginning
key-p1
key-p2
key-p3
null-4
null-8
null-0
 68
Author: Luciano Afranllie,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-02-22 10:39:23

Użyj poniższego polecenia:

kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_name \ 
      --from-beginning --formatter kafka.tools.DefaultMessageFormatter \
      --property print.key=true --property print.value=true \
      --property key.deserialzer=org.apache.kafka.common.serialization.StringDeserializer \
      --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 10
Author: ankit kansal,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-01-03 11:51:54