在Elasticsearch中一次查询返回所有记录。

13

我在 ElasticSearch 中有一个数据库,想要在我的网站页面上获取所有记录。我编写了一个 bean,连接到 ElasticSearch 节点,搜索记录并返回一些响应。我简单的 Java 代码用于进行搜索:

SearchResponse response = getClient().prepareSearch(indexName)
    .setTypes(typeName)              
    .setQuery(queryString("\*:*"))
    .setExplain(true)
    .execute().actionGet();

但是Elasticsearch将默认大小设置为10,而我的响应中有10个命中。我的数据库中有超过10条记录。如果我将大小设置为Integer.MAX_VALUE,我的搜索会变得非常慢,这不是我想要的。

如何在可接受的时间内一次性获取所有记录而不设置响应的大小?


我也遇到了同样的问题。谢谢你的提问。 - Ramaraj Karuppusamy
1
我有100个文档。我将Integer.MAX_VALUE设置为大小。在我的Jboss中也出现了OutOfMemoryError [Java heap space]。如果我给1000,那么就可以正常工作。 - Ramaraj Karuppusamy
10个回答

22
public List<Map<String, Object>> getAllDocs() {
        int scrollSize = 1000;
        List<Map<String,Object>> esData = new ArrayList<Map<String,Object>>();
        SearchResponse response = null;
        int i = 0;
        while (response == null || response.getHits().hits().length != 0) {
            response = client.prepareSearch(indexName)
                    .setTypes(typeName)
                    .setQuery(QueryBuilders.matchAllQuery())
                    .setSize(scrollSize)
                    .setFrom(i * scrollSize)
                    .execute()
                    .actionGet();
            for (SearchHit hit : response.getHits()) {
                esData.add(hit.getSource());
            }
            i++;
        }
        return esData;
}

这个方法可以工作,但需要将整个结果列表加载到内存中,这是不必要的,对于非常大的结果集甚至是不可能的。更健壮的解决方案是使用迭代器:https://dev59.com/32Up5IYBdhLWcg3wtZA0#35729505 - Alphaaa

13

目前排名最高的答案可行,但需要将整个结果列表加载到内存中,这可能会导致大型结果集的内存问题,并且在任何情况下都是不必要的。

我创建了一个Java类,实现了一个漂亮的SearchHit迭代器,可以遍历所有结果。在内部,它通过包含from:字段的查询处理分页,并且仅在内存中保留一页结果

用法:

// build your query here -- no need for setFrom(int)
SearchRequestBuilder requestBuilder = client.prepareSearch(indexName)
                                            .setTypes(typeName)
                                            .setQuery(QueryBuilders.matchAllQuery()) 

SearchHitIterator hitIterator = new SearchHitIterator(requestBuilder);
while (hitIterator.hasNext()) {
    SearchHit hit = hitIterator.next();

    // process your hit
}

注意,创建SearchRequestBuilder时,您无需调用setFrom(int),因为这将由SearchHitIterator在内部执行。如果要指定页面大小(即每个页面的搜索命中数),可以调用setSize(int),否则将使用ElasticSearch的默认值。

SearchHitIterator:

import java.util.Iterator;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.SearchHit;

public class SearchHitIterator implements Iterator<SearchHit> {

    private final SearchRequestBuilder initialRequest;

    private int searchHitCounter;
    private SearchHit[] currentPageResults;
    private int currentResultIndex;

    public SearchHitIterator(SearchRequestBuilder initialRequest) {
        this.initialRequest = initialRequest;
        this.searchHitCounter = 0;
        this.currentResultIndex = -1;
    }

    @Override
    public boolean hasNext() {
        if (currentPageResults == null || currentResultIndex + 1 >= currentPageResults.length) {
            SearchRequestBuilder paginatedRequestBuilder = initialRequest.setFrom(searchHitCounter);
            SearchResponse response = paginatedRequestBuilder.execute().actionGet();
            currentPageResults = response.getHits().getHits();

            if (currentPageResults.length < 1) return false;

            currentResultIndex = -1;
        }

        return true;
    }

    @Override
    public SearchHit next() {
        if (!hasNext()) return null;

        currentResultIndex++;
        searchHitCounter++;
        return currentPageResults[currentResultIndex];
    }

}

事实上,意识到拥有这样一个类是多么方便,我想知道为什么ElasticSearch的Java客户端没有提供类似的东西。


如果其他人添加或删除文档,我们有保证能够迭代所有文档吗? - X. Wo Satuk
3
我认为使用滚动查询会更加健壮。 - X. Wo Satuk
这取决于您如何组合SearchRequestBuilder。例如,如果您按升序文档ID对结果进行排序,并仅添加具有更高文档ID的文档,则将遍历所有结果。 此答案是针对ES 1.7编写的,当时还不存在滚动(Scrolls)。实际上,它们可能是更好的选择。 - Alphaaa
使用“from”也会像这里所描述的那样限制结果为10,000个。滚动没有这个问题。 - X. Wo Satuk

4
你可以使用滚动API。 另一个建议是使用searchhit迭代器,但只适用于不需要更新这些命中的情况。
import static org.elasticsearch.index.query.QueryBuilders.*;

QueryBuilder qb = termQuery("multi", "test");

SearchResponse scrollResp = client.prepareSearch(test)
        .addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
        .setScroll(new TimeValue(60000))
        .setQuery(qb)
        .setSize(100).execute().actionGet(); //max of 100 hits will be returned for each scroll
