Skuteczna strategia unikania duplikatów wiadomości w Apache kafka consumer

Od miesiąca studiuję Apache kafka. Jednak utknąłem w pewnym momencie. Mój przypadek użycia polega na tym, że mam dwa lub więcej procesów konsumenckich działających na różnych maszynach. Przeprowadziłem kilka testów, w których opublikowałem 10 000 wiadomości na serwerze kafka. Następnie podczas przetwarzania tych wiadomości zabiłem jeden z procesów konsumenckich i uruchomiłem go ponownie. Konsumenci zapisywali przetworzone wiadomości w pliku. Więc po zakończeniu konsumpcji plik wyświetlał więcej niż 10K wiadomości. Więc niektóre wiadomości były powielone.

W procesie konsumenckim wyłączyłem Auto commit. Konsumenci ręcznie dokonują przesunięć wsadowych. Tak więc np. jeśli do pliku zapisanych jest 100 wiadomości, konsument zobowiązuje się do kompensacji. Gdy proces pojedynczego konsumenta jest uruchomiony i ulega awarii i odzyskuje w ten sposób unika się powielania. Ale gdy działa więcej niż jeden konsument, a jeden z nich zawiesza się i odzyskuje, zapisuje zduplikowane wiadomości do pliku.

Czy jest jakaś skuteczna strategia, aby uniknąć tych duplikatów wiadomości?

Author: Shades88, 2015-04-15

3 answers

Krótka odpowiedź brzmi: nie.

To, czego szukasz, to dokładnie-po przetworzeniu. Chociaż często może wydawać się wykonalne, nigdy nie należy na nim polegać, ponieważ zawsze są zastrzeżenia.

Nawet w celu uniknięcia duplikatów trzeba by użyć prostego konsumenta. W jaki sposób to podejście działa dla każdego konsumenta, gdy wiadomość jest zużywana z jakiejś partycji, Zapisz partycję i przesunięcie zużytej wiadomości na dysk. Gdy konsument uruchamia ponownie po awarii, odczyt z dysku ostatniego zużytego offsetu dla każdej partycji.

Ale nawet z tym wzorem konsument nie może zagwarantować, że nie przetworzy ponownie wiadomości po awarii. Co zrobić, jeśli konsument konsumuje wiadomość, a następnie nie powiedzie się, zanim przesunięcie zostanie spłukane na dysk? Jeśli napiszesz na dysk przed przetworzeniem wiadomości, co jeśli napiszesz offset, a następnie zawiedziesz przed przetworzeniem wiadomości? Ten sam problem istniałby nawet, gdybyś miał zlecić przesunięcia do ZooKeeper po każdym wiadomość.

Są jednak przypadki, w których dokładnie - raz przetwarzanie jest bardziej osiągalne, ale tylko w niektórych przypadkach użycia. Wymaga to po prostu, aby offset był przechowywany w tym samym miejscu, co wyjście aplikacji jednostkowej. Na przykład, jeśli piszesz konsumenta, który liczy wiadomości, przechowując ostatnio zliczone przesunięcie z każdym zliczeniem możesz zagwarantować, że przesunięcie jest przechowywane w tym samym czasie, co stan konsumenta. Oczywiście, aby zagwarantować dokładnie-po przetworzeniu tego Wymagaj, aby zużywać dokładnie jedną wiadomość i aktualizować stan dokładnie raz dla każdej wiadomości, a to jest całkowicie niepraktyczne dla większości aplikacji konsumenckich Kafka. Ze swej natury Kafka konsumuje wiadomości w partiach ze względu na wydajność.

Zazwyczaj twój czas będzie lepiej spędzony, a Twoja aplikacja będzie o wiele bardziej niezawodna, jeśli po prostu zaprojektujesz ją tak, aby była idempotentna.

 15
Author: kuujo,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-10-31 10:51:55

Oto co Kafka FAQ ma do powiedzenia na temat dokładnie-raz:

Jak dostać dokładnie-raz wiadomości z Kafka?

Dokładnie raz semantyka ma dwie części: unikanie duplikatów podczas produkcji danych i unikanie duplikatów podczas zużycia danych.

Istnieją dwa podejścia do uzyskiwania semantyki dokładnie raz podczas produkcji danych:

  • użyj jednego edytora na partycję i za każdym razem, gdy otrzymasz sprawdzenie błędu sieci ostatnia wiadomość w tej partycji, aby sprawdzić, czy twój ostatni zapis się powiódł
  • Dołącz klucz główny (UUID lub coś takiego) w wiadomości i deduplikuj na konsumencie.

Jeśli zrobisz jedną z tych rzeczy, dziennik, który hostuje Kafka, będzie wolny od duplikatów. Jednak czytanie bez duplikatów zależy również od pewnej współpracy ze strony konsumenta. Jeśli konsument okresowo sprawdza swoją pozycję, to jeśli zawiedzie i uruchomi się ponownie, uruchomi się ponownie ze wskazanego pozycji. Tak więc, jeśli Dane wyjściowe i punkt kontrolny nie są zapisywane atomicznie, będzie można uzyskać duplikaty również tutaj. Ten problem dotyczy zwłaszcza systemu pamięci masowej. Na przykład, jeśli używasz bazy danych, możesz zatwierdzić je razem w transakcji. Ładowarka HDFS Camus, którą napisał LinkedIn, robi coś takiego dla obciążeń Hadoop. Inną alternatywą, która nie wymaga transakcji, jest przechowywanie offsetu z danymi załadowanymi i deduplikacją za pomocą połączenie tematu/partycji/przesunięcia.

Myślę, że są dwie ulepszenia, które znacznie by to ułatwiły:

  • Producer idempotence moĹźe byÄ ‡ wykonane automatycznie i znacznie taniej poprzez opcjonalnÄ ... integracjÄ ™ wsparcia dla tego na serwerze.
  • istniejący konsument wysokiego szczebla nie naraża dużo bardziej drobnoziarnistej kontroli przesunięć (np. aby zresetować swoją pozycję). Wkrótce będziemy nad tym pracować
 14
Author: RaGe,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-07-28 08:50:57

Zgadzam się z deduplikacją RaGe ' a po stronie konsumenta. I używamy Redis do deduplikacji wiadomości Kafka.

Załóżmy, że Klasa Message ma element o nazwie 'uniqId', który jest wypełniany przez stronę producenta i jest gwarantowany jako unikalny. Używamy losowego ciągu o długości 12. (regexp to '^[A-Za-z0-9]{12}$')

Strona konsumencka używa SETNX Redis do deduplikacji i wygaśnięcia, aby automatycznie wyczyścić wygasłe klucze. Przykładowy kod:

Message msg = ... // eg. ConsumerIterator.next().message().fromJson();
Jedis jedis = ... // eg. JedisPool.getResource();
String key = "SPOUT:" + msg.uniqId; // prefix name at will
String val = Long.toString(System.currentTimeMillis());
long rsps = jedis.setnx(key, val);
if (rsps <= 0) {
    log.warn("kafka dup: {}", msg.toJson()); // and other logic
} else {
    jedis.expire(key, 7200); // 2 hours is ok for production environment;
}

Powyższy kod wykrył duplikaty wiadomości kilka razy kiedy Kafka (Wersja 0.8.x) miały sytuacje. Dzięki Naszemu Dziennikowi audytu balansu wejścia/wyjścia nie doszło do utraty wiadomości ani dup.

 12
Author: peihan,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-05-12 10:23:06