Jak mogę wysyłać duże wiadomości z Kafka (powyżej 15MB)?

Wysyłam String-messages do Kafka V. 0. 8 za pomocą Java Producer API. Jeśli rozmiar wiadomości wynosi około 15 MB dostaję MessageSizeTooLargeException. Próbowałem ustawić message.max.bytesna 40 MB, ale nadal dostaję wyjątek. Małe wiadomości działały bez problemów.

(wyjątek pojawia się u producenta, nie mam konsumenta w tej aplikacji.)

Co mogę zrobić, aby pozbyć się tego wyjątku?

My example producer config

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

Error-Log:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
Author: Martin Thoma, 2014-01-09

5 answers

Musisz dostosować trzy (lub cztery) właściwości:

  • Consumer side: fetch.message.max.bytes - to określi największy rozmiar wiadomości, która może zostać pobrana przez konsumenta.
  • Broker side: replica.fetch.max.bytes - pozwoli to replikom w brokerach wysyłać wiadomości w klastrze i upewnić się, że wiadomości są replikowane poprawnie. Jeśli jest to zbyt małe, wiadomość nigdy nie zostanie replikowana, a zatem konsument nigdy nie zobaczy wiadomości, ponieważ wiadomość nigdy nie będzie być zaangażowana (w pełni zreplikowana).
  • Strona brokera: message.max.bytes - jest to największy rozmiar wiadomości, jaki może otrzymać broker od producenta.
  • Strona brokera (na temat): max.message.bytes - jest to największy rozmiar wiadomości, którą broker pozwoli dołączyć do tematu. Ten rozmiar jest sprawdzany przed kompresją. (Domyślnie brokera message.max.bytes.)

Dowiedziałem się ciężko o numerze 2 - nie dostajesz żadnych WYJĄTKÓW, wiadomości lub ostrzeżeń od Kafki, więc upewnij się, że rozważ to podczas wysyłania dużych wiadomości.

 106
Author: laughing_man,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-05-20 18:34:27

Drobne zmiany wymagane dla Kafka 0.10 i Nowy konsument w porównaniu do laughing_man ' s odpowiedź :

  • Broker: bez zmian, trzeba jeszcze zwiększyć właściwości message.max.bytes i replica.fetch.max.bytes. message.max.bytes musi być równe lub mniejsze (*) niż replica.fetch.max.bytes.
  • Producent: Increase max.request.size Aby wysłać większą wiadomość.
  • konsument: zwiększ max.partition.fetch.bytes, aby otrzymywać większe wiadomości.

(*) Przeczytaj komentarze, aby dowiedzieć się więcej o message.max.bytesreplica.fetch.max.bytes

 28
Author: Sascha Vetter,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 10:31:16

Musisz nadpisać następujące właściwości:

Broker Configs ($KAFKA_HOME/config / server.właściwości)

    Replika.aport.max.bajty
  • wiadomość.max.bajty

Consumer Configs ($KAFKA_HOME/config/consumer.właściwości)
ten krok nie zadziałał. Dodałem go do aplikacji konsumenckiej i działał dobrze

    Aport.wiadomość.max.bajty

Uruchom ponownie serwer.

Spójrz na tę dokumentację więcej informacji: http://kafka.apache.org/08/configuration.html

 9
Author: user2550587,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-02-17 09:56:58

Chodzi o to, aby wiadomość o takiej samej wielkości była wysyłana od producenta Kafki do maklera Kafki, a następnie odbierana przez Kafka Consumer tj.

Kafka producent -- > Kafka Broker -- > Kafka Consumer

Załóżmy, że warunkiem jest wysłanie 15MB wiadomości, to producent, pośrednik i konsument, wszystkie trzy, muszą być zsynchronizowane.

Producent Kafka wysyła 15 MB --> Kafka Broker pozwala / sklepy 15 MB --> Kafka Consumer otrzymuje 15 MB

Dlatego ustawienie powinno być A.) Na Brokerze: wiadomość.max.bajtów=15728640 replika.aport.max.bajtów=15728640

B.) O Konsumencie: aport.wiadomość.max.bajtów=15728640

 7
Author: Ravi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-09-11 06:43:42

Należy pamiętać, że atrybut message.max.bytesmusi być zsynchronizowany z właściwością fetch.message.max.bytes konsumenta. rozmiar pobierania musi być co najmniej tak duży, jak maksymalny rozmiar wiadomości, w przeciwnym razie może dojść do sytuacji, w której producenci mogą wysyłać wiadomości większe niż konsument może konsumować/pobierać. Warto się temu przyjrzeć.
Jakiej wersji Kafki używasz? Podaj również więcej szczegółów, które otrzymujesz. jest coś takiego ... payload size of xxxx larger than 1000000 coming up in the log?

 5
Author: user2720864,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-01-14 10:14:03