Receiving multiple messages from MQ asynchronously
Asked Answered
C

1

0

I use Spring + Hibernate + JPA in my application.

I need to read the message from Websphere MQ and insert the message to DB. Sometimes there may be continuous messages available and sometimes very less number of messages and sometimes we can expect no message from Queue.

Currently I'm reading the message one by one and inserting them to Database. But it does not help much in terms of performance.

I mean when I have chunk of messages(Example 300k messages in Queue) I could not insert them faster. Number of entities inserted to DB per second is not so high. Because I do commit for every single entity.

I want to use hibernate batch processing, so that I can insert list of entities in a single commit. (Example: 30 to 40 messages per commit)

Questions:

  1. How to receive multiple messages from Queue? (I have checked that BatchMessageListenerContainer may be helpful. But I could not get some reference)

  2. Should I separate the db insertion process out side onMessage method? So that thread will be released to pool and be available for picking next messages from Queue?

  3. Parallel threads usage?

Current implementation:

Message Listener:

<bean id="myMessageListener" class="org.mypackage.MyMessageListener">

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationName" ref="queue"/>
    <property name="messageListener" ref="myMessageListener"/>
    <property name ="concurrentConsumers" value ="10"/>
    <property name ="maxConcurrentConsumers" value ="50"/>        
</bean>

Listener Class:

package org.mypackage.MyMessageListener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.mypackage.service.MyService;

public class MyMessageListener implements MessageListener {

    @Autowired
    private MyService myService;

    @Override
    public void onMessage(Message message) {
        try {
             TextMessage textMessage = (TextMessage) message;
             // parse the message
             // Process the message to DB
        } catch (JMSException e1) {
             e1.printStackTrace();
        }
    }
}
Colubrid answered 11/3, 2019 at 17:23 Comment(4)
To be able to bulk insert you need a list of items you want to insert in a one batch. I think you have to extend your Queue to deliver you a batch of orders depending on time and/or size.Hammack
@Hammack In my case the sender to Queue cannot send batch of Orders based on time and/or size.Colubrid
I think you have to update your question according to mcve. Because currently it is very difficult to articulate any solution approach when one can't see your implementation details.Hammack
@FlorianDe, I have updated the question by adding my implementation codeColubrid
T
1

It's not clear what your requirements are.

The Spring Batch project provides a BatchMessageListenerContainer.

Message listener container adapted for intercepting the message reception with advice provided through configuration. To enable batching of messages in a single transaction, use the TransactionInterceptor and the RepeatOperationsInterceptor in the advice chain (with or without a transaction manager set in the base class). Instead of receiving a single message and processing it, the container will then use a RepeatOperations to receive multiple messages in the same thread. Use with a RepeatOperations and a transaction interceptor. If the transaction interceptor uses XA then use an XA connection factory, or else the TransactionAwareConnectionFactoryProxy to synchronize the JMS session with the ongoing transaction (opening up the possibility of duplicate messages after a failure). In the latter case you will not need to provide a transaction manager in the base class - it only gets on the way and prevents the JMS session from synchronizing with the database transaction.

Tojo answered 29/3, 2019 at 17:12 Comment(10)
I have updated my question. I am using Websphere MQ and I want to receive chunk of messages so that I can insert multiple records in a single commit to DB.Colubrid
Sorry; I didn't read the tags; I answered for RabbitMQ. See the edit to my answer.Tojo
Thank you for the answer. Do you mean I can use BatchMessageListenerContainer instead of DMLC ? Should I change onMessage method argument to List<Message>? I couldn't get example of the BatchMessageListenerContainer usageColubrid
No, you still get one message at at time so you need to accumulate them; then, when you have a complete batch update the db and the transactions will commit (db and batch).Tojo
In case batchSize is 30 and If only 10 messages are readily available in Queue and remaining 20 messages may reach queue after an hour. It is not good for me to wait for remaining messages to arrive until batch size is reached. Because I want to wait for few millisec(Ex : 3000ms) and If there are 30 messages received within that time then I can update them to DB, else I can update the available No. Of messages to DB. Can this be achievable via this container please?Colubrid
You might be able to do something if you can tell from the data it's the last message in a batch, but not if it's just the queue is empty since the completion policy is only consulted when a message arrives. I don't believe there is an out-of-the box solution for your needs.Tojo
Are there out-of-the box solutions via spring-kafka or Apache Kafka ?Colubrid
You could use a JmsTemplate to "pull" messages from a queue instead of using a message listener container; it will return null when there are no more messages. spring-integration-kafka has a KafkaMessageSource which provides similar functionality for Kafka; or, the kafka Message Listener Container can be configured to emit a ListenerContainerIdleEvent when there are no new messages after some timeout, so your listener can use that to send the partial batch. The JMS container has no equivalent mechanism.Tojo
I read about maxMessagesPerTask in DMLC. Within each task execution, a number of message reception attempts (according to the "maxMessagesPerTask" setting) will each wait for an incoming message (according to the "receiveTimeout" setting). If all of those receive attempts in a given task return without a message, the task is considered idle with respect to received messages. Such a task may still be rescheduled; however, once it reached the specified "idleTaskExecutionLimit", it will shut down (in case of dynamic scaling). But this is not setting the list of messages to MessageListener.Colubrid
Can you please help me in setting timeOut to DMLC. I opened new question stackoverflow.com/questions/55750449Colubrid

© 2022 - 2024 — McMap. All rights reserved.