TLDR: What is required to configure a Spring Boot application that exposes an RSocket interface that supports the WebSocket transport?
I'm learning about RSocket and Spring Boot at the same time, so please bear with me.
In my struggles, I have been able to build a very simple and contrived implementation of a Spring Boot application that consumes an API provided/exposed by a second Spring Boot application using RSocket as the protocol, however, I am only able to achieve this when using the TcpClientTransport
.
From my perspective, the WebsocketTransport
is much more likely to be used and more useful for client->server architectures, however, I haven't found any working examples or documentation on how to properly configure a Spring Boot application that accepts RSocket messages using WebSocket as the transport.
The odd part is that in my tests it appears that my consumer (client) does establish a WebSocket connection to the server/producer, however, the 'handshake' appears to hang and the connection is never fully established. I've tested with both the JavaScript libraries (rsocket-websocket-client, rsocket-rpc-core, etc), and the Java libraries (io.rsocket.transport.netty.client.WebsocketClientTransport) and the server appears to exhibit the same behavior regardless.
To reiterate, using the TCPTransport I am able to connect to the server and invoke requests just fine, however when using the WebsocketTransport
the connection is never established.
What is required of a Spring Boot application that aims to support RSocket via the WebsocketClientTransport
, past consuming spring-boot-starter-rsocket
as a dependency?.
Server
pom.xml
...
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.M5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
...
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
...
application.properties
spring.rsocket.server.port=8081
management.endpoints.enabled-by-default=true
management.endpoints.web.exposure.include=*
SpringBootRSocketServerApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringBootRSocketServerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBootRSocketServerApplication.class, args);
}
}
UserRSocketController
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
@Slf4j
@Controller
public class UserRSocketController {
@Autowired
private UserRepository userRepository;
@MessageMapping("usersList")
public Mono<List<User>> usersList() {
log.info("Handling usersList request.");
return Mono.just(this.userRepository.getUsers());
}
@MessageMapping("usersStream")
Flux<User> usersStream(UserStreamRequest request) {
log.info("Handling request for usersStream.");
List<User> users = userRepository.getUsers();
Stream<User> userStream = Stream.generate(() -> {
Random rand = new Random();
return users.get(rand.nextInt(users.size()));
});
return Flux.fromStream(userStream).delayElements(Duration.ofSeconds(1));
}
@MessageMapping("userById")
public Mono<User> userById(GetUserByIdRequest request) {
log.info("Handling request for userById id: {}.", request.getId());
return Mono.just(this.userRepository.getUserById(request.getId()));
};
}
Startup Logging
:: Spring Boot :: (v2.2.0.M5)
2019-09-08 21:40:02,986 INFO [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRSocketServerApplication on REDACTED with PID 22540 (REDACTED)
2019-09-08 21:40:02,988 INFO [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
2019-09-08 21:40:04,103 INFO [main] org.springframework.boot.actuate.endpoint.web.EndpointLinksResolver: Exposing 14 endpoint(s) beneath base path '/actuator'
2019-09-08 21:40:04,475 INFO [main] org.springframework.boot.rsocket.netty.NettyRSocketServer: Netty RSocket started on port(s): 8081
2019-09-08 21:40:04,494 INFO [main] org.springframework.boot.web.embedded.netty.NettyWebServer: Netty started on port(s): 8080
2019-09-08 21:40:04,498 INFO [main] org.springframework.boot.StartupInfoLogger: Started SpringBootRSocketServerApplication in 1.807 seconds (JVM running for 2.883)
Consumer/Client
ClientConfiguration.java
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
//import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;
@Configuration
public class ClientConfiguration {
@Bean
public RSocket rSocket() {
// ClientTransport transport = TcpClientTransport.create(8081);
// ^--- TCPTransport works fine
ClientTransport transport = WebsocketClientTransport.create(8081);
// ^--- Connection hangs and application startup stalls
return RSocketFactory
.connect()
.mimeType(MetadataExtractor.ROUTING.toString(), MimeTypeUtils.APPLICATION_JSON_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(transport)
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}
}
Startup Logging
:: Spring Boot :: (v2.2.0.M5)
2019-09-08 21:40:52,331 INFO [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRsocketConsumerApplication on REDACTED with PID 18904 (REDACTED)
2019-09-08 21:40:52,334 INFO [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default