将5千万条记录快速添加到AWS ElasticSearch的最快方法

3
我正在尝试找到一种方法,可以将数据加载到Elasticsearch中。AWS-ES提供了批量API,但是AWS对有效负载大小设置了限制。
请问有没有最快的方法从DB2中提取数据并将其放入AWS-ES中?记录数约为5000万条,每条记录的有效负载可能在1到3KB之间。
我已经尝试过使用Java模块,并通过API调用将数据放入ES,但速度非常慢。
是否有任何ETL工具或任何服务可用,可以读取JSON或CSV并将数据放入ES?

你考虑过使用Logstash吗?你确定每行的重量在1到3mb之间吗?这不会太多了吗?即使是1mb的行也会变成50tb,这是一个很大的问题。 - Michael Dz
更新了大小。谢谢!Logstash - 我还没有尝试过。 - Divyesh Kalbhor
2个回答

4

你可以使用一个简单的nodejs小应用程序。

我尝试了许多解决方案(包括logstash),但我发现最实用的方法是编写一个使用nodejs的小代码。

我在本地机器上进行了测试(性能较低),使用了10000条记录的JSON文件,总大小为35MB。

要求:nodejs,npm

  1. 创建一个新的工作文件夹。
  2. 进入创建的文件夹。
  3. 输入 npm install fs
  4. 输入 npm install etl
  5. 输入 npm install JSONStream
  6. 输入 npm install elasticsearch
  7. 创建一个新的index.js文件,并粘贴以下代码与您的数据(elasticsearch服务器,json文件)。在这种情况下,我将json数据文件放在同一文件夹中

将以下代码粘贴到index.js中

var etl = require('etl');

var fs = require('fs');

var JSONStream = require('JSONStream');

var elasticsearch = require('elasticsearch');

//change with your data
var client = new elasticsearch.Client({
  host: 'localhost:9200', 
  log: 'trace'
});


var readStream = fs.createReadStream('file.json') //change with your filename

readStream    
  .pipe(JSONStream.parse('*'))    
  .pipe(etl.collect(100))    
  .pipe(etl.elastic.index(client,'testindex','testtype')) //testindex(your index)- testtype your es type

运行 node index.js

它是如何工作的?

  1. 声明所需的模块

  2. 创建ES客户端并连接

  3. 读取JSON文件并创建流

  4. 将流导入,解析每个JSON对象(我的文件包含10K个对象)

  5. 使用ETL收集100个对象

  6. 将收集到的100个对象添加到Elasticsearch索引中

使用ETL还可以导入CSV(和其他格式)

更多信息和规范:ETLJSONStreamElasticsearch(nodejs)


在ETL中存在一个问题,即在索引过程中无法更改属性的数据类型。默认情况下,它会为所有字段添加文本。 - Kashif Saleem

2
您可以使用Logstash从数据库中一次性或连续地获取数据到Elasticsearch。请按照 说明 安装Logstash,然后只需要为您的数据库获取一个JDBC jar文件和Logstash的配置文件即可。配置文件模板如下:
input {
  jdbc {
    jdbc_driver_library => "LOCATION_OF_db2jcc4.jar"
    jdbc_driver_class => "com.ibm.db2.jcc.DB2Driver"
    jdbc_connection_string => "jdbc:db2://_DB_CONNECTION_DETAILS"
    jdbc_user => "user"
    jdbc_password => "pass"
    jdbc_paging_enabled => "true" #Useful for big data sets
    jdbc_fetch_size => "50000"
    jdbc_page_size => 100000
    #schedule => "* * * * *" #Uncomment if you want to run your query continuously 
    statement => "SELECT * from table" #Query that selects data which you want to download
  }
}
output{
    elasticsearch {
    index => "YOUR_INDEX_NAME-%{+YYYY.MM.dd}"
    hosts => ["localhost:9200"]
}

创建配置文件后,启动Logstash,它将开始从数据库导入数据。导入大量数据可能会导致一些问题,因此您应该为Logstash分配至少5 GB的RAM,更多的RAM将更好。如果出现问题,则应调整jdbc_fetch_sizejdbc_page_size参数。
如果您想持续地从数据库中下载数据,例如仅下载最新的数据,请阅读有关sql_last_value参数的信息。
编辑:
您还可以使用Amazon Elasticsearch输出插件将索引输出到AWS ES,因此您无需配置端点,您可以在这里了解如何安装插件。 使用插件的输出配置:
output {
    amazon_es {
        hosts => ["foo.us-east-1.es.amazonaws.com"]
        region => "us-east-1"
        aws_access_key_id => 'ACCESS_KEY'
        aws_secret_access_key => 'SECRET_KEY'
        index => "YOUR_INDEX_NAME-%{+YYYY.MM.dd}"
        }
}

一段说明如何使用该插件集成Logstash的视频: https://www.oreilly.com/learning/how_do_i_integrate_logstash_with_amazons_elasticsearch_service_es

这能在AWS ES上使用吗?AWS-ES有10MB,100MB的插入限制,具体取决于群集大小。 - Divyesh Kalbhor
它应该可以无任何问题地工作,我已经更新了答案,并添加了一个输出选项。我没有听说过任何插入限制,你能提供来源吗? - Michael Dz
嘿,迈克尔,我尝试了你提到的一切,但是当我尝试运行命令时,它会给我一个错误。 C:\ logstash-6.4.0 \ bin> logstash.bat -e C:\ logstash-6.4.0 \ config \ POC.conf启动Logstash {"logstash.version" =>“6.4.0”} [2018-09-10T17:16:21,405] [ERROR] [logstash.agent]无法执行操作{:action=>LogStash :: PipelineAction :: Create / pipeline_id:main,:exception =>“LogStash :: ConfigurationError”,:message =>“在第3行,第1列(字节76)之后期望#,input,filter,output之一”。 - Divyesh Kalbhor
看起来你的配置文件不正确。你应该为这个错误提出一个新问题。 - Michael Dz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接