如何向不同的Web服务发送多个异步请求?

17

我需要向许多不同的网络服务发送多个请求并接收结果。问题是,如果我一个一个地发送请求,那么需要逐个发送和处理,时间会很长。

我想知道如何一次性发送所有请求并接收结果。

如下所示的代码显示,我有三个主要方法,每个方法都有自己的子方法。 每个子方法都向其关联的网络服务发送请求并接收结果;因此,例如,要接收web服务9的结果,我必须等待1到8的所有网络服务完成,逐个发送请求并接收它们的结果需要很长时间。

如下所示,没有任何一个方法或子方法彼此相关,因此我可以调用它们所有并以任何顺序接收它们的结果,唯一重要的是接收每个子方法的结果并填充其关联的列表。

private List<StudentsResults> studentsResults = new ArrayList();
private List<DoctorsResults> doctorsResults = new ArrayList();
private List<PatientsResults> patientsResults = new ArrayList();

main (){
    retrieveAllLists();
}

retrieveAllLists(){

     retrieveStudents();
     retrieveDoctors();
     retrievePatients();
}

retrieveStudents(){

    this.studentsResults = retrieveStdWS1();   //send request to Web Service 1 to receive its  list of students
    this.studentsResults = retrieveStdWS2();  //send request to Web Service 2 to receive its  list of students
    this.studentsResults = retrieveStdWS3(); //send request to Web Service 3 to receive its  list of students

}

retrieveDoctors(){

   this.doctorsResults = retrieveDocWS4();   //send request to Web Service 4 to receive its list of doctors
   this.doctorsResults = retrieveDocWS5();  //send request to Web Service 5 to receive its  list of doctors
   this.doctorsResults = retrieveDocWS6(); //send request to Web Service 6 to receive its  list of doctors

}

retrievePatients(){

   this.patientsResults = retrievePtWS7();   //send request to Web Service 7 to receive its list of patients
   this.patientsResults = retrievePtWS8();  //send request to Web Service 8 to receive its list of patients
   this.patientsResults = retrievePtWS9(); //send request to Web Service 9 to receive its list of patients

}

你可能需要在问题中添加一些上下文。乍一看,这里可能建议使用发布-订阅模型,但根据您的用例,这可能不可取。例如,您是否必须等待特定事件才能执行“retrieveAllLists”,还是可以预先检索结果?缓存结果对您有何影响? - kolossus
@kolossus的问题已更新,希望它能回答你的问题。 - J888
有点遗憾你让赏金过期了,我认为你得到了很多高质量的回复。 - flup
6个回答

30

这是一种简单的分支合并方法,但为了清晰起见,您可以启动任意数量的线程,并稍后检索结果,例如使用此方法。

    ExecutorService pool = Executors.newFixedThreadPool(10);
    List<Callable<String>> tasks = new ArrayList<>();
    tasks.add(new Callable<String>() {
        public String call() throws Exception {
            Thread.sleep((new Random().nextInt(5000)) + 500);
            return "Hello world";
        }

    });
    List<Future<String>> results = pool.invokeAll(tasks);

    for (Future<String> future : results) {
        System.out.println(future.get());
    }
    pool.shutdown();

更新已完成:

