读取CSV并将数据上传到Elasticsearch

3

我正在逐行迭代csv文件的行,并希望将其插入es中。我对Python和Elasticsearch都不熟悉。如何将一个CSV行转换并逐个插入到ES中?

import csv
import json

from elasticsearch import Elasticsearch

es = Elasticsearch(
  [{'host': 'localhost', 'port': 9200}])
 print(es)


def csv_reader(file_obj, delimiter=','):
   reader = csv.reader(file_obj)
   i = 1
   results = []
   for row in reader:
    print(row)
    es.index(index='product', doc_type='prod', id=i, 
   body=json.dump([row for row in reader], file_obj))
    i = i + 1
    results.append(row)
    print(row)


 if __name__ == "__main__":
  with open("/home/Documents/csv/acsv.csv") as f_obj:
    csv_reader(f_obj)

但是我遇到了以下错误:

Traceback (most recent call last):

File "/home/PycharmProjects/CsvReaderForSyncEs/csvReader.py", line 25, in csv_reader(f_obj)

File "/home/PycharmProjects/CsvReaderForSyncEs/csvReader.py", line 17, in csv_reader

es.index(index='product', doc_type='prod', id=i, body=json.dump([row for row in reader], file_obj))

File "/usr/lib/python2.7/json/init.py", line 190, in dump fp.write(chunk)

IOError: 文件未打开以进行写操作

注:该段文本涉及IT技术。
3个回答

9

尝试使用批量API。

import csv
from elasticsearch import helpers, Elasticsearch

def csv_reader(file_name):
    es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
    with open(file_name, 'r') as outfile:
        reader = csv.DictReader(outfile)
        helpers.bulk(es, reader, index="index_name", doc_type="type")

请参考以下链接了解有关批量API的更多信息:https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

这正是我正在寻找的。顺便问一下,你知道为什么上面的函数会在我的csv的第一列标题名称后附加一个点吗?即 ".CurrencyType": "Cash" - Walter U.
如何逐块处理?我的 CSV 文件太大了,导致内存不足。 - Murtaza Haji
首先将CSV文件分成块,然后使用代码片段。 - Ashwani Shakya
请问能否举个例子?@AshwaniShakya - Murtaza Haji
请遵循 https://gist.github.com/ashshakya/92ecb784c49809f0efda70c6f151cf63 将大型CSV文件分割成更小的部分。 - Ashwani Shakya
我认为需要进行2021年的更新。我刚尝试了一下,结果出现了异常,提到“TransportError(406, 'Content-Type header [] is not supported')”这个问题。我在网上看到有关通过设置命令行调用时的内容类型标头来处理此问题的内容,但我想像你在这里一样将其嵌入到我的代码中。 - James T Snell

0
问题在于您将file_obj作为json.dump的参数进行传递,但是该文件只被打开用于读取。请检查open函数的模式参数,详情请见链接
同时,请检查json.dump函数的第一个参数,[row for row in reader]获取了csv文件中的所有行,但可能你只想传递一行数据,因此参数应该是row
此外,json.dump是用于将数据写入文件的,您可能应该使用json.dumps函数,详情请见这里

我已经更改为es.index(index='product', doc_type='prod', id=i, body=json.dumps(row)),但仍然出现解析错误。 - agxcv
@agxcv 我猜问题在于 csv.reader 返回的是 Python 列表,而对于正文你需要一个 Python 字典,请参考这个例子。也许你可以使用 csv DictReader - piedra

0

你可以试试这个。 将reader更改为DictReader并使用json.dumps(row)。 DictReader使输入数据成为Python字典。对于in循环中的每一行,你只需要尝试推送row就足够了。

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
print(es)

def csv_reader(file_obj, delimiter=','):
    reader = csv.DictReader(file_obj)
    i = 1
    results = []
    for row in reader:
        print(row)
        es.index(index='product', doc_type='prod', id=i,
                         body=json.dumps(row))
        i = i + 1

        results.append(row)
        print(row)

if __name__ == "__main__":
    with open("/home/Documents/csv/acsv.csv") as f_obj:
        csv_reader(f_obj)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接