boto3 and SWF example needed
Asked Answered
P

2

8

Amazon are promoting boto3 for future development but do not provide enough documentation for the new boto3.

Does anybody have any example code of using SWF with boto3 that they would care to share?

Premise answered 22/9, 2015 at 16:15 Comment(1)
Did you ever find one?Jaquez
J
17

This is the only example I have found so far:

https://github.com/jhludwig/aws-swf-boto3

So the process overview looks like this (note this is pulled directly from the link above, but with some additional notes added and more of a flow).

It should be noted, SWF operates on the names of things. It's up to your code to give those names an execution meaning. For example, your Decider will poll and using the Task names decide what's next.

Some things I am not exactly certain on. TASKLIST references I believe are a kind of namespacing. It's not really a list of things, it's more about isolating things by name. Now I could be totally wrong about that, from my basic understanding, that's what I think it's saying.

You can run your Decider and Workers from ANYWHERE. Since they reach out and up to AWS, if your firewall allows 0.0.0.0/0 egress you will have access.

The AWS Docs also mention you can run a lambda, but I have not found out how to trigger that.

Create the boto3 swf client:

import boto3
from botocore.exceptions import ClientError

swf = boto3.client('swf')

Create a domain

try:
  swf.register_domain(
    name=<DOMAIN>,
    description="Test SWF domain",
    workflowExecutionRetentionPeriodInDays="10" # keep history for this long
  )
except ClientError as e:
    print "Domain already exists: ", e.response.get("Error", {}).get("Code")

With the domain created we now register the workflow:

Register Workflow

try:
  swf.register_workflow_type(
    domain=DOMAIN, # string
    name=WORKFLOW, # string
    version=VERSION, # string
    description="Test workflow",
    defaultExecutionStartToCloseTimeout="250",
    defaultTaskStartToCloseTimeout="NONE",
    defaultChildPolicy="TERMINATE",
    defaultTaskList={"name": TASKLIST } # TASKLIST is a string
  )
  print "Test workflow created!"
except ClientError as e:
  print "Workflow already exists: ", e.response.get("Error", {}).get("Code")

With our Workflow registered, we can now begin assign tasks.

Assign Tasks to the Workflow.

You can assign N tasks. Remember, these are mainly strings, your code will give them execution meaning.

try:
  swf.register_activity_type(
    domain=DOMAIN,
    name="DoSomething",
    version=VERSION, # string
    description="This is a worker that does something",
    defaultTaskStartToCloseTimeout="NONE",
    defaultTaskList={"name": TASKLIST } # TASKLIST is a string
  )
  print "Worker created!"
except ClientError as e:
  print "Activity already exists: ", e.response.get("Error", {}).get("Code")

Send Start the Workflow

With our Domain, Workflow, and Task created, we can now begin a workflow.

import boto3

swf = boto3.client('swf')

response = swf.start_workflow_execution(
  domain=DOMAIN # string,
  workflowId='test-1001',
  workflowType={
    "name": WORKFLOW,# string
    "version": VERSION # string
  },
  taskList={
      'name': TASKLIST
  },
  input=''
)

print "Workflow requested: ", response

Note the workflowId, this is a custom identifier, for example str(uuid.uuid4()). From the docs:

The user defined identifier associated with the workflow execution. You can use this to associate a custom identifier with the workflow execution. You may specify the same identifier if a workflow execution is logically a restart of a previous execution. You cannot have two open workflow executions with the same workflowId at the same time.

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.start_workflow_execution

At this point, nothing will happen because we don't have a Decider running nor any Workers. Lets see what those look like.

Decider

Our decider will poll to get a decide task to make a decision about:

import boto3
from botocore.client import Config
import uuid

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

Note the timeout settings above. You can reference this PR to see the rationale behind it:

https://github.com/boto/botocore/pull/634

From the Boto3 SWF docs:

Workers should set their client side socket timeout to at least 70 seconds (10 seconds higher than the maximum time service may hold the poll request).

That PR is what enabled boto3 to do that functionality.

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_decision_task

print "Listening for Decision Tasks"

