我该如何设计这个程序?

6

我正在使用Java编写一个程序。如下图片所示:

设计 主方法启动了三个线程。SAX处理器处理输入的XML文件,生成JAXB对象并将它们放入Guava缓存中。Guava缓存由另一个线程处理。每当有任何对象进入缓存时,该线程会通知第三个线程,即MDL生成器(它涉及相似的JAXB对象,相互连接并生成另一个XML文件,称为MDL)。我已针对主类编写了以下内容 -

package test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MainClass {

    public static void main(String args[]) {

        ExecutorService xMLService = Executors.newFixedThreadPool(1);
        xMLService.execute(new XMLProcessor());

        ExecutorService cacheService = Executors.newFixedThreadPool(1);
        cacheService.execute(new CacheProcessor());

        ExecutorService mdlService = Executors.newFixedThreadPool(1);
        mdlService.execute(new MDLProcessor());

        xMLService.shutdown();
        cacheService.shutdown();
        mDLService.shutdown();
    }
}

但是现在我对如何在线程之间传递对象以及如何在缓存中出现新对象时通知MDL生成器存在疑虑。在Java旧的线程模型中,我们可以使用notify(),但我想使用当前的ExecutorService,并且还有异步回调。所以我想知道如何设计这个框架。如何传递对象并通知线程?我们将缓存对象保存在HashMap中,CacheService线程需要将密钥传递给MDLService。那么我应该使用哪种模式呢?


