I think the accepted answer here is not exactly accurate, so here is my take on this:
You can just add a condition and if it is met you can break the for loop:
for message in consumer:
if condition:
break
In your case, you want to stop when all messages are consumed, so you have to find a way to tell the consumer that all messages have arrived.
For example, you could produce a message which could have that information in it, and then your condition would be checking if the message consumed is the one reporting all messages have arrived.
Another example that was mentioned here before is just assuming that if no message arrives for a certain amount of time (1 second was suggested here, but maybe a few more seconds at least might be better) that means that there are no more messages coming.
The way I did it was by checking if all IDs that I had received are accounted for at least once (to avoid duplicates) but that requires you know exactly what you're receiving and some more logic that is probably beyond the scope of this question, but I found it to be a very useful and elegant way to determine how to stop consuming, here is some of the code you would need for that:
sum = 0
data = {
0: None,
1: None,
2: None,
3: None
}
for message in consumer:
payload = message.value
unique_id = payload["unique_id"]
if data[unique_id] is None:
data[unique_id] = payload
sum += 1
if len(data) == sum:
break
a much easier way if you know how many messages you will be consuming is to use enumerate like this:
amount_of_messages_to_be_consumed = 40 # as an example 40
for index, message in enumerate(consumer):
if index == amount_of_messages_to_be_consumed:
break
of course, after you break out of the for loop you can and should close the consumer (but you were probably just stuck on getting out of the endless for loop...):
consumer.close()
consumer.close()
instead ofKafkaConsumer.close()
. – Ammonite