使用Amazon SWF在服务器之间进行通信

9
使用Amazon SWF在服务器之间通信?
  1. 在服务器A上运行脚本A
  2. 当脚本A完成后,向服务器B发送一条消息以运行脚本B
  3. 如果脚本B成功完成,则将其从工作流队列中清除
我很难弄清楚如何使用Boto和SWF组合来实现这一点。我不需要完整的代码,但我需要的是有人能否更详细地解释一下涉及到的内容。
  • 我该如何告诉服务器B检查脚本A的完成情况?
  • 如何确保服务器A不会捕捉到脚本A的完成并尝试运行脚本B(因为应该由服务器B运行)?
  • 我该如何通知SWF脚本A已经完成?是否有一个标志或消息?
我对所有这些都感到困惑。我应该使用哪种设计?
4个回答

17
我认为你提出了一些非常好的问题,突显了SWF作为服务的帮助性。简而言之,您不需要告诉您的服务器彼此协调工作。您的决策者将在SWF服务的帮助下为您编排所有这些内容。
您的工作流实现将按以下步骤进行:
1. 使用服务注册您的工作流及其活动(一次性)。 2. 实现决策者和工作者。 3. 让您的工作者和决策者运行。 4. 启动新的工作流。
有许多方法可以将凭据提供给boto.swf的代码。针对此练习,我建议在运行以下代码之前将它们导出到环境中。
export AWS_ACCESS_KEY_ID=<your access key>
export AWS_SECRET_ACCESS_KEY=<your secret key>

1)注册域名,执行以下工作流和活动:

# ab_setup.py
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()

2) 实现并运行决策者和工作者。

# ab_decider.py
import time
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

class ABDecider(swf.Decider):

    domain = DOMAIN
    task_list = 'default_tasks'
    version = VERSION

    def run(self):
        history = self.poll()
        # Print history to familiarize yourself with its format.
        print history
        if 'events' in history:
            # Get a list of non-decision events to see what event came in last.
            workflow_events = [e for e in history['events']
                               if not e['eventType'].startswith('Decision')]
            decisions = swf.Layer1Decisions()
            # Record latest non-decision event.
            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']
            if last_event_type == 'WorkflowExecutionStarted':
                # At the start, get the worker to fetch the first assignment.
                decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()),
                   ACTIVITY1, VERSION, task_list='a_tasks')
            elif last_event_type == 'ActivityTaskCompleted':
                # Take decision based on the name of activity that has just completed.
                # 1) Get activity's event id.
                last_event_attrs = last_event['activityTaskCompletedEventAttributes']
                completed_activity_id = last_event_attrs['scheduledEventId'] - 1
                # 2) Extract its name.
                activity_data = history['events'][completed_activity_id]
                activity_attrs = activity_data['activityTaskScheduledEventAttributes']
                activity_name = activity_attrs['activityType']['name']
                # 3) Optionally, get the result from the activity.
                result = last_event['activityTaskCompletedEventAttributes'].get('result')

                # Take the decision.
                if activity_name == ACTIVITY1:
                    # Completed ACTIVITY1 just came in. Kick off ACTIVITY2.
                    decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()),
                        ACTIVITY2, VERSION, task_list='b_tasks', input=result)
                elif activity_name == ACTIVITY2:
                    # Server B completed activity. We're done.
                    decisions.complete_workflow_execution()

            self.complete(decisions=decisions)
            return True

工人们更加简单,如果您不想使用继承,则无需使用它。
# ab_worker.py
import os
import time
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

class MyBaseWorker(swf.ActivityWorker):

    domain = DOMAIN
    version = VERSION
    task_list = None

    def run(self):
        activity_task = self.poll()
        print activity_task
        if 'activityId' in activity_task:
            # Get input.
            # Get the method for the requested activity.
            try:
                self.activity(activity_task.get('input'))
            except Exception, error:
                self.fail(reason=str(error))
                raise error

            return True

    def activity(self, activity_input):
        raise NotImplementedError

class WorkerA(MyBaseWorker):
    task_list = 'a_tasks'

    def activity(self, activity_input):
        result = str(time.time())
        print 'worker a reporting time: %s' % result
        self.complete(result=result)

class WorkerB(MyBaseWorker):
    task_list = 'b_tasks'

    def activity(self, activity_input):
        result = str(os.getpid())
        print 'worker b returning pid: %s' % result
        self.complete(result=result)

3) 运行您的决策器和工作者。 您的决策器和工作者可以从不同的主机运行,也可以从同一台计算机运行。打开四个终端并运行您的操作者:

首先是您的决策器