你好。这是一个很好的概念问题。我认为没有绝对的解决方案。在我看来,这是一个很好的使用案例来介绍队列处理。你的 XMLProcessor 将结果发布到队列中,然后你的 CacheProcessor 监听队列并处理项目,CacheServiceMDLService 同理。 - Mickael
3
在你的程序中,不需要三个线程来工作,用来处理的线程可以删除,然后Guava缓存也不需要了。你可以使用一个BlockingQueue来代替。SAX处理器处理XML输入并添加到BlockingQueue中,而MDL生成器从BlockingQueue中获取数据。如果你需要代码示例,我可以提供支持。 - TongChen
嗨@TongChen:我需要Guava缓存,因为MDL生成器需要使用哈希键(应该从Cache线程本身传递)多次查询它以生成MDL。所以如果对象在缓存中,速度会更快。事实上,如果将xMLService和cacheService放在同一个线程中,问题就会简化。因此,第一个线程生成哈希映射,通知第二个线程并传递哈希键。如何做到这一点?run()方法不接受参数。那么我们如何同时通知和传递字符串? - Nirmalya
1
您可能想看一下Guava [EventBus](https://javadoc.scijava.org/Guava/com/google/common/eventbus/EventBus.html)。它在功能上有点原始(仍处于测试版),但对于简单的情况可用。所有执行器都将移动到“EventBus”控制下。通过传递对象,生产者会将其转换为发布到总线上,并在消费者端异步调用回调函数。所有分派都是基于对象的类类型完成的。 - yegodm
3个回答

4
如何传递对象并通知线程?我们在HashMap中保存缓存对象,CacheService线程需要将键传递给MDLService。所以我应该使用哪种模式呢?
在我看来,你多了一个线程。读取XML的线程和写入MDL的线程是有意义的,但是为了把东西放入内存缓存而创建一个线程似乎太复杂了。如果MDL生成器需要使用Guava缓存,那么它应该“拥有”缓存并将事物放入其中。
这样,你就只剩下一个输入SAX处理器线程和一个输出MDL生成器线程了。很好的。为了连接这两个线程,我会使用像LinkedBlockingQueue这样的BlockingQueue。根据读取速度是否快于写入速度以及作业中有多少记录,您可能需要为队列设置大小限制。
因此,您的主线程将创建BlockingQueue,然后将其传递给输入和输出线程。SAX输入线程在队列上调用put(),MDL输出线程调用take()将对象放入Guava缓存中,然后进行MDL生成。
希望这可以帮到你。

3

既然您正在使用Guava Cache,那么您可以使用Guava AsyncEventBus来在任务之间分发消息,并且不再需要三个独立的专用ExecutorServices。


2
这应该是一条注释,而不是一个答案。 - Stultuske
@Stultuske,您需要解释原因。这不是要求更多信息或改进建议,而是回答问题“如何在线程之间传递对象以及如何在缓存中出现新对象时通知MDL生成器”的答案。答案简短并不意味着它应该是评论。 - Torben
这实际上是你问他的问题。 - Stultuske
这不是我的需求。你只是发布了一些你不确定的东西,你表明自己基本上只是在猜测。这就是为什么它更像是一条评论。 - Stultuske
我只是表达了我的意见,当你询问原因后,我也给出了解释。 - Stultuske
显示剩余4条评论

2
以下是上述情况的示例实现。请注意,即使一些其他人已经回复提到了没有Guava缓存也可以实现,但我认为Nirmalaya请求使用它可能有一个有效的原因。我能想到的一个原因是将缓存溢出到存储设备或数据库中,以节省运行时内存。 employee-records.xml
<?xml version="1.0" encoding="UTF-8"?>
<Employees>
    <Employee id="1">
        <name>Thomas</name>
    </Employee>
    <Employee id="2">
        <name>Lisa</name>
    </Employee>
    <Employee id="3">
        <name>Ronald</name>
    </Employee>
    <Employee id="4">
        <name>Erica</name>
    </Employee>
</Employees>

Employee.java

package com.technoroy.examples.guava;

/**
 * A value holder POJO implementation for Employee records
 * @author Rahul R
 *
 */
class Employee {
    private Integer id = null;
    private String name = null;

    public Employee() {
        super();
    }

    public Employee(Integer id, String name) {
        super();
        this.id = id;
        this.name = name;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "Employee [id=" + id + ", name=" + name + "]";
    }
}

GuavaCacheProcessor.java

package com.technoroy.examples.guava;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import javax.xml.parsers.ParserConfigurationException;
import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;

import org.xml.sax.Attributes;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.DefaultHandler;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

/**
 * The primary executable class
 * 
 * @author Rahul R
 *
 */
public class GuavaCacheProcessor {
    private final static BlockingQueue<Integer> notificationQueue = new LinkedBlockingQueue<>();

    public static void main(String... arguments) {
        Runnable xmlProcessor = new Runnable() {
            public void run() {
                parseDataFile();
            }
        };

        Runnable mdlGenerator = new Runnable() {
            public void run() {
                try {
                    while (true) {
                        Integer id = notificationQueue.take();
                        Employee record = ApplicationCacheUtil.getRecord(id);
                        generateContent(record);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        };

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(xmlProcessor);
        executorService.submit(mdlGenerator);
    }

    public static void generateContent(Employee employee) {
        System.out.println(employee);
    }

    public static void parseDataFile() {
        SAXParserFactory saxParserFactory = SAXParserFactory.newInstance();
        InputStream dataInputStream = GuavaCacheProcessor.class.getResourceAsStream("employee-records.xml");

        try {
            SAXParser saxParser = saxParserFactory.newSAXParser();
            saxParser.parse(dataInputStream, new DefaultHandler() {
                private Employee employee = null;
                private StringBuilder elementValue = null;

                @Override
                public void startElement(String uri, String localName, String qName, Attributes attributes)
                        throws SAXException {
                    if (qName.equalsIgnoreCase("Employee")) {
                        employee = new Employee();

                        String id = attributes.getValue("id");
                        if (id.matches("-?\\d+(\\.\\d+)?")) {
                            employee.setId(Integer.valueOf(id));
                        }
                    }

                    elementValue = new StringBuilder();
                }

                @Override
                public void characters(char ch[], int start, int length) throws SAXException {
                    if (elementValue != null) {
                        elementValue.append(new String(ch, start, length));
                    }
                }

                @Override
                public void endElement(String uri, String localName, String qName) throws SAXException {
                    if (qName.equalsIgnoreCase("name")) {
                        if (employee != null && elementValue != null) {
                            employee.setName(elementValue.toString());
                        }
                    } else if (qName.equalsIgnoreCase("Employee")) {
                        ApplicationCacheUtil.putRecord(employee.getId(), employee);
                        try {
                            notificationQueue.put(employee.getId());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    elementValue = null;
                }
            });
        } catch (ParserConfigurationException | SAXException | IOException e) {
            e.printStackTrace();
        }
    }
}

/**
 * The Cache utilities class, that initializes and returns a handle to the
 * cache.
 * 
 * @author Rahul R
 *
 */
class ApplicationCacheUtil {
    private static Cache<Integer, Employee> cache = CacheBuilder.newBuilder().build();

    public static Cache<Integer, Employee> getCache() {
        return cache;
    }

    public static void putRecord(Integer key, Employee value) {
        cache.put(key, value);
    }

    public static Employee getRecord(Integer key) {
        return cache.getIfPresent(key);
    }
}

抱歉,我错过了评论,其中@Nirmalaya在对TongChen的一些评论中描述了他需要使用Guava缓存的原因。 - Rahul R.
嗨@Rahul R-非常感谢您提供了我所期望的最详细的答案!请鞠躬。 - Nirmalya

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接