I am using confluent-kafka-python
and see that it hangs infinitely when I try to connect to a broker which is down. I can't seem to apply any time out settings I found in the docs:
from confluent_kafka import Consumer
conf = {'bootstrap.servers': f"{self.host}:{self.port}",
'group.id': "foo",
'auto.offset.reset': 'smallest',
'socket.timeout.ms':'2000', 'socket.max.fails':2,
'metadata.request.timeout.ms': 5000,
'reconnect.backoff.max.ms':'5000',
'api.version.request.timeout.ms':'5000',
#api.version.fallback.ms
'session.timeout.ms':'2000',
#heartbeat.interval.ms
'coordinator.query.interval.ms':'1000',
#max.poll.interval.ms
#auto.commit.interval.ms,
"debug":"generic, broker, topic, metadata",
}
try:
self.consumer = Consumer(conf)
and I get in the log:
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: broker in state TRY_CONNECT connecting
%7|1584702589.065|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1584702589.065|BROADCAST|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: Broadcasting state change
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Connecting to ipv4#x.x.x.x:6667 (plaintext) with socket 11
%7|1584702589.065|CONNECT|rdkafka#consumer-1| [thrd:app]: Cluster connection already in progress: application metadata request
%7|1584702589.066|CONNECT|rdkafka#consumer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 49ms: application metadata request
%7|1584702589.067|BROKERFAIL|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: failed: err: Local: Broker transport failure: (errno: Connection refused)
%7|1584702589.067|FAIL|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Connect to ipv4#x.x.x.x:6667 failed: Connection refused (after 1ms in state CONNECT)
%7|1584702589.067|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state CONNECT -> DOWN
%7|1584702589.067|BROADCAST|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: Broadcasting state change
%7|1584702589.067|STATE|rdkafka#consumer-1| [thrd:x.x.x.x:6667/bootstrap]: x.x.x.x:6667/bootstrap: Broker changed state DOWN -> INIT
and it just goes on and on, and even Ctrl-C can not stop it, I have to use kill
.
I have previously used kafka-python
and it did fail when could not connect to a broker.
Is there a setting that can make confluent-kafka-python
fail when broker is not available?
I have seen this question Kafka Streams broker connection timeout setting and wanted to make sure, is my situation the same?
Update: Another related issue: On a dual-homed node, if I configure advertized.listeners to an internal address, and try to send message from an external interface, I again face an infinite loop:
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1584727507.340|NODENAME|rdkafka#consumer-1| [thrd:main]: GroupCoordinator: Broker nodename changed from "" to "de-cn1:6667"
%7|1584727507.340|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Received CONNECT op
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|BROKERFAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Broker node update: (errno: Success)
%7|1584727507.340|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 0ms in state TRY_CONNECT)
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> DOWN
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.340|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1001: broker in state TRY_CONNECT connecting
%7|1584727507.340|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1584727507.340|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1584727507.359|BROKERFAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: failed: err: Local: Host resolution failure: (errno: Bad address)
%7|1584727507.360|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> DOWN
So it can not resolve an internal name de-cn1
and doesn't fail. Same question how to make it fail on such errors?