Notify only specific user(s) through WebSockets, when something is modified in the database
Asked Answered
M

1

12

In order to notify all users through WebSockets, when something is modified in selected JPA entities, I use the following basic approach.

@ServerEndpoint("/Push")
public class Push {

    private static final Set<Session> sessions = new LinkedHashSet<Session>();

    @OnOpen
    public void onOpen(Session session) {
        sessions.add(session);
    }

    @OnClose
    public void onClose(Session session) {
        sessions.remove(session);
    }

    private static JsonObject createJsonMessage(String message) {
        return JsonProvider.provider().createObjectBuilder().add("jsonMessage", message).build();
    }

    public static void sendAll(String text) {
        synchronized (sessions) {
            String message = createJsonMessage(text).toString();

            for (Session session : sessions) {
                if (session.isOpen()) {
                    session.getAsyncRemote().sendText(message);
                }
            }
        }
    }
}

When a selected JPA entity is modified, an appropriate CDI event is raised which is to be observed by the following CDI observer.

@Typed
public final class EventObserver {

    private EventObserver() {}

    public void onEntityChange(@Observes EntityChangeEvent event) {
        Push.sendAll("updateModel");
    }
}

The observer/consumer invokes the static method Push#sendAll() defined in the WebSockets endpoint which sends a JSON message as a notification to all the associated users/connections.

The logic inside the sendAll() method needs to be modified somehow, when only selected users are to be notified.

  • Notify only the user who is responsible for modifying the entity in question (it may be an admin user or a registered user who can modify something only after their successful login).
  • Notify only specific user/s (not all). "Specific" means for example, when a post is voted up on this site, only the post owner is notified (the post may be voted up by any other user with sufficient privileges).

When an initial handshake is establised, HttpSession can be accessed as stated in this answer but it is still insufficient to accomplish the tasks as mentioned above by two bullets. Since it is available when the first handshake request is made, any attribute set to that session afterwards will not be available in the server endpoint i.e. in other words, any session attribute set after a handshake is established will not be available.

What is the most acceptable/canonical way to notify only selected users as mentioned above? Some conditional statement(s) in the sendAll() method or something else somewhere is required. It appears that it has to do something other than only the user's HttpSession.

I use GlassFish Server 4.1 / Java EE 7.

Merras answered 6/9, 2015 at 18:7 Comment(0)
S
28

Session?

Since it is available when the first handshake request is made, any attribute set to that session afterwards will not be available in the server endpoint i.e. in other words, any session attribute set after a handshake is established will not be available

It seems that you got bitten by the ambiguity of the word "session". The lifetime of a session depends on the context and the client. A websocket (WS) session does not have the same lifetime as a HTTP session. Like as that an EJB session does not have the same lifetime as a HTTP session. Like as that a legacy Hibernate session does not have the same lifetime as a HTTP session. Etcetera. The HTTP session, which you likely already understand, is explained here How do servlets work? Instantiation, sessions, shared variables and multithreading. The EJB session is explained here JSF request scoped bean keeps recreating new Stateful session beans on every request?

WebSocket lifecycle

The WS session is tied to the context represented by the HTML document. The client is basically the JavaScript code. The WS session starts when JavaScript does new WebSocket(url). The WS session stops when JavaScript explicitly invokes close() function on the WebSocket instance, or when the associated HTML document gets unloaded as result of a page navigation (clicking a link/bookmark or modifying URL in browser's address bar), or a page refresh, or a browser tab/window close. Do note that you can create multiple WebSocket instances within the very same DOM, usually each with different URL path or query string parameters.

Each time when a WS session starts (i.e. each time when JavaScript does var ws = new WebSocket(url);), then this will fire a handshake request wherein you thus have access to the associated HTTP session via the below Configurator class as you already found out:

public class ServletAwareConfigurator extends Configurator {

    @Override
    public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
        HttpSession httpSession = (HttpSession) request.getHttpSession();
        config.getUserProperties().put("httpSession", httpSession);
    }

}

This is thus not called only once per HTTP session or HTML document as you seemed to expect. This is called every time a new WebSocket(url) is created.

