Amazon are promoting boto3 for future development but do not provide enough documentation for the new boto3.
Does anybody have any example code of using SWF with boto3 that they would care to share?
Amazon are promoting boto3 for future development but do not provide enough documentation for the new boto3.
Does anybody have any example code of using SWF with boto3 that they would care to share?
This is the only example I have found so far:
https://github.com/jhludwig/aws-swf-boto3
So the process overview looks like this (note this is pulled directly from the link above, but with some additional notes added and more of a flow).
It should be noted, SWF operates on the names of things. It's up to your code to give those names an execution meaning. For example, your Decider
will poll and using the Task names decide what's next.
Some things I am not exactly certain on. TASKLIST
references I believe are a kind of namespacing. It's not really a list of things, it's more about isolating things by name. Now I could be totally wrong about that, from my basic understanding, that's what I think it's saying.
You can run your Decider and Workers from ANYWHERE. Since they reach out and up to AWS, if your firewall allows 0.0.0.0/0 egress you will have access.
The AWS Docs also mention you can run a lambda, but I have not found out how to trigger that.
import boto3
from botocore.exceptions import ClientError
swf = boto3.client('swf')
try:
swf.register_domain(
name=<DOMAIN>,
description="Test SWF domain",
workflowExecutionRetentionPeriodInDays="10" # keep history for this long
)
except ClientError as e:
print "Domain already exists: ", e.response.get("Error", {}).get("Code")
With the domain created we now register the workflow:
try:
swf.register_workflow_type(
domain=DOMAIN, # string
name=WORKFLOW, # string
version=VERSION, # string
description="Test workflow",
defaultExecutionStartToCloseTimeout="250",
defaultTaskStartToCloseTimeout="NONE",
defaultChildPolicy="TERMINATE",
defaultTaskList={"name": TASKLIST } # TASKLIST is a string
)
print "Test workflow created!"
except ClientError as e:
print "Workflow already exists: ", e.response.get("Error", {}).get("Code")
With our Workflow registered, we can now begin assign tasks.
You can assign N tasks. Remember, these are mainly strings, your code will give them execution meaning.
try:
swf.register_activity_type(
domain=DOMAIN,
name="DoSomething",
version=VERSION, # string
description="This is a worker that does something",
defaultTaskStartToCloseTimeout="NONE",
defaultTaskList={"name": TASKLIST } # TASKLIST is a string
)
print "Worker created!"
except ClientError as e:
print "Activity already exists: ", e.response.get("Error", {}).get("Code")
With our Domain, Workflow, and Task created, we can now begin a workflow.
import boto3
swf = boto3.client('swf')
response = swf.start_workflow_execution(
domain=DOMAIN # string,
workflowId='test-1001',
workflowType={
"name": WORKFLOW,# string
"version": VERSION # string
},
taskList={
'name': TASKLIST
},
input=''
)
print "Workflow requested: ", response
Note the workflowId
, this is a custom identifier, for example str(uuid.uuid4())
. From the docs:
The user defined identifier associated with the workflow execution. You can use this to associate a custom identifier with the workflow execution. You may specify the same identifier if a workflow execution is logically a restart of a previous execution. You cannot have two open workflow executions with the same workflowId at the same time.
At this point, nothing will happen because we don't have a Decider
running nor any Workers
. Lets see what those look like.
Our decider will poll to get a decide task to make a decision about:
import boto3
from botocore.client import Config
import uuid
botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)
Note the timeout settings above. You can reference this PR to see the rationale behind it:
https://github.com/boto/botocore/pull/634
From the Boto3 SWF docs:
Workers should set their client side socket timeout to at least 70 seconds (10 seconds higher than the maximum time service may hold the poll request).
That PR is what enabled boto3 to do that functionality.
http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_decision_task
print "Listening for Decision Tasks"
while True:
newTask = swf.poll_for_decision_task(
domain=DOMAIN ,
taskList={'name': TASKLIST }, # TASKLIST is a string
identity='decider-1', # any identity you would like to provide, it's recorded in the history
reverseOrder=False)
if 'taskToken' not in newTask:
print "Poll timed out, no new task. Repoll"
elif 'events' in newTask:
eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
lastEvent = eventHistory[-1]
if lastEvent['eventType'] == 'WorkflowExecutionStarted':
print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
swf.respond_decision_task_completed(
taskToken=newTask['taskToken'],
decisions=[
{
'decisionType': 'ScheduleActivityTask',
'scheduleActivityTaskDecisionAttributes': {
'activityType':{
'name': TASKNAME, # string
'version': VERSION # string
},
'activityId': 'activityid-' + str(uuid.uuid4()),
'input': '',
'scheduleToCloseTimeout': 'NONE',
'scheduleToStartTimeout': 'NONE',
'startToCloseTimeout': 'NONE',
'heartbeatTimeout': 'NONE',
'taskList': {'name': TASKLIST}, # TASKLIST is a string
}
}
]
)
print "Task Dispatched:", newTask['taskToken']
elif lastEvent['eventType'] == 'ActivityTaskCompleted':
swf.respond_decision_task_completed(
taskToken=newTask['taskToken'],
decisions=[
{
'decisionType': 'CompleteWorkflowExecution',
'completeWorkflowExecutionDecisionAttributes': {
'result': 'success'
}
}
]
)
print "Task Completed!"
Note that at the end of this snippet, we check if we have ActivityTaskCompleted
and we respond with the decision CompleteWorkflowExecution
to let SWF know we are done.
That's out decider, what's the worker look like?
http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_activity_task
Note again, we set the read_timeout
import boto3
from botocore.client import Config
botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)
Now we start our worker polling:
print "Listening for Worker Tasks"
while True:
task = swf.poll_for_activity_task(
domain=DOMAIN,# string
taskList={'name': TASKLIST}, # TASKLIST is a string
identity='worker-1') # identity is for our history
if 'taskToken' not in task:
print "Poll timed out, no new task. Repoll"
else:
print "New task arrived"
swf.respond_activity_task_completed(
taskToken=task['taskToken'],
result='success'
)
print "Task Done"
Again we signal SWF that we have completed our work.
ScheduleActivityTask
or it ends the workflow: CompleteWorkflowExecution
. If it schedules a task , the worker receives the request with the workflow history in tow. In the example, it just says "I'm done" immediately respond_activity_task_completed
. which kicks it back to the decider, which then sends CompleteWorkflowExecution
to SWF. github.com/blitzagency/flowbee a wrapper, there are more on pypi –
Jaquez The link to the official documentation is [here][1].
There are a lot of code samples out there just follow the link or [this][2] one. Under the available service section, it has enlisted all the services that the boto3 now supports along with detail example.
Some of the examples are: boto3 and getting the execution count of SWF
import boto3
import datetime
import time
import dateutil.tz
def lambda_handler(event,context):
swfClient = boto3.client('swf')
currentTimeZone = dateutil.tz.gettz('Australia/Brisbane')
latestDate = datetime.datetime.now(tz=currentTimeZone)
oldestDate = latestDate - datetime.timedelta(1)
fullTextPreloadResponse = swfClient.count_open_workflow_executions(
domain=domainName,
startTimeFilter={
'oldestDate': oldestDate,
'latestDate': latestDate
},
typeFilter={
'name': 'NAME_OF_YOUR_SWF_WORKFLOW_NAME',
'version': 'VERSION_NUMBER'
}
)
print("the count is " + str(fullTextResponse['count']))
print(fullTextResponse)
This is what I have used in my case to get the count of the running SWF Workflow type. The format that I have used is well defined in the documentation mentioned above.
To simply use boto3 & SWF together, it starts with importing boto3 in the python lambda function. Then python DateTime is being added. Then a boto3.client sets the client from where we can use | interact with SWF.
Other examples would be:
history = swf.get_workflow_execution_history(
domain= domainName,
execution={
'workflowId': workflowId,
'runId': runId
},
)
Hope this one helps you! [1]: https://boto3.amazonaws.com/v1/documentation/api/latest/index.html [2]: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html
© 2022 - 2024 — McMap. All rights reserved.