这是一个冗长的但可行的解决方案。我现场编写它,没有编译它。由于三个列表有不同的类型,并且WS方法是独立的,因此它并不是真正的模块化,但请尝试使用您最好的编程技能,看看是否可以更好地将其模块化。

    ExecutorService pool = Executors.newFixedThreadPool(10);

    List<Callable<List<StudentsResults>>> stasks = new ArrayList<>();
    List<Callable<List<DoctorsResults>>> dtasks = new ArrayList<>();
    List<Callable<List<PatientsResults>>> ptasks = new ArrayList<>();

    stasks.add(new Callable<List<StudentsResults>>() {
        public List<StudentsResults> call() throws Exception {
            return retrieveStdWS1();
        }

    });
    stasks.add(new Callable<List<StudentsResults>>() {
        public List<StudentsResults> call() throws Exception {
            return retrieveStdWS2();
        }

    });
    stasks.add(new Callable<List<StudentsResults>>() {
        public List<StudentsResults> call() throws Exception {
            return retrieveStdWS3();
        }

    });

    dtasks.add(new Callable<List<DoctorsResults>>() {
        public List<DoctorsResults> call() throws Exception {
            return retrieveDocWS4();
        }

    });
    dtasks.add(new Callable<List<DoctorsResults>>() {
        public List<DoctorsResults> call() throws Exception {
            return retrieveDocWS5();
        }

    });
    dtasks.add(new Callable<List<DoctorsResults>>() {
        public List<DoctorsResults> call() throws Exception {
            return retrieveDocWS6();
        }

    });

    ptasks.add(new Callable<List<PatientsResults>>() {
        public List<PatientsResults> call() throws Exception {
            return retrievePtWS7();
        }

    });
    ptasks.add(new Callable<List<PatientsResults>>() {
        public List<PatientsResults> call() throws Exception {
            return retrievePtWS8();
        }

    });
    ptasks.add(new Callable<List<PatientsResults>>() {
        public List<PatientsResults> call() throws Exception {
            return retrievePtWS9();
        }

    });

    List<Future<List<StudentsResults>>> sresults = pool.invokeAll(stasks);
    List<Future<List<DoctorsResults>>> dresults = pool.invokeAll(dtasks);
    List<Future<List<PatientsResults>>> presults = pool.invokeAll(ptasks);

    for (Future<List<StudentsResults>> future : sresults) {
       this.studentsResults.addAll(future.get());
    }
    for (Future<List<DoctorsResults>> future : dresults) {
       this.doctorsResults.addAll(future.get());
    }
    for (Future<List<PatientsResults>> future : presults) {
       this.patientsResults.addAll(future.get());
    }
    pool.shutdown();
每个Callable都返回一个结果列表,并在其自己的单独线程中调用。
当您调用Future.get()方法时,将结果获取回主线程。
结果在Callable完成之前不可用,因此不存在并发问题。

谢谢你的回答,我对线程不是很熟悉,只是想知道它是否适用于并发使用? - J888
每个线程在自己的线程中运行,并最终生成结果。该结果通过Future对象传递回调用方。它尽可能地保证了线程安全,是执行此操作的首选方式。 - Niels Bech Nielsen
1
哦,还有要增加可信度的话。我曾经在大学教授Java并发编程。 - Niels Bech Nielsen
1
他的意思是问这个是否线程安全就像问锤子是不是锤子一样。这些例子使用了线程的创建,他的和我的(基本上是一样的)。所以这些都是线程。很难解释哈哈。 - D-Klotz
1
尽管Camel是一个非常好的用于集成的工具,特别是如果你有深入的集成模式知识,但它也有一套复杂的用例,你将不得不处理,所以特别针对这个任务,我发现它过于复杂。使用Camel是一项架构决策,而不是任务选择标准。 - Niels Bech Nielsen
显示剩余3条评论

3

为了好玩,我提供两个可行的示例。第一个示例展示了在Java 1.5之前完成此操作的传统方法。第二个示例展示了使用Java 1.5中可用的工具更清晰的方法:

import java.util.ArrayList;

public class ThreadingExample
{
    private ArrayList <MyThread> myThreads;

    public static class MyRunnable implements Runnable
    {
        private String data;

        public String getData()
        {
            return data;
        }

        public void setData(String data)
        {
            this.data = data;
        }

        @Override
        public void run()
        {
        }
    }

    public static class MyThread extends Thread
    {
        private MyRunnable myRunnable;

        MyThread(MyRunnable runnable)
        {
            super(runnable);
            setMyRunnable(runnable);
        }

        /**
         * @return the myRunnable
         */
        public MyRunnable getMyRunnable()
        {
            return myRunnable;
        }

        /**
         * @param myRunnable the myRunnable to set
         */
        public void setMyRunnable(MyRunnable myRunnable)
        {
            this.myRunnable = myRunnable;
        }
    }

    public ThreadingExample()
    {
        myThreads = new ArrayList <MyThread> ();
    }

    public ArrayList <String> retrieveMyData ()
    {
        ArrayList <String> allmyData = new ArrayList <String> ();

        if (isComplete() == false)
        {
            // Sadly we aren't done
            return (null);
        }

        for (MyThread myThread : myThreads)
        {
            allmyData.add(myThread.getMyRunnable().getData());
        }

        return (allmyData);
    }

    private boolean isComplete()
    {
        boolean complete = true;

        // wait for all of them to finish
        for (MyThread x : myThreads)
        {
            if (x.isAlive())
            {
                complete = false;
                break;
            }
        }
        return (complete);
    }

