Spring Integration with JMS + ActiveMQ: Messages remain in JDBC Message Store after reconnect
Asked Answered
C

1

6

I'm trying to configure JMS with Spring Integration and an ActiveMQ Message Broker. My outbound channel should be backed by a JDBC Message Store to prevent data loss e.g. the broker or my application goes offline.

My configuration seems to work so far, however the JDBC Message Store does not behave like I expect. If I disconnect the broker, the messages send to the outbound channel will be persisted as intended but after a reconnect they remain in DB and are not send to the queue. However, further messages I send after reconnection reaches the queue and if I restart my application the persisted messages are send finally as well...

application-context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:jms="http://www.springframework.org/schema/jms"
   xmlns:int="http://www.springframework.org/schema/integration"
   xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
   xmlns:oxm="http://www.springframework.org/schema/oxm"
   xmlns:j2ee="http://www.springframework.org/schema/jee"
   xmlns:amq="http://activemq.apache.org/schema/core"
   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm.xsd
   http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd
   http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
   http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
   http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<!-- ActiveMQ Configuration -->
<jms:annotation-driven/>
<int:annotation-config/>

<!-- Factory for ActiveMQ connections from JNDI -->
<j2ee:jndi-lookup jndi-name="java:comp/env/jms/connectionFactory"
                  id="jmsConnectionFactory"
                  expected-type="org.apache.activemq.ActiveMQConnectionFactory"
                  proxy-interface="javax.jms.ConnectionFactory"
                  lookup-on-startup="false"
                  resource-ref="true"
                  cache="true"/>


<bean id="connectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
    <property name="connectionFactory" ref="jmsConnectionFactory"/>
    <property name="maxConnections" value="8"/>
    <property name="reconnectOnException" value="true"/>
</bean>


<!-- JAXB-marshaller from spring-oxm used to handle XML-payload of messages -->
<oxm:jaxb2-marshaller id="jax2bMarshaller" context-path="de.br.ecomx"/>

<!-- message-converter -->
<bean id="messageConverter" class="org.springframework.jms.support.converter.MarshallingMessageConverter">
    <property name="marshaller" ref="jax2bMarshaller"/>
    <property name="unmarshaller" ref="jax2bMarshaller"/>
</bean>

<!-- Message Store implementation -->
<!-- Persists messages temporariliy in DB to prevent data loss on server-shutdown -->
<!-- Has to be injected specifically in Queue Channels -->
<bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider">
        <bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.MySqlChannelMessageStoreQueryProvider"/>
    </property>
    <property name="usingIdCache" value="true"/>
    <property name="region" value="BWH"/>
    <property name="tablePrefix" value="BWH_"/>
</bean>

<!-- default poller for (non-message-driven) channel-adapters -->
<int:poller id="defaultPoller" default="true" fixed-delay="500">
    <int:transactional propagation="REQUIRED" isolation="DEFAULT" transaction-manager="transactionManager"/>
</int:poller>

<!-- LOGGING -->
<!--<int:logging-channel-adapter id="logger" level="DEBUG" log-full-message="true"/>-->


<!-- CHANNEL DEFINITIONS -->

<!-- intermediate channel for message enrichment -->
<int:channel id="output_enricher_channel"><int:queue/></int:channel>

<!-- general output-channel - specific channel is choosed by a router -->
<int:channel id="output_router"><int:queue/></int:channel>

<!-- channel definitions connected to consumer queues -->
<int:channel id="bwh_articleMasterData_input_channel"><int:queue/></int:channel>
<int:channel id="bwh_transferOrder_input_channel"><int:queue/></int:channel>
<!-- add your channel here... -->

<!-- channel definitions connected to producer queues -->
<int:channel id="ax_transferOrderPacked_input_channel"><int:queue message-store="store"/></int:channel>
<!-- add your channel here... -->


<!-- MESSAGE HANDLING -->

<!-- Enriches the 'messageId'-property of payload with 'id'-value from header and sends them to router -->
<int:enricher input-channel="output_enricher_channel" output-channel="output_router">
    <int:property name="messageId" expression="headers.id"/>
</int:enricher>

<!-- routing of outgoing messages to their type-specific channel -->
<int:payload-type-router input-channel="output_router">
    <int:mapping type="de.br.ecomx.TransferOrdersPacked" channel="ax_transferOrderPacked_input_channel"/>
