Kafka Consumer get key value pair
Obecnie pracuję z Kafką i Flinkiem, mam Kafkę działającą na moim lokalnym komputerze i stworzyłem temat, który jest konsumowany.
Desktop \ kafka \ bin \ windows>kafka-console-consumer.bat --zookeeper localhost: 2181-topic test
Ale to tylko odzyskiwanie wiadomości,
Czy jest sposób, aby uzyskać więcej szczegółów na temat wiadomości ? powiedzmy, że czas? klucz? Sprawdzilem dokumentacje Kafki ale nie znalazlem czegos na ten temat
2 answers
Używając out of the box console consumer (ja używam Kafka 0.9.0.1) możesz drukować tylko klucz i wartość wiadomości przy użyciu różnych formatów. Aby wydrukować klucz, ustaw właściwość print.key=true
.
Istnieje inna właściwość key.separator
, która domyślnie jest" \t " (zakładka), którą można również zmienić na cokolwiek chcesz.
Aby ustawić te właściwości, możesz utworzyć plik konfiguracyjny i użyć --consumer.config <config file>
lub przekazać właściwości za pomocą --property key=value
.
Możesz również zaimplementować swój własny formatter i używać go z --formatter
opcja, ale nadal będziesz mieć tylko klucz i wartość, ponieważ to jest to, co zapewnia cecha MessageFormatter (Zobacz writeTo poniżej).
trait MessageFormatter {
def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream)
def init(props: Properties) {}
def close() {}
}
Na przykład:
./bin/kafka-console-consumer.sh --new-consumer --bootstrap-server kafka-1:9092 --topic topic1 --property print.key=true --property key.separator="-" --from-beginning
key-p1
key-p2
key-p3
null-4
null-8
null-0
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-02-22 10:39:23
Użyj poniższego polecenia:
kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_name \
--from-beginning --formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true --property print.value=true \
--property key.deserialzer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-01-03 11:51:54