How to make one subscriber to process message (consider having more than one) in Radisson (redis-java client)
Asked Answered
S

2

8

In my project , I've created a simple pub/sub topic with radisson (https://github.com/redisson/redisson) . Publisher will publish some message and there will be multiple subscribers running on different machines . When I publish , all the subscribers are getting message . But I want anyone of the subscriber to process the message , ie if one process the message , others should simply ignore it . Is it possible with Radisson ?

LISTENER:

RTopic topic = redisson.getTopic("topic2");
topic.addListener(Person.class, new MessageListener<Person>() {
    @Override
    public void onMessage(CharSequence charSequence, Person person) {
        System.out.println("PERSON : "+person.toString());
    }
});

PUBLISHER :

 Person person = new Person("anyName","female");
 RedissonClient redisson = Redisson.create();
 RTopic topic = redisson.getTopic("topic2");
 topic.publish(person);

Is it possible to put lock or something , so that only one subscriber listens it. Will any other tools support this behavior.

Strip answered 30/4, 2019 at 14:1 Comment(2)
Hi @Strip did you found a solution? I am facing the same issueLadybird
Hi @RoieBeck , nope unfortunately I didn't find any solutions for this , but Im using github.com/gresrun/jesque now which seems to fit my solution .Strip
A
0

Pub-Sub is not designed for this. Read about queues in ActiveMQ/RabbitMQ or SQS, which will fulfil your scenario

Amphigory answered 6/9, 2022 at 5:8 Comment(1)
As it’s currently written, your answer is unclear. Please edit to add additional details that will help others understand how this addresses the question asked. You can find more information on how to write good answers in the help center.Moppet
M
0

What you're trying to do falls more in line with a message queue, where at most one consumer receives the message. This is possible to do using redisson, but you wont be able to use the pub/sub model.

Instead, you can create an RBoundedBlockingQueue in which every consumer waits for a new element to arrive on the queue. When a new element shows up, the consumer waiting the longest will remove the element from the head of the queue, and the rest of the consumers will wait. This ensures the message is read by only one consumer. This works by using the redis BLPOP command, which is also used by RedissonBlockingQueues

Here is an example:

public class MessageQueueRedisson
{
    public static void main(String[] args) throws InterruptedException
    {
        RedissonClient redissonClient = Redisson.create();

        RBoundedBlockingQueue<String> blockingQueue = redissonClient.getBoundedBlockingQueue("consumer-queue");
        //optionally set maximum size of queue
        blockingQueue.trySetCapacity(10000);

        //insert 10000 unique elements
        for(int i = 0; i < 10000; ++i) {
            blockingQueue.offer(i + "");
        }

        //thread-safe list that will add all elements polled from redis
        final List<String> copyOnWriteList = new CopyOnWriteArrayList<>();

        final List<Future<?>> results = new ArrayList<>();
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for(int i = 0; i < 5; ++i) {
            //create 5 consumers on the consumer-queue
            results.add(executorService.submit(createRunnable(redissonClient, copyOnWriteList)));
        }


        for(Future<?> future: results) {
            while(!future.isDone()) {
                //wait for queue to empty and runnables to finish
                TimeUnit.SECONDS.sleep(5L);
            }
        }

        //Convert final arraylist to set. if list.size() == set.size(), then each element was only polled by one consumer
        Set<String> setResults = new HashSet<>(copyOnWriteList);
        System.out.println(copyOnWriteList.size() == setResults.size());
        System.out.println("list size: " + copyOnWriteList.size() + "set size: " + setResults.size());
    }

    public static Runnable createRunnable(RedissonClient redissonClient, List<String> copyOnWriteList) {
        return () -> {
            //get redis blocking queue
            RBoundedBlockingQueue<String> blockingQueue = redissonClient.getBoundedBlockingQueue("consumer-queue");
            while(!blockingQueue.isEmpty()) {
                //get next element from head of queue
                String nextValue = blockingQueue.poll();
                if(nextValue != null) {
                    copyOnWriteList.add(nextValue);
                }
            }
        };
    }
}
Mabe answered 3/2 at 22:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.