Elasticsearch REST客户端仍然提示IOException: 打开文件过多。

8

这是对我之前帖子提供的解决方案的跟进:

如何正确关闭原始RestClient以实现最佳性能使用Elastic Search 5.5.0?

这个完全相同的错误消息又出来了!

2017-09-29 18:50:22.497 ERROR 11099 --- [8080-Acceptor-0] org.apache.tomcat.util.net.NioEndpoint   : Socket accept failed

java.io.IOException: Too many open files
    at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) ~[na:1.8.0_141]
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) ~[na:1.8.0_141]
    at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) ~[na:1.8.0_141]
    at org.apache.tomcat.util.net.NioEndpoint$Acceptor.run(NioEndpoint.java:453) ~[tomcat-embed-core-8.5.15.jar!/:8.5.15]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_141]

2017-09-29 18:50:23.885  INFO 11099 --- [Thread-3] ationConfigEmbeddedWebApplicationContext : Closing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@5387f9e0: startup date [Wed Sep 27 03:14:35 UTC 2017]; root of context hierarchy
2017-09-29 18:50:23.890  INFO 11099 --- [Thread-3] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647
2017-09-29 18:50:23.891  WARN 11099 --- [Thread-3] o.s.c.support.DefaultLifecycleProcessor  : Failed to stop bean 'documentationPluginsBootstrapper'

    ... 7 common frames omitted

2017-09-29 18:50:53.891  WARN 11099 --- [Thread-3] o.s.c.support.DefaultLifecycleProcessor  : Failed to shut down 1 bean with phase value 2147483647 within timeout of 30000: [documentationPluginsBootstrapper]
2017-09-29 18:50:53.891  INFO 11099 --- [Thread-3] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown
2017-09-29 18:50:53.894  INFO 11099 --- [Thread-3] com.app.controller.SearchController  : Closing the ES REST client

我尝试使用之前帖子中的解决方案。

ElasticsearchConfig:

@Configuration
public class ElasticsearchConfig {

@Value("${elasticsearch.host}")
private String host;

@Value("${elasticsearch.port}")
private int port;

@Bean
public RestClient restClient() {
    return RestClient.builder(new HttpHost(host, port))
    .setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
        @Override
        public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
            return requestConfigBuilder.setConnectTimeout(5000).setSocketTimeout(60000);
        }
    }).setMaxRetryTimeoutMillis(60000).build();
}

搜索控制器:

@RestController
@RequestMapping("/api/v1")
public class SearchController {

    @Autowired
    private RestClient restClient;

    @RequestMapping(value = "/search", method = RequestMethod.GET, produces="application/json" )
    public ResponseEntity<Object> getSearchQueryResults(@RequestParam(value = "criteria") String criteria) throws IOException {

        // Setup HTTP Headers
        HttpHeaders headers = new HttpHeaders();
        headers.add("Content-Type", "application/json");

        // Setup query and send and return ResponseEntity...

        Response response = this.restClient.performRequest(...);
    }

    @PreDestroy
    public void cleanup() {
        try {
            logger.info("Closing the ES REST client");
            this.restClient.close();
        } 
        catch (IOException ioe) {
            logger.error("Problem occurred when closing the ES REST client", ioe);
        }
    }
}    

pom.xml:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.4.RELEASE</version>
</parent>

<dependencies>
    <!-- Spring -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <!-- Elasticsearch -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>5.5.0</version>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>transport</artifactId>
        <version>5.5.0</version>
    </dependency>

    <!-- Apache Commons -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.6</version>
    </dependency>

    <!-- Log4j -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
</dependencies>

这让我觉得RestClient从一开始就没有显式地关闭连接...这很令人惊讶,因为我的基于Elasticsearch Spring Boot的微服务在两个不同的AWS EC-2服务器上进行负载均衡。
日志文件中报告了大约2000次该异常,并且只有在最后preDestroy()才关闭了客户端。请看@PreDestroy()清理方法的INFO被记录在StackTrace的末尾。
我需要在SearchController中显式设置finally子句并显式关闭RestClient连接吗?
这个IOException不能再次发生非常关键,因为这个搜索微服务依赖于许多不同的移动客户端(iOS&Android)。
需要使其具有容错性和可扩展性......或者至少不会出现故障。
这个原因是它只出现在日志文件的底部:
2017-09-29 18:50:53.894  INFO 11099 --- [Thread-3] com.app.controller.SearchController : Closing the ES REST client

