signal handling pika / python
Asked Answered
B

1

10

I am using pika.BlockingConnection in a consumer which performs some tasks for each message. I have also added signal handling so that the consumer dies properly after completely performing all tasks.

While message is being processed and signal is received, I just get "signal received" from the function, but the code does not exit. So, I decided to check for signal received at the end of callback function, too. The question is, how many times do I check for the signal, as there will be many more functions in this code. Is there a better way of handling signals without overdoing things?

import signal
import sys
import pika
from time import sleep

received_signal = False
all_over = False

def signal_handler(signal, frame):
    global received_signal
    print "signal received"
    received_signal = True

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

mq_connection = pika.BlockingConnection(pika.ConnectionParameters(my_mq_server, virtual_host='test'))
mq_channel = mq_connection.channel()

def callback(ch, method, properties, body):
    if received_signal:
        print "Exiting, as a kill signal is already received"
        exit(0)
    print body
    sleep(50)
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)
    print "Message consumption complete"

    if received_signal:
        print "Exiting, as a kill signal is already received"
        exit(0)

try:
    print ' [*] Waiting for messages. To exit press CTRL+C'
    mq_channel.basic_consume(callback, queue='test')
    mq_channel.start_consuming()
except Exception:
    mq_channel.close()
    exit()

This is my first question here, so let me know if any more details are required.

Bloodstone answered 30/4, 2014 at 12:13 Comment(3)
Your current code will swallow SIGTERM or SIGINT until the next message is received via the queue, at which point it should exit. Is that actually what you want? Why not have the signal_handler method just call sys.exit(0) directly?Cesarean
I want the signal handling to be done in two ways: 1) While waiting for messages, it should just die 2) While consuming a message, it should complete current work, and then die. My current code incorporates the second condition, but not the first one. This is the problem. Would this be even possible?Bloodstone
Yes, that's possible. I'll add an answer.Cesarean
C
5

I think this does what you're looking for:

#!/usr/bin/python

import signal
import sys 
import pika
from contextlib import contextmanager

received_signal = False
processing_callback = False

def signal_handler(signal, frame):
    global received_signal
    print "signal received"
    received_signal = True
    if not processing_callback:
         sys.exit()

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

@contextmanager
def block_signals():
    global processing_callback
    processing_callback = True
    try:
        yield
    finally:
        processing_callback = False
        if received_signal:
            sys.exit()

def callback(ch, method, properties, body):
    with block_signals:
        print body
        sum(xrange(0, 200050000)) # sleep gets interrupted by signals, this doesn't.
        mq_channel.basic_ack(delivery_tag=method.delivery_tag)
        print "Message consumption complete"

if __name__ == "__main__":    
    try:
        mq_connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        mq_channel = mq_connection.channel()
        print ' [*] Waiting for messages. To exit press CTRL+C'
        mq_channel.basic_consume(callback, queue='test')
        mq_channel.start_consuming()
    except Exception as e:
        mq_channel.close()
        sys.exit()

I used a contextmanager to handle blocking the signals, so that all the logic is hidden away outside of the callback itself. This should also make it easier to reuse the code. Just to clarify how it's working, it's equivalent to this:

def callback(ch, method, properties, body):
    global processing_callback
    processing_callback = True
    try:
        print body
        sum(xrange(0, 200050000))
        mq_channel.basic_ack(delivery_tag=method.delivery_tag)
        print "Message consumption complete"
    finally:
        processing_callback = False
        if received_signal:
            sys.exit()
Cesarean answered 9/5, 2014 at 15:47 Comment(1)
Use xrange rather than range, or your memory will blow up and it will start thrashing as it pages out to disk.Carbylamine

© 2022 - 2024 — McMap. All rights reserved.