Java, Jak uzyskać ilość wiadomości w temacie w apache kafka

Używam Apache kafka do przesyłania wiadomości. Zaimplementowałem producenta i konsumenta w Javie. Jak możemy uzyskać liczbę wiadomości w temacie?

Author: Eric Leschinski, 2015-02-18

13 answers

Jedyny sposób, który przychodzi na myśl z konsumenckiego punktu widzenia, to rzeczywiście spożywać wiadomości i liczyć je wtedy.

Broker Kafka wystawia liczniki JMX na liczbę wiadomości otrzymanych od uruchomienia, ale nie możesz wiedzieć, ile z nich zostało już wyczyszczonych.

W najczęstszych scenariuszach wiadomości w Kafce są najlepiej postrzegane jako strumień nieskończony, a uzyskanie dyskretnej wartości liczby, która jest obecnie przechowywana na dysku, nie ma znaczenia. Ponadto rzeczy skomplikuj się, gdy masz do czynienia z klastrem brokerów, którzy mają podzbiór wiadomości w temacie.

 16
Author: Lundahl,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-02-19 21:57:45

To nie jest java, ale może być przydatne

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell 
  --broker-list <broker>:  <port> 
  --topic <topic-name> --time -1 --offsets 1 
  | awk -F  ":" '{sum += $3} END {print sum}'
 54
Author: ssemichev,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-15 20:02:04

Używam tego do porównywania mojego POC. Element, którego chcesz użyć ConsumerOffsetChecker. Możesz go uruchomić za pomocą skryptu bash, jak poniżej.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup

I poniżej jest wynik : Tutaj wpisz opis obrazka Jak widać na czerwonym polu, 999 to numer wiadomości aktualnie w temacie.

Aktualizacja: ConsumerOffsetChecker jest przestarzały od wersji 0.10.0, możesz zacząć używać ConsumerGroupCommand.

 13
Author: Rudy,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-07-17 16:25:05

Użyj https://prestodb.io/docs/current/connector/kafka-tutorial.html

Silnik super SQL, dostarczany przez Facebook, który łączy się z kilkoma źródłami danych (Cassandra, Kafka, JMX, Redis ...).

PrestoDB działa jako serwer z opcjonalnymi workerami (istnieje tryb autonomiczny bez dodatkowych workerów), następnie do tworzenia zapytań używa się małego pliku wykonywalnego jar (zwanego presto CLI).

Gdy dobrze skonfigurujesz Serwer Presto, możesz użyć tradycyjnego SQL:

SELECT count(*) FROM TOPIC_NAME;
 9
Author: Thomas Decaux,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-21 19:24:13

Aby uzyskać wszystkie wiadomości zapisane dla tematu, możesz odszukać konsumenta do początku i końca strumienia dla każdej partycji i zsumować wyniki

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
        .map(p -> new TopicPartition(topic, p.partition()))
        .collect(Collectors.toList());
    consumer.assign(partitions); 
    consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
        .collect(Collectors.toMap(Function.identity(), consumer::position));
    consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
 3
Author: AutomatedMike,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-10-27 11:02:56

Polecenie Apache Kafka, aby uzyskać nie obsługiwane wiadomości na wszystkich partycjach tematu:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group

Druki:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none

Kolumna 6 to wiadomości nieobsługiwane. Dodaj je tak:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} 
    END {print sum}'

Awk odczytuje wiersze, pomija linię nagłówka i dodaje szóstą kolumnę, a na końcu wypisuje sumę.

Druki

5
 2
Author: Eric Leschinski,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-08-31 14:07:58

W najnowszych wersjach Kafka Manager znajduje się kolumna zatytułowana podsumowująca Ostatnie przesunięcia .

Tutaj wpisz opis obrazka

 2
Author: f01,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-01-05 06:38:48

Czasami warto poznać liczbę wiadomości w każdej partycji, na przykład podczas testowania niestandardowego partycjonera.Kolejne kroki zostały przetestowane pod kątem pracy z Kafka 0.10.2.1-2 z Confluent 3.2. Biorąc pod uwagę temat Kafki, kt i następującą linię poleceń:

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt

Który wyświetla Przykładowe wyjście pokazujące liczbę wiadomości w trzech partycjach:

kt:2:6138
kt:1:6123
kt:0:6137

Liczba wierszy może być mniej więcej zależna od liczby partycji dla tematu.

 2
Author: pdp,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-04-29 04:26:16

Nie próbowałem tego, ale to ma sens.

Możesz również użyć kafka.tools.ConsumerOffsetChecker (źródło).

 1
Author: hba,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-02 21:30:21

Używając klienta Javy Kafka 2.11-1.0.0, możesz zrobić następujące rzeczy:

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            // after each message, query the number of messages of the topic
            Set<TopicPartition> partitions = consumer.assignment();
            Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
            for(TopicPartition partition : offsets.keySet()) {
                System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
            }
        }
    }

Wyjście jest takie:

offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
 1
Author: Christophe Quintard,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-15 17:34:04

./kafka-console-consumer.sh --from-beginning --new-consumer --bootstrap-server yourbroker: 9092 --property print.key = true --property print.value = false --property print.partition --topic yourtopic --timeout-ms 5000 | tail -n 10 / grep "przetworzony w sumie"

 0
Author: Borislav Markov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-08-06 14:37:06

Fragmenty Kafka docs

Deprecations in 0.9.0.0

The kafka-consumer-offset-checker.sh kafka.narzędzia.ConsumerOffsetChecker) został wycofany. Idąc do przodu, Proszę użyć kafka-consumer-groups.sh kafka.admin.ConsumerGroupCommand) dla tej funkcjonalności.

Uruchamiam Kafka broker z włączonym SSL zarówno dla serwera, jak i klienta. Poniżej polecenia używam

kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x

Gdzie /tmp / ssl_config jest jak poniżej

security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
 0
Author: S R Bandi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-08-20 09:25:43

Jeśli masz dostęp do interfejsu JMX serwera, przesunięcia początku i końca są obecne w:

kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER

(musisz wymienić TOPICNAME & PARTITIONNUMBER). Pamiętaj, że musisz sprawdzić dla każdej z replik danej partycji, lub musisz dowiedzieć się, który z brokerów jest liderem dla danej partycji (i to może się zmieniać w czasie).

Alternatywnie, można użyć Kafka Consumer metody beginningOffsets i endOffsets.

 0
Author: Adam Kotwasinski,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-08-20 15:18:23