Avoiding duplicated messages on JMS/ActiveMQ
Asked Answered
T

6

22

Is there a way to suppress duplicated messages on a queue defined on ActiveMQ server?

I tried to define manually JMSMessageID, (message.setJMSMessageID("uniqueid")), but server ignores this modification and deliver a message with built-in generated JMSMessageID.

By specification, I didn't found a reference about how to deduplicate messages.

In HornetQ, to deal with this problem, we need to declare the HQ specific property org.hornetq.core.message.impl.HDR_DUPLICATE_DETECTION_ID on message definition.

i.e.:

Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id"; // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

Somebody knows if there's a similar solution for ActiveMQ?

Trixy answered 8/2, 2011 at 14:41 Comment(0)
E
8

You should look at Apache Camel, it provides an Idempotent consumer component that would work with any JMS provider, see: http://camel.apache.org/idempotent-consumer.html

Using that in combination with the ActiveMQ component makes using JMS quite simple, see: http://camel.apache.org/activemq.html

Endemic answered 9/2, 2011 at 11:55 Comment(1)
I'm in doubt if this method will solve my problem. I need to keep just one instance of message with the same JMSMessageID just during this instance is in queue. I need it working as a Set. I want be able to put other message with the same JMSMessageID after the latest idem element was removed from queue. I need to implement it and test. But, based on Idempotent described on EAI book, I think that the concept doesn't matches with my necessity. BUt, the proposed solution is good. I will study more about it and comment here my results. ThanksTrixy
B
5

I doubt if ActiveMQ supports it natively, but it should be easy to implement an idempotent consumer. A way to do this would be to add a unique identifier to each message at the producer end, now at the consumer end using a store(db, cache etc), a check can be made to see if the message has been received before and continue to process based on that check.

I see a previous stackoverflow question along the same lines - Apache ActiveMQ 5.3 - How to configure a queue to reject duplicate messages? , that may also help.

Bigeye answered 9/2, 2011 at 0:41 Comment(1)
Since the consumer itself can be multi threaded, to identify whether its duplicate or not, one has to implement distributed/in-memory locking. Right?Calamite
F
4

There is now support for removing duplicate messages baked into ActiveMQ transports. See the configuration values auditDepth and auditMaximumProducerNumber in the Connection Configuration Guide.

Flamen answered 8/8, 2013 at 15:49 Comment(4)
how do you actually configure those parameters to avoid duplicates?Chyme
@Chyme I'm not sure what you're asking. How, in general, to apply configuration in ActiveMQ? Or what values to use for these specific fields?Flamen
It is just that from the description of the parameters it does not sound so clear to me. The auditDepth for example, does the value there means the Nb of messages or the nb of bytes that will be screened back for duplications? Regarding the auditMaximumProducerNumber, does that mean that there is a limited amount of producers that will be screened? Btw, if a message with the same content is published by 2 different subscribers, is the message by chance considered duplicated?Chyme
@Chris IIUC these parameters guarantee detection of duplicates for 2048 messages each on up to 64 producers. But how does ActiveMQ determine what is a duplicate? If it's by JMSMessageID then we're back to square one because we can't set that.Scrapbook
D
3

There is a way to make ActiveMQ to filter duplicates based on a JMS property. it involves writing an Activemq Plugin. A basic broker filter that sends duplicate messages to the deadletter queue would be like this

import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.activemq.broker.Broker;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.broker.ProducerBrokerExchange;

public class DuplicateFilterBroker extends BrokerFilter {
    String messagePropertyName;
    boolean switchValue;

    public DuplicateFilterBroker(Broker next, String messagePropertyName) {
        super(next);
        this.messagePropertyName = messagePropertyName;
    }

    public boolean hasDuplicate(String propertyValue){
        switchValue = propertyValue;
        return switchValue;
    }

    public void send(ProducerBrokerExchange producerExchange, Message msg) throws Exception { 
        ActiveMQMessage amqmsg = (ActiveMQMessage)msg; 
        Object msgObj = msg.getMessage(); 
        if (msgObj instanceof javax.jms.Message) { 
            javax.jms.Message jmsMsg = (javax.jms.Message) msgObj; 
            if (!hasDuplicate(jmsMsg.getStringProperty(messagePropertyName))) {
                super.send(producerExchange, msg);
            }
            else {
               sendToDeadLetterQueue(producerExchange.getConnectionContext(), msg);
            } 
        }
    }  
}
Dressing answered 27/8, 2012 at 9:34 Comment(1)
How this plugin decides which property will be used for further filtering of duplicate messages, explanation about use-case would be really helpful to integrate such plugin. Thanks in advance for your answer.Hath
B
1

Seem the way that is suggested in the question, works for ActiveMQ too (2016/12). See the activemq-artemis guide. This requires the producer to set a specific property into the message.

Message jmsMessage = session.createMessage();
String myUniqueID = "This is my unique id";   // Could use a UUID for this
message.setStringProperty(HDR_DUPLICATE_DETECTION_ID.toString(), myUniqueID);

However the class containing the property is different: org.apache.activemq.artemis.core.message.impl.HDR_DUPLICATE_DETECTION_ID and the property value is _AMQ_DUPL_ID.

Badillo answered 13/12, 2016 at 2:25 Comment(0)
I
0

Have you tried setting jms.checkForDuplicates=true?

Irisirisa answered 17/1, 2023 at 11:6 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.