How to listen to topic using spring boot jms
Asked Answered
F

3

11

I am trying to listen to topic using the below snippet. However its listening to queue by default. There is no xml config in this case. I am completely relying on annotations. Moreover I have relied completely on the AutoConfiguration provided by Spring boot. I am not sure how to set the destination type as topic, In JmsListener. Spring JMS gurus please help.

    @Component
    public class MyTopicListener {

        @JmsListener(destination = "${trans.alert.topic}")
        public void receiveMessage(TransactionAlert alert) {
            logger.info("AlertSubscriberEmail :: Sending Email => <" + alert + ">");
        }
    }
Filmy answered 20/10, 2016 at 2:46 Comment(0)
O
16

I just took the complete Spring boot example from : https://github.com/spring-guides/gs-messaging-jms/

In this it is created for sending and receipt of messages from a queue. To Change this to a topic , you have to set the Pub-Sub property in the Factory instance.

import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

import javax.jms.ConnectionFactory;

@SpringBootApplication
@EnableJms
public class JmsSampleApplication {

public void registerBeans(ConfigurableApplicationContext context ){
    BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(JmsTemplate.class);
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();

    builder.addPropertyValue("connectionFactory", cachingConnectionFactory);      // set property value
    DefaultListableBeanFactory factory = (DefaultListableBeanFactory) context.getAutowireCapableBeanFactory();
    factory.registerBeanDefinition("jmsTemplateName", builder.getBeanDefinition());
}

@Bean
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
                                                DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setPubSubDomain(true);
    // This provides all boot's default to this factory, including the message converter
    configurer.configure(factory, connectionFactory);
    // You could still override some of Boot's default if necessary.
    return factory;
}

@Bean
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory,
                                                           DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    //factory.setPubSubDomain(true);
    // This provides all boot's default to this factory, including the message converter
    configurer.configure(factory, connectionFactory);
    return factory;
}

@Bean // Serialize message content to json using TextMessage
public MessageConverter jacksonJmsMessageConverter() {
    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    converter.setTargetType(MessageType.TEXT);
    converter.setTypeIdPropertyName("_type");
    return converter;
}
public static void main(String[] args) {
    ConfigurableApplicationContext context = SpringApplication.run(JmsSampleApplication.class, args);

    JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);

    // Send a message with a POJO - the template reuse the message converter
    System.out.println("Sending an email message.");
    jmsTemplate.convertAndSend("mailbox.topic", new Email("[email protected]", "Hello"));
    jmsTemplate.convertAndSend("mailbox.queue", new Email("[email protected]", "Hello"));

    }
}

The listener

package org.springboot.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * Created by RGOVIND on 10/20/2016.
 */
@Component
public class HelloTopicListener {

    @JmsListener(destination = "mailbox.topic", containerFactory = "topicListenerFactory")
    public void receiveTopicMessage(Email email) {
        System.out.println("Received <" + email + ">");
    }

    @JmsListener(destination = "mailbox.queue", containerFactory = "queueListenerFactory")
    public void receiveQueueMessage(Email email) {
        System.out.println("Received <" + email + ">");
    }
}

Once this is done , you are all set to subscribe to the topic of choice.

There are multiple approaches to this of course , you can have a map of beans for different jmsTemplates , each of which can be used when you need them based on queue or topic. The template & beans can be instantiated in a method you choose to like discussed in this SO Question. Hope it helps

Oldline answered 20/10, 2016 at 4:47 Comment(4)
When I try with the JmsListenerContainerFactory bean or setting this property spring.jms.pub-sub-domain=true in application.properties, all my other code for queue sender and listeners started behaving as if they are reading and writing to topics instead of queues. Is there a solution that will make the listeners for both queue and topic co-exist together.Filmy
You can always choose to have multiple factories. In the case of code , you can choose to have both queue and topics. The problem is when you choose to receive , you will have to choose the factory. In that case you can have 2 factories , to get from a topic as well as to get from a queue. Edited my answer as wellOldline
I tried using multiple factories , even then topic so created is behaving like a queue only i.e on creating multiple listeners messages are distributed among themselves . Instead each subscriber should get all the messagesPalaeo
The above example wont work, which is why you are saying is still acts like a queue and not a topic. The reason is because the "factory.setPubSubDomain(true);" must done AFTER the "configurer.configure(factory, connectionFactory);".Hege
H
19

The answer marked correct is ALMOST correct. It still wont work because:

factory.setPubSubDomain(true) 

must come AFTER:

configurer.configure(factory, connectionFactory);

Otherwise the pubSubDomain flag being set to true is lost when configuring the defaults and that factory instance will still work with queues and not topics.

Hege answered 7/6, 2017 at 14:54 Comment(1)
Thank you for this. It drove me crazy for a few hours.Contumely
O
16

