使用Rest High Level Client在Elastic Search中检索或插入数据时出现SocketTimeoutException异常

14

在从 Elastic 中检索/插入数据时,我遇到了 SocketTimeoutException 的问题。当每秒有大约 10-30 个请求 时会出现此问题,这些请求是get/put的组合。

以下是我的 Elastic 配置:

  • 3个主节点,每个节点都是4GB RAM
  • 2个数据节点,每个节点都是8GM RAM
  • Azure负载均衡器连接到上述数据节点(似乎只开放了9200端口)。Java客户端连接到此负载均衡器,因为它是唯一公开的端口。
  • Elastic 版本:7.2.0
  • Rest High Level Client:

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.2.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>7.2.0</version>
    </dependency>
    

索引信息:

  • 索引分片数:2
  • 索引副本数:1
  • 索引字段总数:10000
  • 从 kibana 中获取的索引大小:总共 - 27.2 MB,主分片 - 12.2MB
  • 索引结构:
{
  "dev-index": {
    "mappings": {
      "properties": {
        "dataObj": {
          "type": "object",
          "enabled": false
        },
        "generatedID": {
          "type": "keyword"
        },
        "transNames": { //it's array of string
          "type": "keyword"
        }
      }
    }
  }
}
  • 动态映射已禁用。
  • 以下是我的 elastic Config 文件。这里有两个连接 bean,一个用于读,另一个用于写入 elastic。

    ElasticConfig.java:

    @Configuration
    public class ElasticConfig {
    
        @Value("${elastic.host}")
        private String elasticHost;
    
        @Value("${elastic.port}")
        private int elasticPort;
    
        @Value("${elastic.user}")
        private String elasticUser;
    
        @Value("${elastic.pass}")
        private String elasticPass;
    
        @Value("${elastic-timeout:20}")
        private int timeout;
    
        @Bean(destroyMethod = "close")
        @Qualifier("readClient")
        public RestHighLevelClient readClient(){
    
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticUser, elasticPass));
    
            RestClientBuilder builder = RestClient
                    .builder(new HttpHost(elasticHost, elasticPort))
                    .setHttpClientConfigCallback(httpClientBuilder -> 
                            httpClientBuilder
                                    .setDefaultCredentialsProvider(credentialsProvider)
                                    .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(5).build())
                    );
    
            builder.setRequestConfigCallback(requestConfigBuilder -> 
                    requestConfigBuilder
                            .setConnectTimeout(10000)
                            .setSocketTimeout(60000)
                            .setConnectionRequestTimeout(0)
            );
    
            RestHighLevelClient restClient = new RestHighLevelClient(builder);
            return restClient;
        }
    
        @Bean(destroyMethod = "close")
        @Qualifier("writeClient")
        public RestHighLevelClient writeClient(){
    
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticUser, elasticPass));
    
            RestClientBuilder builder = RestClient
                    .builder(new HttpHost(elasticHost, elasticPort))
                    .setHttpClientConfigCallback(httpClientBuilder -> 
                            httpClientBuilder
                                    .setDefaultCredentialsProvider(credentialsProvider)
                                    .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(5).build())
                    );
    
            builder.setRequestConfigCallback(requestConfigBuilder -> 
                    requestConfigBuilder
                            .setConnectTimeout(10000)
                            .setSocketTimeout(60000)
                            .setConnectionRequestTimeout(0)
            );
    
            RestHighLevelClient restClient = new RestHighLevelClient(builder);
            return restClient;
        }
    
    }
    

    这里是一个函数,它会调用Elasticsearch。如果数据已经存在于Elasticsearch中,它将使用该数据;否则,它将生成数据并将其存入Elasticsearch。

    public Object getData(Request request) {
    
        DataObj elasticResult = elasticService.getData(request);
        if(elasticResult!=null){
            return elasticResult;
        }
        else{
            //code to generate data
            DataObj generatedData = getData();//some function which will generated data
            //put above data into elastic by Async call.
            elasticAsync.putData(generatedData);
            return generatedData;
        }
    }
    

    ElasticService.java getData Function:

    @Service
    public class ElasticService {
    
        @Value("${elastic.index}")
        private String elasticIndex;
    
        @Autowired
        @Qualifier("readClient")
        private RestHighLevelClient readClient;
    
        public DataObj getData(Request request){
            String generatedId = request.getGeneratedID();
    
            GetRequest getRequest = new GetRequest()
                    .index(elasticIndex)   //elastic index name
                    .id(generatedId);   //retrieving by index id from elastic _id field (as key-value)
    
            DataObj result = null;
            try {
                GetResponse response = readClient.get(getRequest, RequestOptions.DEFAULT);
                if(response.isExists()) {
                    ObjectMapper objectMapper = new ObjectMapper();
                    result = objectMapper.readValue(response.getSourceAsString(), DataObj.class);
                }
            }  catch (Exception e) {
                LOGGER.error("Exception occurred during  fetch from elastic !!!! " + ,e);
            }
            return result;
        }
    
    }
    

    ElasticAsync.java 异步 Put 数据函数:

    @Service
    public class ElasticAsync {
    
        private static final Logger LOGGER = Logger.getLogger(ElasticAsync.class.getName());
    
        @Value("${elastic.index}")
        private String elasticIndex;
    
        @Autowired
        @Qualifier("writeClient")
        private RestHighLevelClient writeClient;
    
        @Async
        public void putData(DataObj generatedData){
         ElasticVO updatedRequest = toElasticVO(generatedData);//ElasticVO matches to the structure of index given above.
    
            try {
                ObjectMapper objectMapper = new ObjectMapper();
                String jsonString = objectMapper.writeValueAsString(updatedRequest);
    
                IndexRequest request = new IndexRequest(elasticIndex);
                request.id(generatedData.getGeneratedID());
                request.source(jsonString, XContentType.JSON);
                request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
                request.timeout(TimeValue.timeValueSeconds(5));
                IndexResponse indexResponse = writeClient.index(request, RequestOptions.DEFAULT);
                LOGGER.info("response id: " + indexResponse.getId());
    
                }
    
            } catch (Exception e) {
                LOGGER.error("Exception occurred during saving into elastic !!!!",e);
            }
    
    
        }
    
    }
    

    下面是部分堆栈跟踪信息,当保存数据到Elasticsearch时发生异常:

    2019-07-19 07:32:19.997 ERROR [data-retrieval,341e6ecc5b10f3be,1eeb0722983062b2,true] 1 --- [askExecutor-894] a.c.s.a.service.impl.ElasticAsync        : Exception occurred during saving into elastic !!!!
    
    java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-34 [ACTIVE]
    at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:789) ~[elasticsearch-rest-client-7.2.0.jar!/:7.2.0]
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:225) ~[elasticsearch-rest-client-7.2.0.jar!/:7.2.0]
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:212) ~[elasticsearch-rest-client-7.2.0.jar!/:7.2.0]
        at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1448) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]
        at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1418) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]
        at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1388) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]
        at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClient.java:836) ~[elasticsearch-rest-high-level-client-7.2.0.jar!/:7.2.0]
    
    
    Caused by: java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-34 [ACTIVE]
        at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
        at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
        at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        ... 1 common frames omitted
    

    当检索数据到弹性搜索时发生异常时,以下是堆栈跟踪的一部分:

    2019-07-19 07:22:37.844 ERROR [data-retrieval,104cf6b2ab5b3349,b302d3d3cd7ebc84,true] 1 --- [o-8080-exec-346] a.c.s.a.service.impl.ElasticService      : Exception occurred during  fetch from elastic !!!! 
    
    java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-30 [ACTIVE]
        at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:789) ~[elasticsearch-rest-client-7.1.1.jar!/:7.1.1]
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:225) ~[elasticsearch-rest-client-7.1.1.jar!/:7.1.1]
        at org.elasticsearch.client.RestClient.performRequest(RestClient.java:212) ~[elasticsearch-rest-client-7.1.1.jar!/:7.1.1]
        at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1433) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]
        at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1403) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]
        at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1373) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]
        at org.elasticsearch.client.RestHighLevelClient.get(RestHighLevelClient.java:699) ~[elasticsearch-rest-high-level-client-7.1.1.jar!/:7.1.1]
    
    
    
    Caused by: java.net.SocketTimeoutException: 60,000 milliseconds timeout on connection http-outgoing-30 [ACTIVE]
            at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:92) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
        at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:39) ~[httpasyncclient-4.1.3.jar!/:4.1.3]
        at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:263) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:492) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:213) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ~[httpcore-nio-4.4.11.jar!/:4.4.11]
        ... 1 common frames omitted
    

    我已经阅读了几篇与stackoverflowelastic相关的博客,在那里他们提到这个问题可能是由于elasticRAM集群配置引起的。然后,由于只有两个数据节点,我将我的分片从5更改为2。还将数据节点的内存从4GB增加到8GB,因为我知道elastic只会使用RAM50%。异常发生的次数已经减少,但问题仍然存在。

    解决此问题的可能方法是什么?在java / elastic配置方面,我需要注意什么,以避免经常出现这种类型的SocketTimeoutException?如果您需要有关配置的任何其他详细信息,请告诉我。


    你好!你解决了这个问题吗?我在Azure云上的ES集群中遇到了相同的问题,但是当es客户端闲置几分钟后,我得到了SocketTimeout。 - Danila Zharenkov
    1
    @DanilaZharenkov 还没有解决。即使我增加了 ES 集群的 RAM,它仍然无法工作。只是 SocketTimeoutException 的频率延迟了 5 秒。 - AshwinK
    1
    我也遇到了这样的问题 30,000 毫秒连接超时 http-outgoing-14 [ACTIVE] - Raj Rajeshwar Singh Rathore
    我也遇到了这样的问题,连接http-outgoing-14 [ACTIVE]超时30,000毫秒,但是在本地安装中。 - christophe spielmann
    我遇到了这个问题。你能解决套接字超时异常吗? - saba safavi
    @sabasafavi 我使用了被接受的答案中提供的配置。希望能对你有所帮助。 - AshwinK
    1个回答

    13
    我们曾遇到同样的问题,经过一番深入挖掘,我找到了根本原因:客户端和弹性服务器之间的防火墙配置不匹配,导致tcp keep alive内核参数设置为7200秒(RedHat 6.x/7.x默认值),而防火墙会在3600秒后断开空闲连接。
    sysctl -n net.ipv4.tcp_keepalive_time
    7200
    
    所以在发送保持活动探测之前,连接会被关闭。弹性http客户端中的asyncHttpClient似乎不能很好地处理已经断开的连接,它只是等待直到套接字超时。
    因此,请检查您的客户端和服务器之间是否有任何网络设备(负载均衡器、防火墙、代理等),这些设备具有会话超时或类似功能,并增加该超时时间或降低tcp_keep_alive内核参数。

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接