$ python -i ab_decider.py 
>>> while ABDecider().run(): pass
... 

那么 A 工作者,你可以从服务器 A 上执行此操作:

$ python -i ab_workers.py 
>>> while WorkerA().run(): pass

然后是工作人员B,可能来自服务器B,但如果您将它们全部从笔记本电脑运行,也可以正常工作:

$ python -i ab_workers.py 
>>> while WorkerB().run(): pass
... 

4) 最后,启动工作流程。

$ python
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41) 
[GCC 4.4.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto.swf.layer2 as swf
>>> workflows = swf.Domain(name='stackoverflow').workflows()
>>> workflows
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>]
>>> execution = workflows[0].start(task_list='default_tasks')
>>> 

切换回去查看您的演员会发生什么。他们可能在一分钟的不活动后断开与服务的连接。如果发生这种情况,请按箭头向上+Enter重新进入轮询循环。

现在您可以转到AWS管理控制台的SWF面板,查看执行情况并查看其历史记录。或者,您可以通过命令行查询。

>>> execution.history()
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted', 
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'}, 
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy': 
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version': 
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2, 
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes': 
{'startToCloseTimeout': '300', 'taskList': {'name': ...

那只是一个串行执行活动的工作流程示例,但是决策者也可以安排和协调并行执行活动
我希望这至少能让您入门。对于稍微复杂一些的串行工作流示例,我建议查看此内容

1
是的。当工作人员轮询工作时,它会从特定任务列表中请求任务。 - oozie _at_ concourse.farm
工人为什么“在一分钟的不活动后断开与服务的连接”?这是SWF的问题还是Python的问题? - Nate
这是一个SWF的事情,正如http://aws.amazon.com/swf/faqs/#communicate_with_swf所描述的那样,长轮询机制。与传统的短暂HTTP GET请求相比,60秒实际上是非常慷慨的长时间。 - oozie _at_ concourse.farm
有没有关于Layer3的文档?我很难弄清楚如何注册或安排具有不同超时持续时间的活动... - Nate
1
截至昨晚,Layer2 API 参考文档已经可以在 http://docs.pythonboto.org/en/latest/ref/swf.html#module-boto.swf.layer2 上找到。 - oozie _at_ concourse.farm
显示剩余3条评论

5
我没有任何示例代码可以分享,但是您绝对可以使用SWF来协调两个服务器上脚本的执行。主要思路是创建三个与SWF通信的代码片段:
- 一个组件,它知道要先执行哪个脚本以及在第一个脚本执行完成后要执行什么操作。在SWF术语中称为“决策者”。 - 两个组件,每个组件都知道如何在各自的机器上执行特定的脚本。在SWF术语中称为“活动工作者”。
第一个组件(决策者)调用两个SWF API:PollForDecisionTask和RespondDecisionTaskCompleted。轮询请求将提供正在执行的工作流的当前历史记录,基本上是脚本运行程序的“我在哪里”的状态信息。您编写代码查看这些事件并确定应该执行哪个脚本。这些执行脚本的“命令”将以活动任务的形式进行调度,并作为RespondDecisionTaskCompleted的一部分返回。
您编写的第二个组件(活动工作者)各自调用两个SWF API:PollForActivityTask和RespondActivityTaskCompleted。轮询请求将向活动工作者指示它应该执行它所知道的脚本,即SWF称之为活动任务。从轮询请求返回到SWF的信息可以包括作为活动任务调度的一部分发送到SWF的单个执行特定数据。您的每个服务器都将独立地轮询SWF以指示在该主机上执行本地脚本。工作者完成脚本的执行后,通过RespondActivityTaskCompleted API回调到SWF。
从您的活动工作者到SWF的回调会导致向我已经提到的决策者组件分配新历史记录。它将查看历史记录,看到第一个脚本已完成,并安排执行第二个脚本。一旦看到第二个脚本完成,它就可以使用另一种类型的决策“关闭”工作流程。
通过调用StartWorkflowExecution API来启动在每个主机上执行脚本的整个过程。这将在SWF中创建整个过程的记录,并将第一个历史记录传递给决策过程以安排在第一个主机上执行第一个脚本。
希望这为使用SWF完成此类型的工作流程提供了更多上下文信息。如果您还没有,请查看SWF页面上的开发指南获取其他信息。

1

好的例子,

另外,如果您不想将您的凭证导出到环境中,您可以在类内部调用:

swf.set_default_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) 

1

您可以使用 SNS,当脚本 A 完成时,它应该触发 SNS,然后 SNS 将触发 Server B 的通知。


很遗憾,SNS没有我需要的功能。 - Jimmy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接