I just took the complete Spring boot example from : https://github.com/spring-guides/gs-messaging-jms/

In this it is created for sending and receipt of messages from a queue. To Change this to a topic , you have to set the Pub-Sub property in the Factory instance.

import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

import javax.jms.ConnectionFactory;

@SpringBootApplication
@EnableJms
public class JmsSampleApplication {

public void registerBeans(ConfigurableApplicationContext context ){
    BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(JmsTemplate.class);
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();

    builder.addPropertyValue("connectionFactory", cachingConnectionFactory);      // set property value
    DefaultListableBeanFactory factory = (DefaultListableBeanFactory) context.getAutowireCapableBeanFactory();
    factory.registerBeanDefinition("jmsTemplateName", builder.getBeanDefinition());
}

@Bean
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory,
                                                DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setPubSubDomain(true);
    // This provides all boot's default to this factory, including the message converter
    configurer.configure(factory, connectionFactory);
    // You could still override some of Boot's default if necessary.
    return factory;
}

@Bean
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory,
                                                           DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    //factory.setPubSubDomain(true);
    // This provides all boot's default to this factory, including the message converter
    configurer.configure(factory, connectionFactory);
    return factory;
}

@Bean // Serialize message content to json using TextMessage
public MessageConverter jacksonJmsMessageConverter() {
    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    converter.setTargetType(MessageType.TEXT);
    converter.setTypeIdPropertyName("_type");
    return converter;
}
public static void main(String[] args) {
    ConfigurableApplicationContext context = SpringApplication.run(JmsSampleApplication.class, args);

    JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);

    // Send a message with a POJO - the template reuse the message converter
    System.out.println("Sending an email message.");
    jmsTemplate.convertAndSend("mailbox.topic", new Email("[email protected]", "Hello"));
    jmsTemplate.convertAndSend("mailbox.queue", new Email("[email protected]", "Hello"));

    }
}

The listener

package org.springboot.jms;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * Created by RGOVIND on 10/20/2016.
 */
@Component
public class HelloTopicListener {

    @JmsListener(destination = "mailbox.topic", containerFactory = "topicListenerFactory")
    public void receiveTopicMessage(Email email) {
        System.out.println("Received <" + email + ">");
    }

    @JmsListener(destination = "mailbox.queue", containerFactory = "queueListenerFactory")
    public void receiveQueueMessage(Email email) {
        System.out.println("Received <" + email + ">");
    }
}

Once this is done , you are all set to subscribe to the topic of choice.

There are multiple approaches to this of course , you can have a map of beans for different jmsTemplates , each of which can be used when you need them based on queue or topic. The template & beans can be instantiated in a method you choose to like discussed in this SO Question. Hope it helps

Oldline answered 20/10, 2016 at 4:47 Comment(4)
When I try with the JmsListenerContainerFactory bean or setting this property spring.jms.pub-sub-domain=true in application.properties, all my other code for queue sender and listeners started behaving as if they are reading and writing to topics instead of queues. Is there a solution that will make the listeners for both queue and topic co-exist together.Filmy
You can always choose to have multiple factories. In the case of code , you can choose to have both queue and topics. The problem is when you choose to receive , you will have to choose the factory. In that case you can have 2 factories , to get from a topic as well as to get from a queue. Edited my answer as wellOldline
I tried using multiple factories , even then topic so created is behaving like a queue only i.e on creating multiple listeners messages are distributed among themselves . Instead each subscriber should get all the messagesPalaeo
The above example wont work, which is why you are saying is still acts like a queue and not a topic. The reason is because the "factory.setPubSubDomain(true);" must done AFTER the "configurer.configure(factory, connectionFactory);".Hege
D
6

In Spring Boot's Application.properties, try setting the following property:

spring.jms.pub-sub-domain=true

Then, use this property for the container factory that you are using to listen to the topic.

Dikdik answered 20/10, 2016 at 4:46 Comment(4)
actually the default container factory that Boot auto-configures will use that flag automatically.Tomikotomkiel
I doubt using this property will make all the available listeners to be listening to topics. I am looking for a way how to have separate listeners for queue and topics in the same project itself.Filmy
You can't have the same connectionFactory for listening to queues and topics. Your listeners will be derived from a connectionFactory, which will be configured to listen to either a point-to-point (Queue) or a Publish-Subscribe (Topic). So, have two connectionFactories, one each for Topic and Queue and then use them as you want. The default setting for pub-sub-domain is set to false which means by default it will listen to Queue.Dikdik
Setting the property is overriding all factories created pubsubdomain flag and thus all listeners are behaving as either queue or topic depending upon the value of the flag setPalaeo

© 2022 - 2024 — McMap. All rights reserved.