How to implement simple echo socket service in Spring Integration DSL
Asked Answered
B

2

0

Please,
could you help with implementation of a simple, echo style, Heartbeat TCP socket service in Spring Integration DSL? More precisely how to plug Adapter/Handler/Gateway to IntegrationFlows on the client and server side. Practical examples are hard to come by for Spring Integration DSL and TCP/IP client/server communication.

I think, I nailed most of the code, it's just that bit about plugging everything together in the IntegrationFlow.

There is an sample echo service in SI examples, but it's written in the "old" XML configuration and I really struggle to transform it to the configuration by code.

My Heartbeat service is a simple server waiting for client to ask "status", responding with "OK".

No @ServiceActivator, no @MessageGateways, no proxying, everything explicit and verbose; driven by a plain JDK scheduled executor on client side; server and client in separate configs and projects.

HeartbeatClientConfig

@Configuration
@EnableIntegration
public class HeartbeatClientConfig {

    @Bean
    public MessageChannel outboudChannel() {
        return new DirectChannel();
    }

    @Bean
    public PollableChannel inboundChannel() {
        return new QueueChannel();
    }

    @Bean
    public TcpNetClientConnectionFactory connectionFactory() {
        TcpNetClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 7777);
        return connectionFactory;
    }

    @Bean
    public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
            TcpNetClientConnectionFactory connectionFactory,
            MessageChannel inboundChannel) {
        TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
        heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
        heartbeatReceivingMessageAdapter.setOutputChannel(inboundChannel); // ???
        heartbeatReceivingMessageAdapter.setClientMode(true);
        return heartbeatReceivingMessageAdapter;
    }

    @Bean
    public TcpSendingMessageHandler heartbeatSendingMessageHandler(
            TcpNetClientConnectionFactory connectionFactory) {
        TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
        heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
        return heartbeatSendingMessageHandler;
    }

    @Bean
    public IntegrationFlow heartbeatClientFlow(
            TcpNetClientConnectionFactory connectionFactory,
            TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
            TcpSendingMessageHandler heartbeatSendingMessageHandler,
            MessageChannel outboudChannel) {
        return IntegrationFlows
                .from(outboudChannel) // ??????
                .// adapter ???????????
                .// gateway ???????????
                .// handler ???????????
                .get();
    }

    @Bean
    public HeartbeatClient heartbeatClient(
            MessageChannel outboudChannel,
            PollableChannel inboundChannel) {
        return new HeartbeatClient(outboudChannel, inboundChannel);
    }
}

HeartbeatClient

public class HeartbeatClient {
    private final MessageChannel outboudChannel;
    private final PollableChannel inboundChannel;
    private final Logger log = LogManager.getLogger(HeartbeatClient.class);

    public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
        this.inboundChannel = inboundChannel;
        this.outboudChannel = outboudChannel;
    }

    @EventListener
    public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
        log.info("Starting Heartbeat client...");
        start();
    }

    public void start() {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            while (true) {
                try {
                    log.info("Sending Heartbeat");
                    outboudChannel.send(new GenericMessage<String>("status"));
                    Message<?> message = inboundChannel.receive(1000);
                    if (message == null) {
                        log.error("Heartbeat timeouted");
                    } else {
                        String messageStr = new String((byte[]) message.getPayload());
                        if (messageStr.equals("OK")) {
                            log.info("Heartbeat OK response received");
                        } else {
                            log.error("Unexpected message content from server: " + messageStr);
                        }
                    }
                } catch (Exception e) {
                    log.error(e);
                }
            }
        }, 0, 10000, TimeUnit.SECONDS);
    }
}

HeartbeatServerConfig

@Configuration
@EnableIntegration
public class HeartbeatServerConfig {

    @Bean
    public MessageChannel outboudChannel() {
        return new DirectChannel();
    }

    @Bean
    public PollableChannel inboundChannel() {
        return new QueueChannel();
    }

    @Bean
    public TcpNetServerConnectionFactory connectionFactory() {
        TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(7777);
        return connectionFactory;
    }

