Purge Kafka Topic
Wcisnąłem wiadomość, która była zbyt duża w Temat wiadomości kafka na mojej lokalnej maszynie, teraz dostaję błąd:
kafka.common.InvalidMessageSizeException: invalid message size
Zwiększenie fetch.size
nie jest tutaj idealne, ponieważ nie chcę przyjmować wiadomości tak dużych. Czy jest sposób na oczyszczenie tematu w Kafce?
13 answers
Tymczasowo zaktualizuj czas retencji w temacie do jednej sekundy:
kafka-topics.sh --zookeeper localhost:13003 --alter --topic MyTopic --config retention.ms=1000
Następnie poczekaj, aż oczyszczanie zacznie działać (około jednej minuty). Po wyczyszczeniu Przywróć poprzednią wartość retention.ms
.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-04-16 13:44:58
Oto kroki, które wykonuję, aby usunąć temat o nazwie MyTopic
:
- Zatrzymaj demona Apache Kafka
- Usuń folder danych tematu:
rm -rf /tmp/kafka-logs/MyTopic-0
- Usuń metadane tematu:
zkCli.sh
następniermr /brokers/MyTopic
- Uruchom demona Apache Kafka
Jeśli przegapisz Krok 3, Apache Kafka będzie nadal zgłaszać temat jako obecny (na przykład, gdy uruchomisz kafka-list-topic.sh
).
Testowany z Apache Kafka 0.8.0.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-02-19 13:32:42
Aby wyczyścić kolejkę możesz usunąć Temat:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Następnie utwórz go ponownie:
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-08-29 14:14:36
Chociaż przyjęta odpowiedź jest poprawna, ta metoda została przestarzała. Konfiguracja tematu powinna być teraz wykonywana przez kafka-configs
.
kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic
Konfiguracje ustawione za pomocą tej metody mogą być wyświetlane za pomocą polecenia
kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-04-21 17:56:09
Testowane w Kafka 0.8.2, dla przykładu quick-start: Najpierw dodaj jedną linię do serwera.Plik Właściwości w folderze config:
delete.topic.enable=true
Następnie możesz uruchomić następujące polecenie:
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-14 20:02:28
Aktualizacja: ta odpowiedź jest istotna dla Kafka 0.6. Dla Kafka 0.8 i później Zobacz odpowiedź przez @ Patrick.
Tak, zatrzymaj kafka i ręcznie usuń wszystkie pliki z odpowiedniego podkatalogu(łatwo go znaleźć w katalogu kafka data). Po ponownym uruchomieniu kafka temat będzie pusty.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-10-01 10:25:18
Kafka nie ma bezpośredniej metody czyszczenia / czyszczenia tematu( kolejek), ale może to zrobić poprzez usunięcie tego tematu i odtworzenie go.
Najpierw upewnij się, że sever.plik właściwości ma i jeśli nie dodaj delete.topic.enable=true
Następnie usuń temat
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic
Następnie utwórz go ponownie.
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-10-09 10:55:18
Czasami, jeśli masz nasycony klaster (zbyt wiele partycji lub zaszyfrowane dane tematu, lub używając SSL, lub kontroler jest na złym węźle, lub połączenie jest flaky, usunięcie tego tematu zajmie dużo czasu.
Wykonuję następujące czynności, szczególnie jeśli używasz Avro.1: Uruchom z kafka tools:
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>
2: Uruchom na węźle rejestru schematu:
kafka-avro-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning
3: Ustaw retencję tematu z powrotem do pierwotnego ustawienia, gdy Temat jest pusty.
bash kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>
Mam nadzieję, że to komuś pomoże, ponieważ nie jest łatwo reklamowane.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-02-15 15:30:18
Najprostszym podejściem jest ustawienie daty poszczególnych plików dziennika na starszą niż okres przechowywania. Następnie broker powinien je wyczyścić i usunąć w ciągu kilku sekund. To oferuje kilka zalet:
- nie trzeba niszczyć brokerów, to operacja runtime.
- unika możliwości nieprawidłowych WYJĄTKÓW offsetowych(więcej na ten temat poniżej).
Z mojego doświadczenia z Kafka 0.7.x, usunięcie plików dziennika i ponowne uruchomienie brokera może prowadzić do nieprawidłowych WYJĄTKÓW offsetowych dla niektórych konsumentów. Stało się tak, ponieważ broker ponownie uruchamia offsety na poziomie zerowym (w przypadku braku istniejących plików dziennika), a konsument, który wcześniej korzystał z tematu, ponownie połączy się, aby zażądać określonego [raz ważnego] offsetu. Jeśli to przesunięcie wykracza poza granice nowych dzienników tematu, nie szkodzi, a konsument wznawia na początku lub na końcu. Ale jeśli przesunięcie mieści się w granicach nowych dzienników tematu, broker próbuje pobrać zestaw wiadomości, ale nie udaje się, ponieważ przesunięcie nie jest wyrównane do rzeczywistej wiadomości.
Można to złagodzić, usuwając również offsety konsumenckie w zookeeperze dla tego tematu. Ale jeśli nie potrzebujesz dziewiczego tematu i po prostu chcesz usunąć istniejącą zawartość, po prostu "dotknij" -kilka dzienników tematów jest o wiele łatwiejsze i bardziej niezawodne, niż zatrzymywanie brokerów, usuwanie dzienników tematów i czyszczenie niektórych węzłów zookeeper.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-06-06 20:09:22
Porady Thomasa są świetne, ale niestety zkCli
w starych wersjach Zookeeper (na przykład 3.3.6) nie wydają się wspierać rmr
. Na przykład porównaj implementację linii poleceń w modern Zookeeper z w wersji 3.3.
Jeśli masz do czynienia ze starą wersją Zookeeper jednym z rozwiązań jest użycie biblioteki klienta, takiej jak ZC.zk dla Pythona. Dla osób nie znających Pythona należy zainstalować go za pomocą pip lub easy_install. Wtedy Uruchom powłokę Pythona (python
) i możesz to zrobić:
import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')
Lub nawet
zk.delete_recursive('brokers')
Jeśli chcesz usunąć wszystkie tematy z Kafka.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-10-15 00:32:51
Aby wyczyścić wszystkie wiadomości z określonego tematu za pomocą grupy aplikacji(nazwa grupy powinna być taka sama jak nazwa grupy aplikacji kafka).
./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-30 20:24:45
Nie można dodać jako komentarz ze względu na rozmiar: Nie wiem, czy to prawda, poza aktualizacją retention.ms i retencji.bajtów, ale zauważyłem, że polityka czyszczenia tematu powinna być "delete" (domyślnie), jeśli "compact", to będzie dłużej trzymać się wiadomości, tzn. jeśli jest "compact" , musisz podać delete.retention.ms również.
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1
Również musiał monitorować najwcześniejsze / najnowsze przesunięcia powinny być takie same, aby potwierdzić to pomyślnie się stało, można również sprawdzić du-h / tmp / kafka-logs/test-topic-3-100-*
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}'
26599762
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}'
26599762
Drugi problem polega na tym, że musisz najpierw pobrać bieżący config , aby pamiętać o przywróceniu po pomyślnym usunięciu:
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-06-14 00:02:30
Innym, raczej ręcznym podejściem do tego jest:
W kategorii brokerzy:
- stop Kafka broker
sudo service kafka stop
- usuń wszystkie pliki dziennika partycji (należy to zrobić na wszystkich brokerach)
sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-*
In zookeeper:
- Uruchom interfejs linii poleceń zookeeper
sudo /usr/lib/zookeeper/bin/zkCli.sh
- użyj zkCli, aby usunąć metadane tematu
rmr /brokers/topic/<some_topic_name>
Znowu w brokerach:
- restart broker serwis
sudo service kafka start
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-10-02 15:18:22