弹性搜索滚动(扫描)到Pandas DataFrame

3
我需要从Elasticsearch(es)中获取大量数据,因此我使用的是scan命令,该命令是本地esscroll命令的一个包装。结果将得到以下生成器对象:<generator object scan at 0x000001BF5A25E518>。此外,我想将所有数据插入到一个PandasDataFrame对象中,以便我可以轻松处理它。
代码如下:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan as escan
import pandas as pd

es = Elasticsearch(dpl_server, verify_certs=False)

body = {
  "size": 1000,
  "query": {
    "match_all": {}
  }
}
response = escan(client=es,
                 index="index-*,
                 query=body, request_timeout=30, size=1000)

print(response)
#<generator object scan at 0x000001BF5A25E518>

我想要做的是将所有结果放入Pandas DataFrame中。如果我按照以下方式打印生成器中的每个元素:

for res in response:
    print(res['_source'])
# { .... }
# { .... }
# { .... }

我将得到很多词典。至今为止,我的一个天真的解决方案是逐个添加它们,如下所示:
df = None
for res in response:
    if (df is None):
        df = pd.DataFrame([res['_source']])
    else:
        df = pd.concat([df, pd.DataFrame([res['_source']])], sort=True)

我希望知道是否有更好的方法(首先是从速度和代码清晰度两方面考虑)。例如,累加所有生成器的结果到一个列表中,然后构建完整的DataFrame是否更好?

1个回答

2
你可以使用panda的json_normalize函数。
from pandas.io.json import json_normalize
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan as escan
import pandas as pd

es = Elasticsearch(dpl_server, verify_certs=False)

body = {
  "size": 1000,
  "query": {
    "match_all": {}
  }
}
response = escan(client=es,
                 index="index",
                 query=body, request_timeout=30, size=1000)

# Initialize a double ended queue
output_all = deque()
# Extend deque with iterator
output_all.extend(response)
# Convert deque to DataFrame
output_df = json_normalize(output_all)

这里可以找到关于双端队列的更多信息。


这是一个很好的答案!但它也引发了一些问题。我们为什么选择deque()?是否有更好(更快)的数据结构可用?此外,我想添加res['_source'](对于每个“hit”,我想将对象添加到'_source'下),而不是整个响应对象。 - Eran Moshe
1
@EranMoshe deque() 只是为了更快地进行附加/扩展。为了仅选择属于 _source 的字段,我有一个小函数,只选择以 "_source." 开头的列,类似于 output_df = output_df[[x for x in output_df.columns if "_source." in x]]。 - Michele Tonutti
嘿,如果你想到了更好的解决方案,比如多进程或多线程之类的,请告诉我!如果我开发出一个新的解决方案,我也会告诉你。 - Eran Moshe
我尝试了你的解决方案和答案的解决方案,但在两种情况下,我都陷入了无限循环,我无法打破for循环,尽管响应中只有很少的元素(大约500个)。对此有什么线索吗? - Rami

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接