How to send large files to ActiveMQ using camel
Asked Answered
I

1

7

I'm trying to send a X?-GB large file as a stream to an ActiveMQ queue for processing.

I know ActiveMQ supports streams, and so does camel-jms, but nothing I try to set on the queue seems to make any difference. The only thing that changes is turning off stream caching results is a "stream closed" exception instead.

I am open to using a processor or custom class (as long as resources get cleaned up), but this should be possible from the blueprint. How do I properly process a large file through camel-activemq without getting an OutOfMemoryError?

Using

  • servicemix-7.0.0
  • camel-2.16.4
  • activemq-5.14.3

Here is my camel blueprint

<?xml version="1.0" encoding="UTF-8"?>
<blueprint
xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0"

    <!-- just calls exchange.setBody(exchange.getBody(InputStream.class)) -->    
    <bean id="toStreamBody" class="my.test.toInputStream"/>

    <!-- define a bean of type StreamCachingStrategy which CamelContext will automaticly use -->
    <bean id="streamStrategy" class="org.apache.camel.impl.DefaultStreamCachingStrategy">
        <property name="spoolDirectory" value="${java.io.tmpdir}TestTemp/#uuid#/"/>
        <property name="spoolThreshold" value="131072"/>
        <property name="spoolUsedHeapMemoryThreshold" value="70"/>
        <property name="anySpoolRules" value="true"/>
    </bean>

    <!-- streamCaching="true" is "not a valid attribute" -->
    <camelContext streamCache="true" xmlns="http://camel.apache.org/schema/blueprint">

        <route id="file_route">
            <from uri="file://FileUploadBin?delete=false&amp;moveFailed=.error"/>
            <!-- just calls exchange.setBody(exchange.getBody(InputStream.class)) -->
            <bean ref="toStreamBody"/>
            <to uri="activemq:queue:TestQ"/>
        </route>

        <route id="myTestQ">
            <from uri="activemq:queue:TestQ?concurrentConsumers=1&amp;maxConcurrentConsumers=64&amp;maxMessagesPerTask=100&amp;asyncConsumer=true&amp;jmsMessageType=Stream&amp;mapJmsMessage=false"/>
            <bean ref="toStreamBody"/>
            <log message="FINISHED" loggingLevel="WARN"/>
        </route>

    </camelContext>
</blueprint>

Here is the error I keep getting

2017-10-17 08:46:53,859 | ERROR |  - RecipientList | DefaultErrorHandler              | 43 - org.apache.camel.camel-core - 2.16.4 | Failed delivery for (MessageId: ID-DESKTOP-H2O66PO-62468-1508242908251-4-4 on ExchangeId: ID-DESKTOP-H2O66PO-62468-1508242908251-4-5). Exhausted after delivery attempt: 1 caught: org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is instance of org.apache.camel.StreamCache] due java.lang.OutOfMemoryError: Java heap space

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[file_route        ] [file_route        ] [file://FileUploadBin?delete=false&moveFailed=.error                           ] [      3764]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
        Id                  ID-DESKTOP-H2O66PO-62468-1508242908251-4-5
        ExchangePattern     InOnly
        Headers             {breadcrumbId=ID-DESKTOP-H2O66PO-62468-1508242908251-4-4, fileName=Die.txt}
        BodyType            org.apache.camel.converter.stream.FileInputStreamCache
        Body                [Body is instance of org.apache.camel.StreamCache]
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.TypeConversionException: Error during type conversion from type: java.lang.String to the required type: byte[] with value [Body is instance of org.apache.camel.StreamCache] due java.lang.OutOfMemoryError: Java heap space
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.createTypeConversionException(BaseTypeConverterRegistry.java:610)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:137)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.core.osgi.OsgiTypeConverter.convertTo(OsgiTypeConverter.java:108)[40:org.apache.camel.camel-blueprint:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:560)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:490)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:303)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:300)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:483)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:426)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:440)[46:org.apache.camel.camel-jms:2.16.4]
        at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[154:org.apache.servicemix.bundles.spring-jms:3.2.17.RELEASE_1]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:437)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:413)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:367)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.MulticastProcessor.doProcessParallel(MulticastProcessor.java:823)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.MulticastProcessor.access$200(MulticastProcessor.java:84)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:319)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.MulticastProcessor$1.call(MulticastProcessor.java:304)[43:org.apache.camel.camel-core:2.16.4]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)[:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121]
        at java.lang.Thread.run(Thread.java:745)[:1.8.0_121]
