提高Python Elasticsearch性能

4

我是Python和Elasticsearch的新手,我编写了一段Python代码,从一个非常大的JSON文件中读取数据,并将其中一些属性索引到Elasticsearch中。

import elasticsearch
import json
es = elasticsearch.Elasticsearch()  # use default of localhost, port 9200
with open('j.json') as f:
    n=0
    for line in f:
        try:
            j_content = json.loads(line)
            event_type = j_content['6000000']
            device_id = j_content['6500048']
            raw_event_msg= j_content['6000012']
            event_id = j_content["0"]
            body = {
                '6000000': str(event_type),
                '6500048': str(device_id),
                '6000012': str(raw_event_msg),
                '6000014': str(event_id),
            }
            n=n+1
            es.index(index='coredb', doc_type='json_data', body=body)
        except:
            pass

但是代码运行太慢,我的硬件资源还有很多空余。我应该如何通过使用多线程或批量处理来提高代码的性能呢?


由于您有一个for循环,多线程是一种直接的答案,前提是您的Elastic集群可以处理更重的负载。(我建议您在Python中使用多进程而不是多线程,主要是因为GIL)。然后,我建议您查看这篇文章,其中提供了增加索引速度的好建议:https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html - Adonis
你应该在Python客户端中使用批量API,而不是单个文档索引,结合批量助手 - alr
2个回答

1
你可能需要研究使用Elasticsearch的helpers,其中一个特别称为bulk,似乎你已经意识到了它,因此不要让Elasticsearch在每个循环中将数据设置到索引中,而是将结果收集到列表中,一旦该列表达到一定长度,请使用bulk函数,这将极大地提高性能。
你可以通过以下示例大致了解情况;我有一个非常大的文本文件,有72873471行,使用命令行有效地计算出来,例如:wc -l big-file.txt,并且使用你发布的相同方法,结果预计需要10天。
# Slow Method ~ 10 days
from elasticsearch import Elasticsearch
import progressbar # pip3 install progressbar2
import re
es = Elasticsearch()
file = open("/path/to/big-file.txt")
with progressbar.ProgressBar(max_value=72873471) as bar:
    for idx, line in enumerate(file):
        bar.update(idx)
        clean = re.sub("\n","",line).lstrip().rstrip()
        doc = {'tag1': clean, "tag2": "some extra data"}
        es.index(index="my_index", doc_type='index_type', body=doc)

现在从Elasticsearch导入辅助程序,将时间缩短到3.5小时:
# Fast Method ~ 3.5 hours
from elasticsearch import Elasticsearch, helpers
import progressbar # pip3 install progressbar2
import re
es = Elasticsearch()
with progressbar.ProgressBar(max_value=72873471) as bar:
actions = []
file = open("/path/to/big-file.txt")
for idx, line in enumerate(file):
    bar.update(idx)
    if len(actions) > 10000:
        helpers.bulk(es, actions)
        actions = []
    clean = re.sub("\n","",line).lstrip().rstrip()
    actions.append({
        "_index": "my_index", # The index on Elasticsearch
        "_type": "index_type", # The document type
        "_source": {'tag1': clean, "tag2": "some extra data"}
    })

0

你需要的是Cython技术;)

只要为变量启用静态类型,就可以将代码加速高达20倍。

以下代码应该使用Cython进行编译,请尝试一下,你会看到巨大的改进:

try:
    j_content = json.loads(line)       # Here you might want to work with cython structs.
                                       # I can see you have a json per line, so it should be easy
    event_type = j_content['6000000']
    device_id = j_content['6500048']
    raw_event_msg= j_content['6000012']
    event_id = j_content["0"]
    body = {
        '6000000': str(event_type),
        '6500048': str(device_id),
        '6000012': str(raw_event_msg),
        '6000014': str(event_id),
    }
    n=n+1

答案与原问题无关,因为它没有提到如何使用线程或批量 API。此外,与通过网络进行多个 API 调用相比,字典初始化几乎不是一个问题。 - PyGuy
@PyGuy,虽然我的回答没有提到如何使用线程或批量API(或asyncIO或trio),但它确实与关于性能的问题有关。而且,加速字典初始化肯定会大大提高性能。 - Raydel Miranda

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接