Then a brand new instance of the @ServerEndpoint annotated class will be created and its @OnOpen annotated method will be invoked. If you're familiar with JSF/CDI managed beans, just treat that class as if it's a @ViewScoped and the method as if it's a @PostConstruct.

@ServerEndpoint(value="/push", configurator=ServletAwareConfigurator.class)
public class PushEndpoint {

    private Session session;
    private EndpointConfig config;

    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
        this.session = session;
        this.config = config;
    }

    @OnMessage
    public void onMessage(String message) {
        // ...
    }

    @OnError
    public void onError(Throwable exception) {
        // ...
    }

    @OnClose
    public void onClose(CloseReason reason) {
        // ...
    }

}

Note that this class is unlike e.g. a servlet not application scoped. It's basically WS session scoped. So each new WS session gets its own instance. That's why you can safely assign Session and EndpointConfig as an instance variable. Depending on the class design (e.g. abstract template, etc), you could if necessary add back Session as 1st argument of all those other onXxx methods. This is also supported.

The @OnMessage annotated method will be invoked when JavaScript does webSocket.send("some message"). The @OnClose annotated method will be called when the WS session is closed. The exact close reason can if necessary be determined by close reason codes as available by CloseReason.CloseCodes enum. The @OnError annotated method will be called when an exception is thrown, usually as an IO error on the WS connection (broken pipe, connection reset, etc).

Collect WS sessions by logged-in user

Coming back to your concrete functional requirement of notifying only specific users, you should after the above explanation understand that you can safely rely on modifyHandshake() to extract the logged-in user from the associated HTTP session, every time, provided that new WebSocket(url) is created after the user is logged-in.

public class UserAwareConfigurator extends Configurator {

    @Override
    public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
        HttpSession httpSession = (HttpSession) request.getHttpSession();
        User user = (User) httpSession.getAttribute("user");
        config.getUserProperties().put("user", user);
    }

}

Inside the WS endpoint class with a @ServerEndpoint(configurator=UserAwareConfigurator.class), you can get hand of it in @OnOpen annotated method as below:

@OnOpen
public void onOpen(Session session, EndpointConfig config) {
    User user = (User) config.getUserProperties().get("user");
    // ...
}

You should collect them in the application scope. You can collect them in a static field of the endpoint class. Or, better, if CDI support in WS endpoint is not broken in your environment (works in WildFly, not in Tomcat+Weld, not sure about GlassFish), then simply collect them in an application scoped CDI managed bean which you in turn @Inject in the endpoint class.

When User instance is not null (i.e. when an user is logged in), then remember that an user can have multiple WS sessions. So, you'd basically need to collect them in a Map<User, Set<Session>> structure, or perhaps a more fine grained mapping which maps them by user ID or group/role instead, which after all allows easier finding specific users. It all depends on the final requirements. Here's at least a kickoff example using an application scoped CDI managed bean:

@ApplicationScoped
public class PushContext {

    private Map<User, Set<Session>> sessions;

    @PostConstruct
    public void init() {
        sessions = new ConcurrentHashMap<>();
    }

    void add(Session session, User user) {
        sessions.computeIfAbsent(user, v -> ConcurrentHashMap.newKeySet()).add(session);
    }

    void remove(Session session) {
        sessions.values().forEach(v -> v.removeIf(e -> e.equals(session)));
    }

}
@ServerEndpoint(value="/push", configurator=UserAwareConfigurator.class)
public class PushEndpoint {

    @Inject
    private PushContext pushContext;

    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
        User user = (User) config.getUserProperties().get("user");
        pushContext.add(session, user);
    }

    @OnClose
    public void onClose(Session session) {
        pushContext.remove(session);
    }

}

Finally you can send a message to specific user(s) as below in PushContext:

public void send(Set<User> users, String message) {
    Set<Session> userSessions;

    synchronized(sessions) {
        userSessions = sessions.entrySet().stream()
            .filter(e -> users.contains(e.getKey()))
            .flatMap(e -> e.getValue().stream())
            .collect(Collectors.toSet());
    }

    for (Session userSession : userSessions) {
        if (userSession.isOpen()) {
            userSession.getAsyncRemote().sendText(message);
        }
    }
}

The PushContext being a CDI managed bean has the additional advantage that it's injectable in any other CDI managed bean, allowing easier integration.

