Apache Camel: How to persist property or header between multiple Exchanges after split and subsequent Exception?
Asked Answered
C

3

9

I have a complex route which is as below (partly):

.when(header("KEY_1").isNull())
.choice()
    .when(header("KEY_2").isNull())
        .split().method(SplitExample.class, "invokeSplitter").streaming().parallelProcessing().executorService(threadPoolExecutor)   // first split                  
            .policy(requires_new)
                .bean(SplitExample.class, "enrich")
                .bean(persister,"populateRecordAndXRef")
                .bean(initializer, "initialize")
                .bean(validator, "validateInMsg")
                .bean(suppressResolver, "resolve")
                .choice()
                    .when(header("KEY_3").isNull())
                        .bean(MsgConverter.class,"doInvoke" )  // #1 property or header set here
                        .split(body()) // second split
                        .bean(validator, "validateOutMsg")                                 
                        .to(toURI.toArray(new String[ toURI.size()]))
                        .process(new Processor() {
                            @Override
                            public void process(Exchange exchange) throws Exception {
                                System.out.println(exchange.getException());  // #2 queue server is shut down here so that transaction failure occurs                                       
                            }
                        })
                    .endChoice() //end when                            
                .end() //end choice
            .end() //end policy                     
        .end() //end split                      
.endChoice() //end when 

I have also defined following exception policy:

 onException(JMSException.class)        
    .handled(true)
    .process(new QueueOperationFailureProcessor()); // #3 property or header should be accessible here

Now, my intention is to set a bean as Exchange property ("RECOVERY_DETAIL") in MsgConverter (#1) and retrieve the same bean in QueueOperationFailureProcessor (#3).

By debugging, I can see the property ("RECOVERY_DETAIL") in the in-line processor (#2). On JMSException, when my exception policy kicks in, I would like to retrieve the property ("RECOVERY_DETAIL") in QueueOperationFailureProcessor (#3).

But as it happens - the Exchange available in QueueOperationFailureProcessor (#3) is different from the one that is available at in-line processor (#2) and the property ("RECOVERY_DETAIL") is no where to be found.

Please help me out.

P.S. My camel version is 2.16.0 and I may not able to use any solution which requires version up-gradation.

Cobwebby answered 26/3, 2019 at 11:17 Comment(1)
What exactly is the exchange you find at #3? Is it one of the splitter sub-exchanges? You should be able to use the exchangeId property to trace where it has come from.Tradesman
G
1

Just guessing, but since there is a Splitter between the property creation and the property use, it could be the reason to lose properties.

A Splitter creates new messages with parts of existing messages. The key question here is: does Camel copy over all the properties and headers from the incoming message to all splitted, outgoing messages?

You could easily try this out by moving the creation of the property inside the Splitter. If the property is then available in the Exception handler, the Splitter is responsible.

In this case you could implement the Splitter logic yourself (just a Java Bean, see chapter "Using a Pojo to do the splitting" of the Splitter docs). Your implementation can take care about the properties.

Gaberones answered 29/3, 2019 at 6:3 Comment(3)
New property is introduced (#1) after first split operation which is done using a method - (SplitExample.class, "invokeSplitter"). At this stage the value of property is not known. It is not about "implement the Splitter logic". It is about propagating the value of newly acquired property on exception. A new aggregation strategy is required which can propagate the value of newly acquired property.Cobwebby
My answer is not about "implement the Splitter logic". There is a .split(body()) between #1 and #2. What I say is that this can be responsible for losing parts of the message like properties because when messages are splitted, is depends on the implementation if they are copied to the new messages or not.Gaberones
In my experience exchange properties are copied across to sub-exchanges. I think the OP confirms this: "By debugging, I can see the property ("RECOVERY_DETAIL") in the in-line processor (#2)"Tradesman
T
1

Try setting .shareUnitOfWork() on your splitters, you may find exceptions are propagated as you expect.

Relevant docs here

Tradesman answered 3/4, 2019 at 15:15 Comment(0)
A
0

The exception happening inside split may not be propagated all the way to your onException. You may try defining a doTry/doCatch inside split to handle this error.

Abilene answered 26/3, 2019 at 20:56 Comment(1)
As you can see in the route - I am not concerned with the exception happening inside split. It is the Exception that is thrown inside the transaction boundary of "requires_new". This is correctly propagated to "OnException" clause. But Exchange properties or simple message headers that were set in ConverterInvoker (#1) are lost in QueueOperationFailureProcessor (#3).Cobwebby

© 2022 - 2024 — McMap. All rights reserved.