Czy istnieje sposób na usunięcie wszystkich danych z tematu lub usunięcie tematu przed każdym uruchomieniem?

Czy istnieje sposób na usunięcie wszystkich danych z tematu lub usunięcie tematu przed każdym uruchomieniem?

Czy mogę zmodyfikować KafkaConfig.plik scala do zmiany właściwości logRetentionHours? Czy istnieje sposób, aby wiadomości zostały usunięte, gdy tylko konsument je przeczyta?

Używam producentów do pobierania danych skądś i wysyłania danych do określonego tematu, w którym konsument konsumuje, czy mogę usunąć wszystkie dane z tego tematu przy każdym uruchomieniu? Chcę tylko nowych danych za każdym razem w temat. Czy jest jakiś sposób na ponowne zainicjowanie tematu?

Author: Lorenzo Belli, 2013-07-18

13 answers

nie sądzę, że jest jeszcze wspierany.W tym miejscu znajdziesz wszystkie potrzebne informacje.

Aby usunąć ręcznie:

  1. Zamknięcie gromady
  2. W tym celu należy usunąć Pliki z katalogu, które zostały zapisane w pliku konfiguracyjnym.]}
  3. Uruchom ponownie klaster

Dla danego tematu to co możesz zrobić to

  1. Stop Kafce
  2. Wyczyść log dla partycji, kafka przechowuje swój plik dziennika w formacie "logDir/topic-partition", więc dla tematu o nazwie "MyTopic" dziennik dla partycji o id 0 będzie przechowywany w /tmp/kafka-logs/MyTopic-0 Gdzie {[3] } jest określony przez atrybut log.dir
  3. Restart kafka

To jest NOT dobre i zalecane podejście, ale powinno zadziałać. W pliku konfiguracyjnym Kafka broker atrybut log.retention.hours.per.topic jest używany do zdefiniowania The number of hours to keep a log file before deleting it for some specific topic

Czy istnieje sposób, aby wiadomości były usuwane, gdy tylko konsument czyta to?

Z Dokumentacji Kafka :

Klaster Kafka przechowuje wszystkie opublikowane wiadomości-niezależnie od tego, czy zostały skonsumowane, czy nie-przez konfigurowalny okres czasu. Na przykład, jeśli przechowywanie dziennika jest ustawione na dwa dni, to przez dwa dni po opublikowaniu wiadomości jest ono dostępne do użycia, po czym zostanie odrzucone w celu zwolnienia miejsca. Wydajność Kafki jest skutecznie stała w odniesieniu do wielkości danych, więc przechowywanie wielu danych nie jest problem.

W rzeczywistości jedynymi metadanymi zachowanymi dla każdego konsumenta jest pozycja konsumenta w dzienniku, zwana "offsetem". Przesunięcie to jest kontrolowane przez konsumenta: Zwykle konsument przesunie przesunięcie liniowo, gdy czyta wiadomości, ale w rzeczywistości pozycja jest kontrolowana przez konsumenta i może konsumować wiadomości w dowolnej kolejności. Na przykład konsument może zresetować do starszego offsetu w celu ponownego przetworzenia.

Do znalezienia przesunięcia początkowego do odczytu w Kafka 0.8 prosty przykład konsumenta mówią

Kafka zawiera dwie stałe do pomocy, kafka.api.OffsetRequest.EarliestTime() znajduje początek danych w logach i zaczyna streamować stamtąd, kafka.api.OffsetRequest.LatestTime() będzie streamować tylko nowe wiadomości.

Możesz również znaleźć tam przykładowy kod do zarządzania offsetem na końcu konsumenta.

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}
 63
Author: Hild,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-06-20 09:12:55

Jak już tu wspomniałem Oczyść kolejkę Kafki :

Testowane w Kafka 0.8.2, dla przykładu quick-start: najpierw dodaj jedną linię do serwera.Plik Właściwości w folderze konfiguracyjnym:

delete.topic.enable=true

