Elasticsearch 部分批量更新

16

我需要在ElasticSearch中更新6k个数据,并且必须使用PHP。我在文档中搜寻后找到了批量索引,但这并不会保留先前的数据。

我的数据结构如下:

[
  {
    'name': 'Jonatahn',
    'age' : 21
  }
]

我要更新的代码片段:

$params =[
    "index" => "customer",
    "type" => "doc",
    "body" => [
        [
            "index" => [
                "_index" => "customer",
                "_type" => "doc",
                "_id" => "09310451939"
            ]
        ],
        [
            "name" => "Jonathan"
        ]
    ]
];

$client->bulk($params);

当我发送 ['name' => 'Jonathan'] 时,我期望name将被更新并保留age,但是age被删除了。 当然,我仍然可以逐个更新数据,但这需要很长时间,有没有更好的方法呢?

4个回答

12

我的错误在于使用了"index",但正确的方式是使用"update"来实现我想要的功能。

最终代码如下:

$params =[
"index" => "customer",
"type" => "doc",
"body" => [
    [
        "update" => [
    //   ^^^^^^ Here I change from index to update
            "_index" => "customer",
            "_type" => "doc",
            "_id" => "09310451939"
        ]
    ],
    [
        "doc" => [
            "name" => "Jonathan"
        ]
    ]
]
];

$client->bulk($params);

使用上述代码,我的数据保留先前的数据并仅更新我在参数中传递的数据。

响应:

Array
(
    [took] => 7
    [timed_out] =>
    [_shards] => Array
        (
            [total] => 5
            [successful] => 5
            [skipped] => 0
            [failed] => 0
        )

    [hits] => Array
        (
            [total] => 1
            [max_score] => 1
            [hits] => Array
                (
                    [0] => Array
                        (
                            [_index] => customer
                            [_type] => doc
                            [_id] => 09310451939
                            [_score] => 1
                            [_source] => Array
                                (
                                    [name] => Jonathan
                                    [age] => 23
                                )

                        )

                )

        )

)

4
根据文档,Bulk API 的可行操作有索引、创建、删除和更新更新需要在下一行指定部分文档、 upsert 和脚本及其选项。
POST _bulk
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }

我该如何使用PHP实现这个?我使用了上面回答中的代码,但并非所有文档都被更新了,有什么线索吗?谢谢,这是我的问题 https://github.com/elastic/elasticsearch-php/issues/785 - user1642018

3
这是我的最终代码。
<?php

require_once('../elasticsearch.php');

//initialize elasticsearch
$params = array();

$params['index'] = $elastcsearch_index;
$params['type']  = $elastcsearch_type;

///////////////////////////////////////////////////
//update seeders n leechers in elasticsearch 

//get updated records
$get_updated_records = mysqli_query($conn, "SELECT content_id, seeders, leechers FROM content WHERE is_updated = '1' order by seeders DESC") ;

//create blank array
$results = array();

while($row = mysqli_fetch_assoc($get_updated_records)){
    //put all results in array
    $results[] = $row;

}   

//from https://www.elastic.co/guide/en/elasticsearch/client/php-api/current/_indexing_documents.html

$params = ['body' => []];

for($i = 0; $i < count($results); $i++) {

    $params["body"][]= [
            "update" => [
                "_index" => $elastcsearch_index,
                "_type" => $elastcsearch_type,
                "_id" => $results[$i]['content_id']
            ]
        ];

    $params["body"][]= [
            "doc" => [
                "seeders" => intval($results[$i]['seeders']) ,
                "leechers" => intval($results[$i]['leechers']) ,
            ]
        ];

    // Every 1000 documents stop and send the bulk request
     if ($i % 1000 == 0) {
        $responses = $elasticsearch->bulk($params);

        // erase the old bulk request
        $params = ['body' => []];

        // unset the bulk response when you are done to save memory
        unset($responses);
    } 
}

// Send the last batch if it exists
if (!empty($params['body'])) {
    $responses = $elasticsearch->bulk($params);
}

实际上,_type现在已经被弃用了。您不再需要添加它。默认文档类型将被使用。 - Samir Mammadhasanov

0

$batch_elastics 是结果的数组 每次我都会从行中取消这两个值…… 因为在插入或更新时我不需要这个值

unset($batch_row['type']);

unset($batch_row['diamonds_id']);

代码从这里开始...

    if(count($batch_elastics)){
        // echo 'hi';die;
        $params = array();                
        $params = ['body' => []]; 
        $i=1;       
        foreach($batch_elastics as $batch_row){
            $type=$batch_row['type'];
            $id=$batch_row['diamonds_id'];
            unset($batch_row['type']);
            unset($batch_row['diamonds_id']); 
            if($type=="create"){                                    
                $params["body"][]= [
                        "create" => [
                            "_index" => 'diamonds',                                                        
                            "_id" => $id,
                        ]
                    ];        
                    $params["body"][]= $batch_row;                             
                if ($i % 1000 == 0) {
                    $responses = $client->bulk($params);                                
                    $params = ['body' => []];                                
                    unset($responses);
                }
            } 
            $i=$i+1;
        }
        
        // Send the last batch if it exists
        if (!empty($params['body'])) {
            $responses = $client->bulk($params);
        }
        $params = array();                
        $params = ['body' => []]; 
        $i=1; 
        foreach($batch_elastics as $batch_row){
            $type=$batch_row['type'];
            $id=$batch_row['diamonds_id'];
            unset($batch_row['type']);
            unset($batch_row['diamonds_id']); 
            if($type=="update"){                                    
                $params["body"][]= [
                        "update" => [
                            "_index" => 'diamonds',                                                        
                            "_id" => $id,
                        ]
                    ];        
                $params["body"][]= [
                    "doc"=>$batch_row
                ];                           
                if ($i % 1000 == 0) {
                    $responses = $client->bulk($params);                                
                    $params = ['body' => []];                                
                    unset($responses);
                }
            } 
            $i=$i+1;
        }
        
        // Send the last batch if it exists
        if (!empty($params['body'])) {
            $responses = $client->bulk($params);
        }
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接