Extremely slow startup of a Spring Cloud Stream Kafka application when using enable.idempotence true
Asked Answered
S

1

6

My Scs application has two Kafka producers with this configuration:

spring:
  cloud:
    function:
      definition: myProducer1;myProducer2
    stream:
      bindings:
        myproducer1-out-0:
          destination: topic1
          producer:
            useNativeEncoding: true
        myproducer2-out-0:
          destination: topic2
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: ${kafka.brokers:localhost}
          min-partition-count: 3
          replication-factor: 3
          producerProperties:
            enable:
              idempotence: false
            retries: 10000
            acks: all
            key:
              serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
              subject:
                name:
                  strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
            value:
              serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
              subject:
                name:
                  strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
            schema:
              registry:
                url: ${schema-registry.url:http://localhost:8081}

It starts in about ~10 seconds:

 o.s.c.s.m.DirectWithAttributesChannel    : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
 o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8084
 e.p.i.m.MyAppApplicationKt     : Started MyAppApplicationKt in 11.288 seconds (JVM running for 11.868)

I need my producers to be idempotent so I set enabled.idempotence: true. With this change the startup time is 7x slower (sometimes even more than 10x):

 o.s.c.s.m.DirectWithAttributesChannel    : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
 o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8084
 e.p.i.m.MyAppApplicationKt     : Started MyAppApplicationKt in 71.489 seconds (JVM running for 72.127)

How can I speed up the startup?

UPDATE:

I've found a problem during the startup (Proceeding to force close the producer since pending requests could not be completed within timeout 30000 ms.), sometimes it happens in one of the producers, others in both and others in none of them. When it doesn't show up, the startup is as fast as it used to be.

In the following log, it happens only in one producer:

o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
o.a.k.c.s.authenticator.AbstractLogin    : Successfully logged in.
o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1586864007183
org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: lkc-nvqmv
o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 32029 with epoch 0

Then after having been stuck for 30 seconds in ProducerId set to 32029 with epoch 0, it logs the info message of Proceeding to force close...and initializes the second producer without any problems:

o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Proceeding to force close the producer since pending 
o.s.c.s.m.DirectWithAttributesChannel    : Channel 'my-app-1.myproducer1-out-0' has 1 subscriber(s).
o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: topic2
o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values: 
...
o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-2] Instantiated an idempotent producer.
o.a.k.c.s.authenticator.AbstractLogin    : Successfully logged in.
o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1586864038612
org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Cluster ID: lkc-nvqmv
o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 30000 ms.
o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2] ProducerId set to 32030 with epoch 0
o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-2] Proceeding to force close the producer since pending 
o.s.c.s.m.DirectWithAttributesChannel    : Channel 'my-app-1.myproducer2-out-0' has 1 subscriber(s).
o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8084
e.p.i.m.MetricsIngestorApplicationKt     : Started MetricsIngestorApplicationKt in 66.834 seconds (JVM running for 67.544)

UPDATE 2:

I've debugged the logic behind this, it happends during the doBindProducer() method. It gets the partitions for the topic, for which it creates a ProducerFactory in KafkaMessageChannelBinder.

    @Override
    protected MessageHandler createProducerMessageHandler(
            final ProducerDestination destination,
            ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
            MessageChannel channel, MessageChannel errorChannel) throws Exception {
        /*
         * IMPORTANT: With a transactional binder, individual producer properties for
         * Kafka are ignored; the global binder
         * (spring.cloud.stream.kafka.binder.transaction.producer.*) properties are used
         * instead, for all producers. A binder is transactional when
         * 'spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix' has text.
         */
        final ProducerFactory<byte[], byte[]> producerFB = this.transactionManager != null
                ? this.transactionManager.getProducerFactory()
                : getProducerFactory(null, producerProperties);
        Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(
                producerProperties.getPartitionCount(), false, () -> {
                    Producer<byte[], byte[]> producer = producerFB.createProducer();
                    List<PartitionInfo> partitionsFor = producer
                            .partitionsFor(destination.getName());
                    producer.close();
                    if (this.transactionManager == null) {
                        ((DisposableBean) producerFB).destroy();
                    }
                    return partitionsFor;
                }, destination.getName());

After retrieving correctly this list List<PartitionInfo> partitionsFor, it gets stuck in KafkaProducer.destroy() until the 30 seconds timeout expires:

enter image description here

Why does it block there? Could it be a bug of the binder?

Solothurn answered 14/4, 2020 at 10:33 Comment(4)
instead of closing the underlying producer, did you try reset() and then destroy() on the bean? from the docs, reset() will: Close the Producer(s) and clear the cache of transactional Producer(s). From what I understand, close() will be called from reset/ destroy(), so there may be not need for it... just a guessFollower
I'm not closing manually the producer, it's the binder as you can see in the stacktraceSolothurn
@Solothurn could you solve the issue? if not, did you create a bug or asked the guys on Gitter?Aesthetics
No, I could't solve it. I guess I could ask on Gitter...Solothurn
B
0

I am not sure why the close is timing out, but you should be able to configure that timeout.

Please open an issue against the binder; it currently does not support reducing the close timeout from its default (30 seconds).

Bunnie answered 7/5, 2020 at 13:14 Comment(1)
Done (github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/…). Thanks GarySolothurn

© 2022 - 2025 — McMap. All rights reserved.