这是因为我做了这个:

kill -3 jvm_pid

我应该保留 @PreDestory cleanup() 方法,但更改 SearchController.getSearchResults() 方法的内容,使其反映出以下内容:

@RequestMapping(value = "/search", method = RequestMethod.GET, produces="application/json" )
public ResponseEntity<Object> getSearchQueryResults(@RequestParam(value = "criteria") String criteria) throws IOException {
    // Setup HTTP Headers
    HttpHeaders headers = new HttpHeaders();
    headers.add("Content-Type", "application/json");

    // Setup query and send and return ResponseEntity...
    Response response = null;

    try {
        // Submit Query and Obtain Response
        response = this.restClient.performRequest("POST", endPoint, Collections.singletonMap("pretty", "true"), entity);
    } 
    catch(IOException ioe) {
        logger.error("Exception when performing POST request " + ioe);
    }
    finally {
        this.restClient.close();
    }
    // return response as EsResponse();
}

这样一来,RestClient连接总是关闭的......希望有人能帮助我解决这个问题。

@PacificNW_Lover 请注意,您的异常是由 sun.nio.ch.ServerSocketChannelImpl.accept0() 抛出的,这表明异常的原因与 Elasticsearch 的 RestClient 可能无关,而且可能不同于您第一篇帖子中的异常信息。也许您的服务接收到了太多的流量,或者只需检查并提高 ECS 实例中的 ulimit - diginoise
我同意@diginoise的观点,问题似乎不是来自RestClient。 - Val
3个回答

8

根据我的看法,你做错了几件事,但我会直接给出解决方案。

我不会写出完整的解决方案(实际上,我没有执行或测试任何东西),但重要的是要理解它。此外,最好将所有与数据访问相关的内容移动到另一层。

步骤1:导入正确的库。

与你的示例几乎相同。 我更新了示例以使用版本5.6.2中推荐的最新客户端库。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.acervera</groupId>
  <artifactId>elastic-example</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>elastic-example</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <es.version>5.6.2</es.version>
  </properties>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.4.RELEASE</version>
  </parent>

  <dependencies>
    <!-- Spring -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>

    <!-- Elasticsearch -->
    <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch</artifactId>
      <version>${es.version}</version>
    </dependency>

<dependency>
  <groupId>org.elasticsearch.client</groupId>
  <artifactId>elasticsearch-rest-high-level-client</artifactId>
  <version>${es.version}</version>
</dependency>

    <!-- Log4j -->
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>
  </dependencies>
</project>

步骤2: 在Bean工厂中创建并关闭客户端。

在Bean工厂中,创建并销毁客户端。您可以重复使用同一个客户端。

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;

@Configuration
public class ElasticsearchConfig {


    // Here all init stuff with @Value(....)


    RestClient lowLevelRestClient;
    RestHighLevelClient client;

    @PostConstruct
    public void init() {
        lowLevelRestClient = RestClient.builder(new HttpHost("host", 9200, "http")).build();
        client = new RestHighLevelClient(lowLevelRestClient);
    }

    @PreDestroy
    public void destroy() throws IOException {
        lowLevelRestClient.close();
    }

    @Bean
    public RestHighLevelClient getClient() {
        return client;
    }

}

第三步:使用Java传输客户端执行查询。

使用Java传输客户端执行查询。

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;

@RestController
@RequestMapping("/api/v1")
public class SearchController {

    @Autowired
    private RestHighLevelClient client;

    @RequestMapping(value = "/search", method = RequestMethod.GET, produces="application/json" )
    public ResponseEntity<Tweet> getSearchQueryResults(@RequestParam(value = "criteria") String criteria) throws IOException {

        // This is only one example. Of course, this logic make non sense and you are going to put it in a DAO
        // layer with more logical stuff
        SearchRequest searchRequest = new SearchRequest();
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());
        SearchResponse searchResponse = client.search(searchRequest);

