My Scs application has two Kafka producers with this configuration:
spring:
cloud:
function:
definition: myProducer1;myProducer2
stream:
bindings:
myproducer1-out-0:
destination: topic1
producer:
useNativeEncoding: true
myproducer2-out-0:
destination: topic2
producer:
useNativeEncoding: true
kafka:
binder:
brokers: ${kafka.brokers:localhost}
min-partition-count: 3
replication-factor: 3
producerProperties:
enable:
idempotence: false
retries: 10000
acks: all
key:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
schema:
registry:
url: ${schema-registry.url:http://localhost:8081}
It starts in about ~10 seconds:
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8084
e.p.i.m.MyAppApplicationKt : Started MyAppApplicationKt in 11.288 seconds (JVM running for 11.868)
I need my producers to be idempotent so I set enabled.idempotence: true
. With this change the startup time is 7x slower (sometimes even more than 10x):
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8084
e.p.i.m.MyAppApplicationKt : Started MyAppApplicationKt in 71.489 seconds (JVM running for 72.127)
How can I speed up the startup?
UPDATE:
I've found a problem during the startup (Proceeding to force close the producer since pending requests could not be completed within timeout 30000 ms.
), sometimes it happens in one of the producers, others in both and others in none of them. When it doesn't show up, the startup is as fast as it used to be.
In the following log, it happens only in one producer:
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
o.a.k.c.s.authenticator.AbstractLogin : Successfully logged in.
o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1586864007183
org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: lkc-nvqmv
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 32029 with epoch 0
Then after having been stuck for 30 seconds in ProducerId set to 32029 with epoch 0
, it logs the info message of Proceeding to force close...
and initializes the second producer without any problems:
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Proceeding to force close the producer since pending
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer1-out-0' has 1 subscriber(s).
o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: topic2
o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
...
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2] Instantiated an idempotent producer.
o.a.k.c.s.authenticator.AbstractLogin : Successfully logged in.
o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.3.1
o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 18a913733fb71c01
o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1586864038612
org.apache.kafka.clients.Metadata : [Producer clientId=producer-2] Cluster ID: lkc-nvqmv
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 30000 ms.
o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] ProducerId set to 32030 with epoch 0
o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-2] Proceeding to force close the producer since pending
o.s.c.s.m.DirectWithAttributesChannel : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8084
e.p.i.m.MetricsIngestorApplicationKt : Started MetricsIngestorApplicationKt in 66.834 seconds (JVM running for 67.544)
UPDATE 2:
I've debugged the logic behind this, it happends during the doBindProducer()
method. It gets the partitions for the topic, for which it creates a ProducerFactory in KafkaMessageChannelBinder
.
@Override
protected MessageHandler createProducerMessageHandler(
final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
MessageChannel channel, MessageChannel errorChannel) throws Exception {
/*
* IMPORTANT: With a transactional binder, individual producer properties for
* Kafka are ignored; the global binder
* (spring.cloud.stream.kafka.binder.transaction.producer.*) properties are used
* instead, for all producers. A binder is transactional when
* 'spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix' has text.
*/
final ProducerFactory<byte[], byte[]> producerFB = this.transactionManager != null
? this.transactionManager.getProducerFactory()
: getProducerFactory(null, producerProperties);
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
producerProperties.getPartitionCount(), false, () -> {
Producer<byte[], byte[]> producer = producerFB.createProducer();
List<PartitionInfo> partitionsFor = producer
.partitionsFor(destination.getName());
producer.close();
if (this.transactionManager == null) {
((DisposableBean) producerFB).destroy();
}
return partitionsFor;
}, destination.getName());
After retrieving correctly this list List<PartitionInfo> partitionsFor
, it gets stuck in KafkaProducer.destroy() until the 30 seconds timeout expires:
Why does it block there? Could it be a bug of the binder?