如何在一个库中正确实现同步和异步方法?

6

我需要创建一个库,其中包含同步和异步特性。

  • executeSynchronous() - 等待直到有结果返回,并返回结果。
  • executeAsynchronous() - 立即返回一个Future,如果需要,可以在其他事情完成后进行处理。

我的库的核心逻辑

客户将使用我们的库,并通过传递DataKey构建器对象来调用它。然后,我们将使用该DataKey对象构造一个URL,并通过执行HTTP请求向该URL发出请求。当我们收到响应返回的JSON字符串时,我们将创建一个DataResponse对象,并将该JSON字符串原封不动地发送回给客户。一些客户会调用executeSynchronous()方法,而另一些客户则可能会调用executeAsynchronous()方法,因此我需要在库中分别提供两个方法。

接口:

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey key);
}

然后我有一个实现上述Client接口的DataClient

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    private ExecutorService executor = Executors.newFixedThreadPool(10);

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey key) {
        DataResponse dataResponse = null;
        Future<DataResponse> future = null;

        try {
            future = executeAsynchronous(key);
            dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(DataKey key) {
        Future<DataResponse> future = null;

        try {
            Task task = new Task(key, restTemplate);
            future = executor.submit(task); 
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
        }

        return future;
    }
}

这是一个简单的类,用于执行实际任务:

public class Task implements Callable<DataResponse> {

    private DataKey key;
    private RestTemplate restTemplate;

    public Task(DataKey key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
}

我们公司的客户将像以下示例那样使用我的库,通过在他们的代码库中使用我的工厂 -

// if they are calling `executeSynchronous()` method
DataResponse response = DataClientFactory.getInstance().executeSynchronous(dataKey);

// and if they want to call `executeAsynchronous()` method
Future<DataResponse> response = DataClientFactory.getInstance().executeAsynchronous(dataKey);

什么是实现库的同步和异步方法的最佳方式?实现“将同步调用作为异步调用+等待”的做法是否不好?因为它会在当前设置下每个调用消耗一个线程池中的线程?如果是,那么有人可以解释一下为什么这不是一个好主意,并且它是否会有任何性能问题吗?
考虑到上述标准,你将如何实现同步和异步方法?最佳方法是什么?该库将在非常重载的情况下使用,因此必须快速响应,这意味着它应该尽可能地与服务器响应时间相同。
在我的代码库中,我应该使用AsyncRestTemplate吗?这将是异步非阻塞架构吗?
5个回答

4
对于同步调用,执行在单独的线程中绝对不是一个好主意。在这种情况下,您将需要为Thread付出额外的成本和资源,以及线程之间切换上下文的成本。
如果存在大量同步调用,则固定大小的线程池将不必要地阻塞异步调用的线程。在这种情况下,系统的总吞吐量将较小。
例如: 如果有10个客户端每个都调用同步和异步调用,则在您的实现中只有线程才能正常工作。然而,如果您也利用客户端线程并不以同步调用作为异步且等待,则所有20个调用将同时处理。

2
如果即使在同步操作中(实际上并不需要时),您仍然创建新的线程,这将导致性能下降。您基本上是创建新线程(也就是浪费资源)而没有得到任何好处。 话虽如此,我认为更好的方法是将HTTP部分封装在不同的类中。这样,您可以在同步和异步情况下重复使用HTTP访问代码。
class HTTPAccess{
    private RestTemplate restTemplate;
    private DataKey key;

    public HTTPAccess(DataKey key,RestTemplate restTemplate){
        this.key = key;
        this.restTemplate = restTemplate;

    }


    public DataResponse performRequest() {
        DataResponse dataResponse = null;        
        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }

}

现在来说客户端的实现,只需使用这个类即可。
public class DataClient implements Client {

    private ExecutorService executor = Executors.newFixedThreadPool(10);
    private RestTemplate restTemplate;
    private void initRestClient(DataKey key){
    if(restTemplate == null)
        restTemplate = new RestTemplate(clientHttpRequestFactory(key));
    }

    private ClientHttpRequestFactory clientHttpRequestFactory(DataKey key) {
        HttpComponentsClientHttpRequestFactory factory = new HttpComponentsClientHttpRequestFactory();
        factory.setReadTimeout(key.getTimeout());
        factory.setConnectTimeout(key.getTimeout());
        //if you need to set otherparams this is the place we can do it extracting from DataKey obj
        return factory;
    }

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey key) {
        initRestClient(key);
        DataResponse dataResponse = new HTTPAccess(key).performRequest();
        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(final DataKey key) {
        return executor.submit(new Callable<DataResponse>() {
            @Override
            public DataResponse call() throws Exception {
                return executeSynchronous(key);
            }
        });
    }
}

这样,您的HTTP实现完全独立,在未来如果需要更改接收DataResponse的方式(例如从DB调用),则只需更改HTTPAccess类,其他部分不会受到影响。


请注意,超时将在HTTPAccess类中作为RestClientException捕获。因此,您可能希望更改我们捕获RestClientException的代码,以便根据您的需求使用更具体的内容。 - Nayanjyoti Deka
感谢您的建议。非常感谢您的帮助。我有几个疑问。使用这种方法,我们不是每次都要为每个请求创建RestTemplate吗?而且我想,我们正在为每个请求创建工厂,每个工厂都有连接和线程池,而且我认为这是一个相当重的对象。有没有办法重复使用它们?我想这就是我通过DI从我的DataClien类传递RestTemplate的原因。 - john
是的,您的建议很好。我们可以重复使用 RestTemplate 对象。已经编辑了代码以反映这些更改。 - Nayanjyoti Deka

1
我不会去烦那个任务类。只需让同步方法完成所有工作,并从异步方法异步调用它即可。
public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    private ExecutorService executor = Executors.newFixedThreadPool(10);

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey key) {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(final DataKey key) {
        return executor.submit(new Callable<DataResponse>() {
            @Override
            public DataResponse call() throws Exception {
                return executeSynchronous(key);
            }
        });
    }
}

我之所以写上面的代码是因为我认为它看起来更漂亮、更简单。但如果你想限制对 REST 服务的并发请求数量,那么你的版本完全可以胜任。根据你提供的信息,我无法确定哪个版本的性能更好。这取决于 REST 服务的工作方式以及你的库的客户端的操作。最好的方法是先试一种再试另一种,并测量哪种性能更好。很抱歉,我不知道 AsyncRestTemplate 的任何信息。 - mikeyreilly

1
我认为这样更好:
@Override
public DataResponse executeSynchronous(DataKey key) {
    Task task = new Task(key, restTemplate);
    return task.call();
}

它执行相同的任务,清晰、更短,没有额外开销。
请注意,这也清除了你当前具有的重复异常处理。
如果超时是必须的,则可以使用 RestTemplate 类的底层超时选项,如 Spring RestTemplate timeout 中所述。
然后,超时将导致一个 RestClientException,你或库客户端可以处理。

我确实考虑过这个问题。那么我该如何实现超时功能呢?我仍然需要这样做,以便在服务器响应时间过长时进行超时处理。 - john
我在回答后注意到了超时。我在没有阅读您的评论的情况下修改了答案。根据您展示的代码,我认为超时不应该放在DataKey上,如果超时是必须的,那么这实际上是一个很好的实现。要实现超时,您需要使用第二个线程计时并终止原始的慢速线程。 - wzurita
请查看以下链接:https://dev59.com/8WYr5IYBdhLWcg3wQH6-。最好正确使用底层库的设置,并尽可能简单地编写代码,就像我的原始答案一样(我将编辑它以包含这一部分)。 - wzurita

1
通过异步执行同步任务的上述代码与将所有内容都设为异步相同。如果这是要求,我建议您使用Google Guava的ListenableFuture。我不是支持者,但它具有管理任务超时的方法,以及处理onSuccess、onFailure场景的良好编写的回调函数。https://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接