What you're trying to do falls more in line with a message queue, where at most one consumer receives the message. This is possible to do using redisson, but you wont be able to use the pub/sub model.
Instead, you can create an RBoundedBlockingQueue
in which every consumer waits for a new element to arrive on the queue. When a new element shows up, the consumer waiting the longest will remove the element from the head of the queue, and the rest of the consumers will wait. This ensures the message is read by only one consumer. This works by using the redis BLPOP command, which is also used by RedissonBlockingQueues
Here is an example:
public class MessageQueueRedisson
{
public static void main(String[] args) throws InterruptedException
{
RedissonClient redissonClient = Redisson.create();
RBoundedBlockingQueue<String> blockingQueue = redissonClient.getBoundedBlockingQueue("consumer-queue");
//optionally set maximum size of queue
blockingQueue.trySetCapacity(10000);
//insert 10000 unique elements
for(int i = 0; i < 10000; ++i) {
blockingQueue.offer(i + "");
}
//thread-safe list that will add all elements polled from redis
final List<String> copyOnWriteList = new CopyOnWriteArrayList<>();
final List<Future<?>> results = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5; ++i) {
//create 5 consumers on the consumer-queue
results.add(executorService.submit(createRunnable(redissonClient, copyOnWriteList)));
}
for(Future<?> future: results) {
while(!future.isDone()) {
//wait for queue to empty and runnables to finish
TimeUnit.SECONDS.sleep(5L);
}
}
//Convert final arraylist to set. if list.size() == set.size(), then each element was only polled by one consumer
Set<String> setResults = new HashSet<>(copyOnWriteList);
System.out.println(copyOnWriteList.size() == setResults.size());
System.out.println("list size: " + copyOnWriteList.size() + "set size: " + setResults.size());
}
public static Runnable createRunnable(RedissonClient redissonClient, List<String> copyOnWriteList) {
return () -> {
//get redis blocking queue
RBoundedBlockingQueue<String> blockingQueue = redissonClient.getBoundedBlockingQueue("consumer-queue");
while(!blockingQueue.isEmpty()) {
//get next element from head of queue
String nextValue = blockingQueue.poll();
if(nextValue != null) {
copyOnWriteList.add(nextValue);
}
}
};
}
}