查询大量ndb实体的最佳实践

62
我在使用App Engine数据存储时遇到了一个有趣的限制。我正在创建一个处理程序,帮助我们分析生产服务器上的某些使用数据。为了执行分析,我需要查询并汇总从数据存储中提取的10,000多个实体。计算并不困难,只是对使用样本的特定过滤器的项目的直方图。我遇到的问题是,在达到查询截止时间之前,我无法快速地从数据存储中获取数据以进行任何处理。
我已经尝试了我所能想到的一切方法来将查询分成并行RPC调用以提高性能,但根据appstats,我似乎无法使查询实际上并行执行。无论我尝试什么方法(见下文),它似乎总是RPC回退到顺序下一个查询的瀑布流。
注意:查询和分析代码确实有效,只是因为我无法快速地从数据存储中获取数据而运行缓慢。
背景:
我没有可以分享的实时版本,但这是我所谈论的系统部分的基本模型:
class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

您可以将样本视为用户使用给定名称的功能的时间。(例如:“systemA.feature_x”)。标签基于客户详细信息、系统信息和功能。例如:['winxp','2.5.1','systemA','feature_x','premium_account'])。因此,标签形成了一组非规范化的令牌,可以用来查找感兴趣的样本。
我尝试进行的分析包括选择一个日期范围,并询问每天(或每小时)每个客户帐户(公司而非用户)使用了多少次一个特定的功能集合(也许是所有功能)。
因此,处理程序的输入应该是以下内容:
- 开始日期 - 结束日期 - 标签
输出将会是:
[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

查询的通用代码

以下是所有查询通用的一些代码。处理程序的一般结构是使用webapp2的简单get处理程序,设置查询参数,运行查询,处理结果,创建要返回的数据。

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

尝试过的方法

我已经尝试了多种方法,以尽可能快速且并行地从数据存储中获取数据。目前为止,我尝试过以下方法:

A. 单次迭代

这是一个更简单的基础案例,用于与其他方法进行比较。我只需构建查询并遍历所有项,让ndb依次提取它们。

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

B. 大型获取

这里的想法是尝试进行单个非常大的获取。

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C. 跨时间范围的异步获取数据

这里的想法是要认识到样本在时间上分布得相当均匀,因此可以创建一组独立的查询,将整个时间区域分成若干块,并尝试使用异步并行运行每个查询:

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D. 异步映射

我尝试使用这种方法是因为文档描述Query.map_async方法时暗示ndb可以自动利用一些并行处理。

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

结果

我尝试了一个例子查询,以收集总响应时间和appstats跟踪。结果如下:

A. 单次迭代

真实时间:15.645秒

这个选项按顺序逐个获取批次,然后从memcache检索每个会话。

Method A appstats

B. 大型获取

真实时间:12.12秒

与选项A基本相同,但由于某种原因速度更快。

Method B appstats

C. 跨时间范围异步获取

真实时间:15.251秒

似乎提供了更多的并行性,但在迭代结果期间调用next的一系列调用似乎减慢了速度。此外,似乎无法将会话memcache查找与挂起的查询重叠。

Method C appstats

D. 异步映射

真实时间:13.752秒

对我来说,这是最难理解的。它看起来有很多重叠,但所有内容似乎都呈瀑布状而不是并行。

Method D appstats

建议

基于这一切,我错过了什么?我是否只是在触及App Engine的限制,还是有更好的方法以并行方式获取大量实体?

我对下一步该尝试什么感到困惑。我考虑重新编写客户端,以并行方式向app engine发出多个请求,但这似乎相当暴力。我真的希望app engine能够处理此用例,因此我猜测我可能错过了什么。

更新

最终,我发现选项C是最适合我的情况的。我能够将其优化为6.1秒内完成。虽然还不完美,但要好得多。

在得到几位专业人士的建议后,我发现以下几点至关重要:

  • 多个查询可以并行运行
  • 一次只能有10个RPC正在进行
  • 尝试去规范化,使没有二次查询
  • 这种类型的任务最好交给映射减少和任务队列,而不是实时查询

所以我做了什么来使它更快:

  • 我从一开始就基于时间对查询空间进行了分区。(提示:在返回的实体方面,分区越均等,效果越好)
  • 我进一步去规范化数据,以消除对二次会话查询的需求。
  • 我利用ndb异步操作和wait_any()来重叠查询和处理。

尽管目前仍未达到我所期望或希望的性能水平,但现在它可行。我只希望有一种更好的方法可以快速地将大量连续的实体拉入处理程序内存。


我已经取得了一些进展,并在不到9秒的时间内使选项C正常工作。我认为我可以进一步优化它。我发现,如果将初始查询分成40个部分,并同时发送一个包含所有会话实体的查询,那么大部分RPC时间可以重叠。目前,我最好的努力是在实际时间为9秒的情况下完成245秒的RPC总时间。我将尝试更多的选项,并在后续发布哪种方法效果最佳。与此同时,请让我知道是否有其他人有更多的想法。 - Allen
嗨,我知道这个问题很久了,但是关于D. Async Mapping,你的addCountForPeriod方法是否正在写入数据存储?如果是的话,那么我认为这可能会导致级联,因为异步数据存储操作和同步数据存储操作混合在一起。 - Rob Curtis
感谢您的出色文章。我在这里发布了一个类似的问题后发现了它:https://dev59.com/AYPba4cB1Zd3GeqPqVH0。像您一样,我很沮丧无法提高异步查询的性能。至少我想了解为什么它们如此缓慢。 - Michael Pedersen
我遇到了相同的性能问题,试图在这里 #26759950 找到一个更通用的解决方案(https://stackoverflow.com/questions/26759950/gae-aggregate-work-results-from-tasks-for-a-gae-query-performance-issue)。 - thomasf1
1
这个问题应该放在StackOverflow的常规问答部分,作为一个正确提问的示例。http://stackoverflow.com/help/how-to-ask - András Szepesházi
4个回答

8

不应该在用户请求中执行这样的大型处理,因为用户请求有60秒的时间限制。相反,应该在支持长时间运行请求的上下文中执行。任务队列支持长达10分钟的请求,并且(我相信)正常的内存限制(默认情况下,F1实例具有128MB内存)。对于更高的限制(无请求超时,1GB以上的内存),请使用后端

这里有一些尝试的方法:设置一个URL,当访问它时,会触发任务队列任务。它返回一个网页,每 ~5s 轮询另一个 URL,如果任务队列完成了,则响应 true / false。任务队列处理数据需要几十秒钟,然后将结果保存到数据存储区,作为计算出的数据或呈现的网页。一旦初始页面检测到它已经完成,用户将被重定向到页面,该页面从数据存储区获取现在计算出的结果。


我一直在考虑使用后端。我仍然希望能够在正常的截止日期内使查询工作,但如果这不起作用,我将回退到使用后端来运行它,就像你所描述的那样。由于我的瓶颈之一是将所有会话对象加载到本地缓存中,如果我可以始终将所有会话保留在内存中,那么使用后端也可能会获得性能提升的方法。 - Allen
1
那并没有回答问题。问题具体是关于数据存储应该如何工作,但它并没有正常工作。当需要获取100,000或1M个实体时,任务队列和后端也存在同样的问题。数据存储速度慢且昂贵。 - ZiglioUK
1
请查看Martin Berends的MapReduce答案。后端已被弃用。有一个很好的指南描述了迁移过程:https://cloud.google.com/appengine/docs/python/modules/converting - Josep Valls

2
新的实验性数据处理功能(一个用于MapReduce的AppEngine API)看起来非常适合解决这个问题。它会自动分片以执行多个并行工作进程。

1
我有一个类似的问题,并且在与Google支持团队合作了几周后,我可以确认至少在2017年12月还没有神奇的解决方案。
简而言之:标准SDK在B1实例上运行时,每秒可以期望处理220个实体;而在B8实例上运行经过修补的SDK时,每秒可以期望处理900个实体。
限制是与CPU相关的,更改实例类型直接影响性能。这得到了在B4和B4_1G实例上获得的类似结果的确认。
对于大约有30个字段的Expando实体,我获得的最佳吞吐量是:
标准GAE SDK
- B1实例:每秒约为220个实体 - B2实例:每秒约为250个实体 - B4实例:每秒约为560个实体 - B4_1G实例:每秒约为560个实体 - B8实例:每秒约为650个实体
修补的GAE SDK
- B1实例:每秒约为420个实体 - B8实例:每秒约为900个实体

针对标准GAE SDK,我尝试了多种方法,包括多线程,但最好的方法是使用fetch_asyncwait_any。当前的NDB库已经在底层使用了异步和未来对象,因此任何试图仅使用线程来推动它的尝试都会使情况变得更糟。

我发现了两种优化的有趣方法:

Matt Faus很好地解释了这个问题:

GAE SDK提供了一个API,用于将从数据存储中派生的对象读取和写入数据。这样可以避免验证从数据存储返回的原始数据并重新打包成易于使用的对象的繁琐工作。特别地,GAE使用协议缓冲区将原始数据从存储传输到需要它的前端机器。然后,SDK负责解码此格式并向您的代码返回干净的对象。这个实用程序很棒,但有时会做比您想要的更多的工作。[...] 使用我们的分析工具,我发现完全50%的时间花费在获取这些实体期间是在protobuf-to-python-object解码阶段。这意味着前端服务器上的CPU是这些数据存储读取的瓶颈!

GAE-data-access-web-request

这两种方法都试图通过减少解码的字段数量来减少protobuf到Python解码所花费的时间。

我尝试过这两种方法,但只有Matt的成功了。自从Evan发布他的解决方案以来,SDK内部已经发生了变化。我不得不略微修改Matt在此处发布的代码,但这很容易 - 如果有兴趣,我可以发布最终代码。

对于一个具有大约30个字段的常规Expando实体,我使用了Matt的解决方案仅解码了少数字段,并获得了显着的改进。

总之,需要做好计划,不要期望能够处理超过几百个实体的“实时”GAE请求。


0

在App Engine上进行大数据操作最好使用某种mapreduce操作。

以下是描述该过程的视频,但包括BigQuery https://developers.google.com/events/io/sessions/gooio2012/307/

听起来你似乎不需要BigQuery,但你可能想要同时使用管道的Map和Reduce部分。

你正在做的事情与mapreduce情况之间的主要区别在于,你启动一个实例并通过查询进行迭代,而在mapreduce中,你将为每个查询运行一个单独的实例并行运行。 你需要一个reduce操作来“汇总”所有数据,并将结果写入某个位置。

你还面临的另一个问题是应使用游标进行迭代。 https://developers.google.com/appengine/docs/java/datastore/queries#Query_Cursors

如果迭代器使用查询偏移量,它会很低效,因为偏移量会发出相同的查询,跳过许多结果,并给出下一个集合,而游标则直接跳转到下一个集合。


你能展示一个简单的例子来说明如何使用你的方法并行获取实体吗?我以为一个 Tasklet 可以处理这个问题,但事实上好像并不是这样。 - aschmid00
我不使用游标,因为没有查询会在中途重新开始。它们都立即获取所有实体,没有偏移量。至于映射减少,我考虑过,但这不是离线分析,而是旨在成为内部用户动态更改的实时查询,以便他们探索数据。我对映射减少的理解是,它不适用于这种实时交互式用例。 - Allen
我可能做出了错误的假设,我认为C中的datastore_v3.Next调用是由于使用了一些基于偏移量的迭代器。根据我的经验,Mapreduce不适合交互式用例,因为a)您无法预测操作需要多长时间,b)通常必须将结果写入数据存储而不是接收可以放在模板上的简单结果。客户端方面会有点棘手,我认为您需要一种轮询方式来查看结果是否准备好。但是,由于并行性质,它往往比序列化查询更快。 - dragonx
同意MapReduce可以并行化。我只是希望ndb和异步操作也能够并行化,以满足我的使用情况。我不需要并行化计算,只需要并行化数据检索。我还考虑使用urlfetch编写多级处理程序,将请求分配给子处理程序以获取数据,然后在父处理程序中收集和处理数据。这似乎有更简单的方法。 - Allen
我认为你不可能在实时查询中可靠地完成这个任务,特别是当你的数据集(返回结果)变得更大时。 - Tim Hoffman
同意MapReduce是正确的选择,在我的有限经验中,我发现它的性能比我的查询要好得多,不确定为什么。遗憾的是,Google没有维护他们自己的MR包。我想知道是否有任何工作正在改进数据存储和其糟糕的性能和成本,所有的工作似乎都在为GCE和云存储而进行。 - ZiglioUK

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接