Caused by: org.apache.camel.RuntimeCamelException: java.lang.OutOfMemoryError: Java heap space
        at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1652)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1247)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.StaticMethodTypeConverter.convertTo(StaticMethodTypeConverter.java:59)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:293)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:120)[43:org.apache.camel.camel-core:2.16.4]
        ... 24 more
Caused by: java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:3236)[:1.8.0_121]
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)[:1.8.0_121]
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)[:1.8.0_121]
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)[:1.8.0_121]
        at java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)[:1.8.0_121]
        at sun.nio.ch.FileChannelImpl.transferToArbitraryChannel(FileChannelImpl.java:567)[:1.8.0_121]
        at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:616)[:1.8.0_121]
        at org.apache.camel.converter.stream.FileInputStreamCache.writeTo(FileInputStreamCache.java:108)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.converter.stream.StreamCacheConverter.convertToByteArray(StreamCacheConverter.java:102)[43:org.apache.camel.camel-core:2.16.4]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)[:1.8.0_121]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)[:1.8.0_121]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)[:1.8.0_121]
        at java.lang.reflect.Method.invoke(Method.java:498)[:1.8.0_121]
        at org.apache.camel.util.ObjectHelper.invokeMethod(ObjectHelper.java:1243)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.StaticMethodTypeConverter.convertTo(StaticMethodTypeConverter.java:59)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.doConvertTo(BaseTypeConverterRegistry.java:293)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.impl.converter.BaseTypeConverterRegistry.convertTo(BaseTypeConverterRegistry.java:120)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.core.osgi.OsgiTypeConverter.convertTo(OsgiTypeConverter.java:108)
        at org.apache.camel.component.jms.JmsBinding.createJmsMessageForType(JmsBinding.java:560)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.createJmsMessage(JmsBinding.java:490)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:303)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:300)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:483)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:426)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:440)[46:org.apache.camel.camel-jms:2.16.4]
        at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[154:org.apache.servicemix.bundles.spring-jms:3.2.17.RELEASE_1]
        at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:437)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:413)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:367)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:153)[46:org.apache.camel.camel-jms:2.16.4]
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[43:org.apache.camel.camel-core:2.16.4]
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:196)[43:org.apache.camel.camel-core:2.16.4]
2017-10-17 08:46:57,641 | WARN  | ://FileUploadBin | GenericFileOnCompletion          | 43 - org.apache.camel.camel-core - 2.16.4 | Rollback file strategy: org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy@294a3d48 for file: GenericFile[Die.txt]
Idolah answered 17/10, 2017 at 13:28 Comment(4)
Related to: this question. But no answers, I'm not using hornet/fuse, and the question is unhelpful.Idolah
Have you tried camel-stream? camel.apache.org/stream.html. Just Replace the file component with "stream:file?fileName=fileName.in" and send directly to the queue.Hyrax
@Itsallas The stream component isn't working any better.Idolah
I'm interested in this myself. I'm not sure that Camel streams are integrated with ActiveMQ streams -- broadly Camel just does standard JMS stuff. So if Camel determines that your message body needs (say) a TextMessage, it will accumulate data in the JVM heap to compose that message. I believe the "approved" way to do this stuff is to use off-line storage, and just pass references in JMS messages to the storage identifier or URI. The use of the ActiveMQ BlobMessage gives a little bit of support for this but (a) it's again not really integrated into Camel and (b) you still have to do the work.Boettcher
P
5

This is not really supported on the classic ActiveMQ broker.

However the next generation ActiveMQ Artemis supports large messages, and we have just added support for this in camel-jms as well. I wrote a blog entry about this: http://www.davsclaus.com/2017/10/working-with-large-messages-using.html

And we also added support for the javax.jms.StreamMessage type in camel-jms. However this API is not as ideal for large messages, so it has limited usage. But nevertheless you can turn it on, on the component with the new option streamMessageTypeEnabled in Camel 2.21 onwards, and then if the message body is streaming type then Camel will send as StreamMessage instead of BytesMessage.

Pape answered 22/10, 2017 at 14:4 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.