I am trying to push data from Flume channels to Kafka cluster using Kafka sink and i can see related data into associated topic but simultaneously i am observing below mentioned exception trace in Kafka logs too frequently ,
[2017-03-21 16:47:56,250] WARN Unexpected error from /10.X.X.X; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 30662099 larger than 30662028)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:91)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
at kafka.network.Processor.run(SocketServer.scala:413)
at java.lang.Thread.run(Thread.java:745)
Initial analysis led me to my Flume logs and observed below exception trace in it ,
21 Mar 2017 16:25:32,560 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:229)
... 3 more
from the first stack trace it seems that Flume is trying to push data of size 30662099 bytes but msg accepting limit of Kafka broker is limited to 30662028 bytes .
i have kept similar message sending and receiving size on producer (Flume) and Broker (Kafka) i.e. 30662028 , i am concerned about if my Flume is sending only 30662028 bytes then what are these extra bytes which is accumulating with my producers's message and forming the final message of size 30662099 and causing this message dropping.
Any help will be really appreciable !!