Jak uzyskać dane ze starego offsetu w Kafce?

Używam zookeeper do pobierania danych z Kafki. I tutaj zawsze dostaję dane z ostatniego przesunięcia. Czy jest jakiś sposób, aby określić czas przesunięcia, aby uzyskać stare dane?

Jest jedna opcja autooffset.reset. Przyjmuje najmniejsze lub największe. Czy ktoś może wyjaśnić, co jest najmniejsze i największe. Can autooffset.reset pomaga w uzyskaniu danych ze starego punktu przesunięcia zamiast najnowszego punktu przesunięcia?

Author: Lorenzo Belli, 2013-02-18

7 answers

Konsumenci zawsze należą do grupy i dla każdej partycji Zookeeper śledzi postęp tej grupy konsumentów w partycji.

Aby pobrać Od początku, możesz usunąć wszystkie dane związane z postępem zgodnie z instrukcją

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");

Możesz również określić przesunięcie partycji, jak określono w core/src/main/scala/kafka/tools / UpdateOffsetsInZK.scala

ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)

Jednak przesunięcie nie jest indeksowane w czasie, ale wiesz dla każdego partycja jest sekwencją.

Jeśli Twoja wiadomość zawiera znacznik czasu (i uważaj, że ten znacznik czasu nie ma nic wspólnego z momentem, w którym Kafka otrzymała Twoją wiadomość), możesz spróbować zrobić indekser, który próbuje pobrać jeden wpis w krokach, zwiększając przesunięcie O N, i zapisać krotkę (temat X, część 2, offset 100, znacznik czasu) gdzieś.

Jeśli chcesz pobrać wpisy z określonego momentu w czasie, możesz zastosować wyszukiwanie binarne do szorstkiego indeksu, aż znajdziesz wejście chcesz i pobrać stamtąd.

 23
Author: Alex Rodrigues,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-06-13 10:09:28

Z Dokumentacji Kafki mówią "kafka.api.OffsetRequest.EarliestTime() znajduje początek danych w logach i zaczyna stamtąd przesyłać strumieniowo, kafka.api.OffsetRequest.LatestTime() będzie strumieniować tylko nowe wiadomości. Nie należy zakładać, że offset 0 jest offsetem początku, ponieważ wiadomości starzeją się z dziennika w czasie. "

Użyj SimpleConsumerExample tutaj: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0 + SimpleConsumer + Example

Podobne pytanie: Kafka High Level Consumer pobiera wszystkie wiadomości z tematu za pomocą Java API (odpowiednik --from-beginning)

To może pomóc

 7
Author: Hild,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 12:18:17

Zapoznaj się z dokumentem o kafka config: http://kafka.apache.org/08/configuration.html dla zapytania o najmniejsze i największe wartości parametru offset.

BTW, badając Kafkę, zastanawiałem się, jak odtworzyć wszystkie wiadomości dla konsumenta. Chodzi mi o to, czy grupa konsumentów przepytała wszystkie wiadomości i chce je odzyskać.

Sposobem na to jest usunięcie danych z zookeeper. Użyj Kafki.utils.Klasa zkutils do usunięcia węzła w zookeeper. Poniżej znajduje się jego sposób użycia:

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
 3
Author: Hussain Pirosha,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2013-11-05 12:38:55

Na Razie

Kafka FAQ Daj odpowiedź na ten problem.

Jak dokładnie uzyskać przesunięcia wiadomości dla określonego znacznika czasu za pomocą OffsetRequest?

Kafka pozwala odpytywać offsety wiadomości według czasu i robi to przy ziarnistości segmentu. Parametr timestamp jest uniksowym znacznikiem czasu i zapytanie o offset przez timestamp zwraca najnowsze możliwe przesunięcie wiadomości, która jest dołączana nie później niż podany timestamp. Istnieją 2 specjalne wartości znacznika czasu-najnowsze i najwcześniejsze. Dla każdej innej wartości uniksowego znacznika czasu Kafka otrzyma przesunięcie początkowe segmentu dziennika, które zostanie utworzone nie później niż podany znacznik czasu. Z tego powodu, a także ponieważ żądanie przesunięcia jest obsługiwane tylko przy ziarnistości segmentu, żądanie przesunięcia zwraca mniej dokładne wyniki dla większych rozmiarów segmentów.

Aby uzyskać dokładniejsze wyniki, możesz skonfigurować rozmiar segmentu dziennika na podstawie czasu (log.roll.ms) zamiast rozmiaru (log.segment.bajtów). Należy jednak zachować ostrożność, ponieważ może to zwiększyć liczbę programów obsługi plików z powodu częstego zwijania segmentów dziennika.


Plan Przyszłości

Kafka doda znacznik czasu do formatu wiadomości. Zobacz

Https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

 2
Author: zheolong,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-11-06 03:49:16

Kafka Protocol Doc jest doskonałym źródłem do zabawy z żądaniem/odpowiedzią/offsetami / wiadomościami: https://cwiki.apache.org/confluence/display/KAFKA/A + Guide + To + the+Kafka + Protocol używasz prostego przykładu konsumenckiego, gdzie następujący kod pokazuje stan:

FetchRequest req = new FetchRequestBuilder()

        .clientId(clientName)

        .addFetch(a_topic, a_partition, readOffset, 100000) 

        .build();

FetchResponse fetchResponse = simpleConsumer.fetch(req);

Ustaw readOffset, aby rozpocząć początkowe przesunięcie od. ale musisz sprawdzić maksymalne przesunięcie, jak również powyżej, zapewni ograniczoną liczbę przesunięć, jak na FetchSize w ostatniej param metody addFetch.

 1
Author: usman,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-09-01 06:47:50

Używając Kafkaconsumera możesz używać Seek, SeekToBeginning i SeekToEnd, aby poruszać się w strumieniu.

Https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#seekToBeginning(java.util.Collection)

Ponadto, jeśli nie jest zapewniona Żadna partycja, będzie szukać pierwszego przesunięcia dla wszystkich aktualnie przypisanych partycji.

 0
Author: CamW,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-10-27 07:59:07

Próbowałeś tego?

Bin/kafka-console-consumer.sh --bootstrap-server localhost: 9092 --topic test --from-beginning

Wyświetli wszystkie wiadomości dla danego tematu, "test" w tym przykładzie.

Więcej szczegółów z tego linku https://kafka.apache.org/quickstart

 0
Author: Gang Peng,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-01-20 06:06:46