Jak mogę wysyłać duże wiadomości z Kafka (powyżej 15MB)?
Wysyłam String-messages do Kafka V. 0. 8 za pomocą Java Producer API.
Jeśli rozmiar wiadomości wynosi około 15 MB dostaję MessageSizeTooLargeException
.
Próbowałem ustawić message.max.bytes
na 40 MB, ale nadal dostaję wyjątek. Małe wiadomości działały bez problemów.
(wyjątek pojawia się u producenta, nie mam konsumenta w tej aplikacji.)
Co mogę zrobić, aby pozbyć się tego wyjątku?
My example producer config
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
Error-Log:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
5 answers
Musisz dostosować trzy (lub cztery) właściwości:
- Consumer side:
fetch.message.max.bytes
- to określi największy rozmiar wiadomości, która może zostać pobrana przez konsumenta. - Broker side:
replica.fetch.max.bytes
- pozwoli to replikom w brokerach wysyłać wiadomości w klastrze i upewnić się, że wiadomości są replikowane poprawnie. Jeśli jest to zbyt małe, wiadomość nigdy nie zostanie replikowana, a zatem konsument nigdy nie zobaczy wiadomości, ponieważ wiadomość nigdy nie będzie być zaangażowana (w pełni zreplikowana). - Strona brokera:
message.max.bytes
- jest to największy rozmiar wiadomości, jaki może otrzymać broker od producenta. - Strona brokera (na temat):
max.message.bytes
- jest to największy rozmiar wiadomości, którą broker pozwoli dołączyć do tematu. Ten rozmiar jest sprawdzany przed kompresją. (Domyślnie brokeramessage.max.bytes
.)
Dowiedziałem się ciężko o numerze 2 - nie dostajesz żadnych WYJĄTKÓW, wiadomości lub ostrzeżeń od Kafki, więc upewnij się, że rozważ to podczas wysyłania dużych wiadomości.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-05-20 18:34:27
Drobne zmiany wymagane dla Kafka 0.10 i Nowy konsument w porównaniu do laughing_man ' s odpowiedź :
- Broker: bez zmian, trzeba jeszcze zwiększyć właściwości
message.max.bytes
ireplica.fetch.max.bytes
.message.max.bytes
musi być równe lub mniejsze (*) niżreplica.fetch.max.bytes
. - Producent: Increase
max.request.size
Aby wysłać większą wiadomość. - konsument: zwiększ
max.partition.fetch.bytes
, aby otrzymywać większe wiadomości.
(*) Przeczytaj komentarze, aby dowiedzieć się więcej o message.max.bytes
replica.fetch.max.bytes
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-05-23 10:31:16
Musisz nadpisać następujące właściwości:
Broker Configs ($KAFKA_HOME/config / server.właściwości)
- Replika.aport.max.bajty
- wiadomość.max.bajty
Consumer Configs ($KAFKA_HOME/config/consumer.właściwości)
ten krok nie zadziałał. Dodałem go do aplikacji konsumenckiej i działał dobrze
- Aport.wiadomość.max.bajty
Uruchom ponownie serwer.
Spójrz na tę dokumentację więcej informacji: http://kafka.apache.org/08/configuration.html
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-02-17 09:56:58
Chodzi o to, aby wiadomość o takiej samej wielkości była wysyłana od producenta Kafki do maklera Kafki, a następnie odbierana przez Kafka Consumer tj.
Kafka producent -- > Kafka Broker -- > Kafka Consumer
Załóżmy, że warunkiem jest wysłanie 15MB wiadomości, to producent, pośrednik i konsument, wszystkie trzy, muszą być zsynchronizowane.
Producent Kafka wysyła 15 MB --> Kafka Broker pozwala / sklepy 15 MB --> Kafka Consumer otrzymuje 15 MB
Dlatego ustawienie powinno być A.) Na Brokerze: wiadomość.max.bajtów=15728640 replika.aport.max.bajtów=15728640
B.) O Konsumencie: aport.wiadomość.max.bajtów=15728640
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-09-11 06:43:42
Należy pamiętać, że atrybut message.max.bytes
musi być zsynchronizowany z właściwością fetch.message.max.bytes
konsumenta. rozmiar pobierania musi być co najmniej tak duży, jak maksymalny rozmiar wiadomości, w przeciwnym razie może dojść do sytuacji, w której producenci mogą wysyłać wiadomości większe niż konsument może konsumować/pobierać. Warto się temu przyjrzeć.
Jakiej wersji Kafki używasz? Podaj również więcej szczegółów, które otrzymujesz. jest coś takiego ... payload size of xxxx larger
than 1000000
coming up in the log?
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2014-01-14 10:14:03