while True:

  newTask = swf.poll_for_decision_task(
    domain=DOMAIN ,
    taskList={'name': TASKLIST }, # TASKLIST is a string
    identity='decider-1', # any identity you would like to provide, it's recorded in the history
    reverseOrder=False)

  if 'taskToken' not in newTask:
    print "Poll timed out, no new task.  Repoll"

  elif 'events' in newTask:

    eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
    lastEvent = eventHistory[-1]

    if lastEvent['eventType'] == 'WorkflowExecutionStarted':
      print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'ScheduleActivityTask',
            'scheduleActivityTaskDecisionAttributes': {
                'activityType':{
                    'name': TASKNAME, # string
                    'version': VERSION # string
                    },
                'activityId': 'activityid-' + str(uuid.uuid4()),
                'input': '',
                'scheduleToCloseTimeout': 'NONE',
                'scheduleToStartTimeout': 'NONE',
                'startToCloseTimeout': 'NONE',
                'heartbeatTimeout': 'NONE',
                'taskList': {'name': TASKLIST}, # TASKLIST is a string
            }
          }
        ]
      )
      print "Task Dispatched:", newTask['taskToken']

    elif lastEvent['eventType'] == 'ActivityTaskCompleted':
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'CompleteWorkflowExecution',
            'completeWorkflowExecutionDecisionAttributes': {
              'result': 'success'
            }
          }
        ]
      )
      print "Task Completed!"

Note that at the end of this snippet, we check if we have ActivityTaskCompleted and we respond with the decision CompleteWorkflowExecution to let SWF know we are done.

That's out decider, what's the worker look like?

Worker

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_activity_task

Note again, we set the read_timeout

import boto3
from botocore.client import Config

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

Now we start our worker polling:

print "Listening for Worker Tasks"

while True:

  task = swf.poll_for_activity_task(
    domain=DOMAIN,# string
    taskList={'name': TASKLIST}, # TASKLIST is a string
    identity='worker-1') # identity is for our history

  if 'taskToken' not in task:
    print "Poll timed out, no new task.  Repoll"

  else:
    print "New task arrived"

    swf.respond_activity_task_completed(
        taskToken=task['taskToken'],
        result='success'
    )

    print "Task Done"

Again we signal SWF that we have completed our work.

Jaquez answered 22/3, 2016 at 18:25 Comment(4)
Whilst this may theoretically answer the question, it would be preferable to include the essential parts of the answer here, and provide the link for reference.Acerate
Totally get why, I'm editing it now pulling the relevant parts out of the link with some additional notes.Jaquez
Answer is very thorough now. Mod's should remove the downvote for lack of information.Leone
It doesn't do anything, just ACKs that the task is complete. The process is: Notify SWF to start -> SWF Notifies your decider -> decider either decides to schedule an activity task: ScheduleActivityTask or it ends the workflow: CompleteWorkflowExecution. If it schedules a task , the worker receives the request with the workflow history in tow. In the example, it just says "I'm done" immediately respond_activity_task_completed. which kicks it back to the decider, which then sends CompleteWorkflowExecution to SWF. github.com/blitzagency/flowbee a wrapper, there are more on pypiJaquez
W
1

The link to the official documentation is [here][1].

There are a lot of code samples out there just follow the link or [this][2] one. Under the available service section, it has enlisted all the services that the boto3 now supports along with detail example.

Some of the examples are: boto3 and getting the execution count of SWF

import boto3
import datetime
import time
import dateutil.tz

def lambda_handler(event,context):
    swfClient = boto3.client('swf')
    currentTimeZone = dateutil.tz.gettz('Australia/Brisbane')
    latestDate = datetime.datetime.now(tz=currentTimeZone)
    oldestDate = latestDate - datetime.timedelta(1)

    fullTextPreloadResponse = swfClient.count_open_workflow_executions(
         domain=domainName,
         startTimeFilter={
             'oldestDate': oldestDate,
             'latestDate': latestDate
         },
         typeFilter={
             'name': 'NAME_OF_YOUR_SWF_WORKFLOW_NAME',
             'version': 'VERSION_NUMBER'
         }
     )
     print("the count is " + str(fullTextResponse['count']))
     print(fullTextResponse)

This is what I have used in my case to get the count of the running SWF Workflow type. The format that I have used is well defined in the documentation mentioned above.

To simply use boto3 & SWF together, it starts with importing boto3 in the python lambda function. Then python DateTime is being added. Then a boto3.client sets the client from where we can use | interact with SWF.

Other examples would be:

history = swf.get_workflow_execution_history(
            domain= domainName,
            execution={
                'workflowId': workflowId,
                'runId': runId
            },
        )

Hope this one helps you! [1]: https://boto3.amazonaws.com/v1/documentation/api/latest/index.html [2]: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html

Wisecrack answered 6/7, 2020 at 11:25 Comment(2)
It is a better idea to add some samples and explanations than just copying URL.Mallorie
@Mallorie Thanks for the feedback. I have updated my answer. Cheers.Wisecrack

© 2022 - 2024 — McMap. All rights reserved.