LISTEN/NOTIFY pgconnection goes down java?
Asked Answered
S

1

9

I am using PostgreSQL DB and applying it's LISTEN/NOTIFY functionality. So my listener is at my AS (Application Server) and I have triggers configured on my DB such that when CRUD operations are performed on a table a NOTIFY request is sent on AS.

LISTENER class in java:

        @Singleton
        @Startup
    NotificationListenerInterface.class)
        public class NotificationListener extends Thread implements NotificationListenerInterface {

            @Resource(mappedName="java:/RESOURCES") 
            private DataSource ds;

            @PersistenceContext(unitName = "one")
            EntityManager em;

            Logger logger = Logger.getLogger(NotificationListener.class);

            private Connection Conn;
            private PGConnection pgConnection = null;
            private NotifyRequest notifyRequest = null;

            @PostConstruct
            public void notificationListener() throws Throwable {

                System.out.println("Notification****************");
                try
                {


                    Class.forName("com.impossibl.postgres.jdbc.PGDriver");
                    String url = "jdbc:pgsql://192.xx.xx.126:5432/postgres";


                    Conn = DriverManager.getConnection(url,"postgres","password");
                    this.pgConnection = (PGConnection) Conn;

                    System.out.println("PG CONNECTON: "+ pgConnection);
                    Statement listenStatement = Conn.createStatement();
                    listenStatement.execute("LISTEN notify_channel");
                    listenStatement.close();

                    pgConnection.addNotificationListener(new PGNotificationListener() {

                        @Override
                        public void notification(int processId, String channelName, String payload){

                            System.out.println("*********INSIDE NOTIFICATION*************");

                            System.out.println("Payload: " + jsonPayload);

}

So as my AS is up, I have configured that at startup the listener class is called (@Startup annotation) and it's start listening on the channel.

Now this works fine if like say for testing I edit my table in DB manually, the notification is generated and the LISTENER receives it.

However, when I programmatically send a UPDATE request on the table, the UPADTE is performed successfully but LISTENER is not receiving anything.

I feel my connection of the LISTENER goes down when I send a request (it also makes a connection to edit entities), but I am not sure. I read about permanent connections and pooled connections, but not able to decide how to pursue that.

I am using pgjdbc (http://impossibl.github.io/pgjdbc-ng/) jar for async notifications as jdbc connection requires polling.

EDIT:

When I try the above listener with polling by using the standard jdbc jar (not pgjdbc), I get the notifications.

I do PGNotification notif[] = con.getNotifications() and I get notifications, however doing it asynchronously like below I don't get notifications.

    pgConnection.addNotificationListener(new PGNotificationListener() {

         @Override
         public void notification(int processId, String channelName, String payload){

            System.out.println("*********INSIDE NOTIFICATION*************");
         }

SOLVED:

My listener was going out of scope after the function execution was completed as my listener had the function scope. So kept it into a member variable of my startup bean class and then it worked.

Shornick answered 20/6, 2016 at 7:10 Comment(5)
Inside your listener, the variable 'jsonPayload' does not exist. Also, are you using the same connection to write your updates? It's feasible that your connection with the listener attached goes out of scope and is destroyed by the GC.Vaulted
I am not using the same connection. But I checked using netstat that the connections were in established state i.e the old connection was not lost. netstat --numeric-ports|grep 5432|grep my.ip gave two connections (one old and one new) and both in ESTABLISHED state: tcp 0 0 192.168.5.126:5432 192.168.105.213:46802 ESTABLISHED tcp 0 0 192.168.5.126:5432 192.168.105.213:46805 ESTABLISHEDShornick
@LukeA.Leber: Please check the edit to the question.Shornick
@LukeA.Leber: Since my connection is not closed, I feel my listener which I register pgConnection.addNotificationListener(new PGNotificationListener() {}) goes out of session. Any comments?Shornick
The notification listeners are internally maintained by that library as weak references meaning that you have to hold a hard reference externally so they won't be garbage collected. Check out the BasicContext class: synchronized (notificationListeners) { notificationListeners.put(key, new WeakReference<NotificationListener>(listener)); } If the GC picks up your listener, calls to "get" on the weak reference will return null and will not fire.Vaulted
V
8

The notification listeners are internally maintained by that library as weak references meaning that you have to hold a hard reference externally so they won't be garbage collected. Check out the BasicContext class lines 642 - 655:

public void addNotificationListener(String name, String channelNameFilter, NotificationListener listener) {

    name = nullToEmpty(name);
    channelNameFilter = channelNameFilter != null ? channelNameFilter : ".*";

    Pattern channelNameFilterPattern = Pattern.compile(channelNameFilter);

    NotificationKey key = new NotificationKey(name, channelNameFilterPattern);

    synchronized (notificationListeners) {
      notificationListeners.put(key, new WeakReference<NotificationListener>(listener));
    }

}

If the GC picks up your listener, calls to "get" on the weak reference will return null and will not fire as seen from lines 690 - 710

  @Override
  public synchronized void reportNotification(int processId, String channelName, String payload) {

    Iterator<Map.Entry<NotificationKey, WeakReference<NotificationListener>>> iter = notificationListeners.entrySet().iterator();
    while (iter.hasNext()) {

      Map.Entry<NotificationKey, WeakReference<NotificationListener>> entry = iter.next();

      NotificationListener listener = entry.getValue().get();
      if (listener == null) {

        iter.remove();
      }
      else if (entry.getKey().channelNameFilter.matcher(channelName).matches()) {

        listener.notification(processId, channelName, payload);
      }

    }

}

To fix this, add your notification listeners as such:

/// Do not let this reference go out of scope!
    PGNotificationListener listener = new PGNotificationListener() {

    @Override
    public void notification(int processId, String channelName, String payload) {
        // interesting code
    };
};
    pgConnection.addNotificationListener(listener);

Quite an odd use-case for weak references in my opinion...

Vaulted answered 24/6, 2016 at 1:38 Comment(1)
Thanks, you saved my day. I was very confused over thisFavour

© 2022 - 2024 — McMap. All rights reserved.