当pgconnection失效时,如何在Java中使用LISTEN/NOTIFY?

9
我正在使用PostgreSQL数据库并应用其LISTEN / NOTIFY功能。因此,我的侦听器位于我的AS(应用程序服务器)上,并且我已经在我的DB上配置了触发器,使得当对表执行CRUD操作时,在AS上发送NOTIFY 请求。
Java中的LISTENER类:
        @Singleton
        @Startup
    NotificationListenerInterface.class)
        public class NotificationListener extends Thread implements NotificationListenerInterface {

            @Resource(mappedName="java:/RESOURCES") 
            private DataSource ds;

            @PersistenceContext(unitName = "one")
            EntityManager em;

            Logger logger = Logger.getLogger(NotificationListener.class);

            private Connection Conn;
            private PGConnection pgConnection = null;
            private NotifyRequest notifyRequest = null;

            @PostConstruct
            public void notificationListener() throws Throwable {

                System.out.println("Notification****************");
                try
                {


                    Class.forName("com.impossibl.postgres.jdbc.PGDriver");
                    String url = "jdbc:pgsql://192.xx.xx.126:5432/postgres";


                    Conn = DriverManager.getConnection(url,"postgres","password");
                    this.pgConnection = (PGConnection) Conn;

                    System.out.println("PG CONNECTON: "+ pgConnection);
                    Statement listenStatement = Conn.createStatement();
                    listenStatement.execute("LISTEN notify_channel");
                    listenStatement.close();

                    pgConnection.addNotificationListener(new PGNotificationListener() {

                        @Override
                        public void notification(int processId, String channelName, String payload){

                            System.out.println("*********INSIDE NOTIFICATION*************");

                            System.out.println("Payload: " + jsonPayload);

}

当我的AS启动时,我已经配置了listener类的调用(@Startup注释),并开始在通道上监听。

现在,如果我手动编辑数据库中的表,通知就会生成,并被listener接收。

但是,当我以编程方式发送UPDATE请求时,UPDATE成功执行,但listener没有接收到任何内容。

我觉得当我发送请求时,listener连接断开了(它也连接以编辑entities),但我不确定。我读过关于永久连接和池化连接的文章,但无法决定如何继续下去。

我使用pgjdbc (http://impossibl.github.io/pgjdbc-ng/) jar进行异步通知,因为jdbc连接需要轮询。

编辑:

当我尝试使用标准jdbc jar(而不是pgjdbc)通过轮询来运行以上listener时,我可以得到通知。

我执行以下命令:PGNotification notif[] = con.getNotifications(),然后我就能够得到通知,但如果像下面这样异步执行,我就不能得到通知。

    pgConnection.addNotificationListener(new PGNotificationListener() {

         @Override
         public void notification(int processId, String channelName, String payload){

            System.out.println("*********INSIDE NOTIFICATION*************");
         }

解决方案:

我的监听器在函数执行完成后就失去了作用域,因为我的监听器在函数作用域内。所以我将其保存在启动Bean类的成员变量中,然后它就起作用了。


在您的监听器中,变量“jsonPayload”不存在。此外,您是否使用相同的连接来编写更新?您附加的侦听器的连接可能会超出范围并被GC销毁,这是可行的。 - Luke A. Leber
我没有使用相同的连接。但是我使用 netstat 进行了检查,发现连接处于已建立状态,即旧连接并未丢失。netstat --numeric-ports|grep 5432|grep my.ip 显示了两个连接(一个旧连接和一个新连接),并且都处于 ESTABLISHED 状态:tcp 0 0 192.168.5.126:5432 192.168.105.213:46802 ESTABLISHED tcp 0 0 192.168.5.126:5432 192.168.105.213:46805 ESTABLISHED - Siddharth Trikha
@LukeA.Leber:请检查问题的编辑。 - Siddharth Trikha
@LukeA.Leber: 由于我的连接没有关闭,我感觉我注册的监听器 pgConnection.addNotificationListener(new PGNotificationListener() {}) 已经超出了会话。有什么意见吗? - Siddharth Trikha
1
通知监听器在该库中被内部维护为弱引用,这意味着您必须在外部保持一个硬引用,以便它们不会被垃圾回收。请查看BasicContext类: synchronized (notificationListeners) { notificationListeners.put(key, new WeakReference<NotificationListener>(listener)); } 如果GC捕获了您的监听器,则对弱引用的“get”调用将返回null,并且不会触发任何操作。 - Luke A. Leber
1个回答

8

通知监听器是由该库作为弱引用内部维护的,这意味着您必须在外部保持一个硬引用,以便它们不会被垃圾回收。请查看BasicContext类的第642-655行:

public void addNotificationListener(String name, String channelNameFilter, NotificationListener listener) {

    name = nullToEmpty(name);
    channelNameFilter = channelNameFilter != null ? channelNameFilter : ".*";

    Pattern channelNameFilterPattern = Pattern.compile(channelNameFilter);

    NotificationKey key = new NotificationKey(name, channelNameFilterPattern);

    synchronized (notificationListeners) {
      notificationListeners.put(key, new WeakReference<NotificationListener>(listener));
    }

}

如果垃圾回收器(GC)捡起了你的监听器(listener),那么在弱引用上调用"get"将返回null,并且不会触发从第690到710行所看到的内容。

  @Override
  public synchronized void reportNotification(int processId, String channelName, String payload) {

    Iterator<Map.Entry<NotificationKey, WeakReference<NotificationListener>>> iter = notificationListeners.entrySet().iterator();
    while (iter.hasNext()) {

      Map.Entry<NotificationKey, WeakReference<NotificationListener>> entry = iter.next();

      NotificationListener listener = entry.getValue().get();
      if (listener == null) {

        iter.remove();
      }
      else if (entry.getKey().channelNameFilter.matcher(channelName).matches()) {

        listener.notification(processId, channelName, payload);
      }

    }

}

为了解决这个问题,请按照以下方式添加您的通知监听器:
/// Do not let this reference go out of scope!
    PGNotificationListener listener = new PGNotificationListener() {

    @Override
    public void notification(int processId, String channelName, String payload) {
        // interesting code
    };
};
    pgConnection.addNotificationListener(listener);

在我看来,使用弱引用的这种情况相当奇怪...


{btsdaf} - sanket1729

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接