我已经尝试了我所能想到的一切方法来将查询分成并行RPC调用以提高性能,但根据appstats,我似乎无法使查询实际上并行执行。无论我尝试什么方法(见下文),它似乎总是RPC回退到顺序下一个查询的瀑布流。
注意:查询和分析代码确实有效,只是因为我无法快速地从数据存储中获取数据而运行缓慢。
背景:
我没有可以分享的实时版本,但这是我所谈论的系统部分的基本模型:
class Session(ndb.Model):
""" A tracked user session. (customer account (company), version, OS, etc) """
data = ndb.JsonProperty(required = False, indexed = False)
class Sample(ndb.Model):
name = ndb.StringProperty (required = True, indexed = True)
session = ndb.KeyProperty (required = True, kind = Session)
timestamp = ndb.DateTimeProperty(required = True, indexed = True)
tags = ndb.StringProperty (repeated = True, indexed = True)
您可以将样本视为用户使用给定名称的功能的时间。(例如:“systemA.feature_x”)。标签基于客户详细信息、系统信息和功能。例如:['winxp','2.5.1','systemA','feature_x','premium_account'])。因此,标签形成了一组非规范化的令牌,可以用来查找感兴趣的样本。
我尝试进行的分析包括选择一个日期范围,并询问每天(或每小时)每个客户帐户(公司而非用户)使用了多少次一个特定的功能集合(也许是所有功能)。
因此,处理程序的输入应该是以下内容:
- 开始日期 - 结束日期 - 标签
输出将会是:
[{
'company_account': <string>,
'counts': [
{'timeperiod': <iso8601 date>, 'count': <int>}, ...
]
}, ...
]
查询的通用代码
以下是所有查询通用的一些代码。处理程序的一般结构是使用webapp2的简单get处理程序,设置查询参数,运行查询,处理结果,创建要返回的数据。
# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500 # Bring in large groups of entities
q = Sample.query()
q = q.order(Sample.timestamp)
# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))
def handle_sample(sample):
session_obj = sample.session.get() # Usually found in local or memcache thanks to ndb
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
尝试过的方法
我已经尝试了多种方法,以尽可能快速且并行地从数据存储中获取数据。目前为止,我尝试过以下方法:
A. 单次迭代
这是一个更简单的基础案例,用于与其他方法进行比较。我只需构建查询并遍历所有项,让ndb依次提取它们。
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)
for sample in q_iter:
handle_sample(sample)
B. 大型获取
这里的想法是尝试进行单个非常大的获取。
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)
for sample in samples:
handle_sample(sample)
C. 跨时间范围的异步获取数据
这里的想法是要认识到样本在时间上分布得相当均匀,因此可以创建一组独立的查询,将整个时间区域分成若干块,并尝试使用异步并行运行每个查询:
# split up timestamp space into 20 equal parts and async query each of them
ts_delta = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []
for x in range(ts_intervals):
cur_end_time = (cur_start_time + ts_delta)
if x == (ts_intervals-1): # Last one has to cover full range
cur_end_time = end_time
f = q.filter(Sample.timestamp >= cur_start_time,
Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
q_futures.append(f)
cur_start_time = cur_end_time
# Now loop through and collect results
for f in q_futures:
samples = f.get_result()
for sample in samples:
handle_sample(sample)
D. 异步映射
我尝试使用这种方法是因为文档描述Query.map_async方法时暗示ndb可以自动利用一些并行处理。
q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
@ndb.tasklet
def process_sample(sample):
period_ts = getPeriodTimestamp(sample.timestamp)
session_obj = yield sample.session.get_async() # Lookup the session object from cache
count_key = session_obj.data['customer']
addCountForPeriod(count_key, sample.timestamp)
raise ndb.Return(None)
q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()
结果
我尝试了一个例子查询,以收集总响应时间和appstats跟踪。结果如下:
A. 单次迭代
真实时间:15.645秒
这个选项按顺序逐个获取批次,然后从memcache检索每个会话。
B. 大型获取
真实时间:12.12秒
与选项A基本相同,但由于某种原因速度更快。
C. 跨时间范围异步获取
真实时间:15.251秒
似乎提供了更多的并行性,但在迭代结果期间调用next的一系列调用似乎减慢了速度。此外,似乎无法将会话memcache查找与挂起的查询重叠。
D. 异步映射
真实时间:13.752秒
对我来说,这是最难理解的。它看起来有很多重叠,但所有内容似乎都呈瀑布状而不是并行。
建议
基于这一切,我错过了什么?我是否只是在触及App Engine的限制,还是有更好的方法以并行方式获取大量实体?
我对下一步该尝试什么感到困惑。我考虑重新编写客户端,以并行方式向app engine发出多个请求,但这似乎相当暴力。我真的希望app engine能够处理此用例,因此我猜测我可能错过了什么。
更新
最终,我发现选项C是最适合我的情况的。我能够将其优化为6.1秒内完成。虽然还不完美,但要好得多。
在得到几位专业人士的建议后,我发现以下几点至关重要:
- 多个查询可以并行运行
- 一次只能有10个RPC正在进行
- 尝试去规范化,使没有二次查询
- 这种类型的任务最好交给映射减少和任务队列,而不是实时查询
所以我做了什么来使它更快:
- 我从一开始就基于时间对查询空间进行了分区。(提示:在返回的实体方面,分区越均等,效果越好)
- 我进一步去规范化数据,以消除对二次会话查询的需求。
- 我利用ndb异步操作和wait_any()来重叠查询和处理。
尽管目前仍未达到我所期望或希望的性能水平,但现在它可行。我只希望有一种更好的方法可以快速地将大量连续的实体拉入处理程序内存。