Następnie możesz uruchomić następujące polecenie:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
 71
Author: Patrick,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 11:47:32

Testowane z kafka 0.10

1. stop zookeeper & Kafka server,
2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name
3. go to 'zookeeper-data' folder , delete data inside that.
4. start zookeeper & kafka server again.

Uwaga: jeśli usuwasz foldery tematyczne wewnątrz kafka-logs, ale nie z folderu zookeeper-data, wtedy zobaczysz, że tematy nadal tam są.

 14
Author: Swadeshi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-30 20:14:29

Jako brudne obejście można dostosować ustawienia przechowywania dla poszczególnych tematów, np. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1 (retencja.bajtów = 0 może również działać)

Po krótkim czasie kafka powinna zwolnić miejsce. Nie jestem pewien, czy ma to jakieś konsekwencje w porównaniu do ponownego tworzenia tematu.

Ps. Lepiej przywrócić ustawienia retencji, gdy kafka skończy czyszczenie.

Możesz również użyć retention.ms do przechowywania danych historycznych

 8
Author: Ivan Balashov,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-30 20:22:04

Poniżej znajdują się skrypty do opróżniania i usuwania tematu Kafka zakładając localhost jako serwer zookeeper i Kafka_Home jest ustawiona w katalogu instalacyjnym:

Poniższy skrypt opróżni temat, ustawiając jego czas przechowywania na 1 sekundę, a następnie usuwając konfigurację:

#!/bin/bash
echo "Enter name of topic to empty:"
read topicName
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add-config retention.ms=1000
sleep 5
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms

Do całkowicie usuń tematy należy zatrzymać odpowiedni broker kafka i usunąć jego katalog z katalogu kafka log (domyślnie: / tmp / kafka-logs), a następnie uruchomić ten skrypt do Usuń temat z zookeeper. Aby zweryfikować, że został usunięty z zookeeper, wyjście ls / brokers / topics nie powinno już zawierać tematu:

#!/bin/bash
echo "Enter name of topic to delete from zookeeper:"
read topicName
/$Kafka_Home/bin/zookeeper-shell localhost:2181 <<EOF
rmr /brokers/topics/$topicName
ls /brokers/topics
quit
EOF
 8
Author: vdlen,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-02-25 16:20:55

Próbowaliśmy z umiarkowanym powodzeniem tego, co opisują inne odpowiedzi. To, co naprawdę dla nas działało (Apache Kafka 0.8.1), to polecenie klasy

Sh kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic yourtopic --zookeeper localhost: 2181

 7
Author: Dan M,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-11-20 19:07:44

Dla użytkowników browaru

Jeśli używasz brew jak ja i marnujesz dużo czasu na szukanie niesławnego folderu kafka-logs, nie bój się więcej. (i proszę daj mi znać, czy to działa dla Ciebie i wielu różnych wersji Homebrew, Kafka itp:)) {]}

Prawdopodobnie znajdziesz go pod:

Lokalizacja:

/usr/local/var/lib/kafka-logs


Jak znaleźć tę ścieżkę

(jest to również pomocne w zasadzie dla każdej aplikacji, którą instalujesz przez brew)

1) brew services list

Kafka zaczęła matbhz / Users / matbhz / Library/LaunchAgents / homebrew.mxcl.kafka.plist

2) Otwórz i przeczytaj, że plist znalazłeś powyżej

3) znajdź linię definiującą server.properties lokację otwórz ją, w moim przypadku:

  • /usr/local/etc/kafka/server.properties

4) poszukaj linii log.dirs:

Log.dirs = /usr/local/var/lib / kafka-logs

5) przejdź do tej lokalizacji i usuń logi dla tematów, które wish

6) Restart Kafka z brew services restart kafka

 4
Author: Matheus Felipe,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-09-27 00:39:05

Wszystkie dane o tematach i ich partycjach są przechowywane w tmp/kafka-logs/. Ponadto są one przechowywane w formacie topic-partionNumber, więc jeśli chcesz usunąć temat newTopic, możesz:

  • stop Kafce
  • Usuń pliki rm -rf /tmp/kafka-logs/newTopic-*
 2
