如何从Google App Engine的延迟任务中返回数据

5

原问题

我有一个正在升级的web应用程序的工作版本,我遇到了一个问题:在单个HTTP请求期间执行的任务需要太长的时间。该应用程序通过HTTP Post操作从JavaScript前端获取JSON列表,并返回该列表的已排序/已切片版本。随着输入列表的长度增加,排序操作的执行时间变得更长(显然),因此在长度合适的输入列表上,我会遇到60秒的HTTP请求超时,导致应用程序失败。

我想开始使用deferred库来执行排序任务,但我不清楚如何在执行该任务后存储/检索数据。以下是我的当前代码:

class getLineups(webapp2.RequestHandler):
  def post(self):
    jsonstring = self.request.body
    inputData = json.loads(jsonstring)
    playerList = inputData["pList"]
    positions = ["QB","RB","WR","TE","DST"]

    playersPos = sortByPos(playerList,positions)
    rosters, playerUse = getNFLRosters(playersPos, positions)
    try:
      # This step is computationally expensive, it will fail on large player lists.
      lineups = makeLineups(rosters,playerUse,50000)

      self.response.headers["Content-Type"] = "application/json"
      self.response.out.write(json.dumps(lineups))
    except:
      logging.error("60 second timeout reached on player list of length:", len(playerList))
      self.response.headers["Content-Type"] = "text/plain"
      self.response.set_status(504)

app = webapp2.WSGIApplication([
  ('/lineup',getLineups),
], debug = True)

理想情况下,我希望用延迟任务库调用来替换整个try/except块:
deferred.defer(makeLineups,rosters,playerUse,50000)

但我不确定如何从该操作中获取结果。 我认为我必须将其存储在Datastore中,然后检索它,但是我的JavaScript前端如何知道操作何时完成? 我已经阅读了Google网站上的文档,但仍然不清楚如何完成此任务。

我的解决方法

使用接受答案中的基本概述,以下是我解决这个问题的方法:

def solveResult(result_key):
  result = result_key.get()

  playersPos = sortByPos(result.playerList, result.positions)
  rosters, playerUse = getNFLRosters(playersPos,result.positions)

  lineups = makeLineups(rosters,playerUse,50000)
  storeResult(result_key,lineups)

@ndb.transactional
def storeResult(result_key,lineups):
  result = result_key.get()
  result.lineups = lineups
  result.solveComplete = True
  result.put()

class Result(ndb.Model):
  playerList = ndb.JsonProperty()
  positions = ndb.JsonProperty()
  solveComplete = ndb.BooleanProperty()

class getLineups(webapp2.RequestHandler):
  def post(self):
    jsonstring = self.request.body
    inputData = json.loads(jsonstring)

    deferredResult = Result(
      playerList = inputData["pList"],
      positions = ["QB","RB","WR","TE","DST"],
      solveComplete = False
    )

    deferredResult_key = deferredResult.put()

    deferred.defer(solveResult,deferredResult_key)

    self.response.headers["Content-Type"] = "text/plain"
    self.response.out.write(deferredResult_key.urlsafe())

class queryResults(webapp2.RequestHandler):
  def post(self):
    safe_result_key = self.request.body
    result_key = ndb.Key(urlsafe=safe_result_key)

    result = result_key.get()
    self.response.headers["Content-Type"] = "application/json"

    if result.solveComplete:
      self.response.out.write(json.dumps(result.lineups))
    else:
      self.response.out.write(json.dumps([]))

Javascript前端会在固定的时间内轮询queryLineups URL,如果时间限制到期或接收到数据,则停止轮询。希望这对那些试图解决类似问题的人有所帮助。如果出现问题,我还需要做一些工作来使其优雅地失败,但这个方法有效,只需要改进。

您是否可以分享JavaScript轮询代码? - Shan
3个回答

4
我不熟悉GAE,但这是一个非常普遍的问题,我可以给你一些建议。您的总体想法是正确的,因此我将在此基础上进行扩展。您的工作流程可能如下所示:
1.您收到创建阵容的请求。您在数据存储中为其创建一个新实体。它应包含ID(稍后需要检索结果)和状态(PENDING|DONE|FAILED)。如果有用的话,您也可以保存请求中的数据。
2.您延迟计算并立即返回响应。该响应将包含任务的ID。当计算完成时,它将在数据存储中保存任务的结果并更新任务的状态。该结果将包含任务ID,以便我们可以轻松地找到它。
3.一旦前端接收到ID,它就开始轮询结果。使用setTimeout或setInterval,您向服务器发送带有任务ID的请求(这是一个单独的端点)。服务器检查任务的状态,如果完成,则返回结果(如果失败则返回错误)。
4.前端获取数据并停止轮询。

