Google PubSub Python multiple subscriber clients receiving duplicate messages
Asked Answered
H

2

2

I have a pretty straightforward app that starts a PubSub subscriber StreamingPull client. I have this deployed on Kubernetes so I can scale. When I have a single pod deployed, everything works as expected. When I scale to 2 containers, I start getting duplicate messages. I know that some small of duplicate messages is to be expected, but almost half the messages, sometimes more, are received multiple times.

My process takes about 600ms to process a message. The subscription acknowledgement deadline is set to 600s. I published 1000 messages, and the subscription was emptied in less than a minute, but the acknowledge_message_operation metric shows ~1500 calls, with a small amount with response_code expired. There were no failures in my process and all messages were acked upon processing. Logs show that the same message was received by the two containers at the exact same time. The minute to process all the messages was well below the acknowledgement deadline of the subscription, and the Python client is supposed to handle lease management, so I'm not sure why there were any expired messages at all. I also don't understand why the same message is sent to multiple subscriber clients at the same time.

Minimal working example:

import time

from google.cloud import pubsub_v1

PROJECT_ID = 'my-project'
PUBSUB_TOPIC_ID = 'duplicate-test'
PUBSUB_SUBSCRIPTION_ID = 'duplicate-test'

def subscribe(sleep_time=None):
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        PROJECT_ID, PUBSUB_SUBSCRIPTION_ID)

    def callback(message):
        print(message.data.decode())
        if sleep_time:
            time.sleep(sleep_time)
        print(f'acking {message.data.decode()}')
        message.ack()

    future = subscriber.subscribe(
        subscription_path, callback=callback)
    print(f'Listening for messages on {subscription_path}')
    future.result()


def publish(num_messages):
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(PROJECT_ID, PUBSUB_TOPIC_ID)
    for i in range(num_messages):
        publisher.publish(topic_path, str(i).encode())

In two terminals, run subscribe(1). In a third terminal, run publish(200). For me, this will give duplicates in the two subscriber terminals.

Highchair answered 18/9, 2019 at 1:17 Comment(0)
S
4

It is unusual for two subscribers to get the same message at the same time unless:

  1. The message got published twice due to a retry (and therefore as far as Cloud Pub/Sub is concerned, there are two messages). In this case, the content of the two messages would be the same, but their message IDs would be different. Therefore, it might be worth ensuring that you are looking at the service-provided message ID to ensure the messages are indeed duplicates.
  2. The subscribers are on different subscriptions, which means each of the subscribers would receive all of the messages.

If neither of these is the case, then duplicates should be relatively rare. There is an edge case in dealing with large backlogs of small messages with streaming pull (which is what the Python client library uses). Basically, if messages that are very small are published in a burst and subscribers then consume that burst, it is possible to see the behavior you are seeing. All of the messages would end up being sent to one of the two subscribers and would be buffered behind the flow control limits of the number of outstanding messages. These messages may exceed their ack deadline, resulting in redelivery, likely to the other subscriber. The first subscriber still has these messages in its buffer and will see these messages, too.

However, if you are consistently seeing two subscribers freshly started immediately receive the same messages with the same message IDs, then you should contact Google Cloud support with your project name, subscription name, and a sample of the message IDs. They will better be able to investigate why this immediate duplication is happening.

Scriptorium answered 18/9, 2019 at 22:31 Comment(1)
I eluded to this in a comment on another answer, I believe your explanation explains what I'm seeing and found in the source code. I am indeed processing a large backlog of small messages, using PubSub as a "to-do" list of sorts for bulk processing of data. Solution here is probably the one suggested in the documentation to instead use a synchronous pull client.Highchair
N
0

(Edited as I misread the deadlines)

Looking at the Streaming Pull docs, this seems like an expected behavior:

The gRPC StreamingPull stack is optimized for high throughput and therefore 
buffers messages. This can have some consequences if you are attempting to 
process large backlogs of small messages (rather than a steady stream of new 
messages). Under these conditions, you may see messages delivered multiple times
and they may not be load balanced effectively across clients.

From: https://cloud.google.com/pubsub/docs/pull#streamingpull

Nichy answered 18/9, 2019 at 12:14 Comment(2)
Ack deadline is set to 600s - seconds. This is the highest value allowed.Highchair
I did see that. My initial understanding was individual clients would just pull a large amount of messages and take awhile to process them, and only if messages were beyond the max lease management deadline (2 hours in python) would they be resent. Upon digging through the source code, it appears that messages in the buffer are not actually under lease management, only 100 at a time (controlled by the FlowControl object). So seems that messages in the buffer that are not under lease management expire and get redelivered, but are still processed by the original client holding them in buffer.Highchair

© 2022 - 2024 — McMap. All rights reserved.