Spring websocket send message from multiple threads
Asked Answered
O

3

7

I'm using Spring WebSocket server implementation for one of my spring based projects. I faced an error saying The remote endpoint was in state [TEXT_PARTIAL_WRITING] which is invalid state. I found out the problem is writing to websocket from different threads at same time.

How I temporarily fixed it: Consider I have implemented below method

void sendMessageToSession(WebsocketSession session,String message);

which sends a TextMessage to websocket session. I cant make this whole method synchronized because multiple threads can call it for different websocketSessions and messages. I also cant put session in synchronized block (tried and didn't work)

Although, I fixed my problem like this

synchronized(session.getId()){ 
    //sending message;
}

and I no longer faced that issue. But it does not seem to be good practice to use Strings in synchronized blocks. So what other solutions do I have? whats best way to send asynchronous messages?

PS: I already used ConcurrentWebSocketSessionDecorator after connection established, and I am using the updated websocket. didn't help.

session = new ConcurrentWebSocketSessionDecorator(session, (int) StaticConfig.MAXIMUM_WS_ASYNC_SEND_TIMEOUT, StaticConfig.MAXIMUM_WS_BINARY_BUFFER_SIZE * 2);

NOTE I persist my websocet sessions in a map, where key is session.getId and value is session itself.

Unlike some other websocket implementations, Spring websocket references are not seem to be equal on each message. I saved sessions in a map by their ID, and on each message I check equality of the passed websocket with the websocket I already put on my map, its false.

Oden answered 4/2, 2018 at 11:57 Comment(5)
As far as I konw, synchronized(session.getId()) can not solve your problem..Appleton
@user27149 well I'm not facing any exceptions now that I'm using it and system is working fine so I can say it actually did solve my problem (temporarily because I asked this question to find proper way to solve it)Oden
Yeah, I understand you are seeking for a better solution...Appleton
What happens when you try synchronized(session)?Minefield
@WarrenDew still same error. Also refer to my noteOden
A
7

ConcurrentWebSocketSessionDecorator works like a charm in multithreading, it is designed for it. You may have a problem in your map implementation.

sample code :

private final Map<String, SessionData> sessions = new ConcurrentHashMap<>();

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception 
{
    // Use the following will crash :
    //sessions.put(session.getId(), new SessionData(session));

    // Use ConcurrentWebSocketSessionDecorator is safe :
    sessions.put(session.getId(), new SessionData(new ConcurrentWebSocketSessionDecorator (session, 2000, 4096)));
    super.afterConnectionEstablished(session);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception
{
    sessions.remove(session.getId());
    super.afterConnectionClosed(session, status); 
}

public void send(WebSocketSession session, String msg) throws MessagingException {
    try {
        session.sendMessage (new TextMessage(msg));
    } catch (IOException ex) {
        throw new MessagingException(ex.getMessage());
    }
}

To test easily the behaviour in multithreading :

    public void sendMT(WebSocketSession session, String msg) throws MessagingException{
    for (int i=0; i<3; i++){
        new Thread(){
          @Override
          public void run(){
              send (session, msg);
        }.start();  
    }
}
Alecto answered 17/8, 2021 at 9:28 Comment(7)
This is the correct answer... The "update 2020" accepted Answer above is not thread safe.Aestivation
Yeah, the accepted answer is a bit weird.Astrobiology
This example updates sessions but never reads it.Flagging
As seen in the question, I've already wrapped the websocket session. But reading my answer again, i can say it could be easily issue of the map. Accepting this as an answer :)Oden
needs MRE, no idea afterConnectionClosed method is inherited fromLatitudinarian
session.sendMessage is still not using the decorator.Unshackle
This code is to place in your own class that extends org.springframework.web.socket.handler.AbstractWebSocketHandlerAlecto
O
8

By adding volatile keyword behind my WebsocketSession at where I persist my sessions, I solved the problem. I would be glad to know if this too is a bad practice. But my idea is that when writing to a websocket session from multiple threads, these threads loose the state of websocket because its not updated yet and that's why this exception is thrown.

By adding volatile, we make sure that websocket state has been updated before another thread uses it so writing to websocket works synchronized as expected.

I created a class named SessionData which holds websocketSession and all other data I need about session.

public class SessionData {
    private volatile WebSocketSession websocketSession;
    //...other 
    // getters and setters ...
}

and I used SessionData as value of the map where session IDs are keys

then when getting websocketSession from SessionData and writing into it from different threads, volatile helped me to get updated websocketSession.


Update (2020)

One key note here is that you should use sessionData.getWebsocketSession.sendMessage(...) everytime you want to send a message to the session. You should never use the session directly, which means a code like this is a bad practice:

WebSocketSession websocketSession = sessionData.getWebSocketSession();
websocketSession.sendMessage(...);

You would never know what changes has applied to websocket session between these two lines of code (which might be more than 2 in your case).

And a code like this is better:

sessionData.getWebSocketSession().sendMessage(...);

Also never publish directly into sessions that are passed to you inside Spring websocket MessageHandlers. Otherwise you probably get that error again.

This is why its good practice to map sessionId of WebSocketSession to SessionData when connection opens. You can use this repository to get volatile session using session id instead of using the session directly.

Oden answered 5/2, 2018 at 6:7 Comment(8)
Wow, thanks! This is what i was looking for. Is there a significant difference between marking volatile the whole map (where session IDs are keys) and marking volatile the session object inside map value (SessionData class)?Octameter
@NikolayShevchenko I suggest you put it behind the session in sessionData class. Just thinking about the concept, I should say it means that we are accessing last state of websocketSession after we get it from map. But if you put it behind map, you are accessing maps last state which does not guarantee anything about objects inside the map. Just make sure you are using ConcurrentHashMap.Oden
@SepGH what exactly do you mean by "Also never publish directly into sessions that are passed to you inside Spring websocket MessageHandlers. Otherwise you probably get that error again". Would be really helpful if you can elaborate.Strange
@JaykshatriyaShaktawat well, the whole idea is that the getter of the websocket session that you write in the SessionData is passing you the the volatile session, but then if you introduce a new variable to hold that reference then it will no longer be volatile. Therefore, instead of creating a new variable with value from the getter, it may be a better idea to directly call the method that you want right after you get the session. You can imagine how if your application is running on multiple threads there wont be a guarantee on sessions to be synced if you use new variable,Oden
In other words, this is good: sessionData.getWebSocketSession().sendMessage(...) and this is bad: WebSocketSession websocketSession = sessionData.getWebSocketSession(); websocketSession.sendMessage()Oden
This really is not the case. There is no difference under the hood for those two alternatives. You need to wrap the session in a ConcurrentWebSocketSessionDecorator.Aestivation
this answer needs update, getWebSocketSession method do not exist anymoreLatitudinarian
@Latitudinarian you seem to have got it wrong. That method is for the class that wrapperd the websocket connection.Oden
A
7

ConcurrentWebSocketSessionDecorator works like a charm in multithreading, it is designed for it. You may have a problem in your map implementation.

sample code :

private final Map<String, SessionData> sessions = new ConcurrentHashMap<>();

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception 
{
    // Use the following will crash :
    //sessions.put(session.getId(), new SessionData(session));

    // Use ConcurrentWebSocketSessionDecorator is safe :
    sessions.put(session.getId(), new SessionData(new ConcurrentWebSocketSessionDecorator (session, 2000, 4096)));
    super.afterConnectionEstablished(session);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception
{
    sessions.remove(session.getId());
    super.afterConnectionClosed(session, status); 
}

public void send(WebSocketSession session, String msg) throws MessagingException {
    try {
        session.sendMessage (new TextMessage(msg));
    } catch (IOException ex) {
        throw new MessagingException(ex.getMessage());
    }
}

To test easily the behaviour in multithreading :

    public void sendMT(WebSocketSession session, String msg) throws MessagingException{
    for (int i=0; i<3; i++){
        new Thread(){
          @Override
          public void run(){
              send (session, msg);
        }.start();  
    }
}
Alecto answered 17/8, 2021 at 9:28 Comment(7)
This is the correct answer... The "update 2020" accepted Answer above is not thread safe.Aestivation
Yeah, the accepted answer is a bit weird.Astrobiology
This example updates sessions but never reads it.Flagging
As seen in the question, I've already wrapped the websocket session. But reading my answer again, i can say it could be easily issue of the map. Accepting this as an answer :)Oden
needs MRE, no idea afterConnectionClosed method is inherited fromLatitudinarian
session.sendMessage is still not using the decorator.Unshackle
This code is to place in your own class that extends org.springframework.web.socket.handler.AbstractWebSocketHandlerAlecto
F
1

See SubProtocolWebSocketHandler as a proven usage of the ConcurrentWebSocketSessionDecorator.

Flagging answered 24/8, 2022 at 7:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.