非常好的通用答案。避免轮询的替代方法是使用WebSockets或一些等效技术,例如Channel API - 对于前者,在GAE中,您将在MVM上运行一个模块(以获取对完整网络堆栈的访问权限) - 对于后者,可能是手动或基本缩放模块。但在这种情况下,轮询可能更简单,更可靠,除非服务器的负载非常高,否则会成为问题。 - Alex Martelli
1
或者,您可以使用通道API而不是轮询,并在任务结束时接收更新。因此,在第2步中,您打开一个通道并跳过第3步。 - Dzmitry Lazerka
谢谢您提供的一般想法。我认为我可以实现类似的东西,我会尝试这种方法。 - Patrick Gray
如果您不喜欢通道API用于带外(事后通知),则还有其他选择。XMPP可以使用,或者可以使用其他类型的发布/订阅服务,例如具有REST API的PubNub。 - Tim Hoffman
@DzmitryLazerka 我尝试使用你提到的延迟通道API,但似乎并没有起作用。你介意看一下这个问题吗: http://stackoverflow.com/questions/35348050/debugging-channel-api-appengine - Shan

0

通常情况下,由于那个原始请求的上下文会消失,您无法回复那个原始请求。也许如果您在不回复的情况下从请求处理程序中返回,并且如果以某种方式不会终止来自客户端的连接,并如果您能够保持处理程序对象,以便稍后在另一个(内部)请求中恢复它,并使用恢复的副本来回复原始请求... 最好只是一次性的尝试。

另一种选择是将操作拆分为序列: -第一个请求开始操作 -之后的一个或多个轮询请求,直到操作完成并且结果可用

如果昂贵的操作主要是在调用操作时可用的数据上执行,可能还有另一种方法。您可以重新组织应用逻辑,以便在相应的数据变得可用时立即计算部分结果,因此当请求最终操作时,它仅对预先计算的部分结果进行操作。一个类比,如果你愿意,就是Google搜索请求立即收到来自预先计算索引的数据回复,而不必等待实际的网页搜索执行。


0
首先,让用户等待1分钟直到页面加载已经很糟糕了。一般来说,面向用户的HTTP请求不应该超过1秒钟。GAE给出的60秒已经太慷慨了,对于关键情况而言。
我有几个建议,但我不知道你的应用程序需要什么。
  1. 预先计算。在用户请求之前加载、计算和存储lineups的值。为此,您可以利用GAE后端实例,其运行时间可以比60秒长得多。
  2. 用户真的需要那么多数据吗?通常,如果有太多数据使计算机难以对其进行排序,那么向用户显示它已经太多了。也许您的用户只需要看到其中的一小部分(例如前10名球员或一些聚合统计数据)。这时,改进makeLineups()算法就可以解决问题。
  3. 推迟执行。如果不能执行1或2,则选项是将计算推迟到任务API中。对于此功能,您的前端应该:
  4. 使用任务队列将任务排队:https://cloud.google.com/appengine/docs/python/taskqueue/
    • 使用通道API打开用户通道:https://cloud.google.com/appengine/docs/python/channel/
    • 将该用户的channel_id保存到Datastore中。
    • 完成调用。在用户界面上显示消息,如“请等待,我们正在计算数据”。
    • 同时,GAE后端执行您排队的任务。任务计算makeLineups()的值。完成后,任务将从Datastore获取channel_id,并将计算出的lineups值发送到该通道。
    • 用户前端接收到值后,使用户感到满意。
  5. 除了任务API外,还有新的后台线程可用于您的情况:https://cloud.google.com/appengine/docs/python/modules/#Python_Background_threads 基本上,您只需调用background_thread.BackgroundThread()而不是将任务排队,其余部分保持不变。 更新 这仅适用于后端模块(基本或手动缩放,而不是自动)。在前端(默认)模块上,自定义线程不能存活超过HTTP请求,并受到60秒的限制。

如果有帮助,请告诉我。


我很感激你的建议,以下是我的回应:
  1. 这是一个有趣的想法,我以前考虑过。我仍在研究如何预先计算大量数据,在当前设置中,无法将单个长步骤分解为更小的步骤。
  2. 不幸的是,是的,这是一个旨在生成非常大数据集的应用程序。该算法正在解决计算密集型的背包问题。我正在研究改进它的方法。 3-5. 这些建议非常有帮助(实际上所有5条都很有用),我非常感谢您的帮助。
- Patrick Gray
1
前端任务的后台线程不会有所帮助,因为它们无法超越请求的结束,并且受限于60秒(由于请求截止时间)。 - Tim Hoffman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接