falcon python example with celery
Asked Answered
G

2

6
import falcon
import json
from tasks import add
from waitress import serve


class tasksresource:
    def on_get(self, req, resp):
        """Handles GET requests"""
        self.result = add.delay(1, 2)
        self.context = {'ID': self.result.id, 'final result': self.result.ready()}
        resp.body = json.dumps(self.context)



api = falcon.API()
api.add_route('/result', tasksresource())
# api.add_route('/result/task', taskresult())
if __name__ == '__main__':
    serve(api, host='127.1.0.1', port=5555)

how do i get the Get the task id from json payload ( post data) and add a route to it

Grumble answered 17/1, 2017 at 12:30 Comment(2)
As I understood, you want to send task id to another route after starting the task? Is it right?Shellieshellproof
Yes also their is term pooling which i dont understandGrumble
S
9

Here a small example. Structure of files:

/project
      __init__.py
      app.py # routes, falcon etc.
      tasks.py # celery 
example.py # script for demonstration how it works 

app.py:

import json

import falcon
from tasks import add
from celery.result import AsyncResult


class StartTask(object):

    def on_get(self, req, resp):
        # start task
        task = add.delay(4, 4)
        resp.status = falcon.HTTP_200
        # return task_id to client
        result = {'task_id': task.id}
        resp.body = json.dumps(result)


class TaskStatus(object):

    def on_get(self, req, resp, task_id):
        # get result of task by task_id and generate content to client
        task_result = AsyncResult(task_id)
        result = {'status': task_result.status, 'result': task_result.result}
        resp.status = falcon.HTTP_200
        resp.body = json.dumps(result)


app = falcon.API()

# registration of routes
app.add_route('/start_task', StartTask())
app.add_route('/task_status/{task_id}', TaskStatus())

tasks.py:

from time import sleep

import celery


app = celery.Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')


@app.task
def add(x, y):
    """
    :param int x:
    :param int y:
    :return: int
    """
    # sleep just for demonstration
    sleep(5)

    return x + y

Now we need to start celery application. Go to project folder and run:

celery -A tasks worker --loglevel=info

After this we need to start Falcon application. Go to project folder and run:

gunicorn app:app

Ok. Everything is ready.

example.py is small client side which can help to understand:

from time import sleep

import requests
# start new task
task_info = requests.get('http://127.0.0.1:8000/start_task')
task_info = task_info.json()

while True:
    # check status of task by task_id while task is working
    result = requests.get('http://127.0.0.1:8000/task_status/' + task_info['task_id'])
    task_status = result.json()

    print task_status

    if task_status['status'] == 'SUCCESS' and task_status['result']:
        print 'Task with id = %s is finished' % task_info['task_id']
        print 'Result: %s' % task_status['result']
        break
    # sleep and check status one more time
    sleep(1)

Just call python ./example.py and you should see something like this:

{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'SUCCESS', u'result': 8}
Task with id = 76542904-6c22-4536-99d9-87efd66d9fe7 is finished
Result: 8

Hope this helps you.

Shellieshellproof answered 1/2, 2017 at 21:2 Comment(2)
@DevJalla Good luck in development!Shellieshellproof
@DevJalla sorry but I don't know anything about java oauth serverShellieshellproof
D
0

The above example by Danila Ganchar is great and very helpful. I'm using celery version 4.3.0 with Python 3, and one of the errors I received from using the example above is on this line:

task_result = AsyncResult(task_id)

The error I would receive is:

AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'

This may be a recent change, but result.AsyncResult (or just AsyncResult in this example because he imported it from celery.result) doesn't know the backend you are using. There are 2 solutions to solving this problem:

1) You can take the AsyncResult of the actual task itself add.AsyncResult(task_id) because the add task already has the backend defined through the @app.task decorator. The downside to this in this example is you want to be able to get the result for any task by just passing in the task_id via the Falcon endpoint, so this is limited

2) The preferred method is to just pass in the app parameter to the AsyncResult function:

task = result.AsyncResult(id, app=app)

Hope this helps!

Dissemble answered 15/10, 2019 at 18:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.