I am using Apache Artemis V2.12.0, started two instance of broker in two VM's
broker.xml (myhost1) [ broker.xml of myhost2 is similar only the port I used was 61616]
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://10.64.60.100:61617</connector><!-- direct ip addres of host myhost1 -->
<connector name="broker2-connector">tcp://myhost2:61616</connector> <!-- ip 10.64.60.101 <- mocked up ip for security reasons -->
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="amqp">tcp://0.0.0.0:61617?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true</acceptor>
</acceptors>
<cluster-connections>
<cluster-connection name="myhost1-cluster">
<connector-ref>netty-connector</connector-ref>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors>
<connector-ref>broker2-connector</connector-ref> <!-- defined in the connectors -->
</static-connectors>
</cluster-connection>
</cluster-connections>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-created-queues>false</auto-delete-created-queues>
<auto-delete-addresses>false</auto-delete-addresses>
</address-setting>
</address-settings>
</core>
</configuration>
After starting the broker instance on two nodes they joined the cluster, which i can see in logs.
2020-06-03 23:59:17,874 INFO [org.apache.activemq.artemis.core.server] AMQ221020: Started EPOLL Acceptor at 0.0.0.0:61617 for protocols [CORE,AMQP]
2020-06-03 23:59:17,910 INFO [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
2020-06-03 23:59:17,910 INFO [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.12.0 [localhost, nodeID=e6c6eab6-a456-11ea-94cf-000d3a306e31]
2020-06-03 23:59:18,240 INFO [org.apache.activemq.artemis.core.server] AMQ221027: Bridge ClusterConnectionBridge@5e9820f4 [name=$.artemis.internal.sf.myhost1-cluster.bd39cc41-a201-11ea-abaa-000d3a315d06, queue=QueueImpl[name=$.artemis.internal.sf.devmq1-cluster.bd39cc41-a201-11ea-abaa-000d3a315d06, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=e6c6eab6-a456-11ea-94cf-000d3a306e31], temp=false]@2b0263f3 targetConnector=ServerLocatorImpl (identity=(Cluster-connection-bridge::ClusterConnectionBridge@5e9820f4 [name=$.artemis.internal.sf.devmq1-cluster.bd39cc41-a201-11ea-abaa-000d3a315d06, queue=QueueImpl[name=$.artemis.internal.sf.devmq1-cluster.bd39cc41-a201-11ea-abaa-000d3a315d06, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=e6c6eab6-a456-11ea-94cf-000d3a306e31], temp=false]@2b0263f3 targetConnector=ServerLocatorImpl [initialConnectors=[TransportConfiguration(name=netty-connector, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=10-64-60-100], discoveryGroupConfiguration=null]]::ClusterConnectionImpl@24293395[nodeUUID=e6c6eab6-a456-11ea-94cf-000d3a306e31, connector=TransportConfiguration(name=netty-connector, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61617&host=10-64-60-101, address=, server=ActiveMQServerImpl::serverUUID=e6c6eab6-a456-11ea-94cf-000d3a306e31])) [initialConnectors=[TransportConfiguration(name=netty-connector, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=10-64-60-100], discoveryGroupConfiguration=null]] is connected
2020-06-03 23:59:18,364 INFO [org.apache.activemq.hawtio.branding.PluginContextListener] Initialized activemq-branding plugin
Below java code sends message to the clustered broker,
Step1: both the brokers where running
Step2: The java client was started to send messages to the broker
Step3: From the console of myhost1, i see messages pushed to the queue
Step4: I stop the broker instance in myhost1
Step5: java client log, retries to connect to the other server, after n attempts it throws exception. (My expectation is it should NOT throw any exception)
The java code has JNDI approach which i commented, even in this case the messages where pushed but similar exception occured.
I tired JmsPoolConnectionfactory, even then the same issue, where when one of the broker instance is stopped after few retries it throws exception. (the logs for this are at bottom of the code)
Question:
Using the java code on client side how to achieve auto discovery/failover/reconnect without any exception. I am using static-connector
under the cluster-options
.
package com.demo.artemis.clients;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.messaginghub.pooled.jms.JmsPoolConnectionFactory;
public class ArtemisClientClustered
{
public static void main(final String[] args) throws Exception {
//only produces the message
new ArtemisClientClustered().runProducer(true, false);
}
public boolean runProducer(boolean produceMesage, boolean consumeMessage) throws Exception{
Connection connection = null;
InitialContext initalContext = null;
int i = 0;
try {
Properties jndiProp = new Properties();
jndiProp.put("java.naming.factory.initial", "org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory");
//jndiProp.put("connectionFactory.ConnectionFactory", "tcp://localhost:61616?producerMaxRate=50");
jndiProp.put("connectionFactory.ConnectionFactory", "(tcp://myhost2:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;");
jndiProp.put("queue.queue/ahm.load-datawarehouse.queue","ahm.load-datawarehouse.queue");
initalContext = new InitialContext(jndiProp);
// Step 2. Perfom a lookup on the queue
Queue queue = (Queue) initalContext.lookup("queue/myExampleQ.queue");
// Step 3. Perform a lookup on the Connection Factory
//ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616?producerMaxRate=50");
ConnectionFactory cf = (ConnectionFactory)initalContext.lookup("ConnectionFactory");
// ConnectionFactory cf= new ActiveMQJMSConnectionFactory("(tcp://myhost2:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;");
//using the PoolconectionFactory
JmsPoolConnectionFactory jmsPoolConnectionFactory = new JmsPoolConnectionFactory();
jmsPoolConnectionFactory.setMaxConnections(8);
jmsPoolConnectionFactory.setConnectionFactory(cf);
// Step 4. Create a JMS Connection
connection = jmsPoolConnectionFactory.createConnection("admin","admin");
// Step 5. Create a JMS Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
if(produceMesage) {
// Step 6. Create a JMS Message Producer
MessageProducer producer = session.createProducer(queue);
System.out.println("Will now send as many messages as we can in few seconds...");
// Step 7. Send as many messages as we can in N milliseconds
final long duration = 1200000;
i=0;
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start <= duration) {
TextMessage message = session.createTextMessage("This is text message: " + i++);
producer.send(message);
}
long end = System.currentTimeMillis();
double rate = 1000 * (double) i / (end - start);
System.out.println("We sent " + i + " messages in " + (end - start) + " milliseconds");
System.out.println("Actual send rate was " + rate + " messages per second");
// Step 8. For good measure we consumer the messages we produced.
}
if(consumeMessage) {
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
System.out.println("Now consuming the messages...");
i = 0;
while (true) {
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
if (messageReceived == null) {
break;
}
i++;
}
System.out.println("Received " + i + " messages");
}
return true;
} finally {
// Step 9. Be sure to close our resources!
if (connection != null) {
connection.close();
}
}
}
}
Log message of client code execution: When the client starts both myhost1 and myhost2 was running. After sometime I manually stop the myhost1 broker, expecting the myhost2 will be automatically discovered by the client.
....
2020-06-03 23:58:48 DEBUG ClientSessionFactoryImpl:1102 - Trying to connect with connectorFactory = org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory@45d84a20, connectorConfig=TransportConfiguration(name=ConnectionFactory, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=myhost2&reconnectAttempts=-1&ha=true
2020-06-03 23:58:48 DEBUG NettyConnector:486 - Connector NettyConnector [host=myhost2, port=61616, httpEnabled=false, httpUpgradeEnabled=false, useServlet=false, servletPath=/messaging/ActiveMQServlet, sslEnabled=false, useNio=true] using native epoll
2020-06-03 23:58:48 DEBUG client:668 - AMQ211002: Started EPOLL Netty Connector version 4.1.48.Final to myhost2:61616
2020-06-03 23:58:48 DEBUG NettyConnector:815 - Remote destination: myhost2/10.64.60.101:61616
2020-06-03 23:58:48 DEBUG NettyConnector:659 - Added ActiveMQClientChannelHandler to Channel with id = cf33ff23
2020-06-03 23:58:48 DEBUG Recycler:97 - -Dio.netty.recycler.maxCapacityPerThread: 4096
2020-06-03 23:58:48 DEBUG Recycler:98 - -Dio.netty.recycler.maxSharedCapacityFactor: 2
2020-06-03 23:58:48 DEBUG Recycler:99 - -Dio.netty.recycler.linkCapacity: 16
2020-06-03 23:58:48 DEBUG Recycler:100 - -Dio.netty.recycler.ratio: 8
2020-06-03 23:58:48 DEBUG AbstractByteBuf:63 - -Dio.netty.buffer.checkAccessible: true
2020-06-03 23:58:48 DEBUG AbstractByteBuf:64 - -Dio.netty.buffer.checkBounds: true
2020-06-03 23:58:48 DEBUG ResourceLeakDetectorFactory:195 - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@6933b6c6
2020-06-03 23:58:48 DEBUG ClientSessionFactoryImpl:809 - Reconnection successful
2020-06-03 23:58:48 DEBUG NettyConnector:1269 - NettyConnector [host=myhost2, port=61616, httpEnabled=false, httpUpgradeEnabled=false, useServlet=false, servletPath=/messaging/ActiveMQServlet, sslEnabled=false, useNio=true] host 1: 10.44.6.85 ip address: 10.44.6.85 host 2: myhost2 ip address: 10.44.6.85
2020-06-03 23:58:48 DEBUG ClientSessionFactoryImpl:277 - ClientSessionFactoryImpl received backup update for live/backup pair = TransportConfiguration(name=ConnectionFactory, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=myhost2&reconnectAttempts=-1&ha=true / null but it didn't belong to TransportConfiguration(name=ConnectionFactory, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=myhost2&reconnectAttempts=-1&ha=true
Will now send as many messages as we can in few seconds...
...
...
2020-06-04 00:01:09 WARN client:210 - AMQ212037: Connection failure to myhost2/10.64.60.101:61616 has been detected: AMQ219015: The connection was disconnected because of server shutdown [code=DISCONNECTED]
2020-06-04 00:01:09 DEBUG ClientSessionFactoryImpl:800 - Trying reconnection attempt 0/-1
2020-06-04 00:01:09 DEBUG ClientSessionFactoryImpl:1102 - Trying to connect with connectorFactory = org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory@45d84a20, connectorConfig=TransportConfiguration(name=ConnectionFactory, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=myhost2&reconnectAttempts=-1&ha=true
2020-06-04 00:01:09 DEBUG NettyConnector:486 - Connector NettyConnector [host=myhost2, port=61616, httpEnabled=false, httpUpgradeEnabled=false, useServlet=false, servletPath=/messaging/ActiveMQServlet, sslEnabled=false, useNio=true] using native epoll
2020-06-04 00:01:09 DEBUG client:668 - AMQ211002: Started EPOLL Netty Connector version 4.1.48.Final to myhost2:61616
2020-06-04 00:01:09 DEBUG NettyConnector:815 - Remote destination: myhost2/10.64.60.101:61616
2020-06-04 00:01:09 DEBUG NettyConnector:659 - Added ActiveMQClientChannelHandler to Channel with id = d4ed884e
2020-06-04 00:01:09 DEBUG ClientSessionFactoryImpl:1063 - Connector towards NettyConnector [host=myhost2, port=61616, httpEnabled=false, httpUpgradeEnabled=false, useServlet=false, servletPath=/messaging/ActiveMQServlet, sslEnabled=false, useNio=true] failed
2020-06-04 00:01:09 DEBUG ClientSessionFactoryImpl:1140 - Backup is not active, trying original connection configuration now.
2020-06-04 00:01:11 DEBUG ClientSessionFactoryImpl:800 - Trying reconnection attempt 1/-1
2020-06-04 00:01:11 DEBUG ClientSessionFactoryImpl:1102 - Trying to connect with connectorFactory = org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory@45d84a20, connectorConfig=TransportConfiguration(name=ConnectionFactory, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=myhost2&reconnectAttempts=-1&ha=true
2020-06-04 00:01:11 DEBUG NettyConnector:486 - Connector NettyConnector [host=myhost2, port=61616, httpEnabled=false, httpUpgradeEnabled=false, useServlet=false, servletPath=/messaging/ActiveMQServlet, sslEnabled=false, useNio=true] using native epoll
2020-06-04 00:01:11 DEBUG client:668 - AMQ211002: Started EPOLL Netty Connector version 4.1.48.Final to myhost2:61616
2020-06-04 00:01:11 DEBUG NettyConnector:815 - Remote destination: myhost2/10.64.60.101:61616
2020-06-04 00:01:11 DEBUG NettyConnector:659 - Added ActiveMQClientChannelHandler to Channel with id = 1530857a
2020-06-04 00:01:11 DEBUG ClientSessionFactoryImpl:1063 - Connector towards NettyConnector [host=myhost2, port=61616, httpEnabled=false, httpUpgradeEnabled=false, useServlet=false, servletPath=/messaging/ActiveMQServlet, sslEnabled=false, useNio=true] failed
020-06-04 00:01:37 DEBUG NettyConnector:659 - Added ActiveMQClientChannelHandler to Channel with id = d886a84e
2020-06-04 00:01:37 DEBUG ClientSessionFactoryImpl:1063 - Connector towards NettyConnector [host=myhost2, port=61616, httpEnabled=false, httpUpgradeEnabled=false, useServlet=false, servletPath=/messaging/ActiveMQServlet, sslEnabled=false, useNio=true] failed
2020-06-04 00:01:37 DEBUG ClientSessionFactoryImpl:1140 - Backup is not active, trying original connection configuration now.
Exception in thread "main" javax.jms.JMSException: AMQ219014: Timed out after waiting 30,000 ms for response when sending packet 71
at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.sendBlocking(ChannelImpl.java:457)
at org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl.sendBlocking(ChannelImpl.java:361)
at org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQSessionContext.sendFullMessage(ActiveMQSessionContext.java:552)
at org.apache.activemq.artemis.core.client.impl.ClientProducerImpl.sendRegularMessage(ClientProducerImpl.java:296)
at org.apache.activemq.artemis.core.client.impl.ClientProducerImpl.doSend(ClientProducerImpl.java:268)
at org.apache.activemq.artemis.core.client.impl.ClientProducerImpl.send(ClientProducerImpl.java:143)
at org.apache.activemq.artemis.core.client.impl.ClientProducerImpl.send(ClientProducerImpl.java:125)
at org.apache.activemq.artemis.jms.client.ActiveMQMessageProducer.doSendx(ActiveMQMessageProducer.java:483)
at org.apache.activemq.artemis.jms.client.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:220)
at org.messaginghub.pooled.jms.JmsPoolMessageProducer.sendMessage(JmsPoolMessageProducer.java:182)
at org.messaginghub.pooled.jms.JmsPoolMessageProducer.send(JmsPoolMessageProducer.java:90)
at org.messaginghub.pooled.jms.JmsPoolMessageProducer.send(JmsPoolMessageProducer.java:79)
at com.demo.artemis.clients.ArtemisClientClustered.runProducer(ArtemisClientClustered.java:77)
at com.demo.artemis.clients.ArtemisClientClustered.main(ArtemisClientClustered.java:26)
Caused by: ActiveMQConnectionTimedOutException[errorType=CONNECTION_TIMEDOUT message=AMQ219014: Timed out after waiting 30,000 ms for response when sending packet 71]
... 14 more
NOTE: When I used the Camel consumer to consumer the message from this queue and transform to another queue. During the process when I stop the broker the consumer count is automatically redirected to the other broker instance. From the console I am able to see the consumer counts redirected from one broker to another.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="jmsConnectionFactory" class="org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory">
<constructor-arg index="0" value="(tcp://myhost2:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;"/>
</bean>
<bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
<property name="maxConnections" value="10" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsPooledConnectionFactory" />
<property name="concurrentConsumers" value="10" />
</bean>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<endpoint id="queue1" uri="jms:queue:myExampleQ" />
<endpoint id="queue2" uri="jms:queue:myExampleQ2" />
<route>
<from uri="ref:queue1" />
<convertBodyTo type="java.lang.String" />
<transform>
<simple>MSG FRM queue1 TO queue2 : ${bodyAs(String)}</simple>
</transform>
<to uri="ref:queue2" />
</route>
</camelContext>
</beans>