Handling long running tasks in pika / RabbitMQ
Asked Answered
Y

7

77

We're trying to set up a basic directed queue system where a producer will generate several tasks and one or more consumers will grab a task at a time, process it, and acknowledge the message.

The problem is, the processing can take 10-20 minutes, and we're not responding to messages at that time, causing the server to disconnect us.

Here's some pseudo code for our consumer:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

After the first task completes, an exception is thrown somewhere deep inside of BlockingConnection, complaining that the socket was reset. In addition, the RabbitMQ logs show that the consumer was disconnected for not responding in time (why it resets the connection rather than sending a FIN is strange, but we won't worry about that).

We searched around a lot because we believed this was the normal use case for RabbitMQ (having a lot of long running tasks that should be split up among many consumers), but it seems like nobody else really had this issue. Finally we stumbled upon a thread where it was recommended to use heartbeats and to spawn the long_running_task() in a separate thread.

So the code has become:

#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

def callback(ch, method, properties, body):
    threading.Thread(target=thread_func, args=(ch, method, body)).start()

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

And this seems to work, but it's very messy. Are we sure that the ch object is thread safe? In addition, imagine that long_running_task() is using that connection parameter to add a task to a new queue (i.e. the first part of this long process is done, let's send the task on to the second part). So, the thread is using the connection object. Is that thread safe?

More to the point, what's the preferred way of doing this? I feel like this is very messy and possibly not thread safe, so maybe we're not doing it right. Thanks!

Yarrow answered 28/1, 2013 at 21:58 Comment(1)
I am having the same problem. The docs says pika connection is not thread safe pika.readthedocs.org/en/latest/faq.htmlGran
F
41

For now, your best bet is to turn off heartbeats, this will keep RabbitMQ from closing the connection if you're blocking for too long. I am experimenting with pika's core connection management and IO loop running in a background thread but it's not stable enough to release.

In pika v1.1.0 this is ConnectionParameters(heartbeat=0)

Fortunato answered 22/4, 2013 at 19:29 Comment(4)
As @Gavin mentioned the best bet as of now is to turn off the heartbeat in pika while setting up the connection. connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', virtual_host='TestVirtualHost', credentials=credentials, heartbeat_interval=0, port=5672))Wrightson
Pika 0.12.0 has a better solution, please see this answerVance
Thanks, It works. If its still not working for you. Note that heartbeat parameter should be set to both peers (consumer and producer). That's what happened in my case.Straphanger
Set ConnectionParameters(heartbeat=0) is safe. Because when you have killed this process, the connection is automatically closed immediately. You can go rabbit_mq_server:15672/#/connections to verify it.Porterfield
V
22

Please don't disable heartbeats!

As of Pika 0.12.0, please use the technique described in this example code to run your long-running task on a separate thread and then acknowledge the message from that thread.

Vance answered 23/10, 2018 at 14:46 Comment(7)
Why is it a bad thing to disable heartbeats?Aeciospore
Neither RabbitMQ nor your application will detect a lost TCP connection until the next operation is attempted on that connection.Vance
I get that, but depending on the use case, that's not necessarily a bad thing. In my case I'd rather have an error when trying to ack a message because the connection was lost, than having an error when trying to ack a message because the connection was closed due to my processing taking too long. That's why a general warning not to disable heartbeats seems unjustified IMO. It all depends on the use case, so I believe it would be more productive to say why you maybe should consider not disabling them instead of going "disabling = bad" without any further information.Aeciospore
@LukeBakken Is there any way to use this method with channel.basic_get? I need my consumer to consume one message, acknowledge it and then die/quit, I can get it to consume only one message with basic_get, but then I cannot get it to acknowledge the (long-running) message.Sully
This is the best & correct solution. Thanks @LukeBakken.Iroquoian
If there are a large number of items queued up and the average time to process each one is very high, then the spawning an individual thread for processing a item would lead to an explosion of active threads which finally ends up in OOM error. I have experienced this.Foiled
Stackoverflow answers should be self-contained. In addition to linking to example code, include the example code inline.Ednaedny
L
11

I encounter the same problem you had.
My solution is:

  1. ture off the heartbeat on the server side
  2. evaluate the maximum time the task can possible take
  3. set the client heartbeat timeout to the time got from step2

Why this?

As i test with the following cases:

case one
  1. server heartbeat turn on, 1800s
  2. client unset

I still get error when task running for a very long time -- >1800

case two
  1. turn off server heartbeat
  2. turn off client heartbeat

There is no error on client side, except one problem--when the client crashes(my os restart on some faults), the tcp connection still can be seen at the Rabbitmq Management plugin. And it is confusing.

case three
  1. turn off server heartbeat
  2. turn on client heartbeat, set it to the foresee maximum run time