Fire CDI event with associated users

In your EntityListener, where you fire the CDI event most likely as per your previous related question Real time updates from database using JSF/Java EE, you already have the changed entity at hands and thus you should be able to find the users associated with it via their relationships in the model.

Notify only the user who is responsible for modifying the entity in question (it may be an admin user or a registered user who can modify something only after their successful login)

@PostUpdate
public void onChange(Entity entity) {
    Set<User> editors = entity.getEditors();
    beanManager.fireEvent(new EntityChangeEvent(editors));
}

Notify only specific user/s (not all). "Specific" means for example, when a post is voted up on this site, only the post owner is notified (the post may be voted up by any other user with sufficient privileges).

@PostUpdate
public void onChange(Entity entity) {
    User owner = entity.getOwner();
    beanManager.fireEvent(new EntityChangeEvent(Collections.singleton(owner)));
}

And then in the CDI event observer, pass it forth:

public void onEntityChange(@Observes EntityChangeEvent event) {
    pushContext.send(event.getUsers(), "message");
}

See also:

Silverware answered 9/9, 2015 at 10:6 Comment(11)
Thank you. Expressive answer. I am however, not getting one point as to by whom the send() method is going to be invoked with a set of users. It is expected to be invoked by an appropriate CDI observer, when an appropriate CDI event is fired where a set of users is not available which is required to pass as a parameter to this method.Merras
"When a selected JPA entity is modified, an appropriate CDI event is raised which is to be observed by the following CDI observer." At the moment you create the EntityChangeEvent, you already know which entity (and thus which user(s)) is affected, so just do like so new EntityChangeEvent(users), pass it forth and have the observer extract from it via a getter.Silverware
new happens, when a CDI event is fired like event.fire(new EntityChangeEvent(entity));. A set of users (Set<User>) is not available at that time. The parameter being supplied to the constructor is the current entity firing the event. The entity raising the event could be other than User. The current "logged in" who is responsible for performing that operation/modification which in turns raises the event is not recognizable at that place. (For example, feedback can be submitted (inserted) by any registered user. The entity which gets affected by that "insert" is Feedback, not User).Merras
So, Feedback isn't (in)directly related to any User (via e.g. FK in DB and @OneToOne/Many in entity)? How is that possible? :) Or perhaps you just want to notify any user currently on a specific page? That information should then be passed as path or query parameter in WS URL, but this is a quite different question than currently asked.Silverware
It is related to User via a foreign key relationship. I thought this should cause LazyInitializationException. (I did not test because GlassFish Server is currently not working. Operating system has some problems and perhaps needs to be reinstalled :)). I will perhaps put this into practice a few days later. Thanks.Merras
If it's lazy loaded then it indeed needs to be obtained in a transaction. You can inject an EJB in an entity listener, although the JPA impl in earlier WF versions had trouble with this, but it works for us at least since WF 8.2. GF might have simliar trouble although it uses EclipseLink, not Hibernate. EJBs are always manually obtainable via JNDI if necessary.Silverware
The JPA (2.1) specification states, "In general, the lifecycle method of a portable application should not invoke EntityManager or query operations, access other entity instances, or modify relationships within the same persistence context. A lifecycle callback method may modify the non-relationship state of the entity on which it is invoked." Accordingly, is it against the rule to inject an EJB into a listener and then invoking an EJB method from a JPA callback in that listener? It also adds a confusing statement, "Lifecycle callbacks can invoke JNDI, JDBC, JMS, and enterprise beans."Merras
You're right. The entity listener would be the wrong place for the job of interacting with the DB. You may need to take a step back and do the job inside the EJB method.Silverware
Can EJBs and/or CDI events be injected into entity listeners in WildFly 8.2 final? I see them failing in WildFly 9.0.2 final.Merras
Doesn't the HashMap inside the PushContext application scoped bean need to be synchronized (for some circumstances) using java.util.concurrent.ConcurrentHashMap or Collections.synchronizedMap(...) as elements are randomly added to or removed from the instance member Map as WebSocket sessions open and terminate randomly?Merras
@Tiny, That's correct. It just didn't occur to me while writing the answer. Answer is updated, thanks.Silverware

© 2022 - 2024 — McMap. All rights reserved.