Spring Cloud Function - Separate routing-expression for different Consumer
Asked Answered
M

2

7

I have a service, which receives different structured messages from different message queues. Having @StreamListener conditions we can choose at every message type how that message should be handled. As an example:

We receive two different types of messages, which have different header fields and values e.g.

Incoming from "order" queue:

Order1: { Header: {catalog:groceries} }
Order2: { Header: {catalog:tools} }

Incoming from "shipment" queue:

Shipment1: { Header: {region:Europe} }
Shipment2: { Header: {region:America} }

There is a binding for each queue, and with according @StreamListener I can process the messages by catalog and region differently

e.g.

@StreamListener(target = OrderSink.ORDER_CHANNEL, condition = "headers['catalog'] == 'groceries'")
public void onGroceriesOrder(GroceryOder order){
...
}

So the question is, how to achieve this with the new Spring Cloud Function approach?

At the documentation https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.2.RELEASE/reference/html/spring-cloud-stream.html#_event_routing it is mentioned:

Also, for SpEL, the root object of the evaluation context is Message so you can do evaluation on individual headers (or message) as well …​.routing-expression=headers['type']

Is it possible to add the routing-expression to the binding like (in application.yml)

onGroceriesOrder-in-0:
  destination: order
  routing-expression: "headers['catalog']==groceries"

?

EDIT after first answer If the above expression at this location is not possible, what the first answer implies, than my question goes as follows:

As far as I understand, an expression like routing-expression: headers['catalog'] must be set globally, because the result maps to certain (consumer) functions.

How can I control that the 2 different messages on each queue will be forwarted to their own consumer function, e.g.

Order1 --> MyOrderService.onGroceriesOrder()
Order2 --> MyOrderService.onToolsOrder()
Shipment1 --> MyShipmentService.onEuropeShipment()
Shipment2 --> MyShipmentService.onAmericaShipment()

That was easy with @StreamListener, because each method gets their own @StreamListener annotation with different conditions. How can this be achieved with the new routing-expression setting? ?

Milissa answered 10/4, 2020 at 7:17 Comment(2)
Hi @Milissa did you find how to achieve it ?Salamander
I need the same behavior. but, apparently routing-expression:... can not be applied on the binding level.Apodal
P
3

Aside from the fact that the above is not a valid expression, but I think you meant headers['catalog']==groceries. If so, what would you expect to happen from evaluating it as the only two option could be true/false. Anyway, these are rhetorical but helps to understand the problem and how to fix it.

The expression must result in a value of a function to route TO. So. . .

routing-expression: headers['catalog'] - assumes that the actual value of catalog header is the name of the function to invoke

routing-expression: headers['catalog']==groceries ? 'processGroceries' : 'processOther' - maps value 'groceries' to 'processGroceries' function.

Piranesi answered 10/4, 2020 at 7:36 Comment(4)
Thank you, this leads to a follow up question, see in the "EDIT after first answer" blockMilissa
Yes, to get the equivalent of 'condition' we had in StreamListener (and more) you can use routing-expression as message header. Refer to this test for an example - github.com/spring-cloud/spring-cloud-stream/blob/master/…Piranesi
it's rather useless, as you have to define this header when you're sending (creating) a message. So you need to embed function name (architecture detail) in a message. If message comes from external system, it knows nothing about architecture of my system, so it doesn't know, which header to set. Do you suggest to make functions names based on what's in header? The whole popwer of messaging is decoupling, which in yoour solution doesn't happen. You tie message with consumer function nameElnoraelnore
I never said or suggested any of what you just said. The expression allows you to evaluate anything including message header and based on that determine the function definition. Just like in provided example function echo will be invoked if message's content type is text/plain. It certainly does the same thing as with legacy annotation approach while keeping your actual code clean from infrastructure details. So what coupling you are referring to???Piranesi
T
0

For a specific routing, you can use MessageRoutingCallback strategy:

MessageRoutingCallback

The MessageRoutingCallback is a strategy to assist with determining the name of the route-to function definition.

public interface MessageRoutingCallback {
  FunctionRoutingResult routingResult(Message<?> message);
  . . .
}

All you need to do is implement and register it as a bean to be picked up by the RoutingFunction. For example:

@Bean
public MessageRoutingCallback customRouter() {
  return new MessageRoutingCallback() {
      @Override
      FunctionRoutingResult routingResult(Message<?> message) {
          return new FunctionRoutingResult((String) message.getHeaders().get("func_name"));
      }
  };
}

Spring Cloud Function

Transported answered 24/1, 2022 at 17:25 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.