    @Bean
    public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
            TcpNetServerConnectionFactory connectionFactory,
            MessageChannel outboudChannel) {
        TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
        heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
        heartbeatReceivingMessageAdapter.setOutputChannel(outboudChannel);
        return heartbeatReceivingMessageAdapter;
    }

    @Bean
    public TcpSendingMessageHandler heartbeatSendingMessageHandler(
            TcpNetServerConnectionFactory connectionFactory) {
        TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
        heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
        return heartbeatSendingMessageHandler;
    }

    @Bean
    public IntegrationFlow heartbeatServerFlow(
            TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
            TcpSendingMessageHandler heartbeatSendingMessageHandler,
            MessageChannel outboudChannel) {
        return IntegrationFlows
                .from(heartbeatReceivingMessageAdapter) // ???????????????
                .handle(heartbeatSendingMessageHandler) // ???????????????
                .get();
    }

    @Bean
    public HeartbeatServer heartbeatServer(
            PollableChannel inboundChannel, 
            MessageChannel outboudChannel) {
        return new HeartbeatServer(inboundChannel, outboudChannel);
    }
}

HeartbeatServer

public class HeartbeatServer {
    private final PollableChannel inboundChannel;
    private final MessageChannel outboudChannel;
    private final Logger log = LogManager.getLogger(HeartbeatServer.class);

    public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
        this.inboundChannel = inboundChannel;
        this.outboudChannel = outboudChannel;
    }

    @EventListener
    public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
        log.info("Starting Heartbeat");
        start();
    }

    public void start() {
        Executors.newSingleThreadExecutor().execute(() -> {
            while (true) {
                try {
                    Message<?> message = inboundChannel.receive(1000);
                    if (message == null) {
                        log.error("Heartbeat timeouted");
                    } else {
                        String messageStr = new String((byte[]) message.getPayload());
                        if (messageStr.equals("status")) {
                            log.info("Heartbeat received");
                            outboudChannel.send(new GenericMessage<>("OK"));
                        } else {
                            log.error("Unexpected message content from client: " + messageStr);
                        }
                    }
                } catch (Exception e) {
                    log.error(e);
                }
            }
        });
    }
}

Bonus question

Why channel can be set on TcpReceivingChannelAdapter (inbound adapter) but not TcpSendingMessageHandler (outbound adapter)?

UPDATE
Here is the full project source code if anyone is interested for anyone to git clone it:
https://bitbucket.org/espinosa/spring-integration-tcp-demo
I will try to put all suggested solutions there.

Baranowski answered 14/3, 2019 at 3:15 Comment(0)
K
2

It's much simpler with the DSL...

@SpringBootApplication
@EnableScheduling
public class So55154418Application {

    public static void main(String[] args) {
        SpringApplication.run(So55154418Application.class, args);
    }

    @Bean
    public IntegrationFlow server() {
        return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234)))
                .transform(Transformers.objectToString())
                .log()
                .handle((p, h) -> "OK")
                .get();
    }

    @Bean
    public IntegrationFlow client() {
        return IntegrationFlows.from(Gate.class)
                .handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
                .transform(Transformers.objectToString())
                .handle((p, h) -> {
                    System.out.println("Received:" + p);
                    return null;
                })
                .get();
    }

    @Bean
    @DependsOn("client")
    public Runner runner(Gate gateway) {
        return new Runner(gateway);
    }

    public static class Runner {

        private final Gate gateway;

        public Runner(Gate gateway) {
            this.gateway = gateway;
        }

        @Scheduled(fixedDelay = 5000)
        public void run() {
            this.gateway.send("foo");
        }

    }

    public interface Gate {

        void send(String out);

    }

}

Or, get the reply from the Gate method...

    @Bean
    public IntegrationFlow client() {
        return IntegrationFlows.from(Gate.class)
                .handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
                .transform(Transformers.objectToString())
                .get();
    }

    @Bean
    @DependsOn("client")
    public Runner runner(Gate gateway) {
        return new Runner(gateway);
    }

    public static class Runner {

        private final Gate gateway;

        public Runner(Gate gateway) {
            this.gateway = gateway;
        }

        @Scheduled(fixedDelay = 5000)
        public void run() {
            String reply = this.gateway.sendAndReceive("foo"); // null for timeout
            System.out.println("Received:" + reply);
        }

    }

    public interface Gate {

        @Gateway(replyTimeout = 5000)
        String sendAndReceive(String out);

    }

Bonus:

Consuming endpoints are actually comprised of 2 beans; a consumer and a message handler. The channel goes on the consumer. See here.

EDIT

An alternative, for a single bean for the client...

