在Vaadin 7应用程序中使用Push技术展示相同数据在多个客户端之间的同步问题

4

我想要将同一组数据分享给多个客户端。我需要使用Push来自动更新他们在屏幕上的视图。

我已经阅读了问题和答案,Vaadin7应用程序中Push的最小示例(“@Push”)。现在我需要一个更加实际、更加健壮的示例。首先,我知道在Servlet环境中永无止境的线程不是一个好主意。

而且我不希望每个用户都有自己的线程,每个用户都单独访问数据库。更合理的做法似乎是有一个单独的线程检查数据库中是否有新鲜的数据。当找到新鲜的数据时,该线程应该将新鲜的数据发布到所有等待更新的用户UI/Layouts。


相关说明:如需了解在Vaadin 8中使用Push异步更新UI小部件的完整工作示例,请参见我在“Vaadin:数据返回后更新UI”问题(https://stackoverflow.com/q/50880506/642706)上的[我的答案](https://stackoverflow.com/a/50885540/642706)。 - Basil Bourque
1个回答

22

完整示例

以下是几个类的代码。它们一起构成了一个完整的 Vaadin 7.3.8 应用程序示例,使用新的内置 Push 功能将单个数据集同时发布到任意数量的用户。我们通过随机生成一组数据值来模拟检查数据库以获取新鲜数据。

当您运行此示例应用程序时,会出现一个窗口,显示当前时间以及一个按钮。时间每秒更新一次,共更新一百次。

screen shot of first window in example app

这个时间更新不是真正的例子。时间更新器有两个其他用途:
  • 它的简单代码检查了在您的Vaadin应用程序、Web服务器和Web浏览器中是否正确配置了Push。
  • 遵循The Book Of VaadinServer Push section中给出的示例代码。我们的时间更新器几乎完全从那个示例中提取出来,只是在他们每分钟更新一个chart的地方,我们更新了一段文本。

要查看此应用程序的真实预期示例,请单击“打开数据窗口”按钮。第二个窗口将打开以显示三个文本字段。每个字段都包含一个随机生成的值,我们假装它来自数据库查询。

screen shot of Database Display window with three text fields

这需要一些工作,需要几个部分。让我们来看看这些部分。

diagram of various classes and objects in this example app’s design

推送

在当前版本的 Vaadin 7.3.8 中,无需插件或附加组件即可启用推送技术。甚至 Push 相关的 .jar 文件也捆绑在 Vaadin 中。

有关详细信息,请参见Vaadin 之书。但实际上,您只需要将 @Push 注释添加到 UI 的子类中即可。

请使用最新版本的 Servlet 容器和 Web 服务器。Push 是相对较新的技术,并且实现正在不断发展,特别是WebSocket类型的实现。例如,如果使用 Tomcat,请确保使用最新的 Tomcat 7 或 8 更新。

定期检查新数据

我们必须有一种方式来重复查询数据库以获取最新数据。

在Servlet环境中,一个永不停止的线程并不是最好的方法,因为当Web应用程序被取消部署或Servlet容器关闭时,该线程将不会结束。该线程将继续在JVM中运行,浪费资源,导致内存泄漏和其他问题。

Web应用程序启动/关闭钩子

理想情况下,我们希望在Web应用程序启动(部署)和关闭(或取消部署)时得到通知。当收到通知时,我们可以启动或中断数据库查询线程。幸运的是,每个Servlet容器都提供了这样的钩子。Servlet规范要求容器支持ServletContextListener接口。

我们可以编写一个实现该接口的类。当我们的Web应用程序(我们的Vaadin应用程序)部署时,会调用我们的监听器类的 contextInitialized 方法。在取消部署时,将调用 contextDestroyed 方法。

执行器服务

从这个钩子中,我们可以启动一个线程。但是有更好的方法。Java配备了ScheduledExecutorService。该类具有一组线程池,以避免实例化和启动线程的开销。您可以将一个或多个任务(Runnable)分配给执行器以定期运行。

Web应用程序监听器

这是我们的Web应用监听器类,使用Java 8中可用的Lambda语法。
package com.example.pushvaadinapp;

import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;

/**
 * Reacts to this web app starting/deploying and shutting down.
 *
 * @author Basil Bourque
 */
@WebListener
public class WebAppListener implements ServletContextListener
{

    ScheduledExecutorService scheduledExecutorService;
    ScheduledFuture<?> dataPublishHandle;

    // Constructor.
    public WebAppListener ()
    {
        this.scheduledExecutorService = Executors.newScheduledThreadPool( 7 );
    }

    // Our web app (Vaadin app) is starting up.
    public void contextInitialized ( ServletContextEvent servletContextEvent )
    {
        System.out.println( Instant.now().toString() + " Method WebAppListener::contextInitialized running." );  // DEBUG logging.

        // In this example, we do not need the ServletContex. But FYI, you may find it useful.
        ServletContext ctx = servletContextEvent.getServletContext();
        System.out.println( "Web app context initialized." );   // INFO logging.
        System.out.println( "TRACE Servlet Context Name : " + ctx.getServletContextName() );
        System.out.println( "TRACE Server Info : " + ctx.getServerInfo() );

        // Schedule the periodic publishing of fresh data. Pass an anonymous Runnable using the Lambda syntax of Java 8.
        this.dataPublishHandle = this.scheduledExecutorService.scheduleAtFixedRate( () -> {
            System.out.println( Instant.now().toString() + " Method WebAppListener::contextDestroyed->Runnable running. ------------------------------" ); // DEBUG logging.
            DataPublisher.instance().publishIfReady();
        } , 5 , 5 , TimeUnit.SECONDS );
    }

    // Our web app (Vaadin app) is shutting down.
    public void contextDestroyed ( ServletContextEvent servletContextEvent )
    {
        System.out.println( Instant.now().toString() + " Method WebAppListener::contextDestroyed running." ); // DEBUG logging.

        System.out.println( "Web app context destroyed." );  // INFO logging.
        this.scheduledExecutorService.shutdown();
    }

}

DataPublisher

在这段代码中,你会看到DataPublisher实例被定期调用,要求其检查新鲜数据,并且如果发现,则将其传递给所有感兴趣的Vaadin布局或小部件。

package com.example.pushvaadinapp;

import java.time.Instant;
import net.engio.mbassy.bus.MBassador;
import net.engio.mbassy.bus.common.DeadMessage;
import net.engio.mbassy.bus.config.BusConfiguration;
import net.engio.mbassy.bus.config.Feature;
import net.engio.mbassy.listener.Handler;

/**
 * A singleton to register objects (mostly user-interface components) interested
 * in being periodically notified with fresh data.
 *
 * Works in tandem with a DataProvider singleton which interacts with database
 * to look for fresh data.
 *
 * These two singletons, DataPublisher & DataProvider, could be combined into
 * one. But for testing, it might be handy to keep them separated.
 *
 * @author Basil Bourque
 */
public class DataPublisher
{

    // Statics
    private static final DataPublisher singleton = new DataPublisher();

    // Member vars.
    private final MBassador<DataEvent> eventBus;

    // Constructor. Private, for simple Singleton pattern.
    private DataPublisher ()
    {
        System.out.println( Instant.now().toString() + " Method DataPublisher::constructor running." );  // DEBUG logging.
        BusConfiguration busConfig = new BusConfiguration();
        busConfig.addFeature( Feature.SyncPubSub.Default() );
        busConfig.addFeature( Feature.AsynchronousHandlerInvocation.Default() );
        busConfig.addFeature( Feature.AsynchronousMessageDispatch.Default() );
        this.eventBus = new MBassador<>( busConfig );
        //this.eventBus = new MBassador<>( BusConfiguration.SyncAsync() );
        //this.eventBus.subscribe( this );
    }

    // Singleton accessor.
    public static DataPublisher instance ()
    {
        System.out.println( Instant.now().toString() + " Method DataPublisher::instance running." );   // DEBUG logging.
        return singleton;
    }

    public void register ( Object subscriber )
    {
        System.out.println( Instant.now().toString() + " Method DataPublisher::register running." );   // DEBUG logging.
        this.eventBus.subscribe( subscriber ); // The event bus is thread-safe. So hopefully we need no concurrency managament here.
    }

    public void deregister ( Object subscriber )
    {
        System.out.println( Instant.now().toString() + " Method DataPublisher::deregister running." );   // DEBUG logging.
        // Would be unnecessary to deregister if the event bus held weak references.
        // But it might be a good practice anyways for subscribers to deregister when appropriate.
        this.eventBus.unsubscribe( subscriber ); // The event bus is thread-safe. So hopefully we need no concurrency managament here.
    }

    public void publishIfReady ()
    {
        System.out.println( Instant.now().toString() + " Method DataPublisher::publishIfReady running." );   // DEBUG logging.

        // We expect this method to be called repeatedly by a ScheduledExecutorService.
        DataProvider dataProvider = DataProvider.instance();
        Boolean isFresh = dataProvider.checkForFreshData();
        if ( isFresh ) {
            DataEvent dataEvent = dataProvider.data();
            if ( dataEvent != null ) {
                System.out.println( Instant.now().toString() + " Method DataPublisher::publishIfReady…post running." );   // DEBUG logging.
                this.eventBus.publishAsync( dataEvent ); // Ideally this would be an asynchronous dispatching to bus subscribers.
            }
        }
    }

    @Handler
    public void deadEventHandler ( DeadMessage event )
    {
        // A dead event is an event posted but had no subscribers.
        // You may want to subscribe to DeadEvent as a debugging tool to see if your event is being dispatched successfully.
        System.out.println( Instant.now() + " DeadMessage on MBassador event bus : " + event );
    }

}

访问数据库

DataPublisher类使用DataProvider类来访问数据库。在我们的情况下,我们不是实际访问数据库,而是生成随机数据值。

package com.example.pushvaadinapp;

import java.time.Instant;
import java.util.Random;
import java.util.UUID;

/**
 * Access database to check for fresh data. If fresh data is found, package for
 * delivery. Actually we generate random data as a way to mock database access.
 *
 * @author Basil Bourque
 */
public class DataProvider
{

    // Statics
    private static final DataProvider singleton = new DataProvider();

    // Member vars.
    private DataEvent cachedDataEvent = null;
    private Instant whenLastChecked = null; // When did we last check for fresh data.

    // Other vars.
    private final Random random = new Random();
    private Integer minimum = Integer.valueOf( 1 ); // Pick a random number between 1 and 999.
    private Integer maximum = Integer.valueOf( 999 );

    // Constructor. Private, for simple Singleton pattern.
    private DataProvider ()
    {
        System.out.println( Instant.now().toString() + " Method DataProvider::constructor running." );   // DEBUG logging.
    }

    // Singleton accessor.
    public static DataProvider instance ()
    {
        System.out.println( Instant.now().toString() + " Method DataProvider::instance running." );   // DEBUG logging.
        return singleton;
    }

    public Boolean checkForFreshData ()
    {
        System.out.println( Instant.now().toString() + " Method DataProvider::checkForFreshData running." );   // DEBUG logging.

        synchronized ( this ) {
            // Record when we last checked for fresh data.
            this.whenLastChecked = Instant.now();

            // Mock database access by generating random data.
            UUID dbUuid = java.util.UUID.randomUUID();
            Number dbNumber = this.random.nextInt( ( this.maximum - this.minimum ) + 1 ) + this.minimum;
            Instant dbUpdated = Instant.now();

            // If we have no previous data (first retrieval from database) OR If the retrieved data is different than previous data --> Fresh.
            Boolean isFreshData = ( ( this.cachedDataEvent == null ) ||  ! this.cachedDataEvent.uuid.equals( dbUuid ) );

            if ( isFreshData ) {
                DataEvent freshDataEvent = new DataEvent( dbUuid , dbNumber , dbUpdated );
                // Post fresh data to event bus.
                this.cachedDataEvent = freshDataEvent; // Remember this fresh data for future comparisons.
            }

            return isFreshData;
        }
    }

    public DataEvent data ()
    {
        System.out.println( Instant.now().toString() + " Method DataProvider::data running." );   // DEBUG logging.

        synchronized ( this ) {
            return this.cachedDataEvent;
        }
    }

}

数据打包

DataProvider将新鲜的数据打包以便传递给其他对象。我们定义了一个DataEvent类来作为这个包裹。或者,如果你需要传递多组数据或对象而不是单一的数据,则可以在你的DataHolder中放置一个集合。打包任何对于想要显示这些新鲜数据的布局或小部件有意义的内容。

package com.example.pushvaadinapp;

import java.time.Instant;
import java.util.UUID;

/**
 * Holds data to be published in the UI. In real life, this could be one object
 * or could hold a collection of data objects as might be needed by a chart for
 * example. These objects will be dispatched to subscribers of an MBassador
 * event bus.
 *
 * @author Basil Bourque
 */
public class DataEvent
{

    // Core data values.
    UUID uuid = null;
    Number number = null;
    Instant updated = null;

    // Constructor
    public DataEvent ( UUID uuid , Number number , Instant updated )
    {
        this.uuid = uuid;
        this.number = number;
        this.updated = updated;
    }

    @Override
    public String toString ()
    {
        return "DataEvent{ " + "uuid=" + uuid + " | number=" + number + " | updated=" + updated + " }";
    }

}

分发数据

将新鲜数据打包成一个DataEvent后,DataProvider将其交给DataPublisher。因此,下一步是将这些数据传递给感兴趣的Vaadin布局或小部件,以呈现给用户。但是,我们如何知道哪些布局/小部件对此数据感兴趣?我们如何将这些数据传递给它们?

一种可能的方式是观察者模式。我们在Java Swing以及Vaadin中看到这种模式,例如Vaadin中ButtonClickListener。这种模式意味着观察者和被观察者彼此知道。这也意味着在定义和实现接口时需要更多的工作。

事件总线

在我们的情况下,数据的生产者(DataPublisher)和消费者(Vaadin布局/小部件)不需要彼此知道。所有小部件想要的只是数据,不需要进一步与生产者交互。因此,我们可以使用另一种方法,即事件总线。在事件总线中,一些对象在发生有趣的事情时发布“事件”对象。其他对象注册它们的兴趣,以便在事件对象发布到总线上时得到通知。发布后,总线通过调用特定的方法并传递事件将该事件发布给所有已注册的订阅者。在我们的情况下,将传递DataEvent对象。
但是已注册的订阅对象上的哪个方法将被调用?通过Java的注释、反射和内省技术的魔法,任何方法都可以被标记为要调用的方法。只需使用注释标记所需的方法,然后在发布事件时让总线在运行时找到该方法。
不需要自己构建任何此事件总线。在Java世界中,我们可以选择事件总线实现。
Google Guava EventBus
最为著名的可能是Google Guava EventBusGoogle Guava是谷歌公司内部开发的各种实用项目的集合,然后开源供其他人使用。EventBus包是其中之一。我们可以使用Guava EventBus。事实上,我最初就是使用这个库构建了这个示例。但是Guava EventBus有一个限制:它保留强引用。

弱引用

当对象注册它们对通知感兴趣时,任何事件总线都必须通过保持对注册对象的引用来维护这些订阅列表。理想情况下,这应该是一个弱引用,这意味着如果订阅对象变得无用并成为垃圾收集的候选对象,那么该对象可以进行垃圾回收。如果事件总线持有强引用,则该对象无法进行垃圾回收。弱引用告诉JVM我们不是真正关心这个对象,我们有点关心但不足以坚持保留该对象。使用弱引用,事件总线在尝试通知新事件的订阅者之前会检查空引用。如果为空,事件总线可以删除其对象跟踪集合中的该插槽。
您可能认为,作为解决保持强引用问题的一种解决方法,您可以让已注册的Vaadin小部件覆盖detach方法。当该Vaadin小部件不再使用时,您将收到通知,然后您的方法将从事件总线中注销。如果订阅对象被从事件总线中移除,则不再有强引用和问题。但是,就像Java Object方法finalize并不总是被调用一样,Vaadin detach方法也并不总是被调用。有关详细信息,请参见Vaadin专家Henri Sara此线程上发布的帖子。依赖detach可能会导致内存泄漏和其他问题。

MBassador事件总线

请参阅我的博客文章,了解有关各种Java事件总线库的讨论。在其中,我选择了MBassador来用于此示例应用程序。它的存在理由是使用弱引用。

UI类

线程之间

要实际更新Vaadin布局和小部件的值,有一个重要的问题。这些小部件在其自己的用户界面处理线程中运行(对于此用户而言,主Servlet线程)。同时,您的数据库检查、数据发布和事件总线分派都在由执行器服务管理的后台线程上发生。 绝不要从单独的线程访问或更新Vaadin小部件! 这个规则是非常关键的。更麻烦的是,在开发期间这样做可能会起作用。但是如果您在生产中这样做,那么您将处于困境之中。

所以,我们如何将后台线程中的数据传递到在主Servlet线程中运行的小部件中进行通信呢?UI类提供了一个专门用于此目的的方法:access。您将Runnable传递给access方法,Vaadin会安排该Runnable在主用户界面线程上执行。简单易行。

剩余的类

为了完成这个示例应用程序,这里是剩下的类。 "MyUI"类替换了new Maven archetype for Vaadin 7.3.7创建的默认项目中同名的文件。
package com.example.pushvaadinapp;

import com.vaadin.annotations.Push;
import com.vaadin.annotations.Theme;
import com.vaadin.annotations.VaadinServletConfiguration;
import com.vaadin.annotations.Widgetset;
import com.vaadin.server.BrowserWindowOpener;
import com.vaadin.server.VaadinRequest;
import com.vaadin.server.VaadinServlet;
import com.vaadin.ui.Button;
import com.vaadin.ui.Label;
import com.vaadin.ui.UI;
import com.vaadin.ui.VerticalLayout;
import java.time.Instant;
import javax.servlet.annotation.WebServlet;

/**
 * © 2014 Basil Bourque. This source code may be used freely forever by anyone
 * absolving me of any and all responsibility.
 */
@Push
@Theme ( "mytheme" )
@Widgetset ( "com.example.pushvaadinapp.MyAppWidgetset" )
public class MyUI extends UI
{

    Label label = new Label( "Now : " );
    Button button = null;

    @Override
    protected void init ( VaadinRequest vaadinRequest )
    {
        // Prepare widgets.
        this.button = this.makeOpenWindowButton();

        // Arrange widgets in a layout.
        VerticalLayout layout = new VerticalLayout();
        layout.setMargin( Boolean.TRUE );
        layout.setSpacing( Boolean.TRUE );
        layout.addComponent( this.label );
        layout.addComponent( this.button );

        // Put layout in this UI.
        setContent( layout );

        // Start the data feed thread
        new FeederThread().start();
    }

    @WebServlet ( urlPatterns = "/*" , name = "MyUIServlet" , asyncSupported = true )
    @VaadinServletConfiguration ( ui = MyUI.class , productionMode = false )
    public static class MyUIServlet extends VaadinServlet
    {
    }

    public void tellTime ()
    {
        label.setValue( "Now : " + Instant.now().toString() ); // If before Java 8, use: new java.util.Date(). Or better, Joda-Time.
    }

    class FeederThread extends Thread
    {

        // This Thread class is merely a simple test to verify that Push works.
        // This Thread class is not the intended example.
        // A ScheduledExecutorService is in WebAppListener class is the intended example.
        int count = 0;

        @Override
        public void run ()
        {
            try {
                // Update the data for a while
                while ( count < 100 ) {
                    Thread.sleep( 1000 );

                    access( new Runnable() // Special 'access' method on UI object, for inter-thread communication.
                    {
                        @Override
                        public void run ()
                        {
                            count ++;
                            tellTime();
                        }
                    } );
                }

                // Inform that we have stopped running
                access( new Runnable()
                {
                    @Override
                    public void run ()
                    {
                        label.setValue( "Done. No more telling time." );
                    }
                } );
            } catch ( InterruptedException e ) {
                e.printStackTrace();
            }
        }
    }

    Button makeOpenWindowButton ()
    {
        // Create a button that opens a new browser window.
        BrowserWindowOpener opener = new BrowserWindowOpener( DataUI.class );
        opener.setFeatures( "height=300,width=440,resizable=yes,scrollbars=no" );

        // Attach it to a button
        Button button = new Button( "Open data window" );
        opener.extend( button );

        return button;
    }
}

"DataUI"和"DataLayout"是这个Vaadin应用程序中的7个.java文件之一。
package com.example.pushvaadinapp;

import com.vaadin.annotations.Push;
import com.vaadin.annotations.Theme;
import com.vaadin.annotations.Widgetset;
import com.vaadin.server.VaadinRequest;
import com.vaadin.ui.UI;
import java.time.Instant;
import net.engio.mbassy.listener.Handler;

@Push
@Theme ( "mytheme" )
@Widgetset ( "com.example.pushvaadinapp.MyAppWidgetset" )
public class DataUI extends UI
{

    // Member vars.
    DataLayout layout;

    @Override
    protected void init ( VaadinRequest request )
    {
        System.out.println( Instant.now().toString() + " Method DataUI::init running." );   // DEBUG logging.

        // Initialize window.
        this.getPage().setTitle( "Database Display" );
        // Content.
        this.layout = new DataLayout();
        this.setContent( this.layout );

        DataPublisher.instance().register( this ); // Sign-up for notification of fresh data delivery.
    }

    @Handler
    public void update ( DataEvent event )
    {
        System.out.println( Instant.now().toString() + " Method DataUI::update (@Subscribe) running." );   // DEBUG logging.

        // We expect to be given a DataEvent item.
        // In a real app, we might need to retrieve data (such as a Collection) from within this event object.
        this.access( () -> {
            this.layout.update( event ); // Crucial that go through the UI:access method when updating the user interface (widgets) from another thread.
        } );
    }

}

……和……

/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
package com.example.pushvaadinapp;

import com.vaadin.ui.TextField;
import com.vaadin.ui.VerticalLayout;
import java.time.Instant;

/**
 *
 * @author brainydeveloper
 */
public class DataLayout extends VerticalLayout
{

    TextField uuidField;
    TextField numericField;
    TextField updatedField;
    TextField whenCheckedField;

    // Constructor
    public DataLayout ()
    {
        System.out.println( Instant.now().toString() + " Method DataLayout::constructor running." );   // DEBUG logging.

        // Configure layout.
        this.setMargin( Boolean.TRUE );
        this.setSpacing( Boolean.TRUE );

        // Prepare widgets.
        this.uuidField = new TextField( "UUID : " );
        this.uuidField.setWidth( 22 , Unit.EM );
        this.uuidField.setReadOnly( true );

        this.numericField = new TextField( "Number : " );
        this.numericField.setWidth( 22 , Unit.EM );
        this.numericField.setReadOnly( true );

        this.updatedField = new TextField( "Updated : " );
        this.updatedField.setValue( "<Content will update automatically>" );
        this.updatedField.setWidth( 22 , Unit.EM );
        this.updatedField.setReadOnly( true );

        // Arrange widgets.
        this.addComponent( this.uuidField );
        this.addComponent( this.numericField );
        this.addComponent( this.updatedField );
    }

    public void update ( DataEvent dataHolder )
    {
        System.out.println( Instant.now().toString() + " Method DataLayout::update (via @Subscribe on UI) running." );   // DEBUG logging.

        // Stuff data values into fields. For simplicity in this example app, using String directly rather than Vaadin converters.
        this.uuidField.setReadOnly( false );
        this.uuidField.setValue( dataHolder.uuid.toString() );
        this.uuidField.setReadOnly( true );

        this.numericField.setReadOnly( false );
        this.numericField.setValue( dataHolder.number.toString() );
        this.numericField.setReadOnly( true );

        this.updatedField.setReadOnly( false );
        this.updatedField.setValue( dataHolder.updated.toString() );
        this.updatedField.setReadOnly( true );
    }

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接