How can @MessagingGateway be configured with Spring Cloud Stream MessageChannels?
Asked Answered
T

3

6

I have developed asynchronous Spring Cloud Stream services, and I am trying to develop an edge service that uses @MessagingGateway to provide synchronous access to services that are async by nature.

I am currently getting the following stack trace:

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:355)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 47 common frames omitted

My @MessagingGateway:

@EnableBinding(AccountChannels.class)
@MessagingGateway

public interface AccountService {
  @Gateway(requestChannel = AccountChannels.CREATE_ACCOUNT_REQUEST,replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
  Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}

If I consume the message on the reply channel via a @StreamListener, it works just fine:

  @HystrixCommand(commandKey = "acounts-edge:accountCreated", fallbackMethod = "accountCreatedFallback", commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE")}, ignoreExceptions = {ClientException.class})
  @StreamListener(AccountChannels.ACCOUNT_CREATED)
  public void accountCreated(Account account, @Header(name = "spanTraceId", required = false) String traceId) {
    try {
      if (log.isInfoEnabled()) {
        log.info(new StringBuilder("Account created: ").append(objectMapper.writeValueAsString(account)).toString());
      }
    } catch (JsonProcessingException e) {
      log.error(e.getMessage(), e);
    }
  }

On the producer side, I am configuring requiredGroups to ensure that multiple consumers can process the message, and correspondingly, the consumers have matching group configurations.

Consumer:

spring:
  cloud:
    stream:
      bindings:
        create-account-request:
          binder: rabbit1
          contentType: application/json
          destination: create-account-request
          requiredGroups: accounts-service-create-account-request
        account-created:
          binder: rabbit1
          contentType: application/json
          destination: account-created
          group: accounts-edge-account-created

Producer:

spring:
  cloud:
    stream:
      bindings:
        create-account-request:
          binder: rabbit1
          contentType: application/json
          destination: create-account-request
          group: accounts-service-create-account-request
        account-created:
          binder: rabbit1
          contentType: application/json
          destination: account-created
          requiredGroups: accounts-edge-account-created

The bit of code on the producer side that processes the request and sends the response:

  accountChannels.accountCreated().send(MessageBuilder.withPayload(accountService.createAccount(account)).build());

I can debug and see that the request is received and processed, but when the response is sent to the reply channel, that's when the error occurs.

To get the @MessagingGateway working, what configurations and/or code am I missing? I know I'm combining Spring Integration and Spring Cloud Gateway, so I'm not sure if using them together is causing the issues.

Tussore answered 13/12, 2017 at 19:9 Comment(0)
G
7

It's good question and really good idea. But it isn't going to work so easy.

First of all we have to determine for ourselves that gateway means request/reply, therefore correlation. And this available in @MessagingGateway via replyChannel header in face of TemporaryReplyChannel instance. Even if you have an explicit replyChannel = AccountChannels.ACCOUNT_CREATED, the correlation is done only via the mentioned header and its value. The fact that this TemporaryReplyChannel is not serializable and can't be transferred over the network to the consumer on another side.

Luckily Spring Integration provide some solution for us. It is a part of the HeaderEnricher and its headerChannelsToString option behind HeaderChannelRegistry:

Starting with Spring Integration 3.0, a new sub-element <int:header-channels-to-string/> is available; it has no attributes. This converts existing replyChannel and errorChannel headers (when they are a MessageChannel) to a String and stores the channel(s) in a registry for later resolution when it is time to send a reply, or handle an error. This is useful for cases where the headers might be lost; for example when serializing a message into a message store or when transporting the message over JMS. If the header does not already exist, or it is not a MessageChannel, no changes are made.

https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-transformation-chapter.html#header-enricher

But in this case you have to introduce an internal channel from the gateway to the HeaderEnricher and only the last one will send the message to the AccountChannels.CREATE_ACCOUNT_REQUEST. So, the replyChannel header will be converted to a string representation and be able to travel over the network. On the consumer side when you send a reply you should ensure that you transfer that replyChannel header as well, as it is. So, when the message will arrive to the AccountChannels.ACCOUNT_CREATED on the producer side, where we have that @MessagingGateway, the correlation mechanism is able to convert a channel identificator to the proper TemporaryReplyChannel and correlate the reply to the waiting gateway call.

Only the problem here that your producer application must be as single consumer in the group for the AccountChannels.ACCOUNT_CREATED - we have to ensure that only one instance in the cloud is operating at a time. Just because only one instance has that TemporaryReplyChannel in its memory.

More info about gateway: https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-endpoints-chapter.html#gateway

UPDATE

Some code for help:

@EnableBinding(AccountChannels.class)
@MessagingGateway

public interface AccountService {
  @Gateway(requestChannel = AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST, replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
  Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}

@Bean
public IntegrationFlow headerEnricherFlow() {
   return IntegrationFlows.from(AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST)
            .enrichHeaders(headerEnricher -> headerEnricher.headerChannelsToString())
            .channel(AccountChannels.CREATE_ACCOUNT_REQUEST)
            .get();

}

UPDATE

Some simple application to demonstrate the PoC:

@EnableBinding({ Processor.class, CloudStreamGatewayApplication.GatewayChannels.class })
@SpringBootApplication
public class CloudStreamGatewayApplication {

    interface GatewayChannels {

        String REQUEST = "request";

        @Output(REQUEST)
        MessageChannel request();


        String REPLY = "reply";

        @Input(REPLY)
        SubscribableChannel reply();
    }

    private static final String ENRICH = "enrich";


    @MessagingGateway
    public interface StreamGateway {

        @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
        String process(String payload);

    }

    @Bean
    public IntegrationFlow headerEnricherFlow() {
        return IntegrationFlows.from(ENRICH)
                .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
                .channel(GatewayChannels.REQUEST)
                .get();
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> process(Message<String> request) {
        return MessageBuilder.withPayload(request.getPayload().toUpperCase())
                .copyHeaders(request.getHeaders())
                .build();
    }


    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext =
                SpringApplication.run(CloudStreamGatewayApplication.class, args);

        StreamGateway gateway = applicationContext.getBean(StreamGateway.class);

        String result = gateway.process("foo");

        System.out.println(result);
    }

}

The application.yml:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: requests
        output:
          destination: replies
        request:
          destination: requests
        reply:
          destination: replies

I use spring-cloud-starter-stream-rabbit.

The

MessageBuilder.withPayload(request.getPayload().toUpperCase())
            .copyHeaders(request.getHeaders())
            .build()

Does the trick copying request headers to the reply message. So, the gateway is able on the reply side to convert channel identifier in the headers to the appropriate TemporaryReplyChannel to convey the reply properly to the caller of gateway.

The SCSt issue on the matter: https://github.com/spring-cloud/spring-cloud-stream/issues/815

Gotham answered 13/12, 2017 at 19:34 Comment(29)
Thanks for the quick reply, Artem. I think I understand the gist of what you are stating, but I want to make sure I understand. Are you stating that I need to create a HeaderEnricher bean on the side with the @MessagingGateway? If so, how would I configure its inputChannel and outputChannel attributes given that the current requestChannel is AccountChannels.CREATE_ACCOUNT_REQUEST?Tussore
Also, I'm not clear on how the TemporaryReplyChannel plays into the creation of the HeaderEncricher bean.Tussore
The Gateway creates TemporaryReplyChannel and populates it into the message it sends to the requestChannel.Gotham
That requestChannel must be as input for the HeaderEncricher, something internal, not binding destination. The outputChannel of the HeaderEncricher will be already AccountChannels.CREATE_ACCOUNT_REQUESTGotham
So, if my bean has @Transformer(inputChannel = AccountChannels.CREATE_ACCOUNT_REQUEST_HEADERS, outputChannel = AccountChannels.CREATE_ACCOUNT_REQUEST), which channel do I specify as the argument to headerChannelRegistry.channelToChannelName(channel)? Sorry if I'm missing something obvious here.Tussore
You don't specify any channel there, you have to take a look into Java DSL HeaderEnricherSpec and use its headerChannelsToString() operation. The raw HeaderEnricher doesn't expose that option. So, you should declare a simple IntegrationFlow instead of @TransformerGotham
I've been trying to figure out how to do what you're suggesting and have hit a wall. I haven't worked with the Spring Integration DSL before. Is it possible for you to share the beans I need to configure to get this done, taking into account the code I originally shared? If so, I really appreciate it.Tussore
See an UPDATE in my answer.Gotham
Your update helped. However, I am still getting 2017-12-13 16:37:46.582 WARN [accounts-edge,84460ed05e3dabdf,de980fadff1f95df,false] 24308 --- [nt-created-1] o.s.a.r.r.RejectAndDontRequeueRecoverer : Retries exhausted for message (Body:'{.... I see in my logs that the replyChannel is populated with the channel name I specified, but is it possible that I should specify the same name as I have configured in requiredGroups? Could that be why the message isn't being sent?Tussore
No, replyChannel header must be exactly a TemporaryReplyChannel instance or its string representation after converting to string via that header-enricher function. Looks like you somehow overrides it somewhere downstream or even in the consumer on the other side.Gotham
I can see on the @MessagingGateway side that the message is received. I see this in the logs: headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=account-created, amqp_deliveryTag=2, amqp_consumerQueue=account-created.accounts-edge-account-created, amqp_redelivered=false, messageSent=true, spanTraceId=38056f7842484161, spanId=c02b30b3a8dc7fb6, amqp_receivedRoutingKey=account-created, replyChannel=account-created, id=81f5b33a-0e37-3425-c692-5b4197a828b2, spanSampled=0, amqp_consumerTag=amq.ctag-giNMTOHkOzGxVRTqOwjXrA, contentType=application/json;charset=UTF-8}].Tussore
But the message is never processed. What I don't understand is that on the service side, I set the replyChannel to the TemporaryReplyChannel value, but the amqp_consumerQueue is still set to the value I used when I had everything wired up to process the message with a @StreamListener. I'm trying, on the processing side, to both return a response to the MessagingGateway AND send it through the asynchronous channel to other processes. Also, I'm not clear why I would set the replyChannel to AccountChannels.ACCOUNT_CREATED when on the processing side I set it to the TemporaryReplyChannel value.Tussore
??? You shouldn't set anything by your self. That's wrong. The gateway does that automatically because it is about request/reply and therefore correlation. If you override it, you get unexpected behavior. Would it be so hard for you to share Spring Boot app with us to play with the codeGotham
I sure wish I could share the code, but I can't due to NDA with my client. In fact, I've had to mask the actual code I'm developing and posting here. Has Pivotal developed any examples where Spring Cloud Stream is used to provide an asynchronous, event-driven set of services while at the same time providing for a synchronous messaging gateway to the services? Specifically, what I'm doing is using the groups/requiredGroups configurations to allow for multiple asynchronous nodes to receive messages. But, I want to also allow for a messaging gateway to send and receive a response.Tussore
I see, but I really don't understand what and how overrides the replyChannel header to that account-created value. The MessagingGateway doesn't do that. There is no such a sample. I'll try to cook something. Although need to understand the requirements why would one do the request/reply with streaming?..Gotham
Well, actually I requested simple Spring Boot project. I don't need the whole client one and I'm sure I won't understand it. Only what we need a short snippet in the scope of the problem to reproduce and play.Gotham
I want to do the request/reply in combination with streaming because I want all of the transactions-based services (i.e., those that create/update/delete data) to be asynchronous and using Spring Cloud Stream. At the same time, I want to be able to provide request/reply messaging gateways for clients who need to/want to wait for a response (e.g., a web client who sends the account data and waits for the account to be created so it can be displayed in the UI). Does that give you enough of an idea about what I'm trying to do?Tussore
M-m-m, my friend. I see you don't carry request headers to the reply. You even don't care about them in your @StreamListener. You have to copy request message headers into the reply message before sending to the accountChannels.accountCreated()Gotham
Which headers from the request message should I copy into the reply message? Another oddity is that until my request times out on the producer side (i.e., @MessagingGateway side), the mesage isn't received by the consumer. Once the request times out, I see the message hit my CREATE_ACCOUNT_REQUEST @StreamListener. The request processes and sends the response message, but I want the message to be sent back to the @MessagingGateway and at the same time be sent out over the existing channel to the asynchronous @StreamListeners that are identified in my configuration with requiredGroups.Tussore
Right, bring for us a simple app to let us understand your requirements and current PoC, otherwise it isn't clear. Would be better if you copy all the request header, but most important are replyChannel and errorChannelGotham
See an UPDATE in my answer.Gotham
Artem, one more question. When using this approach, string to object conversion isn't happening via the Jackson message converter. Is there an additional configuration to the IntegrationFlow that needs to be done to get the required Content-Type configured?Tussore
There is JsonToObjectTransformer in SI on the matter to be used in the IntegrationFlow: docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/…. But that all sounds like a fully separate story, not related to this Gateway subject.Gotham
When I get the message via the RabbitMQ UI, I see that the content-type is text/plain. That appears to probably be the culprit. Normally, when I build the REST controllers, I just specify the produces/consumes attributes on the mapping annotations and the correct headers are generated. When using Spring Integration, and specifically via the headerEnricherFlow example you provided, how is content-type negotiation done?Tussore
There is no "content-type negotiation" at all. We talk here only about some header changes. Nothing is done for payload. When you send the message to the AccountChannels.CREATE_ACCOUNT_REQUEST, that's where bidner does payload conversion and content-type header population. Not sure why do you think HeaderEnricher is some kind of bottleneck here...Gotham
I don't think it is. I am currently receiving org.springframework.core.convert.ConverterNotFoundException: No converter found capable of converting from type [java.lang.String] to type x.y.z.Account, so it looks to me like the Accept/Content-Type headers aren't getting set in my code. I was just wondering if I had to set those headers within the HeaderEnricher or if I need to do that somewhere else. I apologize about my ignorance on SI as I'm still new to it. Given the solution, and SI in general, do you have advice on how best to set these headers so complex objects can be mapped?Tussore
Please, start a new SO thread. That's too complicate to figure out what's going on. ThanksGotham
Artem, referring back to your comment: "Only the problem here that your producer application must be as single consumer in the group for the AccountChannels.ACCOUNT_CREATED - we have to ensure that only one instance in the cloud is operating at a time. Just because only one instance has that TemporaryReplyChannel in its memory." My concern is that the @MessagingGateway service can't scale since there can only be one instance of it running. Is there any way to get this solution working with more than a single instance?Tussore
As I said before: this is home-made solution. There is no guarantee how it is going to work in any possible scenario. And right, some limitations might apply. You can scale though, as far as you have proper partition sticking.Gotham
T
2

With Artem's help, I've found the solution I was looking for. I have taken the code Artem posted and split it into two services, a Gateway service and a CloudStream service. I also added a @RestController for testing purposes. This essentially mimics what I was wanting to do with durable queues. Thanks Artem for your assistance! I really appreciate your time! I hope this helps others who want to do the same thing.

Gateway Code

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;

import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@EnableBinding({GatewayApplication.GatewayChannels.class})
@SpringBootApplication
public class GatewayApplication {

  interface GatewayChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Input(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Output(TO_UPPERCASE_REQUEST)
    MessageChannel toUppercaseRequest();
  }

  @MessagingGateway
  public interface StreamGateway {
    @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
    String process(String payload);
  }

  private static final String ENRICH = "enrich";

  public static void main(String[] args) {
    SpringApplication.run(GatewayApplication.class, args);
  }

  @Bean
  public IntegrationFlow headerEnricherFlow() {
    return IntegrationFlows.from(ENRICH).enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
        .channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
  }

  @RestController
  public class UppercaseController {
    @Autowired
    StreamGateway gateway;

    @GetMapping(value = "/string/{string}",
        produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
    public ResponseEntity<String> getUser(@PathVariable("string") String string) {
      return new ResponseEntity<String>(gateway.process(string), HttpStatus.OK);
    }
  }

}

Gateway Config (application.yml)

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          producer:
            required-groups: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          group: gateway-to-uppercase-reply
server:
  port: 8080

CloudStream Code

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding({CloudStreamApplication.CloudStreamChannels.class})
@SpringBootApplication
public class CloudStreamApplication {

  interface CloudStreamChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Output(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Input(TO_UPPERCASE_REQUEST)
    MessageChannel toUppercaseRequest();
  }

  public static void main(String[] args) {
    SpringApplication.run(CloudStreamApplication.class, args);
  }

  @StreamListener(CloudStreamChannels.TO_UPPERCASE_REQUEST)
  @SendTo(CloudStreamChannels.TO_UPPERCASE_REPLY)
  public Message<?> process(Message<String> request) {
    return MessageBuilder.withPayload(request.getPayload().toUpperCase())
        .copyHeaders(request.getHeaders()).build();
  }

}

CloudStream Config (application.yml)

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          group: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          producer:
            required-groups: gateway-to-uppercase-reply
server:
  port: 8081
Tussore answered 15/12, 2017 at 20:54 Comment(1)
its working fine. But how to convert the payload to object?Lobscouse
V
1

Hmm, I am a bit confused as well as to what you are trying to accomplish, but let's se if we can figure this out. Mixing SI and SCSt is definitely natural as one builds on another so all should work: Here is an example code snippet I just dug up from an old sample that exposes REST endpoint yet delegates (via Gateway) to Source's output channel. See if that helps:

@EnableBinding(Source.class)
@SpringBootApplication
@RestController
public class FooApplication {
    . . . 

    @Autowired
    private Source channels;

    @Autowired
    private CompletionService completionService;

    @RequestMapping("/complete")
    public String completeRequest(@RequestParam int id) {
        this.completionService.complete("foo");
        return "OK";
    }

    @MessagingGateway
    interface CompletionService {
        @Gateway(requestChannel = Source.OUTPUT)
        void complete(String message);
    }
}
Villarreal answered 13/12, 2017 at 19:34 Comment(1)
See my answer as well.Gotham

© 2022 - 2024 — McMap. All rights reserved.