    public void kickOffQueries()
    {
        myThreads.clear();

        MyThread a = new MyThread(new MyRunnable()
        {
            @Override
            public void run()
            {
                // This is where you make the call to external services
                // giving the results to setData("");
                setData("Data from list A");
            }
        });
        myThreads.add(a);

        MyThread b = new MyThread (new MyRunnable()
        {
            @Override
            public void run()
            {
                // This is where you make the call to external services
                // giving the results to setData("");
                setData("Data from list B");
            }
        });
        myThreads.add(b);

        for (MyThread x : myThreads)
        {
            x.start();
        }

        boolean done = false;

        while (done == false)
        {
            if (isComplete())
            {
                done = true;
            }
            else
            {
                // Sleep for 10 milliseconds
                try
                {
                    Thread.sleep(10);
                }
                catch (InterruptedException e)
                {
                    e.printStackTrace();
                }
            }
        }
    }


    public static void main(String [] args)
    {
        ThreadingExample example = new ThreadingExample();
        example.kickOffQueries();

        ArrayList <String> data = example.retrieveMyData();
        if (data != null)
        {
            for (String s : data)
            {
                System.out.println (s);
            }
        }
    }
}

这是较为简单的工作版本:
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ThreadingExample
{

    public static void main(String [] args)
    {
        ExecutorService service = Executors.newCachedThreadPool();
        Set <Callable<String>> callables = new HashSet <Callable<String>> ();

        callables.add(new Callable<String>()
        {
            @Override
            public String call() throws Exception
            {
                return "This is where I make the call to web service A, and put its results here";
            }
        });

        callables.add(new Callable<String>()
        {
            @Override
            public String call() throws Exception
            {
                return "This is where I make the call to web service B, and put its results here";
            }
        });

        callables.add(new Callable<String>()
        {
            @Override
            public String call() throws Exception
            {
                return "This is where I make the call to web service C, and put its results here";
            }
        });

        try
        {
            List<Future<String>> futures = service.invokeAll(callables);
            for (Future<String> future : futures)
            {
                System.out.println (future.get());
            }
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        catch (ExecutionException e)
        {
            e.printStackTrace();
        }
    }
}

我从未使用过Camel。但如果这个描述准确的话,“Apache Camel是具有路由功能的消息技术粘合剂。它将消息起点和终点连接在一起,允许将来自不同来源的消息传输到不同的目的地。例如:JMS -> JSON,HTTP -> JMS或漏斗式FTP -> JMS,HTTP -> JMS,JSON -> JMS”,只要您有一个API调用,在上述“调用”方法中返回给您一个Java对象,那么当然可以。 - D-Klotz
感谢您在示例中包含了导入 :) - shareef

2
您可以要求您的jax-ws实现为Web服务生成异步绑定。
我认为有两个优点:
1. 如Asynchronous web services calls with JAX-WS: Use wsimport support for asynchrony or roll my own?所讨论的,jax-ws将为您生成经过充分测试(可能更加复杂)的代码,您无需自行实例化ExecutorService。因此,这对您来说可以减少工作量!(但也意味着您无法控制线程实现详情)
2. 生成的绑定包括一个方法,您可以在其中指定回调处理程序,这可能比在调用retrieveAllLists()的线程上同步get()获取所有响应列表更适合您的需求。它允许每次服务调用进行错误处理,并且会并行处理结果,如果处理不是微不足道的,则非常好。
Metro的示例可以在Metro网站找到。请注意自定义绑定文件custom-client.xml的内容:
<bindings ...>    
    <bindings node="wsdl:definitions">
        <enableAsyncMapping>true</enableAsyncMapping>
    </bindings>    
</bindings>

当您将此绑定文件指定给wsimport时,它将生成一个客户端,该客户端返回实现javax.xml.ws.Response<T>的对象。Response扩展了Future接口,当您自己实现时,其他人也建议使用它。
因此,如果您没有使用回调函数,代码将与其他答案类似。
public void retrieveAllLists() throws ExecutionException{
    // first fire all requests
    Response<List<StudentsResults>> students1 = ws1.getStudents();
    Response<List<StudentsResults>> students2 = ws2.getStudents();
    Response<List<StudentsResults>> students3 = ws3.getStudents();

    Response<List<DoctorsResults>> doctors1 = ws4.getDoctors();
    Response<List<DoctorsResults>> doctors2 = ws5.getDoctors();
    Response<List<DoctorsResults>> doctors3 = ws6.getDoctors();

    Response<List<PatientsResults>> patients1 = ws7.getPatients();
    Response<List<PatientsResults>> patients2 = ws8.getPatients();
    Response<List<PatientsResults>> patients3 = ws9.getPatients();

    // then await and collect all the responses
    studentsResults.addAll(students1.get());
    studentsResults.addAll(students2.get());
    studentsResults.addAll(students3.get());

    doctorsResults.addAll(doctors1.get());
    doctorsResults.addAll(doctors2.get());
    doctorsResults.addAll(doctors3.get());

    patientsResults.addAll(patients1.get());
    patientsResults.addAll(patients2.get());
    patientsResults.addAll(patients3.get());
}

