spring boot configure multiple ActiveMQ instances
Asked Answered
A

4

15

I have requirement to move messages from queues on one ActiveMQ instance to another ActiveMQ instance. Is there a way to connect to two different ActiveMQ instances using spring boot configuration?

Do I need to create multiple connectionFactories? If so then how does JmsTemplate know which ActiveMQ instance to connect to?

  @Bean
    public ConnectionFactory connectionFactory() {
        return new ActiveMQConnectionFactory(JMS_BROKER_URL);
    }

Any help and code examples would be useful.

Thanks in advance. GM

Ackack answered 13/4, 2017 at 18:11 Comment(0)
W
17

Additionally to the response of @Chris You have to create different BrokerService instances using differents ports and create different ConnectionFactory to connect to each broker and create different JmsTemplate using these different factories to send messages to differents brokers.

For example :

import javax.jms.ConnectionFactory;
import javax.jms.QueueConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class ActiveMQConfigurationForJmsCamelRouteConsumeAndForward {
    public static final String LOCAL_Q = "localQ";
    public static final String REMOTE_Q = "remoteQ";

    @Bean
    public BrokerService broker() throws Exception {
        final BrokerService broker = new BrokerService();
        broker.addConnector("tcp://localhost:5671");
        broker.setBrokerName("broker");
        broker.setUseJmx(false);
        return broker;
    }

    @Bean
    public BrokerService broker2() throws Exception {
        final BrokerService broker = new BrokerService();
        broker.addConnector("tcp://localhost:5672");
        broker.setBrokerName("broker2");
        broker.setUseJmx(false);
        return broker;
    }

    @Bean
    @Primary
    public ConnectionFactory jmsConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5671");
        return connectionFactory;
    }

    @Bean
    public QueueConnectionFactory jmsConnectionFactory2() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:5672");
        return connectionFactory;
    }

    @Bean
    @Primary
    public JmsTemplate jmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(jmsConnectionFactory());
        jmsTemplate.setDefaultDestinationName(LOCAL_Q);
        return jmsTemplate;
    }

    @Bean
    public JmsTemplate jmsTemplate2() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(jmsConnectionFactory2());
        jmsTemplate.setDefaultDestinationName(REMOTE_Q);
        return jmsTemplate;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory2(
            @Qualifier("jmsConnectionFactory2") ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;
    }
}

To move messages from one AMQ instance to another instance you can use JmsBridgeConnectors :

Note that by the example below you cannot have multiple consumers on the queue from which you want to forward the messages because Camel or JmsBridgeConnectors consume the message and forward it. If you want a only copy of the message to be forwarded you have some solutions : 1- Convert your queue to a topic, manage the messages for offline consumers by a durable subscriptions or retroactive consumers. 2- convert your queue to a composite queue and use DestinationsInterceptors to copy messages to another queue. 3- use NetworkConnector for Networkof brokers

@Bean
public BrokerService broker() throws Exception {
    final BrokerService broker = new BrokerService();
    broker.addConnector("tcp://localhost:5671");
    SimpleJmsQueueConnector simpleJmsQueueConnector = new SimpleJmsQueueConnector();
    OutboundQueueBridge bridge = new OutboundQueueBridge();
    bridge.setLocalQueueName(LOCAL_Q);
    bridge.setOutboundQueueName(REMOTE_Q);
    OutboundQueueBridge[] outboundQueueBridges = new OutboundQueueBridge[] { bridge };
    simpleJmsQueueConnector.getReconnectionPolicy().setMaxSendRetries(ReconnectionPolicy.INFINITE);
    simpleJmsQueueConnector.setOutboundQueueBridges(outboundQueueBridges);
    simpleJmsQueueConnector.setLocalQueueConnectionFactory((QueueConnectionFactory) jmsConnectionFactory());
    simpleJmsQueueConnector.setOutboundQueueConnectionFactory(jmsConnectionFactory2());
    JmsConnector[] jmsConnectors = new JmsConnector[] { simpleJmsQueueConnector };
    broker.setJmsBridgeConnectors(jmsConnectors);
    broker.setBrokerName("broker");
    broker.setUseJmx(false);
    return broker;
}

Or by using Camel like this below :

