Kafka consumer pobieranie metadanych dla tematów nie powiodło się

Próbuję napisać klienta Javy dla serwerów Kafka i ZooKeeper stron trzecich. Jestem w stanie wymienić i opisać tematy, ale kiedy próbuję przeczytać dowolny, pojawia się ClosedChannelException. Odtwarzam je tutaj za pomocą klienta linii poleceń.

$ bin/kafka-console-consumer.sh --zookeeper 255.255.255.255:2181 --topic eventbustopic
[2015-06-02 16:23:04,375] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:1,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException                                       
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)           
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)        
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)                
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)        
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)        
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)         
[2015-06-02 16:23:04,515] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:0,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException                                       
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)           
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)        
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)                
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)        
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)        
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)         

Alternatywne polecenia:

$ bin/kafka-topics.sh --describe --zookeeper 255.255.255.255:2181 --topic eventbustopic
Topic:eventbustopic   PartitionCount:2        ReplicationFactor:1     Configs:
    Topic: eventbustopic  Partition: 0    Leader: 1       Replicas: 1     Isr: 1
    Topic: eventbustopic  Partition: 1    Leader: 0       Replicas: 0     Isr: 0

$ bin/kafka-topics.sh --list --zookeeper 255.255.255.255:2181 --topic eventbustopic
eventbustopic

(IP zostały zredagowane i zastąpione przez 255.255.255.255)

Kiedy wygoogluję ten wyjątek, widzę problemy po stronie producenta-rzeczywiście, źródło ClientUtils.fetchTopicMetadata sugeruje, że jest to głównie używane przez producentów.

Jedną z moich obaw jest to, że może to być produkt układu sieci: pakiety są zniekształcone przez Haproxy i wysyłane przez VPN.

Co dokładnie jest w pracy Tutaj?

Author: Carson Ip, 2015-06-02

4 answers

Broker informuje Klienta, która Nazwa hosta powinna być używana do generowania / konsumowania wiadomości. Domyślnie Kafka używa nazwy hosta systemu, na którym działa. Jeśli ta nazwa hosta nie może zostać rozwiązana przez klienta, otrzymasz ten wyjątek.

Możesz spróbować ustawić advertised.host.name w konfiguracji Kafka na nazwę/adres hosta, którego klienci powinni używać.

 40
Author: Johannes Barop,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-06-10 11:10:12

Oto mój sposób na rozwiązanie tego problemu:

  1. Uruchom bin/kafka-server-stop.sh, aby przestać działać na serwerze kafka.
  2. zmodyfikuj plik właściwości {[1] } dodając linię: listeners=PLAINTEXT://{ip.of.your.kafka.server}:9092
  3. restart serwera kafka.

Ponieważ bez ustawienia lisenera, kafka użyje java.net.InetAddress.getCanonicalHostName(), aby uzyskać adres, na którym nasłuchuje serwer socket.

 9
Author: Charles.Zhu,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-07-27 03:45:12

Masz problem z Zookeeperem. 255.255.255.255:2181 nie jest poprawnym adresem Zookeeper; jest to adres broadcast w Twojej sieci lub maska podsieci. Aby wszystko działało, znajdź adres IP lub nazwę hosta komputera z uruchomionym programem Zookeeper.

 1
Author: Douglas Lopez,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-07-25 02:14:52

Natknąłem się na ten błąd na AWS. Problem polegał na tym, że byłem zbyt restrykcyjny w grupie zabezpieczeń i ustawiłem porty 2181 i 9092 na "moje IP". Oznaczało to, że instancja kafka nie mogła znaleźć ZK działającego na tym samym pudełku.

Rozwiązanie-otwórz-trochę.

 0
Author: ethrbunny,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-08-07 23:16:02