如何正确实现HTTP sink?

20

我希望将我的DataStream流的计算结果通过HTTP协议发送到其他服务。我看到有两种可能的实现方式:

  1. 在sink中使用同步的Apache HttpClient客户端
public class SyncHttpSink extends RichSinkFunction<SessionItem> {
    private static final String URL = "http://httpbin.org/post";

    private CloseableHttpClient httpClient;

    private Histogram httpStatusesAccumulator;

    @Override
    public void open(Configuration parameters) throws Exception {
        httpClient = HttpClients.custom()
            .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
            .build();

        httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
    }

    @Override
    public void close() throws Exception {
        httpClient.close();

        httpStatusesAccumulator.resetLocal();
    }

    @Override
    public void invoke(SessionItem value) throws Exception {
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
        params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
        params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));

        UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);

        HttpPost httpPost = new HttpPost(URL);
        httpPost.setEntity(entity);

        try(CloseableHttpResponse response = httpClient.execute(httpPost)) {
            int httpStatusCode = response.getStatusLine().getStatusCode();

            httpStatusesAccumulator.add(httpStatusCode);
        }
    }
}
  1. 在sink中使用异步的Apache HttpAsyncClient客户端
public class AsyncHttpSink extends RichSinkFunction<SessionItem> {
    private static final String URL = "http://httpbin.org/post";

    private CloseableHttpAsyncClient httpClient;

    private Histogram httpStatusesAccumulator;

    @Override
    public void open(Configuration parameters) throws Exception {
        httpClient = HttpAsyncClients.custom()
                .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
                .build();
        httpClient.start();

        httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
    }

    @Override
    public void close() throws Exception {
        httpClient.close();

        httpStatusesAccumulator.resetLocal();
    }

    @Override
    public void invoke(SessionItem value) throws Exception {
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
        params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
        params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));

        UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);

        HttpPost httpPost = new HttpPost(URL);
        httpPost.setEntity(entity);

        httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
            @Override
            public void completed(HttpResponse response) {
                int httpStatusCode = response.getStatusLine().getStatusCode();

                httpStatusesAccumulator.add(httpStatusCode);
            }

            @Override
            public void failed(Exception ex) {
                httpStatusesAccumulator.add(-1); // -1 - failed
            }

            @Override
            public void cancelled() {
                httpStatusesAccumulator.add(-2); // -2 - cancelled
            }
        });
    }
}

问题:

  1. 在Sink中应该使用同步(sync)还是异步(async)的HTTP客户端?

  2. 如果我使用同步客户端,它会阻塞(Sink),通过背压机制,Flink将会阻塞(Source), 对吗?

  3. 如果我使用异步客户端,它不会阻塞(Sink), 对吗?

  4. 累加器(Accumulators)不是线程安全的吗?也就是说,我可以在异步回调中使用它吗?

  5. RuntimeContext不是线程安全的吗?也就是说,我可以在异步回调中使用它吗?

1个回答

17

1. 我应该在Sink中使用同步还是异步HTTP客户端?

为了避免由于阻塞式HTTP调用而导致的背压问题,我建议使用异步HTTP客户端。

2. 如果我使用同步客户端,它会阻塞Sink,并通过背压机制使Flink阻塞源。对吗?

是的,你说得对。背压将通过拓扑结构传播到源端。

3. 如果我使用异步客户端,它不会阻塞Sink。对吗?

没错。

4. Accumulators 不是线程安全的吗?也就是说,我能在异步回调中使用吗?

Accumulators 不是线程安全的,因此对它们的访问必须进行同步处理。

5. RuntimeContext 不是线程安全的吗?也就是说,我能在异步回调中使用吗?

RuntimeContext 不是线程安全的,因此对它的访问必须进行同步处理。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接