Python cassandra-driver OperationTimeOut on every query in Celery task
Asked Answered
A

4

7

I have a problem with every insert query (little query) which is executed in celery tasks asynchronously. In sync mode when i do insert all done great, but when it executed in apply_async() i get this:

OperationTimedOut('errors=errors=errors={}, last_host=***.***.*.***, last_host=None, last_host=None',)

Traceback:

Traceback (most recent call last):
  File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__
    return self.run(*args, **kwargs)
  File "/var/nfs_www/***/www_v1/app/mods/news_feed/tasks.py", line 26, in send_new_comment_reply_notifications
    send_new_comment_reply_notifications_method(comment_id)
  File "/var/nfs_www/***www_v1/app/mods/news_feed/methods.py", line 83, in send_new_comment_reply_notifications
    comment_type='comment_reply'
  File "/var/nfs_www/***/www_v1/app/mods/news_feed/models/storage.py", line 129, in add
    CommentsFeed(**kwargs).save()
  File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/models.py", line 531, in save
    consistency=self.__consistency__).save()
  File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/query.py", line 907, in save
    self._execute(insert)
  File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/query.py", line 786, in _execute
    tmp = execute(q, consistency_level=self._consistency)
  File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/connection.py", line 95, in execute
    result = session.execute(query, params)
  File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cassandra/cluster.py", line 1103, in execute
    result = future.result(timeout)
  File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cassandra/cluster.py", line 2475, in result
    raise OperationTimedOut(errors=self._errors, last_host=self._current_host)
OperationTimedOut: errors={}, last_host=***.***.*.***

Does anyone have ideas about problem?

I found this When cassandra-driver was executing the query, cassandra-driver returned error OperationTimedOut, but my query is very little and problem only in celery tasks.

UPDATE:

I made a test task and it raises this error too.

@celery.task()
def test_task_with_cassandra():
    from app import cassandra_session
    cassandra_session.execute('use news_feed')
    return 'Done'

UPDATE 2: Made this:

@celery.task()
def test_task_with_cassandra():
    from cqlengine import connection
    connection.setup(app.config['CASSANDRA_SERVERS'], port=app.config['CASSANDRA_PORT'],
                     default_keyspace='test_keyspace')
    from .models import Feed
    Feed.objects.count()
    return 'Done'

Got this:

NoHostAvailable('Unable to connect to any servers', {'***.***.*.***': OperationTimedOut('errors=errors=Timed out creating connection, last_host=None, last_host=None',)})

From shell i can connect to it

UPDATE 3: From deleted thread on github issue (found this in my emails): (this worked for me too) Here's how, in substance, I plug CQLengine to Celery:

from celery import Celery
from celery.signals import worker_process_init, beat_init
from cqlengine import connection
from cqlengine.connection import (
    cluster as cql_cluster, session as cql_session)

def cassandra_init():
    """ Initialize a clean Cassandra connection. """
    if cql_cluster is not None:
        cql_cluster.shutdown()
    if cql_session is not None:
        cql_session.shutdown()
    connection.setup()

# Initialize worker context for both standard and periodic tasks.
worker_process_init.connect(cassandra_init)
beat_init.connect(cassandra_init)

app = Celery()

This is crude, but works. Should we add this snippet in the FAQ ?

Anchusin answered 16/7, 2014 at 15:54 Comment(5)
Does your celery user have permissions to run the queries?Tears
Hm, i didn't set any auth providers for it. Or what did you mean? Where should i search for? I've updated my question.Anchusin
have you solved this? I got the same problem and have no idea what's going on!Rugose
@haifzhan, special for you add raritet info :))) see update 3Anchusin
thanks for you post @EllochkaCannibal, I checked my script and find out if multiprocessing process share one session, it will raise the OperationTimedOut Exception, after I create one session for each process, the problem solved.Rugose
H
6

I had a similar issue. It seemed to be related to sharing the Cassandra session between tasks. I solved it by creating a session per thread. Make sure you call get_session() from you tasks and then do this:

thread_local = threading.local()

def get_session():
    if hasattr(thread_local, "cassandra_session"):
        return thread_local.cassandra_session

    cluster = Cluster(settings.CASSANDRA_HOSTS)
    session = cluster.connect(settings.CASSANDRA_KEYSPACE)

    thread_local.cassandra_session = session

    return session
Herl answered 31/10, 2014 at 14:53 Comment(0)
B
2

Inspired by Ron's answer, I come up with the following code to put in tasks.py:

import threading
from django.conf import settings
from cassandra.cluster import Cluster
from celery.signals import worker_process_init,worker_process_shutdown

thread_local = threading.local()

@worker_process_init.connect
def open_cassandra_session(*args, **kwargs):
    cluster = Cluster([settings.DATABASES["cassandra"]["HOST"],], protocol_version=3)
    session = cluster.connect(settings.DATABASES["cassandra"]["NAME"])
    thread_local.cassandra_session = session

@worker_process_shutdown.connect
def close_cassandra_session(*args,**kwargs):
    session = thread_local.cassandra_session
    session.shutdown()
    thread_local.cassandra_session = None

This neat solution will automatically open/close cassandra sessions when celery worker process starts and stops.

Side note: protocol_version=3, because Cassandra 2.1 only supports protocol versions 3 and lower.

Bellbottoms answered 28/1, 2016 at 6:41 Comment(1)
I upvoted this answer, but I changed my mind - this didn't actually work for me. I posted a different answer.Oriya
O
2

The other answers didn't work for me, but the question's 'update 3' did. Here's what I ended up with (small updates to the suggestion within the question):

from celery.signals import worker_process_init
from cassandra.cqlengine import connection
from cassandra.cqlengine.connection import (
    cluster as cql_cluster, session as cql_session)

def cassandra_init(*args, **kwargs):
    """ Initialize a clean Cassandra connection. """
    if cql_cluster is not None:
        cql_cluster.shutdown()
    if cql_session is not None:
        cql_session.shutdown()
    connection.setup(settings.DATABASES["cassandra"]["HOST"].split(','), settings.DATABASES["cassandra"]["NAME"])

# Initialize worker context (only standard tasks)
worker_process_init.connect(cassandra_init)
Oriya answered 13/11, 2016 at 9:43 Comment(2)
You saved my day, my friendBoulder
again you saved a day for another friend... thank you.Went
S
0

Using django-cassandra-engine the following resolved the issue for me:

db_connection = connections['cassandra']

@worker_process_init.connect
def connect_db(**_):
    db_connection.reconnect()


@worker_shutdown.connect
def disconnect(**_):
    db_connection.connection.close_all()

look at here

Sissified answered 12/9, 2020 at 2:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.