In this case, i can dynamic change every heatbeat on indivitual client. In fact, i set heartbeat on the machines crashed frequently.Moreover, i can see offline machine through the Rabbitmq Manangement plugin.

Environment

OS: centos x86_64
pika: 0.9.13
rabbitmq: 3.3.1

Lavettelavigne answered 29/5, 2014 at 11:17 Comment(3)
how do you turn on client heartbeat? can't find anything about how to do it.Geochronology
You could try something like this: params = pika.ConnectionParameters(host=self.__host, port=self.__port, credentials=credentials, heartbeat_interval=<your-interval-in-seconds>)Corm
I should have tried your approach first, saved me tons of headache and hairs. thank you for your helpful insight.Marinara
S
10
  1. You can periodic call connection.process_data_events() in your long_running_task(connection), this function will send heartbeat to server when it is been called, and keep the pika client away from close.
  2. Set the heartbeat value greater than call connection.process_data_events() period in your pika BlockingConnection.
Strawser answered 8/5, 2018 at 8:34 Comment(2)
connection.process_data_events() help meOmniumgatherum
My long-running tasks were largely waiting/sleeping, so this helped a ton. I'm rather surprised how buried and non-intuitive "please tell the server I'm still alive" functionality is in pika, but glad to have finally found it.Postremogeniture
S
7

Don't disable heartbeat.
The best solution is to run the task in a separate thread and , set the prefetch_count to 1 so that the consumer only gets 1 unacknowledged message using something like this channel.basic_qos(prefetch_count=1)

Said answered 16/8, 2017 at 7:9 Comment(0)
L
0

You can also set up a new thread, and process the message in this new thread, and call .sleep on the connection while this thread is alive to prevent missing heartbeats. Here is a sample code block taken from @gmr in github, and a link to the issue for future reference.

import re
import json
import threading

from google.cloud import bigquery
import pandas as pd
import pika
from unidecode import unidecode

def process_export(url, tablename):
    df = pd.read_csv(csvURL, encoding="utf-8")
    print("read in the csv")
    columns = list(df)
    ascii_only_name = [unidecode(name) for name in columns]
    cleaned_column_names = [re.sub("[^a-zA-Z0-9_ ]", "", name) for name in ascii_only_name]
    underscored_names = [name.replace(" ", "_") for name in cleaned_column_names]
    valid_gbq_tablename = "test." + tablename
    df.columns = underscored_names

    # try:
    df.to_gbq(valid_gbq_tablename, "some_project", if_exists="append", verbose=True, chunksize=10000)
    # print("Finished Exporting")
    # except Exception as error:
    #     print("unable to export due to: ")
    #     print(error)
    #     print()

def data_handler(channel, method, properties, body):
    body = json.loads(body)

    thread = threading.Thread(target=process_export, args=(body["csvURL"], body["tablename"]))
    thread.start()
    while thread.is_alive():  # Loop while the thread is processing
        channel._connection.sleep(1.0)
    print('Back from thread')
    channel.basic_ack(delivery_tag=method.delivery_tag)


def main():
    params = pika.ConnectionParameters(host='localhost', heartbeat=60)
    connection = pika.BlockingConnection(params)
    channel = connection.channel()
    channel.queue_declare(queue="some_queue", durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(data_handler, queue="some_queue")
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()
    channel.close()

if __name__ == '__main__':
    main()

The link: https://github.com/pika/pika/issues/930#issuecomment-360333837

Lieselotteliestal answered 21/2, 2019 at 12:54 Comment(1)
Please refer to the solution provided by Luke Bakken. It's Thread Safe & refers to an official example from pika documentation.Iroquoian
D
0

Here's one more simple way to handle this with threads. Particularly useful if the consumer app should not consume another job until the current job is finished. The ack can be sent any time- in this case I choose to send it only when the job is done (the thread is no longer alive).

Start the long-running process in its own thread, and then monitor that thread in a loop with calls to channel.process_data_events(). Keep the reference to the connection object in the main thread, since it is not threadsafe. Essentially:

import time
import pika
from threading import Thread
from functools import partial

rmqconn = pika.BlockingConnection( ... )
rmqchan = rmqconn.channel()
rmqchan.basic_consume(
    queue='test',
    on_message_callback=partial(launch_process,rmqconn)
)
rmqchan.start_consuming()

def launch_process(conn,ch,method,properties,body):
    runthread = Thread(target=run_process,args=body)
    runthread.start()
    while runthread.is_alive():
        time.sleep(2)
        conn.process_data_events()
    ch.basic_ack(delivery_tag=method.delivery_tag)

def run_process(body):
    #do the long-running thing
    time.sleep(10)
Donetsk answered 24/6, 2021 at 21:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.