Kafka: Get broker host from ZooKeeper

Ze szczególnych powodów muszę używać zarówno - ConsumerGroup (Alias konsument wysokiego szczebla) jak i SimpleConsumer (Alias konsument niskiego szczebla), aby czytać z Kafki. Dla ConsumerGroup używam konfiguracji bazującej na Zookeeperze i jestem z niej całkowicie zadowolony, ale SimpleConsumer wymaga tworzenia instancji seed brokerów.

[6]} nie chcę prowadzić listy zarówno opiekunów ZooKeeper jak i broker. Dlatego szukam sposobu, aby automatycznie odkryć brokerów dla konkretnego tematu od ZooKeeper.

Ponieważ niektóre pośrednie informacje i przekonanie , że dane te są przechowywane w ZooKeeper pod jedną z następujących ścieżek:

  • /brokers/topics/<topic>/partitions/<partition-id>/state
  • /brokerzy / ids /

Jednak, gdy próbuję odczytać dane z tych węzłów, dostaję błąd serializacji (używam do tego com.101tec.zkclient):

Org.I0Itec.zkclient.wyjątek.ZkMarshallingError: java. io. StreamCorruptedException: invalid stream header: 7B226A6D na org.I0Itec.zkclient.serializuj.SerializableSerializer.deserialize (SerializableSerializer.Java:37) w org.I0Itec.zkclient.ZkClient.derializable (ZkClient.Java: 740) w org.I0Itec.zkclient.ZkClient.readData (ZkClient.wyswietlen: 773) w org.I0Itec.zkclient.ZkClient.readData (ZkClient.Java:761) w org.I0Itec.zkclient.ZkClient.readData (ZkClient.Java: 750) w org.I0Itec.zkclient.ZkClient.readData (ZkClient.Java: 744) ... 64,00 zł Spowodowane przez: java. io. StreamCorruptedException: invalid nagłówek strumienia: 7B226A6D w java. io. ObjectInputStream. readStreamHeader (ObjectInputStream.java:804) w java. io. ObjectInputStream. (ObjectInputStream. java:299) w org.I0Itec.zkclient.serializuj.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java: 30) w org.I0Itec.zkclient.serializuj.SerializableSerializer.deserialize (SerializableSerializer.java:31) ... 69 więcej

Mogę bez problemu pisać i czytać własne obiekty Javy (np. Stringi), więc uważam, że to nie jest to problem klienta, ale raczej skomplikowane kodowanie. Dlatego chcę wiedzieć:

  1. jeśli jest to właściwa droga, Jak prawidłowo odczytać te węzły?
  2. jeśli całe podejście jest złe, co jest właściwe ?
Author: ffriend, 2015-04-07

5 answers

Tak zrobił jeden z moich kolegów, żeby zdobyć listę maklerów Kafki. Myślę, że jest to prawidłowy sposób, gdy chcesz dynamicznie uzyskać listę brokerów.

Oto przykładowy kod, który pokazuje, jak uzyskać listę.

public class KafkaBrokerInfoFetcher {

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
        List<String> ids = zk.getChildren("/brokers/ids", false);
        for (String id : ids) {
            String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
            System.out.println(id + ": " + brokerInfo);
        }
    }
}

Uruchomienie kodu na klastrze składającym się z trzech brokerów skutkuje

1: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
2: {"jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094}
3: {"jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095}
 27
Author: Heejin,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-07-12 14:08:30

Okazuje się, że Kafka używa ZKStringSerializer do odczytu i zapisu danych do znodes. Aby naprawić błąd, musiałem dodać go jako ostatni parametr w konstruktorze ZkClient:

val zkClient = new ZkClient(zkQuorum, Integer.MAX_VALUE, 10000, ZKStringSerializer)

Używając go, napisałem kilka przydatnych funkcji do odkrywania identyfikatorów brokerów, ich adresów i innych rzeczy:

import kafka.utils.Json
import kafka.utils.ZKStringSerializer
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.KafkaException


def listBrokers(): List[Int] = {
  zkClient.getChildren("/brokers/ids").toList.map(_.toInt)
}

def listTopics(): List[String] = {
  zkClient.getChildren("/brokers/topics").toList
}

def listPartitions(topic: String): List[Int] = {
  val path = "/brokers/topics/" + topic + "/partitions"
  if (zkClient.exists(path)) {
    zkClient.getChildren(path).toList.map(_.toInt)
  } else {
    throw new KafkaException(s"Topic ${topic} doesn't exist")
  }
}

def getBrokerAddress(brokerId: Int): (String, Int) = {
  val path = s"/brokers/ids/${brokerId}"
  if (zkClient.exists(path)) {
    val brokerInfo = readZkData(path)
    (brokerInfo.get("host").get.asInstanceOf[String], brokerInfo.get("port").get.asInstanceOf[Int])
  } else {
    throw new KafkaException("Broker with ID ${brokerId} doesn't exist")
  }
}

def getLeaderAddress(topic: String, partitionId: Int): (String, Int) = {
  val path = s"/brokers/topics/${topic}/partitions/${partitionId}/state"
  if (zkClient.exists(path)) {
    val leaderStr = zkClient.readData[String](path)
    val leaderId = Json.parseFull(leaderStr).get.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int]
    getBrokerAddress(leaderId)
  } else {
    throw new KafkaException(s"Topic (${topic}) or partition (${partitionId}) doesn't exist")
  }
}
 14
Author: ffriend,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-07-12 14:07:46

Aby to zrobić używając powłoki:

zookeeper-shell myzookeeper.example.com:2181
ls /brokers/ids
  => [2, 1, 0]
get /brokers/ids/2
get /brokers/ids/1
get /brokers/ids/0 
 4
Author: phayes,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-01-24 19:55:09

W rzeczywistości istnieje ZkUtils z wewnątrz Kafki (przynajmniej dla 0,8.x linii), które możesz użyć z jednym małym zastrzeżeniem: będziesz musiał ponownie zaimplementować ZkStringSerializer, który konwertowałby ciągi znaków jako macierze bajtów zakodowane w UTF-8. Jeśli chcesz używać API strumieniowych Java8, możesz iterację nad kolekcjami Scali throug scala.collection.JavaConversions. To właśnie pomogło w mojej sprawie.

 3
Author: nefo_x,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-01-12 23:25:47
 public KafkaProducer(String zookeeperAddress, String topic) throws IOException,
        KeeperException, InterruptedException {

    this.zookeeperAddress = zookeeperAddress;
    this.topic = topic;

    ZooKeeper zk = new ZooKeeper(zookeeperAddress, 10000, null);
    List<String> brokerList = new ArrayList<String>();

    List<String> ids = zk.getChildren("/brokers/ids", false);
    for (String id : ids) {
        String brokerInfoString = new String(zk.getData("/brokers/ids/" + id, false, null));
        Broker broker = Broker.createBroker(Integer.valueOf(id), brokerInfoString);
        if (broker != null) {
            brokerList.add(broker.connectionString());
        }
    }

    props.put("serializer.class", KAFKA_STRING_ENCODER);
    props.put("metadata.broker.list", String.join(",", brokerList));
    producer = new Producer<String, String>(new ProducerConfig(props));
}
 2
Author: ljp,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-12-15 03:44:12