Echo socket service in Spring Integration DSL using Channels and Gateways
Asked Answered
R

2

0

This is variant of my question How to implement simple echo socket service in Spring Integration DSL. A good working solutions was introduced but I would like to explore alternatives. Particularly I am interested in solution based on using inbound and outbound channels explicitly, in client and server implementations. Is that possible?

So far I was able to come up with:

HeartbeatClientConfig

...
@Bean
public IntegrationFlow heartbeatClientFlow(
        TcpNetClientConnectionFactory clientConnectionFactory,
        MessageChannel outboundChannel,
        PollableChannel inboundChannel) {
    return IntegrationFlows
            .from(outboundChannel)
            .handle(Tcp.outboundGateway(clientConnectionFactory))
            .channel(inboundChannel)
            .get();
}
...

HeartbeatClient

public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
    this.inboundChannel = inboundChannel;
    this.outboudChannel = outboudChannel;
}
...
void run() {
    // ..in scheduled intervals in loop 
    outboudChannel.send(new GenericMessage<String>("status"));
    Message<?> message = inboundChannel.receive(1000);
}

The client part seems to be working fine. Problem is on the server side.

HeartbeatServer

public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
    this.inboundChannel = inboundChannel;
    this.outboudChannel = outboudChannel;
}
...
void run() {
    // ..in some kind of loop
    Message<?> message = inboundChannel.receive(1000); // presumably a blocking call
    ...
    outboudChannel.send(new GenericMessage<>("OK"));
    ...
}

HeartbeatServerConfig
Here comes the most tricky part where I am sure I am wrong. I just don't know what I should do. Here I naively use inverse approach from client implementation, where it seems to be working; inverse in sense of switching inbound and outbound channels in the Flow definition.

...
@Bean
public IntegrationFlow heartbeatServerFlow(
        MessageChannel outboundChannel,
        PollableChannel inboundChannel) {
    return IntegrationFlows
            .from(inboundChannel)
            .handle(Tcp.inboundGateway(Tcp.netServer(7777)))
            .channel(outboundChannel)
            .get();
}
...

The server does not work, throwing cryptic exception about Found ambiguous parameter type [class java.lang.Boolean] for method match ... followed by a long list of Spring and Spring Integration methods.

Full source code can be found here.

Regality answered 24/3, 2019 at 23:20 Comment(0)
U
1

You can't start the server-side flow with a channel.

The flow starts with the gateway; it handles all the socket communication. When is receives a message it sends it to a channel.

You could do this...

@Bean
public IntegrationFlow server(PollableChannel requests, MessageChannel replies) {
    return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234))
                .replyChannel(replies))
            .transform(Transformers.objectToString())
            .channel(requests)
            .get();
}

But I would ask why you would want to because now you have to manage your own thread to receive from the request channel and write to the reply channel. In order for this to work, the replyChannel header from the request message must be copied to the reply message. In fact, you don't really need a reply channel; you can send the reply to the replyChannel header directly (that's what happens internally, we bridge the reply channel to the header channel).

It's much simpler to handle the request on the gateway's thread.

Unfledged answered 25/3, 2019 at 13:48 Comment(0)
R
0

Just to complement Gary's perfect answer, here is the full code if someone is interested.

I had to specify TcpNetServerConnectionFactory explicitly, to set ByteArrayLengthHeaderSerializer as serializer/deserializer. It did NOT work without it.

HeartbeatServerConfig full code

@Bean
public TcpNetServerConnectionFactory connectionFactory() {
    TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(7777);
    connectionFactory.setSerializer(new ByteArrayLengthHeaderSerializer());
    connectionFactory.setDeserializer(new ByteArrayLengthHeaderSerializer());
    return connectionFactory;
}

@Bean
public IntegrationFlow heartbeatServerFlow(
        TcpNetServerConnectionFactory connectionFactory,
        PollableChannel inboundChannel, 
        MessageChannel outboundChannel) {
    return IntegrationFlows.from(Tcp.inboundGateway(connectionFactory)
            .replyChannel(outboundChannel))
            .channel(inboundChannel)
            .get();
}

HeartbeatServer full code

public void start() {
    Executors.newSingleThreadExecutor().execute(() -> {
        while (true) {
            try {
                Message<?> request = inboundChannel.receive();
                if (request == null) {
                    log.error("Heartbeat timeouted");
                } else {
                    MessageChannel outboudChannel = (MessageChannel)request.getHeaders().getReplyChannel();
                    String requestPayload = new String((byte[]) request.getPayload());
                    if (requestPayload.equals("status")) {
                        log.info("Heartbeat received");
                        outboudChannel.send(new GenericMessage<>("OK"));
                    } else {
                        log.error("Unexpected message content from client: " + requestPayload);
                    }
                }
            } catch (Exception e) {
                log.error(e);
            }
        }
    });
}

The crucial bit is of course getting the outbound channel from request message itself as: MessageChannel outboudChannel = (MessageChannel)request.getHeaders().getReplyChannel()

Full code can be found here.

Regality answered 31/3, 2019 at 23:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.