Jak wysłać ostateczny wynik agregacji kafka-streams w oknie czasowym?

To co chciałbym zrobić to:

  1. zużywaj rekordy z tematu liczb (Long ' S)
  2. Agregat (count) wartości dla każdego okna 5 sek
  3. Wyślij ostateczny wynik agregacji do innego tematu

Mój kod wygląda tak:

KStream<String, Long> longs = builder.stream(
        Serdes.String(), Serdes.Long(), "longs");

// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts = 
        longs.countByKey(TimeWindows.of("longCounts", 5000L));

// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
        .to("long-counts");

Wygląda na to, że wszystko działa zgodnie z oczekiwaniami, ale agregacje są wysyłane do tematu docelowego dla każdego przychodzącego rekordu. Moje pytanie brzmi: Jak mogę wysłać tylko ostateczny wynik agregacji każdego okna?

Author: odavid, 2016-08-13

1 answers

W strumieniach Kafka nie ma czegoś takiego jak "ostateczna agregacja". Okna są cały czas otwarte, aby obsługiwać zapisy o późnym przybyciu (oczywiście okna nie są przechowywane na zawsze, są odrzucane do czasu wygaśnięcia ich czasu przechowywania-jednak nie ma specjalnych działań, gdy okno zostanie odrzucone).

Zobacz dokumentację Confluent po więcej szczegółów: http://docs.confluent.io/current/streams/

Tak więc dla każdej aktualizacji do agregacji generowany jest rekord wyniku (ponieważ strumienie Kafka również aktualizują wynik agregacji na późno przybywających rekordach). Twój "wynik końcowy" będzie ostatnim rekordem wyniku (zanim okno zostanie odrzucone). W zależności od przypadku, ręczne usuwanie duplikacji byłoby sposobem rozwiązania problemu (za pomocą interfejsu API dolnej dźwigni, transform() lub process())

Ten wpis na blogu też może pomóc: https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html

Kolejny wpis na blogu dotyczący tego wydanie bez użycia interpunkcji: http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html

Update

Z KIP-328 , operator KTable#suppress() jest dodawany, który pozwoli na powstrzymanie kolejnych aktualizacji w ścisły sposób i emitowanie pojedynczego rekordu wyniku na okno; kompromis jest zwiększeniem opóźnienia.

 11
Author: Matthias J. Sax,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-10-02 17:33:42