org.apache.kafka.common.errors.RecordTooLargeException in Flume Kafka Sink
Asked Answered
K

2

2

I am trying to read data from JMS source and pushing them into KAFKA topic, while doing that after few hours i observed that pushing frequency to the KAFKA topic became almost zero and after some initial analysis i found following exception in FLUME logs .

28 Feb 2017 16:35:44,758 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158)  - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
        at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
        at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
        ... 3 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

my flume shows the current set value (in logs ) for max.request.size as 1048576 , which is clearly very less than 1399305 , increasing this max.request.size may eliminate these exception but am unable to find correct place for updating that value .

My flume.config ,

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.channels.c1.type = file
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 100000000
a1.channels.c1.checkpointDir = /data/flume/apache-flume-1.7.0-bin/checkpoint
a1.channels.c1.dataDirs = /data/flume/apache-flume-1.7.0-bin/data

a1.sources.r1.type = jms

a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = true

a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = some context urls
a1.sources.r1.connectionFactory = some_queue
a1.sources.r1.providerURL = some_url 
#a1.sources.r1.providerURL = some_url
a1.sources.r1.destinationType = QUEUE
a1.sources.r1.destinationName = some_queue_name 
a1.sources.r1.userName = some_user
a1.sources.r1.passwordFile= passwd

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = some_kafka_topic
a1.sinks.k1.kafka.bootstrap.servers = some_URL
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.flumeBatchSize = 1
a1.sinks.k1.channel = c1

Any help will be really appreciated !!

Keishakeisling answered 28/2, 2017 at 11:25 Comment(0)
B
4

This change has to be done at Kafka. Update the Kafka producer configuration file producer.properties with a larger value like

max.request.size=10000000
Bereniceberenson answered 28/2, 2017 at 11:33 Comment(5)
I am using FLUME where it uses producer lib of KAFKA to push the messages on Topics , but i cant see it as configurable iin FLUME ; Do i need to change any hard coded value into producer classes ?Keishakeisling
@RiteshSharma Are you saying that you do not have Kafka installed on the server?Bereniceberenson
Actually this "max.request.size" issue is coming for FLUME where am using kafka sink to push data on kafka brokers , so basically FLUME uses kafka producer libraries (kafka sink) to push data on kafka brokers ; FLUME doesn't provide any dedicated configuration file as "producer.properties" you need to update kafka producer properties in FLUME configuration only .Keishakeisling
Yes, Flume uses Kafka. So my solution was to change this property at Kafka level, as the property max.request.size belongs to kafka producer, which in turn be applied for Flume!Bereniceberenson
Yeah , Thanks for help !!Keishakeisling
K
2

It seems like i have resolved my issue ; As suspected increasing the max.request.size eliminated the exception , for updating such kafka sink(producer) properties FLUME provides the constant prefix as kafka.producer. and we can append this constant prefix with any kafka properties ;

so mine goes as, a1.sinks.k1.kafka.producer.max.request.size = 5271988 .

Keishakeisling answered 1/3, 2017 at 7:47 Comment(1)
Wow. Never knew this is possible!Bereniceberenson

© 2022 - 2024 — McMap. All rights reserved.