How to subscribe to a list of multiple kafka wildcard patterns using kafka-python?
Asked Answered
L

2

24

I'm subscribing to Kafka using a pattern with a wildcard, as shown below. The wildcard represents a dynamic customer id.

consumer.subscribe(pattern='customer.*.validations')

This works well, because I can pluck the customer Id from the topic string. But now I need to expand on the functionality to listen to a similar topic for a slightly different purpose. Let's call it customer.*.additional-validations. The code needs to live in the same project because so much functionality is shared, but I need to be able to take a different path based on the type of queue.

In the Kafka documentation I can see that it is possible to subscribe to an array of topics. However these are hard-coded strings. Not patterns that allow for flexibility.

>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
...     assert isinstance(msg.value, dict)

So I'm wondering if it is possible to somehow do a combination of the two? Kind of like this (non-working):

consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])
Lindsy answered 15/9, 2016 at 20:56 Comment(0)
B
36

In the KafkaConsumer code, it supports list of topics, or a pattern,

https://github.com/dpkp/kafka-python/blob/68c8fa4ad01f8fef38708f257cb1c261cfac01ab/kafka/consumer/group.py#L717

   def subscribe(self, topics=(), pattern=None, listener=None):
        """Subscribe to a list of topics, or a topic regex pattern
        Partitions will be dynamically assigned via a group coordinator.
        Topic subscriptions are not incremental: this list will replace the
        current assignment (if there is one).

So you can create a regex, with OR condition using |, that should work as subscribe to multiple dynamic topics regex, as it internally uses re module for matching.

(customer.*.validations)|(customer.*.additional-validations)

Bedford answered 19/9, 2016 at 13:38 Comment(1)
What will happen if after subscriber has made subscription, new topic (matching the regexp) will be created? Will the consumer receive also events from new topic without the need of re-subscribing or restarting client application?Carlin
N
2

In the Confluent Kafka library, the subscribe doesn't have a pattern keyword but instead will process regex patterns that start with ^.

def subscribe(self, topics, on_assign=None, *args, **kwargs):
    """
    Set subscription to a supplied list of topics
    This replaces a previous subscription.
        
    Regexp pattern subscriptions are supported by prefixing the topic string with ``"^"``, e.g.::
        
        consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
    """
Noted answered 23/11, 2020 at 22:10 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.