Post a Message to Elastic Beanstalk Worker Environment via SQS
Asked Answered
O

3

7

I have a docker application on elastic beanstalk with a web-server and worker environment.

The worker environment currently runs scheduled jobs via cron. I'm trying to connect the server to the worker to achieve the following:

  1. Client sends a request to the server (/trigger_job)
  2. Server offloads the job to the worker by sending a JSON message to SQS queue (/perform_job)
  3. Worker performs the job by reading the message from SQS

I haven't been able to find documentation on what the JSON message should look like. There are some HTTP headers mentioned in the official documentation. But there's no mention of header to specify the desired endpoint in the worker environment.

# server.py
from bottle import post, HTTPResponse


@post('/trigger_job')
def trigger_worker_job():
    # should send a JSON message to sqs to trigger the '/perform_job'
    # Need help with what the JSON message looks like
    return HTTPResponse(status=200, body={'Msg': 'Sent message'})
# worker.py
from bottle import post, HTTPResponse


@post('/perform_job')
def perform_job():
    # job is performed in the worker environment

    return HTTPResponse(status=200, body={'Msg': 'Success'})
Opposable answered 15/8, 2018 at 18:54 Comment(2)
Did you ever find a solution to this?Gannes
Jeez stuck here as well. I'm wondering if they maybe wrap it in something like "body": etc.Syphilis
B
1

In Python, you can see how this from the python sample application where you can find on this aws doc step 4: Deploy a New Application Version.

You can configure SQS endpoint in beanstalk worker environment console. Configuration > Worker > Select a Worker Queue

# for example: 
environ['HTTP_X_AWS_SQSD_TASKNAME']
environ['HTTP_X_AWS_SQSD_SCHEDULED_AT']
logger.info("environ X-Aws-Sqsd-Queue %s" % environ['HTTP_X_AWS_SQSD_QUEUE'])

# regarding your message attribute. For example, the attribute name is Email, 
# you can extract it via environ['HTTP_X_AWS_SQSD_ATTR_EMAIL']). 
# Make sure that the attribute name is all capital.  
logger.info("environ X-Aws-Sqsd-Attr Email %s" % environ['HTTP_X_AWS_SQSD_ATTR_EMAIL'])

The message will contains the following info in the image. You can read more on aws AWS Elastic Beanstalk Worker Environments

enter image description here

Behling answered 20/9, 2018 at 19:7 Comment(0)
M
1

I found from some of my failed cron jobs in the dead letter queue, there were the following three attributes:

  • "beanstalk.sqsd.path"
  • "beanstalk.sqsd.task_name"
  • "beanstalk.sqsd.scheduled_time"

which are similar to the attributes set on the cron job.

You can manually create a new message in the SQS queue for the worker environment and set those attributes to match the cron job you wish to execute, though that is tedious and I much prefer to do so by code.

I'm using the boto3 framework in python, though hopefully it is a similar process in other languages.

def send_elastic_beanstalk_message(name, path, message):
    client = boto3.client('sqs')
    return client.send_message(
        QueueUrl=url, 
        MessageAttributes={
            "beanstalk.sqsd.path": {
                "DataType": "String",
                "StringValue": path
            },
            "beanstalk.sqsd.task_name": {
                "DataType": "String",
                "StringValue": name
            },
            "beanstalk.sqsd.scheduled_time": {
                "DataType": "String",
                "StringValue": datetime.now(timezone.utc).strftime(
                    "%Y-%m-%d %H:%M:%S UTC"
                )
            }
        }
    )

This will create a new message in the SQS queue that will be parsed by the worker daemon and trigger a cron job.

Unfortunately, a message body does not seem to be included when the message is parsed, so triggering the job is the only action I was able to solve.

Monck answered 11/12, 2020 at 0:45 Comment(1)
fwiw: currently (aws-sqsd/3.0.4), cron tasks send message body "elasticbeanstalk scheduled job", and boto3 send_message() requires a MessageBody argument.Chadburn
C
0

From the OP:

I haven't been able to find documentation on what the JSON message should look like. There are some HTTP headers mentioned in the official documentation. But there's no mention of header to specify the desired endpoint in the worker environment.

summary

Use SQS.Client.send_message() from boto3 to send messages to the SQS queue.

  • The SQS message can look like anything you want (not necessarily JSON).
  • You do not need to worry about the HTTP headers, unless you want to.
  • You can specify a single endpoint in the worker environment configuration using HTTP path. Afaik only tasks from a cron.yaml can specify alternative endpoints using the path value. At the endpoint you can apply logic based on the request body and/or headers to distinguish tasks.

details

The worker docs are indeed sparse and a bit confusing.

Here's what I've been able to deduce (disclaimer: there may be some inaccuracies):

HTTP headers and message content

The docs mention a lot of HTTP headers, and these are also described in the answers by user3643324 and Jun.

These HTTP headers are set by the SQS Daemon (sqsd). You can use them in your perform_job(), if you want, but you do not have to.

More importantly, you do not need to set these headers yourself when you send a custom message to the worker's SQS queue in your trigger_worker_job().

In fact, the sqsd picks up all messages from the worker queue, regardless of content. The content does not need to be JSON either.

Here's a relevant snippet from the docs:

[...] When the daemon pulls an item from the queue, it sends an HTTP POST request locally to http://localhost/ on port 80 with the contents of the queue message in the body. All that your application needs to do is perform the long-running task in response to the POST. You can configure the daemon to post to a different path, use a MIME type other than application/JSON, connect to an existing queue, or [...]

normal messages

Basically, any message sent to the SQS queue, by anyone, will be pulled down by the worker's sqsd. Note that an SQS message has a body and optional attributes.

In the simple case of a message without any attributes, the message body is posted to localhost at the path specified under HTTP path in the worker environment configuration. The default path is /, but in the OP's case that would become /perform_job.

If the SQS message does contain attributes, those are added to the POST request as custom HTTP headers, as mentioned in the docs and the other answers.

messages scheduled via cron.yaml

If your app source includes a cron.yaml, that's a special case. Each task in cron.yaml specifies a path, which can be different from the worker environment's HTTP path value. The sqsd adds this path as an attribute to the SQS message when sending to the SQS queue. If the same (or another) sqsd pulls that message down from the SQS queue, it will POST the message body to the specified path, with the cron-related HTTP headers mentioned in the docs.

simplified example

So, the OP's example can be adjusted as follows, using send_message() from boto3:


sqs_client = boto3.client('sqs')

@post('/trigger_job')
def trigger_worker_job():
    # send a JSON message to sqs to trigger the '/perform_job'
    result = sqs_client.send_message(
        QueueUrl='https://<my worker queue url>',
        MessageBody='Would you please perform a job for me?',
    )
    # check `result` if necessary...
    ...
    return HTTPResponse(status=200, body={'Msg': 'Sent message'})

and on the worker side, assuming HTTP path for the worker env is configured as /perform_job:

@post('/perform_job')
def perform_job():
    request_body = ... # get the body from the POST request somehow
    if request_body == 'Would you please perform a job for me?':
        # perform the actual job
        ...
    return HTTPResponse(status=200, body={'Msg': 'Success'})

If necessary, you can adjust the logic to test for certain HTTP headers.

Also see how it's done in the elastic beanstalk python sample app: python.zip

Chadburn answered 21/8 at 11:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.