</int:payload-type-router>


<!-- ENDPOINTS -->

<!-- endpoint configurations connecting queues with channels -->
<int-jms:message-driven-channel-adapter id="articleMasterDataConsumerChannelAdapter" connection-factory="connectionFactory"
                                        destination-name="${queue.bwh.articlemasterdata.in}" channel="bwh_articleMasterData_input_channel"
                                        message-converter="messageConverter" acknowledge="client" max-concurrent-consumers="10"
                                        auto-startup="false"/>
<int-jms:message-driven-channel-adapter id="transferOrderConsumerChannelAdapter" connection-factory="connectionFactory"
                                        destination-name="${queue.bwh.transferorder.in}" channel="bwh_transferOrder_input_channel"
                                        message-converter="messageConverter" acknowledge="client" max-concurrent-consumers="5"
                                        auto-startup="false"/>
<int-jms:outbound-channel-adapter id="transferOrderPackedProducerChannelAdapter" connection-factory="connectionFactory"
                                  destination-name="${queue.ax.transferorderpacked.in}" channel="ax_transferOrderPacked_input_channel"
                                  message-converter="messageConverter" delivery-persistent="true" session-transacted="true"
                                  auto-startup="false"/>

<!-- add your endpoint here... -->
<!-- REMEMBER: If auto-startup is set to false (which is recommended for all channel adapters) -->
<!--           the channel adapter has to be registered in ecomxConsumerChannelAdapterContainer below!!! -->


<!-- gateway definitions usually injected by producer-services and used to send messages to an outbound-channel-adapter -->
<int:gateway default-request-channel="output_enricher_channel" service-interface="de.lo.bwh.jms.gateway.EcomxProducerGateway"/>

<!-- connect endpoints with services -->
<int:service-activator input-channel="bwh_articleMasterData_input_channel" ref="ecomxArticleMasterDataConsumer"/>
<int:service-activator input-channel="bwh_transferOrder_input_channel" ref="ecomxTransferOrderConsumer"/>


<!-- container for endpoints used for start / stop all registered consumers / producers at once -->
<bean id="ecomxConsumerChannelAdapterContainer" class="de.lo.bwh.jms.EcomxConsumerChannelAdapterContainer">
    <property name="endpoints">
        <list value-type="org.springframework.integration.jms.JmsMessageDrivenEndpoint">
            <ref bean="articleMasterDataConsumerChannelAdapter"/>
            <ref bean="transferOrderConsumerChannelAdapter"/>
        </list>
    </property>
</bean>

<bean id="ecomxProducerChannelAdapterContainer" class="de.lo.bwh.jms.EcomxProducerChannelAdapterContainer">
    <property name="endpoints">
        <list value-type="org.springframework.integration.endpoint.PollingConsumer">
            <ref bean="transferOrderPackedProducerChannelAdapter"/>
        </list>
    </property>
</bean>

</beans>

context.xml

<Resource name="jms/connectionFactory"
        auth="Container"
        type="org.apache.activemq.ActiveMQConnectionFactory"
        description="JMS Connection Factory"
        factory="org.apache.activemq.jndi.JNDIReferenceFactory"
        brokerURL="tcp://my.url.net:61616"
        brokerName="myActiveMQBroker"/>

I'm using the EcomxProducerGateway to send messages. It sends the messages to output_enricher_channel which forwards them to output_router which routes the message to the correct outbound channel by payload-type.

When disconnecting the broker, several exceptions are logged after a few seconds. The following stacktrace may be helpful:

[WARN 08:37:07] SingleConnectionFactory.onException(322) | Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: Operation timed out
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54)
at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1997)
at org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:2016)
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101)
at org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126)
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101)
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101)
at org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:160)
at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:314)
at org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:96)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:200)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Operation timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:609)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:594)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:258)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
... 1 more

