Apache camel - how to "wiretap" synchronously? Or just send a copy of an exchange?
Asked Answered
D

5

7

I have an apache camel route which is processing a POJO on the exchange body.

Please look at the sequences of lines marked from 1 to 3.

    from("direct:foo")
        .to("direct:doSomething")         // 1 (POJO on the exchange body)
        .to("direct:storeInHazelcast")    // 2 (destroys my pojo! it gets -1)
        .to("direct:doSomethingElse")     // 3 (Where is my POJO??)
    ;

Now I need to use the put operation on the hazelcast component which unfortunately needs to set the body to the value -1.

    from("direct:storeInHazelcast")
            .setBody(constant(-1))
            .setHeader(HazelcastConstants.OPERATION, constant(HazelcastConstants.PUT_OPERATION))
            .setHeader(HazelcastConstants.OBJECT_ID, constant(LAST_FLIGHT_UPDATE_SEQ))
            .to("hazelcast:map:MyNumber")
    ;

For the line marked 2, I would like to send a COPY of the exchange to the storeInHazelcast route.

Firstly, I tried .multicast(), but the exchange body was still screwed up (to -1).

        // shouldnt this copy the exchange?
        .multicast().to("direct:storeInHazelcast").end()

Then I tried .wireTap(), which works as a "fire and forget" (async) mode, but I actually need it to block, and wait for it to complete. Can you make wireTap block?

        // this works but I need it to be sync processing (not async)
        .wireTap("direct:storeInHazelcast").end()

So I'm looking for some tips here. As far as I can read, multicast() should have copied the exchange, but the setBody() in my storeInHazelcast route seens to screw up the original exchange.

Alternatively maybe there is some other way to do this.

Thanks in advance. Camel 2.10

Devoir answered 7/1, 2014 at 10:34 Comment(1)
multicast do shallow copy, which in turn reflect in other multicasted routes as well.Doane
D
6

I think I have stumbled on to the answer, line 2 can use enrich() from the dsl like this:

    .enrich("direct:storeInHazelcast", new KeepOriginalAggregationStrategy())

where:

public class KeepOriginalAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        return oldExchange;
    }
}

Interestingly, I found an aggregation strategy named UseOriginalAggregationStrategy(), but I couldn't see how to specify the parameter named Exchange original from the DSL.

    .enrich("direct:storeInHazelcast",
        new UseOriginalAggregationStrategy(???, false))

In absence of some sort of getExchange() method in the dsl, I can't see how to use this aggregation strategy here (but if anyone can advise how, please do).

Devoir answered 7/1, 2014 at 12:3 Comment(3)
While this answer works, I disagree with the use of enrich since it carries a different semantic and implies a different pattern.Westfahl
@AlessandroDaRugna I agree, but I have yet to see a better solution. Robert's solution is OK, but I don't think it should be necessary. Camel is all about Integration Patterns, is there really no implemented Pattern that suits this use case?Capello
enrich only implies Producing an Exchange - what happens with the returned Exchange is encapsulated within the AggregationStrategy. See the answer from Arek Bazylewicz for a nice solution (I had no idea there were pre-defined strategies either): .enrich("direct:storeInHazelcast", AggregationStrategies.useOriginal())Celestinecelestite
M
7

You can do without writing your own aggregation strategy by using

.enrich("direct:storeInHazelcast", AggregationStrategies.useOriginal())
Mute answered 26/7, 2016 at 11:4 Comment(0)
D
6

I think I have stumbled on to the answer, line 2 can use enrich() from the dsl like this:

    .enrich("direct:storeInHazelcast", new KeepOriginalAggregationStrategy())

where:

public class KeepOriginalAggregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        return oldExchange;
    }
}

Interestingly, I found an aggregation strategy named UseOriginalAggregationStrategy(), but I couldn't see how to specify the parameter named Exchange original from the DSL.

    .enrich("direct:storeInHazelcast",
        new UseOriginalAggregationStrategy(???, false))

In absence of some sort of getExchange() method in the dsl, I can't see how to use this aggregation strategy here (but if anyone can advise how, please do).

Devoir answered 7/1, 2014 at 12:3 Comment(3)
While this answer works, I disagree with the use of enrich since it carries a different semantic and implies a different pattern.Westfahl
@AlessandroDaRugna I agree, but I have yet to see a better solution. Robert's solution is OK, but I don't think it should be necessary. Camel is all about Integration Patterns, is there really no implemented Pattern that suits this use case?Capello
enrich only implies Producing an Exchange - what happens with the returned Exchange is encapsulated within the AggregationStrategy. See the answer from Arek Bazylewicz for a nice solution (I had no idea there were pre-defined strategies either): .enrich("direct:storeInHazelcast", AggregationStrategies.useOriginal())Celestinecelestite
R
4

Save it in a header and restore it.

from("direct:foo")
    .to("direct:doSomething")         // 1 (POJO on the exchange body)
    .setHeader("old_body", body())    // save body
    .to("direct:storeInHazelcast")    // 2 (destroys my pojo! it gets -1)
    .setBody(header("old_body"))      // RESTORE the body
    .removeHeader("old_body")         // cleanup header
    .to("direct:doSomethingElse")     // 3 (Where is my POJO??)
;

This is a fairly common paradigm for destructive components.

Rodroda answered 7/1, 2014 at 20:21 Comment(2)
Thanks, i was hoping to avoid the whole 'temp variable' thing, but this shifts the responsibilty into my route rather than requiring the caller to use enrich.Devoir
Actually, I guess I should do the temp variable to/from header inside the storeInHazelcast route, that will leave my 1 - 2 - 3 code lines unchanged.Devoir
R
1

I also had this requirement (to perform synchronous, in-only processing on another route), and to achieve it I wrote a custom Processor that programmatically sends a copy of the Exchange. I think this results in a nicer DSL, in which the semantics at the point of use are clearer than by using enrich.

This static helper method creates the processor:

public static Processor synchronousWireTap(String uri) {
    return exchange -> {
        Exchange copy = exchange.copy();

        exchange.getContext().createProducerTemplate().send(uri,copy);

        //ProducerTemplate.send(String,Exchange) does not, unlike other send methods, rethrow an exception
        //on the exchange. We want any unhandled exception to be rethrown, so we must do so here.
        Throwable thrown = copy.getException(Throwable.class);

        if (thrown != null) {
            throw new CamelExecutionException(thrown.getMessage(), exchange, thrown);
        }
    };
}

And here's an example of use:

from("direct:foo")
    .to("direct:doSomething")                               // 1 (POJO on the exchange body)
    .process(synchronousWireTap("direct:storeInHazelcast")) // 2 (Does not destroy POJO because a copy of the exchange gets sent to this uri)
    .to("direct:doSomethingElse")                           // 3 (POJO is still there)

Note that this custom processor is not quite a synchronous analogue of the standard wireTap(), which is fully in-only, in that this processor rethrows any unhandled exception that occurs on the target route - but the message itself is left untouched. This was my requirement, since what I wanted to do was perform some other processing synchronously on another route, and be notified if that failed, but otherwise not have the message on my main route be affected (sort of the equivalent of calling a void method in procedural code).

Referendum answered 20/7, 2018 at 10:54 Comment(0)
G
-1

You can use the copy="true" option in wiretap to copy the exchange as mentioned in http://camel.apache.org/wire-tap.html or you can create your own processor to do the same.

Gulosity answered 29/6, 2017 at 9:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.