使用Python客户端的Elasticsearch滚动搜索

7
当在elasticsearch中滚动时,每次滚动都要提供最新的scroll_id
引用: 初始搜索请求和每个后续滚动请求返回一个新的滚动ID - 只应使用最新的滚动ID。
以下示例(摘自这里)让我感到困惑。首先是滚动初始化:
rs = es.search(index=['tweets-2014-04-12','tweets-2014-04-13'], 
               scroll='10s', 
               search_type='scan', 
               size=100, 
               preference='_primary_first',
               body={
                 "fields" : ["created_at", "entities.urls.expanded_url", "user.id_str"],
                   "query" : {
                     "wildcard" : { "entities.urls.expanded_url" : "*.ru" }
                   }
               }
   )
sid = rs['_scroll_id']

然后进行循环:

tweets = [] while (1):
    try:
        rs = es.scroll(scroll_id=sid, scroll='10s')
        tweets += rs['hits']['hits']
    except:
        break

这段代码能够正常运行,但我没看到sid是在哪里更新的。我相信它是在Python客户端内部完成更新的,但我不理解它是如何实现的...


什么时候会跳出循环?我尝试了这种方法,但它一直在执行,我不得不手动停止。谢谢。 - Nguai al
5个回答

19

这是一个古老的问题,但在搜索“elasticsearch python scroll”时第一次出现。Python模块提供了一个帮助方法,为您完成所有工作。它是一个生成器函数,将返回每个文档,并管理底层滚动ID。

https://elasticsearch-py.readthedocs.io/en/master/helpers.html#scan

以下是用法示例:

from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan

query = {
    "query": {"match_all": {}}
}

es = Elasticsearch(...)
for hit in scan(es, index="my-index", query=query):
    print(hit["_source"]["field"])

8
使用Python requests库
import requests
import json

elastic_url = 'http://localhost:9200/my_index/_search?scroll=1m'
scroll_api_url = 'http://localhost:9200/_search/scroll'
headers = {'Content-Type': 'application/json'}

payload = {
    "size": 100,
    "sort": ["_doc"]
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    }
}

r1 = requests.request(
    "POST",
    elastic_url,
    data=json.dumps(payload),
    headers=headers
)

# first batch data
try:
    res_json = r1.json()
    data = res_json['hits']['hits']
    _scroll_id = res_json['_scroll_id']
except KeyError:
    data = []
    _scroll_id = None
    print 'Error: Elastic Search: %s' % str(r1.json())
while data:
    print data
    # scroll to get next batch data
    scroll_payload = json.dumps({
        'scroll': '1m',
        'scroll_id': _scroll_id
    })
    scroll_res = requests.request(
        "POST", scroll_api_url,
        data=scroll_payload,
        headers=headers
    )
    try:
        res_json = scroll_res.json()
        data = res_json['hits']['hits']
        _scroll_id = res_json['_scroll_id']
    except KeyError:
        data = []
        _scroll_id = None
        err_msg = 'Error: Elastic Search Scroll: %s'
        print err_msg % str(scroll_res.json())

参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html#search-request-scroll

这是关于Elasticsearch中“scroll”搜索的文档。此功能可用于处理大量数据,因为它允许您在不影响性能的情况下检索大量数据。简单来说,“scroll”搜索将结果分批返回,并继续保持搜索上下文,以便您可以在后续请求中检索更多的结果。

嘿@anjan,如果我们使用弹性版本5,我们可以使用这个吗?我猜滚动只在弹性版本6中。 - ak3191
@ak3191 我认为它只在v6中可用,所以我们不能在v5中使用它。 - anjaneyulubatta505
我有一种方法...如果您想知道,请告诉我。 - ak3191

4
事实上,这段代码存在一个错误 - 为了正确使用滚动功能,应该在下一次调用scroll()时使用每个新调用返回的新scroll_id,而不是重复使用第一个scroll_id:

重要提示

初始搜索请求和每个后续滚动请求都会返回一个新的scroll_id - 只有最近的scroll_id应该被使用。

http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-request-scroll.html

这是因为Elasticsearch在调用之间并不总是更改scroll_id,并且对于较小的结果集,可以返回与最初返回的相同的scroll_id一段时间。去年的这次讨论是两个其他用户之间看到了相同的问题,即相同的scroll_id一段时间内被返回:

