Stateful Rsocket Application
Asked Answered
H

3

5

in my project I want to have multiple clients connecting to a service. I am using the java Rsocket implementation.

The service should maintain a state for each client. Now at this point I either can manage the clients by some identifier. This option I have already implemented. But I do not want to manage the session manually using strings.

So another idea is to identify the clients by the Rsocket connection. Is there a way to use Rsocket channel for identification of a specific client?

Imagine an example service and a couple of clients. Each client has the Rsocket channel with the service up and running. Is there a way to identify these clients on the server side using the Rsocket channel? Would be amazing if you could show a programmatic example of such behavior. Thank you!

EDIT (describing the case more detailed)

Here is my example.

We currently have three CORBA objects that are used as demonstrated in the diagram:

  • LoginObject (to which a reference is retrieved via NamingService). Clients can call a login() method to obtain a session
  • The Session object has various methods for query details about the current serivce context and most importatly to obtain a Transaction object
  • The Transaction object can be used to execute various commands via a generic method that take a commandName and a list of key-value pairs as parameters. After the client executed n commands he can commit or rollback the transaction (also via methods on the Transaction object).

enter image description here

so here we use the session object to execute transactions on our service.

Now we decided to move away from CORBA to Rsocket. Thus we need Rsocket microservice to be able to store the session's state, otherwise we can't know what's going to be commited or rolled back. Can this be done with just individual Publisher for each client?

Highkeyed answered 3/5, 2019 at 11:15 Comment(3)
It's unclear what you are trying to do, most of the time it's not recommended to save state in the web server + if you are using channel you can create Publisher for each client, so why do you need state?Thicket
@Thicket I edited the question with a more thorough description :)Highkeyed
I recommend you to take a look at akka.io It's a very powerful reactive framework.Kanter
E
4

Here's an example I made the other day that will create a stateful RSocket using Netifi's broker: https://github.com/netifi/netifi-stateful-socket

Unfortunately you'd need to build our develop branch locally to try it out (https://github.com/netifi/netifi-java) - there should be a release with the code by the end of the week if you don't want to build it locally.

I'm working on a pure RSocket example too, but if you want to see how it would take a look at the StatefulSocket found in the example. It should give you a clue how to deal with the session with pure RSocket.

Regarding your other questions about a transaction manager - you would need to tie your transaction to the Reactive Streams signals that are being emitted - if you received an cancel, an onError you'd roll back, and if received a onComplete you would commit the transaction. There are side effect methods from Flux/Mono that should make this easy to deal with. Depending on what you are doing you could also use the BaseSubscriber as it has hooks to deal with the different Reactive Streams signals.

Thanks, Robert

Epigraphy answered 15/5, 2019 at 5:47 Comment(0)
Q
2

An example of resuming connections i.e. maintaining the state on the server, has landed in the rsocket-java repo

https://github.com/rsocket/rsocket-java/commit/d47629147dd1a4d41c7c8d5af3d80838e01d3ba5

The resumes a whole connection, including whatever state is associated with each individual channel etc.

There is an rsocket-cli project that lets you try this out. Start and stop the socat process and observe the client and server progress.

$ socat -d TCP-LISTEN:5001,fork,reuseaddr TCP:localhost:5000
$ ./rsocket-cli --debug --resume --server -i cli:time tcp://localhost:5000
$ ./rsocket-cli -i client --stream --resume tcp://localhost:5001
Queenie answered 16/5, 2019 at 6:17 Comment(2)
Umm, now a question to the commit, how does the server manage the state? I do not see any statefulness there...Highkeyed
That depends on what state you want. It could be any of a) the stream is inherently stateful e.g. counter incrementing, b) the stream has stateful operators, c) you use project reactors Context, d) you use something stateful in the app like a singleton static Map that you key with a session id.Queenie
T
0

From your description it looks like channel will work best, I haven't used channel before so I can't really guarantee (sorry). But what I'd recommend you to try something like this:

A transcation contoller:

public class TransactionController implements Publisher<Payload> {

    List<Transaction> transcations = new ArrayList<>();

    @Override
    public void subscribe(Subscriber<? super Payload> subscriber) {

    }

    public void processPayload(Payload payload) {
        // handle transcations...
    }
}

And in your RSocket implementation override the requestChannel:

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
    // Create new controller for each channel
    TranscationController cntrl = new TranscationController();
    Flux.from(payloads)
      .subscribe(cntrl::processPayload);
    return Flux.from(cntrl);
}
Thicket answered 9/5, 2019 at 13:48 Comment(2)
Hey, I am completely new to the Rsocket so i might ask amateur questions... So let's say I created a service, which implements the protobuf-generated service. The service defines a method: public Mono<TransactionResult> transaction(Publisher<Command> messages, ByteBuf metadata) which was derived from protobuf's rpc transaction (stream Command) returns (TransactionResult) {} and is supposed to handle stream of client requests with a Mono of wether a transaction was commited or canceled.Highkeyed
At this point, i will need to add the TransactionController logic into the service's: Mono<TransactionResult> transaction(Publisher<Command> messages, ByteBuf metadata) method, as you did in your requestChannel-example. Am I correct?Highkeyed

© 2022 - 2024 — McMap. All rights reserved.