How to support WebSocket transport with Spring Boot RSocket server?
Asked Answered
Z

2

6

TLDR: What is required to configure a Spring Boot application that exposes an RSocket interface that supports the WebSocket transport?


I'm learning about RSocket and Spring Boot at the same time, so please bear with me.

In my struggles, I have been able to build a very simple and contrived implementation of a Spring Boot application that consumes an API provided/exposed by a second Spring Boot application using RSocket as the protocol, however, I am only able to achieve this when using the TcpClientTransport.

From my perspective, the WebsocketTransport is much more likely to be used and more useful for client->server architectures, however, I haven't found any working examples or documentation on how to properly configure a Spring Boot application that accepts RSocket messages using WebSocket as the transport.

The odd part is that in my tests it appears that my consumer (client) does establish a WebSocket connection to the server/producer, however, the 'handshake' appears to hang and the connection is never fully established. I've tested with both the JavaScript libraries (rsocket-websocket-client, rsocket-rpc-core, etc), and the Java libraries (io.rsocket.transport.netty.client.WebsocketClientTransport) and the server appears to exhibit the same behavior regardless.

To reiterate, using the TCPTransport I am able to connect to the server and invoke requests just fine, however when using the WebsocketTransport the connection is never established.

What is required of a Spring Boot application that aims to support RSocket via the WebsocketClientTransport, past consuming spring-boot-starter-rsocket as a dependency?.

Server


pom.xml

...

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.0.M5</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

...

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-rsocket</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

...

application.properties

spring.rsocket.server.port=8081
management.endpoints.enabled-by-default=true
management.endpoints.web.exposure.include=*

SpringBootRSocketServerApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootRSocketServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootRSocketServerApplication.class, args);
    }
}

UserRSocketController

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;

@Slf4j
@Controller
public class UserRSocketController {

    @Autowired
    private UserRepository userRepository;

    @MessageMapping("usersList")
    public Mono<List<User>> usersList() {
        log.info("Handling usersList request.");
        return Mono.just(this.userRepository.getUsers());
    }

    @MessageMapping("usersStream")
    Flux<User> usersStream(UserStreamRequest request) {
        log.info("Handling request for usersStream.");
        List<User> users = userRepository.getUsers();
        Stream<User> userStream = Stream.generate(() -> {
            Random rand = new Random();
            return users.get(rand.nextInt(users.size()));
        });
        return Flux.fromStream(userStream).delayElements(Duration.ofSeconds(1));
    }

    @MessageMapping("userById")
    public Mono<User> userById(GetUserByIdRequest request) {
        log.info("Handling request for userById id: {}.", request.getId());
        return Mono.just(this.userRepository.getUserById(request.getId()));
    };
}

Startup Logging

 :: Spring Boot ::             (v2.2.0.M5)

2019-09-08 21:40:02,986 INFO  [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRSocketServerApplication on REDACTED with PID 22540 (REDACTED)
2019-09-08 21:40:02,988 INFO  [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
2019-09-08 21:40:04,103 INFO  [main] org.springframework.boot.actuate.endpoint.web.EndpointLinksResolver: Exposing 14 endpoint(s) beneath base path '/actuator'
2019-09-08 21:40:04,475 INFO  [main] org.springframework.boot.rsocket.netty.NettyRSocketServer: Netty RSocket started on port(s): 8081
2019-09-08 21:40:04,494 INFO  [main] org.springframework.boot.web.embedded.netty.NettyWebServer: Netty started on port(s): 8080
2019-09-08 21:40:04,498 INFO  [main] org.springframework.boot.StartupInfoLogger: Started SpringBootRSocketServerApplication in 1.807 seconds (JVM running for 2.883)

Consumer/Client


ClientConfiguration.java

import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
//import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocket rSocket() {
        // ClientTransport transport = TcpClientTransport.create(8081);
        // ^--- TCPTransport works fine

        ClientTransport transport = WebsocketClientTransport.create(8081);
        // ^--- Connection hangs and application startup stalls

        return RSocketFactory
            .connect()
            .mimeType(MetadataExtractor.ROUTING.toString(), MimeTypeUtils.APPLICATION_JSON_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(transport)
            .start()
            .block();
    }

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
    }
}

Startup Logging

 :: Spring Boot ::             (v2.2.0.M5)

2019-09-08 21:40:52,331 INFO  [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRsocketConsumerApplication on REDACTED with PID 18904 (REDACTED)
2019-09-08 21:40:52,334 INFO  [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
Zapata answered 9/9, 2019 at 2:13 Comment(0)
P
7

You only need two things to have an RSocket application exposing endpoints using the websocket transport:

First, you need both webflux and rsocket dependencies as you'll probably need to serve web pages and static resources as well:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-rsocket</artifactId>
    </dependency>

Then you need to configure the RSocket server accordingly in your application.properties file:

#server.port=8080 this is already the default
spring.rsocket.server.transport=websocket
spring.rsocket.server.mapping-path=/rsocket

You'll find more about that in the Spring Boot reference documentation about RSocket.

The websocket client can now connect to ws://localhost:8080/rsocket.

Note that as of the current 2.2.0 SNAPSHOTs, the RSocket protocol has evolved and the rsocket-js library is currently catching up, especially in the metadata support. You'll find a working sample here as well.

On the Java client side of things, Spring Boot provides you with a RSocketRequester.Builder that's already configured and customized to your needs with codecs and interceptors:

@Component
public class MyService {

    private final RSocketRequester rsocketRequester;

    public MyService(RSocketRequester.Builder builder) {
        this.rsocketRequester = builder
                .connectWebSocket(URI.create("ws://localhost:8080/rsocket"))
                .block();
    }
}
Petrie answered 10/9, 2019 at 7:16 Comment(0)
Z
2

Based on this blog post the correct port to connect to is the port that is configured via server.port=8080.

enter image description here

Server Config

server.port=8080
spring.rsocket.server.port=8081
spring.rsocket.server.mapping-path=/ws
spring.rsocket.server.transport=websocket

Java Consumer Client Configuration

import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;

import java.net.URI;
import java.time.Duration;

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocket rSocket() {
        URI websocketUri = URI.create("ws://127.0.0.1:8080/ws");
        WebsocketClientTransport ws = WebsocketClientTransport.create(websocketUri);
        return RSocketFactory
            .connect()
            .mimeType(
                MetadataExtractor.ROUTING.toString(),
                MimeTypeUtils.APPLICATION_JSON_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(ws)
            .start()
            .block();
    }

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(
            rSocket(),
            MimeTypeUtils.APPLICATION_JSON,
            MetadataExtractor.ROUTING,
            rSocketStrategies);
    }
}

JavaScript Client Configuration

import { RSocketClient, JsonSerializers } from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';

const transport = new RSocketWebSocketClient({
    url: 'ws://127.0.0.1:8080/ws'
});

const client = new RSocketClient({
    // send/receive JSON objects instead of strings/buffers
    serializers: JsonSerializers,
    setup: {
        // ms btw sending keepalive to server
        keepAlive: 60000,

        // ms timeout if no keepalive response
        lifetime: 180000,

        // format of `data`
        dataMimeType: 'application/json',

        // format of `metadata`
        metadataMimeType: 'application/json',
    },
    transport,
});

client.connect().then((rsocket) => {
    // work with rsocket
});
Zapata answered 10/9, 2019 at 5:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.