org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 30662099 larger than 30662028)
Asked Answered
U

0

10

I am trying to push data from Flume channels to Kafka cluster using Kafka sink and i can see related data into associated topic but simultaneously i am observing below mentioned exception trace in Kafka logs too frequently ,

[2017-03-21 16:47:56,250] WARN Unexpected error from /10.X.X.X; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 30662099 larger than 30662028)
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:91)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
        at kafka.network.Processor.run(SocketServer.scala:413)
        at java.lang.Thread.run(Thread.java:745)  

Initial analysis led me to my Flume logs and observed below exception trace in it ,

21 Mar 2017 16:25:32,560 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158)  - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
        at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:229)
        ... 3 more

from the first stack trace it seems that Flume is trying to push data of size 30662099 bytes but msg accepting limit of Kafka broker is limited to 30662028 bytes .

i have kept similar message sending and receiving size on producer (Flume) and Broker (Kafka) i.e. 30662028 , i am concerned about if my Flume is sending only 30662028 bytes then what are these extra bytes which is accumulating with my producers's message and forming the final message of size 30662099 and causing this message dropping.

Any help will be really appreciable !!

Unwholesome answered 21/3, 2017 at 13:17 Comment(2)
Looking at some posts online from other user groups it looks like this could be a mismatched protocol issue (either some parts speaking SSL and other not, or some using an older kafka protocol than others).Shitty
Is this possibly a matter of a little overhead? Did you try if reducing the flume limit or increasing the Kafka limit by a few hundred bytes made a difference? Also in general, if you found the answer, please post it below. If you think this is not likely to be relevant for future visitors anymore (e.g. because of old versions) consider closing the question.Fusible

© 2022 - 2024 — McMap. All rights reserved.