Purge Kafka Topic

Wcisnąłem wiadomość, która była zbyt duża w Temat wiadomości kafka na mojej lokalnej maszynie, teraz dostaję błąd:

kafka.common.InvalidMessageSizeException: invalid message size

Zwiększenie fetch.size nie jest tutaj idealne, ponieważ nie chcę przyjmować wiadomości tak dużych. Czy jest sposób na oczyszczenie tematu w Kafce?

Author: cricket_007, 2013-04-29

13 answers

Tymczasowo zaktualizuj czas retencji w temacie do jednej sekundy:

kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000

Następnie poczekaj, aż oczyszczanie zacznie działać (około jednej minuty). Po wyczyszczeniu Przywróć poprzednią wartość retention.ms.

 283
Author: steven appleyard,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-04-16 13:44:58

Oto kroki, które wykonuję, aby usunąć temat o nazwie MyTopic:

  1. Zatrzymaj demona Apache Kafka
  2. Usuń folder danych tematu: rm -rf /tmp/kafka-logs/MyTopic-0
  3. Usuń metadane tematu: zkCli.sh następnie rmr /brokers/MyTopic
  4. Uruchom demona Apache Kafka

Jeśli przegapisz Krok 3, Apache Kafka będzie nadal zgłaszać temat jako obecny (na przykład, gdy uruchomisz kafka-list-topic.sh).

Testowany z Apache Kafka 0.8.0.

 41
Author: Thomas Bratt,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-02-19 13:32:42

Aby wyczyścić kolejkę możesz usunąć Temat:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

Następnie utwórz go ponownie:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test
 38
Author: rjaiswal,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-08-29 14:14:36

Chociaż przyjęta odpowiedź jest poprawna, ta metoda została przestarzała. Konfiguracja tematu powinna być teraz wykonywana przez kafka-configs.

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic

Konfiguracje ustawione za pomocą tej metody mogą być wyświetlane za pomocą polecenia

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
 32
Author: Shane Perry,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-04-21 17:56:09

Testowane w Kafka 0.8.2, dla przykładu quick-start: Najpierw dodaj jedną linię do serwera.Plik Właściwości w folderze config:

delete.topic.enable=true

Następnie możesz uruchomić następujące polecenie:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
 23
Author: Patrick,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-14 20:02:28

Aktualizacja: ta odpowiedź jest istotna dla Kafka 0.6. Dla Kafka 0.8 i później Zobacz odpowiedź przez @ Patrick.

Tak, zatrzymaj kafka i ręcznie usuń wszystkie pliki z odpowiedniego podkatalogu(łatwo go znaleźć w katalogu kafka data). Po ponownym uruchomieniu kafka temat będzie pusty.

 3
Author: Wildfire,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-10-01 10:25:18

Kafka nie ma bezpośredniej metody czyszczenia / czyszczenia tematu( kolejek), ale może to zrobić poprzez usunięcie tego tematu i odtworzenie go.

Najpierw upewnij się, że sever.plik właściwości ma i jeśli nie dodaj delete.topic.enable=true

Następnie usuń temat bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic

Następnie utwórz go ponownie.

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
 2
Author: Manish Jaiswal,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-10-09 10:55:18