[ERROR 08:37:07] LoggingHandler.handleMessageInternal(145) | org.springframework.jms.connection.SynchedLocalTransactionFailedException: Local JMS transaction failed to commit; nested exception is javax.jms.JMSException: Operation timed out
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:424)
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:404)
at org.springframework.transaction.support.ResourceHolderSynchronization.afterCommit(ResourceHolderSynchronization.java:85)
at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit(TransactionSynchronizationUtils.java:133)
at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerAfterCommit(TransactionSynchronizationUtils.java:121)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCommit(AbstractPlatformTransactionManager.java:954)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:799)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:726)
at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:515)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:291)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy484.call(Unknown Source)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: javax.jms.JMSException: Operation timed out
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54)
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1420)
at org.apache.activemq.TransactionContext.syncSendPacketWithInterruptionHandling(TransactionContext.java:761)
at org.apache.activemq.TransactionContext.commit(TransactionContext.java:327)
at org.apache.activemq.ActiveMQSession.commit(ActiveMQSession.java:574)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.jms.connection.CachingConnectionFactory$CachedSessionInvocationHandler.invoke(CachingConnectionFactory.java:384)
at com.sun.proxy.$Proxy602.commit(Unknown Source)
at org.springframework.jms.connection.JmsResourceHolder.commitAll(JmsResourceHolder.java:184)
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:421)
... 27 more
Caused by: java.net.SocketException: Operation timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:609)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:594)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:258)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
... 1 more

[ERROR 08:37:08] LoggingHandler.handleMessageInternal(145) | org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler#0]; nested exception is org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Could not connect to broker URL: tcp://my.url.net:61616. Reason: java.net.UnknownHostException: my.url.net
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:74)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:149)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
at sun.reflect.GeneratedMethodAccessor464.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:281)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy484.call(Unknown Source)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Could not connect to broker URL: tcp://my.url.net:61616. Reason: java.net.UnknownHostException: my.url.net
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:496)
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:579)
at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:705)
at org.springframework.integration.jms.JmsSendingMessageHandler.send(JmsSendingMessageHandler.java:145)
at org.springframework.integration.jms.JmsSendingMessageHandler.handleMessageInternal(JmsSendingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 31 more
Caused by: javax.jms.JMSException: Could not connect to broker URL: tcp://my.url.net:61616. Reason: java.net.UnknownHostException: my.url.net
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:360)
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:305)
at org.apache.activemq.ActiveMQConnectionFactory.createConnection(ActiveMQConnectionFactory.java:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:201)
at com.sun.proxy.$Proxy156.createConnection(Unknown Source)
at org.springframework.jms.connection.SingleConnectionFactory.doCreateConnection(SingleConnectionFactory.java:365)
at org.springframework.jms.connection.SingleConnectionFactory.initConnection(SingleConnectionFactory.java:305)
at org.springframework.jms.connection.SingleConnectionFactory.getConnection(SingleConnectionFactory.java:283)
at org.springframework.jms.connection.SingleConnectionFactory.createConnection(SingleConnectionFactory.java:224)
at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
at org.springframework.jms.core.JmsTemplate.access$600(JmsTemplate.java:90)
at org.springframework.jms.core.JmsTemplate$JmsTemplateResourceFactory.createConnection(JmsTemplate.java:1205)
at org.springframework.jms.connection.ConnectionFactoryUtils.doGetTransactionalSession(ConnectionFactoryUtils.java:312)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:480)
... 36 more
Caused by: java.net.UnknownHostException: my.url.net
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.activemq.transport.tcp.TcpTransport.connect(TcpTransport.java:501)
at org.apache.activemq.transport.tcp.TcpTransport.doStart(TcpTransport.java:464)
at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)
at org.apache.activemq.transport.AbstractInactivityMonitor.start(AbstractInactivityMonitor.java:138)
at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58)
at org.apache.activemq.transport.WireFormatNegotiator.start(WireFormatNegotiator.java:72)
at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58)
at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58)
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:340)
... 54 more
Complexion answered 28/4, 2015 at 8:15 Comment(0)
O
3

Well, that doesn't work for rolled back messages because of usingIdCache="true". The message are returned to the DB, but they aren't picked up by the next poll, because their ids are in the in-memory idCache.

We provide this recommendation to get deal with processed or rolled back messages:

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>

...

<int:poller id="defaultPoller" default="true" fixed-delay="500">
    <int:transactional propagation="REQUIRED" isolation="DEFAULT" 
                       synchronization-factory="syncFactory" 
                       transaction-manager="transactionManager"/>
</int:poller>

With that independently of TX result message id is removed from the idCache. And in your case it will be ready to be polled one more time.

See more info in the Reference Manual. And JavaDocs on the JdbcChannelMessageStore#setUsingIdCache shows the same.

Overshine answered 30/4, 2015 at 13:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.