I would like to notice that the scenarion I will describe happen rare enough and in most cases everything works as expected.
I have 1 topic and 1 subscription on Pub/Sub side.
My java application listens for subscription, does some processing and sends acknowledge back. Because of fact that google Pub/Sub guarantees at least once delivery, we do message deduplication on our side based on objectGeneration
header and 'objectId' header.
Sometimes we see that message that was acknowldged is accepted by our application again and again and it is unexpected behaviour.
Log example:
//first
2019-12-17 20:51:57.375 INFO 1 --- [sub-subscriber3] bucketNotificationFlow : Received new message from pub-sub: GenericMessage [payload={....}, headers={.....objectGeneration=1576615916875106, eventTime=2019-12-17T20:51:56.874940Z, objectId=Small_files_bunch/100_12_1.csv, ....
....
2019-12-17 20:51:57.698 INFO 1 --- [sub-subscriber3] .i.g.PubSubMessageAcknowledgementHandler : Acknowledged message - 1576615916875106
...
//duplicate 1
2019-12-17 20:51:59.663 INFO 1 --- [sub-subscriber4] bucketNotificationFlow : Received new message from pub-sub: GenericMessage [payload={...}, headers={ objectGeneration=1576615916875106, eventTime=2019-12-17T20:51:56.874940Z, objectId=Small_files_bunch/100_12_1.csv", ....
...
2019-12-17 20:51:59.704 INFO 1 --- [sub-subscriber4] c.b.m.i.DiscardedMessagesHandler : Duplicate message received GenericMessage [ headers={idempotent.keys=[objectGeneration.1576615916875106, objectId.Small_files_bunch/100_12_1.csv], ...
....
//duplicate 2
2019-12-17 22:52:02.239 INFO 1 --- [sub-subscriber1] bucketNotificationFlow : Received new message from pub-sub: GenericMessage [payload={...}, headers={objectGeneration=1576615916875106, eventTime=2019-12-17T20:51:56.874940Z, objectId=Small_files_bunch/100_12_1.csv, ...
...
2019-12-17 22:52:02.339 INFO 1 --- [sub-subscriber1] c.b.m.i.DiscardedMessagesHandler : Duplicate message received GenericMessage [ headers={idempotent.keys=[objectGeneration.1576615916875106, objectId.Small_files_bunch/100_12_1.csv], ...
// and so on each 2 hours
Code for acknowledgement:
var generation = message.getHeaders().get("objectGeneration");
pubSubMessage = message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class)
pubSubMessage.ack().addCallback(
v -> {
removeFromIdempotentStore(targetMessage, false);
log.info("Acknowledged message - {}", generation); //from logs we see that this line was invoked
},
e -> {
removeFromIdempotentStore(targetMessage, false);
log.error("Failed to acknowledge message - {}", generation, e);
}
);
GCP subscription page contains following diagram:
StackDriver acknowledge diagram:
Any ideas what is going on, how to troubleshoot it and fix it ?