        if(searchResponse.getHits().totalHits > 0) {
            SearchHit searchHit = searchResponse.getHits().iterator().next();

            // Deserialize to Java. The best option is to use response.getSource() and Jackson
            // This is other option.
            Tweet tweet = new Tweet();
            tweet.setId(searchHit.getField("id").getValue().toString());
            tweet.setTittle(searchHit.getField("tittle").getValue().toString());
            return ResponseEntity.ok(tweet);
        } else {
            return ResponseEntity.notFound().build();
        }

    }

}

此外,使用bean构建响应。

public class Tweet {

    private String id;

    private String tittle;

    public String getTittle() {
        return tittle;
    }

    public void setTittle(String tittle) {
        this.tittle = tittle;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    // Here rest of bean stuff (equal, hash, etc) or Lombok
}

步骤:4

享受Elasticsearch!

注: Java REST客户端[5.6] » Java高级REST客户端

PS. 有必要重构示例,这只是为了理解方式。


1
在5.6版本中,他们引入了高级REST客户端,但自5.0版本以来就提供了低级Java REST客户端。低级REST客户端还可以在所有节点之间平衡查询。最后,我建议不要使用传输客户端,因为Elastic将废弃并最终删除传输客户端和Java API:https://www.elastic.co/blog/state-of-the-official-elasticsearch-java-clients - Val
@Val 无论如何,结构都是相同的。一个线程安全的连接器被重复使用,只有在应用程序启动或完成时才需要打开和关闭一次。我会在今晚更新回复。感谢您的意见! - angelcervera
@Val 百分之百同意。我今晚会更新回复。 - angelcervera
你不能使用Java High Level REST Client吗?我更新了例子。请检查你的pom文件,因为在你的例子中你并没有使用它。 - angelcervera
1
@PacificNW_Lover cleanup()方法只会在应用程序关闭和bean被回收时调用。它不会在每个请求之后被调用...也没有必要这样做。我在我的应用程序中具有完全相同的设置,并且没有这样的错误。您是否与/search API端点的客户端保持长期连接? - Val
显示剩余11条评论

1
从您的堆栈跟踪中可以看出,嵌入式Tomcat(您的应用程序容器)由于“打开文件过多”错误而无法再接受新连接。从您的代码来看,“elasticsearch rest client”似乎没有问题。
由于您在处理搜索请求时重复使用了单个“RestClient”实例,因此可能不会有超过30个(“org.elasticsearch.client.RestClientBuilder.DEFAULT_MAX_CONN_TOTAL”)与ES集群的打开连接。因此,“RestClient”不太可能引起问题。
其他潜在的根本原因可能是您的服务消费者将连接保持在(Tomcat)服务器上更长时间,或者他们没有正确关闭连接。

我需要在SearchController中显式放置finally子句并显式关闭RestClient连接吗?

不需要。在关闭服务时应关闭Rest客户端(在@PreDestroy方法中,就像您已经正确执行的那样)。

1
你确定每次HTTP请求时不会启动新的(线程池)连接到elasticsearch服务器吗?即,在以下行中:
        Response response = this.restClient.performRequest(...);

在单个HTTP请求后,请仔细检查elasticsearch服务器上的日志。您应该尝试实现Singleton模式,而无需使用@Autowired注释,并查看问题是否仍然存在。


根据Elasticsearch的文档,低级别的RestClient是线程安全的。在日志中我应该寻找什么? - PacificNW_Lover
他已经在使用@Autowired,并且RestClient在配置bean中只被创建一次。 - Val
那么,SearchController只被实例化一次吗?如果不是,为什么不将restClient作为单例的static变量呢? - Zouzias
@PacificNW_Lover 在应用程序日志中,HTTP 请求后应该看到一个新的连接线程池被初始化。如果没有,那么你的应用程序可能会在其他地方泄漏资源。 - Zouzias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接