kafka get partition count for a topic
Jak mogę uzyskać Ilość partycji dla dowolnego tematu Kafki z kodu. Sprawdziłem wiele linków, ale żaden nie działa.
Kilka:
Http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count-through-simpleclient-api
Http://grokbase.com/t/kafka/users/151cv3htga/get-replication-and-partition-count-of-a-topic
Http://qnalist.com/questions/5809219/get-replication-and-partition-count-of-a-topic
Które wyglądają jak podobne dyskusje.
Są też podobne linki NA SO, które nie mają na to działającego rozwiązania.
16 answers
W 0.82 Producer API i 0.9 Consumer api możesz użyć czegoś w rodzaju
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
producer.partitionsFor("test")
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-10-12 14:25:02
Przejdź do katalogu kafka/bin
.
Następnie uruchom to:
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic_name
Powinieneś zobaczyć, co potrzebujesz pod PartitionCount
.
Topic:topic_name PartitionCount:5 ReplicationFactor:1 Configs:
Topic: topic_name Partition: 0 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 1 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 2 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 3 Leader: 1001 Replicas: 1001 Isr: 1001
Topic: topic_name Partition: 4 Leader: 1001 Replicas: 1001 Isr: 1001
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-02-16 16:37:29
Poniżej cmd powłoki można wydrukować liczbę partycji. Powinieneś być w katalogu kafka bin przed wykonaniem cmd:
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{print "count of partitions=" $1}'
Zauważ, że musisz zmienić nazwę tematu w zależności od potrzeb. Możesz również potwierdzić to używając warunku if:
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic **TopicName** | awk '{print $2}' | uniq -c |awk 'NR==2{if ($1=="16") print "valid partitions"}'
Powyższe polecenie cmd wyświetla poprawne partycje, jeśli liczba wynosi 16. Możesz zmienić liczbę w zależności od wymagań.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-12-04 15:54:00
Oto Jak to robię:
/**
* Retrieves list of all partitions IDs of the given {@code topic}.
*
* @param topic
* @param seedBrokers List of known brokers of a Kafka cluster
* @return list of partitions or empty list if none found
*/
public static List<Integer> getPartitionsForTopic(String topic, List<BrokerInfo> seedBrokers) {
for (BrokerInfo seed : seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed.getHost(), seed.getPort(), 20000, 128 * 1024, "partitionLookup");
List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<Integer> partitions = new ArrayList<>();
// find our partition's metadata
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
partitions.add(part.partitionId());
}
}
return partitions; // leave on first successful broker (every broker has this info)
} catch (Exception e) {
// try all available brokers, so just report error and go to next one
LOG.error("Error communicating with broker [" + seed + "] to find list of partitions for [" + topic + "]. Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
throw new RuntimeError("Could not get partitions");
}
Zauważ, że po prostu musiałem wyciągnąć identyfikatory partycji, ale możesz dodatkowo pobrać inne metadane partycji, takie jakleader
, isr
, replicas
, ...
I BrokerInfo
jest po prostu prostym POJO, które ma host
i port
pola.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-07-04 14:39:36
W kodzie Javy możemy użyć AdminClient
, Aby uzyskać sumę części jednego tematu.
Properties props = new Properties();
props.put("bootstrap.servers", "host:9092");
AdminClient client = AdminClient.create(props);
DescribeTopicsResult result = client.describeTopics(Arrays.asList("TEST"));
Map<String, KafkaFuture<TopicDescription>> values = result.values();
KafkaFuture<TopicDescription> topicDescription = values.get("TEST");
int partitions = topicDescription.get().partitions().size();
System.out.println(partitions);
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-01-22 09:56:32
Tak więc poniższe podejście działa dla kafka 0.10 i nie używa żadnych API producenta lub konsumenta. Wykorzystuje niektóre klasy z API scala w kafka, takie jak ZkConnection i ZkUtils.
ZkConnection zkConnection = new ZkConnection(zkConnect);
ZkUtils zkUtils = new ZkUtils(zkClient,zkConnection,false);
System.out.println(JavaConversions.mapAsJavaMap(zkUtils.getPartitionAssignmentForTopics(
JavaConversions.asScalaBuffer(topicList))).get("bidlogs_kafka10").size());
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-10-26 06:58:18
Use PartitionList from KafkaConsumer
//create consumer then loop through topics
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
List<PartitionInfo> partitions = consumer.partitionsFor(topic);
ArrayList<Integer> partitionList = new ArrayList<>();
System.out.println(partitions.get(0).partition());
for(int i = 0; i < partitions.size(); i++){
partitionList.add(partitions.get(i).partition());
}
Collections.sort(partitionList);
Powinno zadziałać jak urok. Daj mi znać, jeśli jest prostszy sposób na dostęp do listy partycji z tematu.Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-07-22 19:32:02
Miałem ten sam problem, w którym musiałem uzyskać partycje dla tematu.
Z Pomocą odpowiedzi tutaj udało mi się uzyskać informacje od Zookeepera.
Oto Mój kod w Scali (ale można go łatwo przetłumaczyć na Javę)
import org.apache.zookeeper.ZooKeeper
def extractPartitionNumberForTopic(topicName: String, zookeeperQurom: String): Int = {
val zk = new ZooKeeper(zookeeperQurom, 10000, null);
val zkNodeName = s"/brokers/topics/$topicName/partitions"
val numPartitions = zk.getChildren(zkNodeName, false).size
zk.close()
numPartitions
}
Użycie tego podejścia pozwoliło mi uzyskać dostęp do informacji na temat Kafka topics, a także innych informacji na temat Kafka brokers ...
Z Zookeeper można sprawdzić liczbę partycji do tematu przez przeglądanie do /brokers/topics/MY_TOPIC_NAME/partitions
Używanie zookeeper-client.sh
aby połączyć się ze swoim zookeeperem:
[zk: ZkServer:2181(CONNECTED) 5] ls /brokers/topics/MY_TOPIC_NAME/partitions
[0, 1, 2]
To pokazuje nam, że są 3 partycje dla tematu MY_TOPIC_NAME
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 10:31:12
Liczba partycji może być pobrana z zookeeper-shell
Syntax: ls /brokers/topics/<topic_name>/partitions
Poniżej przykład:
root@zookeeper-01:/opt/kafka_2.11-2.0.0# bin/zookeeper-shell.sh zookeeper-01:2181
Connecting to zookeeper-01:2181
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
ls /brokers/topics/test/partitions
[0, 1, 2, 3, 4]
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-11-17 14:32:33
@ Sunil-patil odpowiedź zatrzymała się bez odpowiedzi. Musisz uzyskać rozmiar listy
Producent.partitionsFor ("test").size ()
@vish4071 nie ma sensu wtrącać Sunila, nie wspomniałeś, że używasz ConsumerConnector w pytaniu.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-09-17 00:56:15
Możesz zbadać kafka.utils.ZkUtils
, który ma wiele metod mających na celu pomoc w wyodrębnieniu metadanych o klastrze. Odpowiedzi tutaj są ładne, więc dodaję tylko dla różnorodności:
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
def getTopicPartitionCount(zookeeperQuorum: String, topic: String): Int = {
val client = new ZkClient(zookeeperQuorum)
val partitionCount = ZkUtils.getAllPartitions(client)
.count(topicPartitionPair => topicPartitionPair.topic == topic)
client.close
partitionCount
}
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-04-05 14:02:57
cluster.availablePartitionsForTopic(topicName).size()
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-10-12 14:28:18
//create the kafka producer
def getKafkaProducer: KafkaProducer[String, String] = {
val kafkaProps: Properties = new Properties()
kafkaProps.put("bootstrap.servers", "localhost:9092")
kafkaProps.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer")
new KafkaProducer[String, String](kafkaProps)
}
val kafkaProducer = getKafkaProducer
val noOfPartition = kafkaProducer.partitionsFor("TopicName")
println(noOfPartition) //it will print the number of partiton for the given
//topic
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-04-10 05:05:55
Możesz pobrać listę partycji kafka z zookeeper w ten sposób. Jest to prawdziwy numer partycji po stronie serwera kafka.
[zk: zk.kafka:2181(CONNECTED) 43] ls /users/test_account/test_kafka_name/brokers/topics/test_kafka_topic_name/partitions
[35, 36, 159, 33, 34, 158, 157, 39, 156, 37, 155, 38, 154, 152, 153, 150, 151, 43, 42, 41, 40, 202, 203, 204, 205, 200, 201, 22, 23, 169, 24, 25, 26, 166, 206, 165, 27, 207, 168, 208, 28, 29, 167, 209, 161, 3, 2, 162, 1, 163, 0, 164, 7, 30, 6, 32, 5, 160, 31, 4, 9, 8, 211, 212, 210, 215, 216, 213, 19, 214, 17, 179, 219, 18, 178, 177, 15, 217, 218, 16, 176, 13, 14, 11, 12, 21, 170, 20, 171, 174, 175, 172, 173, 220, 221, 222, 223, 224, 225, 226, 227, 188, 228, 187, 229, 189, 180, 10, 181, 182, 183, 184, 185, 186, 116, 117, 79, 114, 78, 77, 115, 112, 113, 110, 111, 118, 119, 82, 83, 80, 81, 86, 87, 84, 85, 67, 125, 66, 126, 69, 127, 128, 68, 121, 122, 123, 124, 129, 70, 71, 120, 72, 73, 74, 75, 76, 134, 135, 132, 133, 59, 138, 58, 57, 139, 136, 56, 137, 55, 64, 65, 62, 63, 60, 131, 130, 61, 49, 143, 48, 144, 145, 146, 45, 147, 44, 148, 47, 149, 46, 51, 52, 53, 54, 140, 142, 141, 50, 109, 108, 107, 106, 105, 104, 103, 99, 102, 101, 100, 98, 97, 96, 95, 94, 93, 92, 91, 90, 88, 89, 195, 194, 197, 196, 191, 190, 193, 192, 198, 199, 230, 239, 232, 231, 234, 233, 236, 235, 238, 237]
I możesz użyć liczby partycji w kodzie konsumenta.
def getNumPartitions(topic: String): Int = {
val zk = CuratorFrameworkFactory.newClient(zkHostList, new RetryNTimes(5, 1000))
zk.start()
var numPartitions: Int = 0
val topicPartitionsPath = zkPath + "/brokers/topics/" + topic + "/partitions"
if (zk.checkExists().forPath(topicPartitionsPath) != null) {
try {
val brokerIdList = zk.getChildren().forPath(topicPartitionsPath).asScala
numPartitions = brokerIdList.length.toInt
} catch {
case e: Exception => {
e.printStackTrace()
}
}
}
zk.close()
numPartitions
}
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2019-08-05 11:26:09
Aby uzyskać listę partycji, idealnym / faktycznym sposobem jest użycie API AdminClients
Properties properties=new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
AdminClient adminClient=KafkaAdminClient.create(properties);
Map<String, TopicDescription> jension = adminClient.describeTopics(Collections.singletonList("jenison")).all().get();
System.out.println(jension.get("jenison").partitions().size());
Może być uruchomiona jako samodzielna metoda java bez zależności producenta/konsumenta.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2020-02-24 10:44:27
Żadna z odpowiedzi nie dostarczyła szybkiego łatwego sposobu liczenia wszystkich partycji dla danego tematu regex. W moim przypadku musiałem sprawdzić, ile partycji znajduje się w moim klastrze, w tym repliki do celów wymiarowych.
Poniżej znajduje się polecenie bash, które możesz uruchomić (bez dodatkowych narzędzi):
kafka-topics --describe --bootstrap-server broker --topic ".*" | grep Configs | awk '{printf "%d\n", $4*$6}' | awk '{s+=$1} END {print s}'
Możesz dostosować regex tematu, zamieniając .*
regex na cokolwiek chcesz. Upewnij się również, że zmienisz broker
na brokera adres.
Szczegóły:
- Stream
kafka-topics
opisz dane wyjściowe dla podanych interesujących tematów - Wyodrębnij tylko pierwszą linię dla każdego tematu, która zawiera liczbę partycji i współczynnik replikacji
- mnożenie partycji przez ReplicationFactor, aby uzyskać całkowite partycje dla tematu
- Sumuj wszystkie liczby i drukuj razem
Bonus:
Jeśli masz zainstalowany docker nie musisz pobierać pliku binarnego Kafka:
docker run -it confluentinc/cp-kafka:6.0.0 /bin/bash
Wtedy możesz uruchom to, aby uzyskać dostęp do wszystkich skryptów Kafka:
cd /usr/bin
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2021-01-14 20:28:39