//Scroll until no hits are returned
do {
    for (SearchHit hit : scrollResp.getHits().getHits()) {
        //Handle the hit...
    }

    scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
} while(scrollResp.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop.

3
很久以前你提出了这个问题,我想发布我的答案供未来的读者参考。
如上面的回答所述,在索引中有数千个文档时,最好通过大小和起始位置来加载文档。在我的项目中,搜索默认加载50个结果并从零索引开始,如果用户想要加载更多数据,则会加载下一个50个结果。以下是在代码中所做的事情:
public List<CourseDto> searchAllCourses(int startDocument) {

    final int searchSize = 50;
    final SearchRequest searchRequest = new SearchRequest("course_index");
    final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.query(QueryBuilders.matchAllQuery());

    if (startDocument != 0) {
        startDocument += searchSize;
    }

    searchSourceBuilder.from(startDocument);
    searchSourceBuilder.size(searchSize);

    // sort the document
    searchSourceBuilder.sort(new FieldSortBuilder("publishedDate").order(SortOrder.ASC));
    searchRequest.source(searchSourceBuilder);

    List<CourseDto> courseList = new ArrayList<>();

    try {
        final SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        final SearchHits hits = searchResponse.getHits();

        // Do you want to know how many documents (results) are returned? here is you get:
        TotalHits totalHits = hits.getTotalHits();
        long numHits = totalHits.value;

        final SearchHit[] searchHits = hits.getHits();

        final ObjectMapper mapper = new ObjectMapper();

        for (SearchHit hit : searchHits) {
            // convert json object to CourseDto
            courseList.add(mapper.readValue(hit.getSourceAsString(), CourseDto.class));
        }
    } catch (IOException e) {
        logger.error("Cannot search by all mach. " + e);
    }
    return courseList;
}

信息: - Elasticsearch 版本 7.5.0 - 使用 Java High Level REST Client 作为客户端。

希望对某人有用。


0
SearchResponse response = restHighLevelClient.search(new SearchRequest("Index_Name"), RequestOptions.DEFAULT);
SearchHit[] hits = response.getHits().getHits();

0
你需要在返回结果数量、用户等待时间和服务器可用内存之间做出权衡。如果你已经索引了100万个文档,那么就没有现实的办法在一个请求中检索所有这些结果。我假设你的结果是给一个用户的。你需要考虑系统在负载下的表现。

0
要查询所有记录,您应该构建一个CountRequestBuilder来获取记录的总数(通过CountResponse),然后将该数字设置为您的搜索请求的大小。

0
如果您的主要重点是导出所有记录,您可能希望选择一种不需要任何排序的解决方案,因为排序是一项昂贵的操作。您可以使用ElasticsearchCRUD中描述的扫描和滚动方法here

0

对于版本6.3.2,以下内容有效:

public List<Map<String, Object>> getAllDocs(String indexName, String searchType) throws FileNotFoundException, UnsupportedEncodingException{

    int scrollSize = 1000;
    List<Map<String,Object>> esData = new ArrayList<>();
    SearchResponse response = null;
    int i=0;

    response = client.prepareSearch(indexName)
        .setScroll(new TimeValue(60000))
        .setTypes(searchType)  // The document types to execute the search against. Defaults to be executed against all types.
        .setQuery(QueryBuilders.matchAllQuery())
        .setSize(scrollSize).get(); //max of 100 hits will be returned for each scroll
    //Scroll until no hits are returned
    do {
        for (SearchHit hit : response.getHits().getHits()) {
            ++i;
            System.out.println (i + " " + hit.getId());
            writer.println(i + " " + hit.getId());
        }
        System.out.println(i);

        response = client.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet();
    } while(response.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop.
    return esData;
}

-2

1. 设置最大值,例如:MAX_INT_VALUE;

private static final int MAXSIZE=1000000;

@Override public List getAllSaleCityByCity(int cityId) throws Exception {

    List<EsSaleCity> list=new ArrayList<EsSaleCity>();

    Client client=EsFactory.getClient();
    SearchResponse response= client.prepareSearch(getIndex(EsSaleCity.class)).setTypes(getType(EsSaleCity.class)).setSize(MAXSIZE)
            .setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), FilterBuilders.boolFilter()
                    .must(FilterBuilders.termFilter("cityId", cityId)))).execute().actionGet();

    SearchHits searchHits=response.getHits();

    SearchHit[] hits=searchHits.getHits();
    for(SearchHit hit:hits){
        Map<String, Object> resultMap=hit.getSource();
        EsSaleCity saleCity=setEntity(resultMap, EsSaleCity.class);
        list.add(saleCity);
    }

    return list;

}

2. 在搜索之前计算 ES 的数量

CountResponse countResponse = client.prepareCount(getIndex(EsSaleCity.class)).setTypes(getType(EsSaleCity.class)).setQuery(queryBuilder).execute().actionGet();

int size = (int)countResponse.getCount();//这是你想要的大小;

然后你可以

SearchResponse response= client.prepareSearch(getIndex(EsSaleCity.class)).setTypes(getType(EsSaleCity.class)).setSize(size);

这段程序相关的内容翻译成中文如下:在验证阶段会发生错误,因为MAX_RESULT_WINDOW设置为10000。您需要在prepareSearch中设置滚动值以避免出现此情况。也许使用从响应对象中获取的scroll Id的prepareSearchScroll将是更好的解决方案。 - Ankit Chaudhary

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接