confluent-kafka-python: how to timeout initial connection when Broker is not available?
Asked Answered
O

0

6

I am using confluent-kafka-python and see that it hangs infinitely when I try to connect to a broker which is down. I can't seem to apply any time out settings I found in the docs:

from confluent_kafka import Consumer

conf = {'bootstrap.servers': f"{self.host}:{self.port}",
           'group.id': "foo",
           'auto.offset.reset': 'smallest',
           'socket.timeout.ms':'2000', 'socket.max.fails':2,
           'metadata.request.timeout.ms': 5000,
           'reconnect.backoff.max.ms':'5000',
           'api.version.request.timeout.ms':'5000',
           #api.version.fallback.ms
           'session.timeout.ms':'2000',
           #heartbeat.interval.ms
           'coordinator.query.interval.ms':'1000',
           #max.poll.interval.ms
           #auto.commit.interval.ms,
           "debug":"generic, broker, topic, metadata",

   }

try:
     self.consumer = Consumer(conf)

and I get in the log:

%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: broker in state TRY_CONNECT connecting
%7|1584702589.065|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1584702589.065|BROADCAST|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: Broadcasting state change
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Connecting to ipv4#x.x.x.x:6667 (plaintext) with socket 11
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:app]: Cluster connection already in progress: application metadata request
%7|1584702589.066|CONNECT|rdkafka#consumer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: application metadata request
%7|1584702589.067|BROKERFAIL|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%7|1584702589.067|FAIL|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Connect to ipv4#x.x.x.x:6667 failed: Connection refused (after 1ms in state CONNECT) 
%7|1584702589.067|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state CONNECT -> DOWN
%7|1584702589.067|BROADCAST|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: Broadcasting state change
%7|1584702589.067|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state DOWN -> INIT

and it just goes on and on, and even Ctrl-C can not stop it, I have to use kill.

I have previously used kafka-python and it did fail when could not connect to a broker.

Is there a setting that can make confluent-kafka-python fail when broker is not available?

I have seen this question Kafka Streams broker connection timeout setting and wanted to make sure, is my situation the same?

Update: Another related issue: On a dual-homed node, if I configure advertized.listeners to an internal address, and try to send message from an external interface, I again face an infinite loop:

%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1584727507.340|NODENAME|rdkafka#consumer-1| [thrd:main]: GroupCoordinator: Broker nodename changed from "" to "de-cn1:6667"
%7|1584727507.340|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Received CONNECT op
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|BROKERFAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker node update: (errno: Success)
%7|1584727507.340|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 0ms in state TRY_CONNECT)
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> DOWN
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1001: broker in state TRY_CONNECT connecting
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.359|BROKERFAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Host resolution failure: (errno: Bad address)
%7|1584727507.360|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> DOWN

So it can not resolve an internal name de-cn1 and doesn't fail. Same question how to make it fail on such errors?

Oliana answered 20/3, 2020 at 11:28 Comment(2)
The constructor, Consumer(), will not hang, it will return immediately and broker connections will be attempted in background threads.Alberic
Temporary errors, such as connection failures, are automatically retried by the underlying client and will not raise an error. You can set up an error callback to see what errors are seen, but these are typically informational since the client will retry indefinitely. Instead of trying to deal with the myriad of possible cluster errors it is recommended to error out on timeout instead, i.e., if your consumer has not received an assignment or a message in X minutes there is probably something wrong. Reacting drastically to each error in distributed systems is counter productive.Alberic

© 2022 - 2025 — McMap. All rights reserved.