Author: Salvador Dali,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-01-26 06:27:29
  1. Stop ZooKeeper i Kafka
  2. na serwerze.właściwości, dziennik zmian.retencja.wartość godzin. Możesz skomentować log.retention.hours i dodać log.retention.ms=1000. To utrzymywałoby rekord na temat Kafki tylko przez sekundę.
  3. Start zookeeper i kafka.
  4. sprawdź konsolę konsumencką. Kiedy po raz pierwszy otworzyłem konsolę, była tam Płyta. Ale kiedy ponownie otworzyłem konsolę, rekord został usunięty.
  5. później możesz ustawić wartość log.retention.hours Na żądaną tak.
 1
Author: earl,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-30 20:14:57

Od wersji kafka 2.3.0 istnieje alternatywny sposób na miękkie usunięcie Kafki (stare podejście jest przestarzałe ).

Update retention.ms na 1 sek (1000ms), a następnie ustawić go ponownie po min, do domyślnego ustawienia tj. 7 dni (168 godzin, 604,800,000 w ms)

Soft deletion: - (rentention. ms=1000) (using kafka-configs.sh)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=1000
Completed Updating config for entity: topic 'kafka_topic3p3r'.

Ustawienie na domyślne: - 7 dni (168 godzin , retention.ms= 604800000)

bin/kafka-configs.sh --zookeeper 192.168.1.10:2181 --alter --entity-name kafka_topic3p3r --entity-type topics  --add-config retention.ms=604800000
 1
Author: brajkishore dubey,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-09-08 17:22:29

W ręcznym usuwaniu tematu z klastra Kafki, możesz to sprawdzić https://github.com/darrenfu/bigdata/issues/6 Ważnym krokiem w większości rozwiązań jest usunięcie /config/topics/<topic_name> w ZK.

 0
Author: Abdurrahman Adebiyi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-06-16 10:50:28

Używam tego skryptu:

#!/bin/bash
topics=`kafka-topics --list --zookeeper zookeeper:2181`
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --config ${p}=100
    done
done
sleep 60
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --delete-config ${p}
    done
done
 0
Author: Дмитрий Шепелев,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-06-10 13:15:13

Używam poniższego narzędzia do czyszczenia po uruchomieniu testu integracji.

Używa najnowszego api AdminZkClient. Starsze api zostało przestarzałe.

import javax.inject.Inject
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.utils.Time

class ZookeeperUtils @Inject() (config: AppConfig) {

  val testTopic = "users_1"

  val zkHost = config.KafkaConfig.zkHost
  val sessionTimeoutMs = 10 * 1000
  val connectionTimeoutMs = 60 * 1000
  val isSecure = false
  val maxInFlightRequests = 10
  val time: Time = Time.SYSTEM

  def cleanupTopic(config: AppConfig) = {

    val zkClient = KafkaZkClient.apply(zkHost, isSecure, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time)
    val zkUtils = new AdminZkClient(zkClient)

    val pp = new Properties()
    pp.setProperty("delete.retention.ms", "10")
    pp.setProperty("file.delete.delay.ms", "1000")
    zkUtils.changeTopicConfig(testTopic , pp)
    //    zkUtils.deleteTopic(testTopic)

    println("Waiting for topic to be purged. Then reset to retain records for the run")
    Thread.sleep(60000L)

    val resetProps = new Properties()
    resetProps.setProperty("delete.retention.ms", "3000000")
    resetProps.setProperty("file.delete.delay.ms", "4000000")
    zkUtils.changeTopicConfig(testTopic , resetProps)

  }


}

Istnieje opcja Usuń temat. Ale to oznacza temat do usunięcia. Zookeeper później usuwa temat. Ponieważ może to być nieprzewidywalnie długie, wolę retention.ms podejście

 0
Author: ForeverLearner,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-10-01 19:26:22