Jak zapisać najnowszy offset, który Spark zużyty do ZK lub Kafka i może odczytać po ponownym uruchomieniu

Używam Kafka 0.8.2 do odbioru danych z AdExchange następnie używam Spark Streaming 1.4.1 do przechowywania danych do MongoDB.

Mój problem polega na ponownym uruchomieniu mojej pracy Spark Streaming, na przykład zaktualizuj nową wersję, Napraw błąd, dodaj nowe funkcje. Będzie nadal czytać najnowsze offset z kafka w tym czasie, a następnie utracę dane ADX push do Kafki podczas ponownego uruchamiania zadania.

Próbuję coś w stylu auto.offset.reset -> smallest, ale odbierze od 0 - > Ostatnio dane były ogromne i zduplikowane w db.

Staram się również ustawić konkretne group.id i consumer.id do Spark ale to samo.

Jak zapisać najnowszą spark offset do zookeeper lub kafka, a następnie odczytać z niej najnowszą offset?

Author: giaosudau, 2015-08-06

4 answers

Jeden z konstruktorów funkcji createDirectStream może uzyskać mapę, która będzie zawierała identyfikator partycji jako klucz i przesunięcie, od którego zaczyna się zużywać jako wartość.

Wystarczy spojrzeć na api tutaj: http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html Mapa, o której mówiłem, Zwykle nazywała się: fromOffsets

Możesz wstawić dane do mapy:

startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)

I używać go podczas tworzenia bezpośredniego strumień:

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
                streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))

Po każdej iteracji można uzyskać przetworzone offsety używając:

rdd.asInstanceOf[HasOffsetRanges].offsetRanges

Będziesz mógł użyć tych danych do zbudowania mapy fromOffsets w następnej iteracji.

Możesz zobaczyć pełny kod i użycie tutaj: https://spark.apache.org/docs/latest/streaming-kafka-integration.html na końcu strony

 14
Author: Michael Kopaniov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-04-17 15:15:23

Aby dodać do odpowiedzi Michała Kopaniowa, jeśli naprawdę chcesz używać ZK jako miejsca, z którego przechowujesz i ładujesz mapę offsetów, możesz.

Jednakże, ponieważ Twoje wyniki nie są wysyłane do ZK, nie otrzymasz wiarygodnej semantyki, chyba że Twoja operacja wyjściowa jest idempotentna (co brzmi, jakby nie była).

Jeśli możliwe jest przechowywanie wyników w tym samym dokumencie w Mongo wraz z offsetami w jednej akcji atomowej, może to być lepsze dla Ciebie.

Dla więcej szczegółów, zobacz https://www.youtube.com/watch?v=fXnNEq1v3VA

 2
Author: Cody Koeninger,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-08-10 17:33:18

Oto kod, którego możesz użyć do przechowywania offsetów w ZK http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/

A oto kod, którego możesz użyć, aby użyć offsetu podczas wywoływania KafkaUtils.createDirectStream: http://geeks.aretotally.in/spark-streaming-direct-api-reusing-offset-from-zookeeper/

 2
Author: Felipe Oliveira,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-06-29 22:58:45

Jeszcze tego nie rozgryzłem na 100%, ale najlepiej będzie skonfigurować JavaStreamingContext.checkpoint ().

Zobacz https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing na przykład.

Według niektórych wpisów na bloguhttps://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md są pewne zastrzeżenia, ale wydaje się, że wiąże się to z pewnymi przypadkami, które są tylko aluzjami, a nie właściwie to wyjaśnione.

 -1
Author: PatE,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-08-07 12:43:59