Czasami, jeśli masz nasycony klaster (zbyt wiele partycji lub zaszyfrowane dane tematu, lub używając SSL, lub kontroler jest na złym węźle, lub połączenie jest flaky, usunięcie tego tematu zajmie dużo czasu.

Wykonuję następujące czynności, szczególnie jeśli używasz Avro.

1: Uruchom z kafka tools:

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2: Uruchom na węźle rejestru schematu:

kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning

3: Ustaw retencję tematu z powrotem do pierwotnego ustawienia, gdy Temat jest pusty.

bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

Mam nadzieję, że to komuś pomoże, ponieważ nie jest łatwo reklamowane.

 2
Author: Ben Coughlan,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-02-15 15:30:18

Najprostszym podejściem jest ustawienie daty poszczególnych plików dziennika na starszą niż okres przechowywania. Następnie broker powinien je wyczyścić i usunąć w ciągu kilku sekund. To oferuje kilka zalet:

  1. nie trzeba niszczyć brokerów, to operacja runtime.
  2. unika możliwości nieprawidłowych WYJĄTKÓW offsetowych(więcej na ten temat poniżej).

Z mojego doświadczenia z Kafka 0.7.x, usunięcie plików dziennika i ponowne uruchomienie brokera może prowadzić do nieprawidłowych WYJĄTKÓW offsetowych dla niektórych konsumentów. Stało się tak, ponieważ broker ponownie uruchamia offsety na poziomie zerowym (w przypadku braku istniejących plików dziennika), a konsument, który wcześniej korzystał z tematu, ponownie połączy się, aby zażądać określonego [raz ważnego] offsetu. Jeśli to przesunięcie wykracza poza granice nowych dzienników tematu, nie szkodzi, a konsument wznawia na początku lub na końcu. Ale jeśli przesunięcie mieści się w granicach nowych dzienników tematu, broker próbuje pobrać zestaw wiadomości, ale nie udaje się, ponieważ przesunięcie nie jest wyrównane do rzeczywistej wiadomości.

Można to złagodzić, usuwając również offsety konsumenckie w zookeeperze dla tego tematu. Ale jeśli nie potrzebujesz dziewiczego tematu i po prostu chcesz usunąć istniejącą zawartość, po prostu "dotknij" -kilka dzienników tematów jest o wiele łatwiejsze i bardziej niezawodne, niż zatrzymywanie brokerów, usuwanie dzienników tematów i czyszczenie niektórych węzłów zookeeper.

 1
Author: Andrew Carter,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-06-06 20:09:22

Porady Thomasa są świetne, ale niestety zkCli w starych wersjach Zookeeper (na przykład 3.3.6) nie wydają się wspierać rmr. Na przykład porównaj implementację linii poleceń w modern Zookeeper z w wersji 3.3.

Jeśli masz do czynienia ze starą wersją Zookeeper jednym z rozwiązań jest użycie biblioteki klienta, takiej jak ZC.zk dla Pythona. Dla osób nie znających Pythona należy zainstalować go za pomocą pip lub easy_install. Wtedy Uruchom powłokę Pythona (python) i możesz to zrobić:

import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic') 

Lub nawet

zk.delete_recursive('brokers')

Jeśli chcesz usunąć wszystkie tematy z Kafka.

 1
Author: Mark Butler,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-10-15 00:32:51

Aby wyczyścić wszystkie wiadomości z określonego tematu za pomocą grupy aplikacji(nazwa grupy powinna być taka sama jak nazwa grupy aplikacji kafka).

./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group

 1
Author: user4713340,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-30 20:24:45

Nie można dodać jako komentarz ze względu na rozmiar: Nie wiem, czy to prawda, poza aktualizacją retention.ms i retencji.bajtów, ale zauważyłem, że polityka czyszczenia tematu powinna być "delete" (domyślnie), jeśli "compact", to będzie dłużej trzymać się wiadomości, tzn. jeśli jest "compact" , musisz podać delete.retention.ms również.

./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

Również musiał monitorować najwcześniejsze / najnowsze przesunięcia powinny być takie same, aby potwierdzić to pomyślnie się stało, można również sprawdzić du-h / tmp / kafka-logs/test-topic-3-100-*

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762

Drugi problem polega na tym, że musisz najpierw pobrać bieżący config , aby pamiętać o przywróceniu po pomyślnym usunięciu: ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

 0
Author: kisna,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-06-14 00:02:30

Innym, raczej ręcznym podejściem do tego jest:

W kategorii brokerzy:

  1. stop Kafka broker
    sudo service kafka stop
  2. usuń wszystkie pliki dziennika partycji (należy to zrobić na wszystkich brokerach)
    sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*

In zookeeper:

  1. Uruchom interfejs linii poleceń zookeeper
    sudo /usr/lib/zookeeper/bin/zkCli.sh
  2. użyj zkCli, aby usunąć metadane tematu
    rmr /brokers/topic/<some_topic_name>

Znowu w brokerach:

  1. restart broker serwis
    sudo service kafka start
 0
Author: Danny Mor,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-10-02 15:18:22