如果您创建回调处理程序,例如:
private class StudentsCallbackHandler 
            implements AsyncHandler<Response<List<StudentsResults>>> {
    public void handleResponse(List<StudentsResults> response) {
        try {
            studentsResults.addAll(response.get());
        } catch (ExecutionException e) {
            errors.add(new CustomError("Failed to retrieve Students.", e.getCause()));
        } catch (InterruptedException e) {
            log.error("Interrupted", e);
        }
    }
}

您可以这样使用它们:

public void retrieveAllLists() {
    List<Future<?>> responses = new ArrayList<Future<?>>();
    // fire all requests, specifying callback handlers
    responses.add(ws1.getStudents(new StudentsCallbackHandler()));
    responses.add(ws2.getStudents(new StudentsCallbackHandler()));
    responses.add(ws3.getStudents(new StudentsCallbackHandler()));

    ...

    // await completion 
    for( Future<?> response: responses ) {
        response.get();
    }

    // or do some other work, and poll response.isDone()
}

请注意,由于结果将会同时添加,因此现在需要使studentResults集合具有线程安全性!

Apache Camel是一个集成框架。如果您想知道学习和使用它是否值得,您需要提供更多的背景信息,例如:您的数据源有多频繁更改?这些服务还是明年仍将是这些服务,还是源来去去?您正在编写什么?客户端?还是从多个服务中组合数据的服务?或者是服务总线?*在收集此信息后,链条中是否还有更多步骤?您需要导出到Excel吗?写入数据库?打印在用户屏幕上?保存到文件? - flup
或者简单说:这取决于您要做多少集成。如果只有这些,我不会费心去用Camel。如果还有更多,Camel可能是一个非常有趣的选择。 - flup

1

看起来您需要将应用程序与10多个不同的Web服务集成,并使所有调用异步化。这可以很容易地通过使用Apache Camel实现。它是企业集成的一个重要框架,也支持异步处理。您可以使用其CXF组件调用Web服务,使用其路由引擎进行调用和处理结果。请参考以下页面了解Camel的异步路由功能。他们还提供了一个完整的示例,使用CXF异步调用Web服务,可以在其Maven repo中找到。还可以查看以下页面以获取更多详细信息。


1
您可以考虑以下范例,按顺序创建工作,但实际工作是并行完成的。一种方法是:1)让“主”创建一个工作项队列;2)创建一个“doWork”对象,查询工作项队列中的工作;3)让“主”启动一些“doWork”线程(可以与不同服务的数量相同或更少),让“doWork”对象将其结果添加到对象列表中(任何结构都可以,如向量、列表等)。
每个“doWork”对象将标记其队列项为完成状态,将所有结果放入传递的容器中,并检查新工作(如果队列上没有更多工作,则会休眠并重试)。
当然,您会想要看看您能够如何构建您的类模型。如果每个网络服务在解析方面都非常不同,那么您可能需要创建一个接口,让每个“retrieveinfo”类承诺实现它。

-3

它有各种选项来开发这个。

  1. JMS:质量服务和管理,例如重试尝试、死消息队列、负载管理、可扩展性、集群、监控等。
  2. 只需使用观察者模式即可。更多详情请参见OODesign以及如何解决生产者和消费者问题,请参见Kodelog**

JMS如何使这个过程异步化? - user1907906
您的每个方法都作为一个 JMS 组件执行。 - Kumar
@LutzHorn,你有什么建议吗?谢谢@Kumar。 - J888
关于JMS,只需调用JMS组件并执行与响应无关的其他操作,在JMS中完成响应后将其存储在数据库中,并附带事务ID。因此,您可以独立运行所有服务,并在收到所有响应后(使用某种观察者模式,但在使用JMS时更可靠)进行其他操作。 - Kumar
@J888,我认为Niels Bech Nielsen的解决方案非常好,有些类似于我的第二个解决方案。而Hussain Pirosha非常出色,完全依赖于此,但我认为是否使用第三方系统来异步调用多个Web服务或编写自己的代码来完成工作是由您决定的。 - Kumar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接