ElasticSearch 更新不是即时的,如何等待ElasticSearch完成更新它的索引?

41
我将尝试提高针对ElasticSearch的测试套件的性能。
由于Elasticsearch在更新后不会立即更新其索引,因此测试需要很长时间。例如,以下代码运行而不会引发断言错误。
from elasticsearch import Elasticsearch
elasticsearch = Elasticsearch('es.test')

# Asumming that this is a clean and empty elasticsearch instance
elasticsearch.update(
     index='blog',
     doc_type=,'blog'
     id=1,
     body={
        ....
    }
)

results = elasticsearch.search()
assert not results
# results are not populated

目前我们针对这个问题的临时解决方案是在代码中加入time.sleep调用,以给ElasticSearch一些时间来更新其索引。

from time import sleep
from elasticsearch import Elasticsearch
elasticsearch = Elasticsearch('es.test')

# Asumming that this is a clean and empty elasticsearch instance
elasticsearch.update(
     index='blog',
     doc_type=,'blog'
     id=1,
     body={
        ....
    }
)

# Don't want to use sleep functions
sleep(1)

results = elasticsearch.search()
assert len(results) == 1
# results are now populated

显然,这不太好,因为它相当容易失败。假设ElasticSearch更新其索引所需时间超过一秒钟,尽管这很不可能,测试将失败。而且当您运行像这样的数百个测试时,速度非常慢。

我试图解决这个问题,通过查询待处理集群任务,以查看是否有剩余任务需要完成。但是这并不起作用,代码将在没有断言错误的情况下运行。

from elasticsearch import Elasticsearch
elasticsearch = Elasticsearch('es.test')

# Asumming that this is a clean and empty elasticsearch instance
elasticsearch.update(
     index='blog',
     doc_type=,'blog'
     id=1,
     body={
        ....
    }
)

# Query if there are any pending tasks
while elasticsearch.cluster.pending_tasks()['tasks']:
    pass

results = elasticsearch.search()
assert not results
# results are not populated

基本上,回到我的最初问题,ElasticSearch 的更新不是立即执行的,你怎样等待 ElasticSearch 完成索引更新呢?
5个回答

51

从5.0.0版本开始,Elasticsearch有一个选项:

 ?refresh=wait_for

关于索引、更新、删除和批量API,可以通过这种方式使请求直到结果在ElasticSearch中可见才收到响应(耶!)

有关更多信息,请参见 https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-refresh.html

编辑:看起来这个功能已经是最新的Python Elasticsearch API的一部分了: https://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.Elasticsearch.index

将您的elasticsearch.update更改为:

elasticsearch.update(
     index='blog',
     doc_type='blog'
     id=1,
     refresh='wait_for',
     body={
        ....
    }
)

而且您不应该需要任何睡眠或轮询。


1
但是如果索引和搜索两个操作是由两个不同的非同步进程或简单地不在同一个调用栈中执行,会发生什么?我的意思是,索引文档和搜索该文档可以完全是由2个不同的线程执行的,"wait_for" 在这种情况下似乎不是一个好的解决方案。我如何确保我的搜索,在文档索引1秒后进行,将检索到此文档?我不想每次进行搜索时都显式调用ES刷新此索引。 - Rayyyz

6

对我来说似乎可以工作:

els.indices.refresh(index)
els.cluster.health(wait_for_no_relocating_shards=True,wait_for_active_shards='all')

4

Elasticsearch进行近实时搜索。更新/索引的文档不会立即可搜索,只有在下一次刷新操作后才能搜索。刷新计划每1秒钟执行一次。

要在更新/索引后检索文档,应该使用GET API。默认情况下,get API是实时的,并且不受索引刷新速率的影响。这意味着如果更新/索引正确完成,则应在GET请求的响应中看到修改。

如果您坚持使用SEARCH API来检索更新/索引后的文档。然后根据文档,有3种解决方案

  • 等待刷新间隔
  • 在索引/更新/删除请求中设置?refresh option选项
  • 使用Refresh API显式地完成索引/更新请求后的刷新(POST _refresh)。但请注意,刷新是资源密集型的。

2
如果您使用批量助手,可以按照以下方式进行操作:
from elasticsearch.helpers import bulk    
bulk(client=self.es, actions=data, refresh='wait_for')

0

如果你不想等待集群刷新间隔,你也可以调用elasticsearch.Refresh('blog')


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接