http://elasticsearch-users.115913.n3.nabble.com/Distributing-query-results-using-scrolling-td4036726.html

因此,尽管您的代码适用于较小的结果集,但它并不正确 - 您需要捕获每个新调用scroll()返回的scroll_id,并将其用于下一次调用。


只是为了明确:这个 bug 是在示例代码中,对吧?希望 Python 客户端没有 bug。 - Dror
如果确实是这种情况,正如我所认为的那样,您能否建议如何更正此示例?仅在try内添加类似于sid=rs['_scroll_id']的内容似乎不起作用。 - Dror
陷入无限循环。while 循环不会结束。 - Dror
1
那个网站的代码中出现了第二个bug - 不应该有一个while循环等待异常,因为当scroll正确调用时,我不认为它会抛出异常。你应该检查在循环的那一次迭代中是否收到任何命中 - 如果没有则完成了任务。 - John Petrone

1
from elasticsearch import Elasticsearch

elasticsearch_user_name ='es_username'
elasticsearch_user_password ='es_password'
es_index = "es_index"

es = Elasticsearch(["127.0.0.1:9200"],
                   http_auth=(elasticsearch_user_name, elasticsearch_user_password))


query = {
  "query": {
     "bool": {
        "must": [
            {
            "range": {
                "es_datetime": {
                    "gte": "2021-06-21T09:00:00.356Z",
                    "lte": "2021-06-21T09:01:00.356Z",
                    "format": "strict_date_optional_time"
                    }
                }
        }
    ]
 }
},
  "fields": [
      "*"
],
  "_source": False,
  "size": 2000,
}
resp = es.search(index=es_index, body=query, scroll="1m")
old_scroll_id = resp['_scroll_id']
results = resp['hits']['hits']
while len(results):
    for i, r in enumerate(results):
    # do something whih data
    pass
    result = es.scroll(
        scroll_id=old_scroll_id,
        scroll='1m'  # length of time to keep search context
    )
    # check if there's a new scroll ID
    if old_scroll_id != result['_scroll_id']:
        print("NEW SCROLL ID:", result['_scroll_id'])
    # keep track of pass scroll _id
    old_scroll_id = result['_scroll_id']

    results = result['hits']['hits']

1

self._elkUrl = "http://Hostname:9200/logstash-*/_search?scroll=1m"

自己._elkUrl = "http://Hostname:9200/logstash-*/_search?scroll=1m"

self._scrollUrl="http://Hostname:9200/_search/scroll"


    """
    Function to get the data from ELK through scrolling mechanism
    """ 
    def GetDataFromELK(self):
        #implementing scroll and retriving data from elk to get more than 100000 records at one search
        #ref :https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-request-scroll.html
        try :
            dataFrame=pd.DataFrame()
            if self._elkUrl is None:
                raise ValueError("_elkUrl is missing")
            if self._username is None:
                raise ValueError("_userNmae for elk is missing")
            if self._password is None:
                raise ValueError("_password for elk is missing")
            response=requests.post(self._elkUrl,json=self.body,auth=(self._username,self._password))
            response=response.json()
            if response is None:
                raise ValueError("response is missing")
            sid  = response['_scroll_id']
            hits = response['hits']
            total= hits["total"]
            if total is  None:
                raise ValueError("total hits from ELK is none")
            total_val=int(total['value'])
            url = self._scrollUrl
            if url is None:
                raise ValueError("scroll url is missing")
            #start scrolling 
            while(total_val>0):
                #keep search context alive for 2m
                scroll = '2m'
                scroll_query={"scroll" : scroll, "scroll_id" : sid }
                response1=requests.post(url,json=scroll_query,auth=(self._username,self._password))
                response1=response1.json()
                # The result from the above request includes a scroll_id, which should be passed to the scroll API in order to retrieve the next batch of results
                sid = response1['_scroll_id']
                hits=response1['hits']
                data=response1['hits']['hits']
                if len(data)>0:
                    cleanDataFrame=self.DataClean(data)
                    dataFrame=dataFrame.append(cleanDataFrame)
                total_val=len(response1['hits']['hits'])
                num=len(dataFrame)
            print('Total records recieved from ELK=',num)
            return dataFrame
        except Exception as e:
            logging.error('Error while getting the data from elk', exc_info=e)
            sys.exit()


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接