@Bean
public CamelContext camelContext() throws Exception {
    CamelContext context = new DefaultCamelContext();
    context.addComponent("inboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5671"));
    context.addComponent("outboundQueue", ActiveMQComponent.activeMQComponent("tcp://localhost:5672"));
    context.addRoutes(new RouteBuilder() {
        public void configure() {
            from("inboundQueue:queue:" + LOCAL_Q).to("outboundQueue:queue:" + REMOTE_Q);
        }
    });
    context.start();
    return context;
}

your Producer must be like this to use differents JmsTemplates :

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class Producer implements CommandLineRunner {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    @Qualifier("jmsTemplate2")
    private JmsTemplate jmsTemplate2;

    @Override
    public void run(String... args) throws Exception {
        send("Sample message");
    }

    public void send(String msg) {
        this.jmsTemplate.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.LOCAL_Q, msg);
        this.jmsTemplate2.convertAndSend(ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, msg);
    }
}

and Consumer :

import javax.jms.Session;

import org.apache.activemq.ActiveMQSession;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @JmsListener(destination = ActiveMQConfigurationForJmsCamelRouteConsumeAndForward.REMOTE_Q, containerFactory = "jmsListenerContainerFactory2")
    public void receiveQueue(Session session, String text) {
        System.out.println(((ActiveMQSession) session).getConnection().getBrokerInfo());
        System.out.println(text);
    }
}
Whereupon answered 13/4, 2017 at 20:40 Comment(3)
thanks for this detailed answer and it gave me a good start. I managed to get it working. I do like the camel solution... its nice and simple. Thanks agian.Ackack
Thanks!! Its nice and clean. I have the requirement to move message from IBM MQ to Active MQ. Will this approach suit for my requirement too?Poff
@Poff i think yes by adapting the component to use ibm factory, if not you can use some EIP patterns, adapter and/or bridge ? enterpriseintegrationpatterns.com/patterns/messaging/…Whereupon
V
4

You would need to instantiate multiple JmsTemplate instances as Beans in your application and then use a combination of @Qualifier and @Primary annotations to indicate which JmsTemplate instance should go where.

For example

@Bean("queue1")
@Primary
public JmsTemplate getQueue1(@Qualifier("connectionFactory1")ConnectionFactory factory...){
...
}

@Bean("queue2")
@Primary
public JmsTemplate getQueue2(@Qualifier("connectionFactory2")ConnectionFactory factory...){
...
}

...

@Autowired
@Qualifier("queue1")
private JmsTemplate queue1;
...

See here for more info.

Veljkov answered 13/4, 2017 at 18:30 Comment(2)
this gave me a good start. Thanks again. I also used example code from hassen-bennour who expanded on your solution.Ackack
it really works for my case. My requirement is to receive the message from IBM Queue and forward it to Active MQ since some of the legacy system are not ready to move to Active MQ. So we create an workaround to forward it. So created 2 connections factory one for IBM MQ and another one for AMQ.Poff
V
1

You can use the Spring Boot default for the queue consumer

@JmsListener(destination = “queue.name")
public void consumer(String message) {
    // consume the message
}

And for the producer you can create another JmsTemplate @Bean

@Bean
public JmsTemplate jmsTemplate() {
    return new JmsTemplate(new ActiveMQConnectionFactory("tcp://localhost:5671"));
}
Vincents answered 2/7, 2019 at 21:27 Comment(0)
A
0

This way you can register as many brokers/listeners as you wish dynamically:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.JmsListenerConfigurer;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerEndpointRegistrar;
import org.springframework.jms.config.SimpleJmsListenerEndpoint;

import javax.jms.Message;
import javax.jms.MessageListener;

@Configuration
public class CustomJmsConfigurer implements JmsListenerConfigurer {

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        ActiveMQConnectionFactory amqConnectionFactory = new ActiveMQConnectionFactory();
        amqConnectionFactory.setBrokerURL("brokerUrl");
        amqConnectionFactory.setUserName("user");
        amqConnectionFactory.setPassword("password");
        amqConnectionFactory.setExclusiveConsumer(true);

        DefaultJmsListenerContainerFactory containerFactory = new DefaultJmsListenerContainerFactory();
        containerFactory.setConnectionFactory(amqConnectionFactory);

        SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
        endpoint.setId("someIdentifier");
        endpoint.setDestination("queueName");
        endpoint.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // Do your stuff
            }
        });
        registrar.registerEndpoint(endpoint, containerFactory);
    }
}
Alpinist answered 12/2, 2020 at 11:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.