我认为你提出了一些非常好的问题,突显了SWF作为服务的帮助性。简而言之,您不需要告诉您的服务器彼此协调工作。您的决策者将在SWF服务的帮助下为您编排所有这些内容。
您的工作流实现将按以下步骤进行:
1. 使用服务注册您的工作流及其活动(一次性)。
2. 实现决策者和工作者。
3. 让您的工作者和决策者运行。
4. 启动新的工作流。
有许多方法可以将凭据提供给boto.swf的代码。针对此练习,我建议在运行以下代码之前将它们导出到环境中。
export AWS_ACCESS_KEY_ID=<your access key>
export AWS_SECRET_ACCESS_KEY=<your secret key>
1)注册域名,执行以下工作流和活动:
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()
2) 实现并运行决策者和工作者。
import time
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
class ABDecider(swf.Decider):
domain = DOMAIN
task_list = 'default_tasks'
version = VERSION
def run(self):
history = self.poll()
print history
if 'events' in history:
workflow_events = [e for e in history['events']
if not e['eventType'].startswith('Decision')]
decisions = swf.Layer1Decisions()
last_event = workflow_events[-1]
last_event_type = last_event['eventType']
if last_event_type == 'WorkflowExecutionStarted':
decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()),
ACTIVITY1, VERSION, task_list='a_tasks')
elif last_event_type == 'ActivityTaskCompleted':
last_event_attrs = last_event['activityTaskCompletedEventAttributes']
completed_activity_id = last_event_attrs['scheduledEventId'] - 1
activity_data = history['events'][completed_activity_id]
activity_attrs = activity_data['activityTaskScheduledEventAttributes']
activity_name = activity_attrs['activityType']['name']
result = last_event['activityTaskCompletedEventAttributes'].get('result')
if activity_name == ACTIVITY1:
decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()),
ACTIVITY2, VERSION, task_list='b_tasks', input=result)
elif activity_name == ACTIVITY2:
decisions.complete_workflow_execution()
self.complete(decisions=decisions)
return True
工人们更加简单,如果您不想使用继承,则无需使用它。
import os
import time
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
class MyBaseWorker(swf.ActivityWorker):
domain = DOMAIN
version = VERSION
task_list = None
def run(self):
activity_task = self.poll()
print activity_task
if 'activityId' in activity_task:
try:
self.activity(activity_task.get('input'))
except Exception, error:
self.fail(reason=str(error))
raise error
return True
def activity(self, activity_input):
raise NotImplementedError
class WorkerA(MyBaseWorker):
task_list = 'a_tasks'
def activity(self, activity_input):
result = str(time.time())
print 'worker a reporting time: %s' % result
self.complete(result=result)
class WorkerB(MyBaseWorker):
task_list = 'b_tasks'
def activity(self, activity_input):
result = str(os.getpid())
print 'worker b returning pid: %s' % result
self.complete(result=result)
3) 运行您的决策器和工作者。 您的决策器和工作者可以从不同的主机运行,也可以从同一台计算机运行。打开四个终端并运行您的操作者:
首先是您的决策器
$ python -i ab_decider.py
>>> while ABDecider().run(): pass
...
那么 A 工作者,你可以从服务器 A 上执行此操作:
$ python -i ab_workers.py
>>> while WorkerA().run(): pass
然后是工作人员B,可能来自服务器B,但如果您将它们全部从笔记本电脑运行,也可以正常工作:
$ python -i ab_workers.py
>>> while WorkerB().run(): pass
...
4) 最后,启动工作流程。
$ python
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41)
[GCC 4.4.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto.swf.layer2 as swf
>>> workflows = swf.Domain(name='stackoverflow').workflows()
>>> workflows
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>]
>>> execution = workflows[0].start(task_list='default_tasks')
>>>
切换回去查看您的演员会发生什么。他们可能在一分钟的不活动后断开与服务的连接。如果发生这种情况,请按箭头向上+Enter重新进入轮询循环。
现在您可以转到AWS管理控制台的SWF面板,查看执行情况并查看其历史记录。或者,您可以通过命令行查询。
>>> execution.history()
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted',
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'},
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy':
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version':
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2,
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes':
{'startToCloseTimeout': '300', 'taskList': {'name': ...
那只是一个串行执行活动的工作流程示例,但是决策者也可以
安排和协调并行执行活动。
我希望这至少能让您入门。对于稍微复杂一些的串行工作流示例,我建议
查看此内容。