@Bean
public IntegrationFlow client() {
    return IntegrationFlows.from(() -> "foo", 
                    e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
            .transform(Transformers.objectToString())
            .handle((p, h) -> {
                System.out.println("Received:" + p);
                return null;
            })
            .get();
}
Kudos answered 14/3, 2019 at 13:36 Comment(8)
Thank you for you fast response. Sorry to be such PIA but could such solution be possible where channel are exposed? Where I can actually see explicit commands send() and receive() in my client and server classes? Or would it be too difficult? Your solution is clearly is shorter in terms of code, but simplicity is a bit subjective word.Baranowski
I also noticed that both your solutions effectively splits Client into request producing part (Gate) and response processing part (handling lambda). Is that necessary? Can whole Client live in a single class, or even better, a single method? I was under impression that channels, if connected correctly, can deliver exactly this.Baranowski
I added an alternative for the client, eliminating Gate. I am not sure what you mean in your first comment.Kudos
Gary, thank you very much! Still, my mind is still intrigued with solution based on outbound and inbound channels being handed over to Client to call send() on the outboundChannel and then blocking receive() on inboundChannel; reversely on Server side. Is it possible to configure something like IntegrationFlows.from(outboundChannel).handle(Tcp.outboundGateway(..)).channel(inboundChannel).get() for the client and IntegrationFlows.from(inboundChannel).handle(Tcp.inboundGateway(..)).channel(outboundChannel).get() for the server?Baranowski
Yes, you can do that by adding a Queue .channel() after the outbound gateway and then receive() from it. But my question is "why would you want to?". It adds complexity. One goal of the framework is to allow you to rise above low-level APIs and get rid of boilerplate code. If you want to code at a low level, why not just use the Socket APIs directly? Perhaps you have some unwritten business need that you are not explaining.Kudos
I am still learning the framework, exploring what can be done and what not. I've accepted your post but if you could be so kind and check the "channel solution"; client side seems to be working, implemented as suggested, but server does not even start.Baranowski
The admins here don't like extended commentary on questions/answers; I suggest you ask a new question with what you are trying to do; your current server implementation is completely wrong, but I don't have room here to explain the solution (and it's really not clear what you are trying to do).Kudos
Sure, kindly check #55330030 ..and yet again many thanks!Baranowski
B
2

For anyone interested, here is one of the working solutions I made with help from Gary Russell. All credits to Gary Russell. Full project source code here.

Highlights:

  • IntegrationFlows: Use only inbound and outbound Gateways.
  • No Adapters or Channels needed; no ServiceActivators or Message Gate proxies.
  • No need for ScheduledExecutor or Executors; client and server code got significatn
  • IntegrationFlows directly calls methods on client class and server class; I like this type of explicit connection.
  • Split client class on two parts, two methods: request producing part and response processing part; this way it can be better chained to flows.
  • explicitly define clientConnectionFactory/serverConnectionFactory. This way more things can be explicitly configured later.

HeartbeatClientConfig

@Bean
public IntegrationFlow heartbeatClientFlow(
        TcpNetClientConnectionFactory clientConnectionFactory,
        HeartbeatClient heartbeatClient) {
    return IntegrationFlows.from(heartbeatClient::send,  e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
            .handle(Tcp.outboundGateway(clientConnectionFactory))
            .handle(heartbeatClient::receive)
            .get();
}

HeartbeatClient

public class HeartbeatClient {
    private final Logger log = LogManager.getLogger(HeartbeatClient.class);

    public GenericMessage<String> send() {
        log.info("Sending Heartbeat");
        return new GenericMessage<String>("status");
    }

    public Object receive(byte[] payload, MessageHeaders messageHeaders) { // LATER: use transformer() to receive String here
        String messageStr = new String(payload);
        if (messageStr.equals("OK")) {
            log.info("Heartbeat OK response received");
        } else {
            log.error("Unexpected message content from server: " + messageStr);
        }
        return null;
    }
}

HeartbeatServerConfig

@Bean
public IntegrationFlow heartbeatServerFlow(
        TcpNetServerConnectionFactory serverConnectionFactory,
        HeartbeatServer heartbeatServer) {
    return IntegrationFlows
            .from(Tcp.inboundGateway(serverConnectionFactory))
            .handle(heartbeatServer::processRequest)
            .get();
}

HeartbeatServer

public class HeartbeatServer {
    private final Logger log = LogManager.getLogger(HeartbeatServer.class);

    public Message<String> processRequest(byte[] payload, MessageHeaders messageHeaders) {
        String messageStr = new String(payload);
        if (messageStr.equals("status")) {
            log.info("Heartbeat received");
            return new GenericMessage<>("OK");
        } else {
            log.error("Unexpected message content from client: " + messageStr);
            return null;
        }

    }
}
Baranowski answered 18/3, 2019 at 1:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.