pika, stop_consuming does not work
Asked Answered
C

2

12

I'm new to rabbitmq and pika, and is having trouble with stopping consuming.

channel and queue setting:

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True)

Basically, consumer and producer are like this:

consumer:

def task(task_id):
    def callback(channel, method, properties, body):
        if body != "quit":
            print(body)
        else:
            print(body)
            channel.stop_consuming(task_id)

    channel.basic_consume(callback, queue=task_id, no_ack=True)
    channel.start_consuming()
    print("finish")
    return "finish"

producer:

proc = Popen(['app/sample.sh'], shell=True, stdout=PIPE)
while proc.returncode is None:  # running
    line = proc.stdout.readline()
    if line:
        channel.basic_publish(
            exchange='',
            routing_key=self.request.id,
            body=line
        )
    else:
        channel.basic_publish(
            exchange='',
            routing_key=self.request.id,
            body="quit"
        )
        break

consumer task gave me output:

# ... output from sample.sh, as expected

quit
�}q(UstatusqUSUCCESSqU  tracebackqNUresultqNUtask_idqU
1419350416qUchildrenq]u.

However, "finish" didn't get printed, so I'm guessing it's because channel.stop_consuming(task_id) didn't stop consuming. If so, what is the correct way to do? Thank you.

Clingy answered 23/12, 2014 at 16:21 Comment(3)
Are you sure that stop_consuming gets called?Camerlengo
@Camerlengo Yes I'm sure.Clingy
Ok. Are you sure you are passing the correct ID to stop_consuming? Try simply using channel.stop_consuming()Camerlengo
M
8

I had the same problem. It seems to be caused by the fact that internally, start_consuming calls self.connection.process_data_events(time_limit=None). This time_limit=None makes it hang.

I managed to workaround this problem by replacing the call to channel.start_consuming() with its implemenation, hacked:

while channel._consumer_infos:
    channel.connection.process_data_events(time_limit=1) # 1 second
Mountebank answered 14/2, 2016 at 13:35 Comment(3)
I don't have the condition to testify your answer now, anyway, I'll accept it.Clingy
shx2, you're right I think. At least, that's what I also understood by looking at the source. And it also seems to fix the problem. I'll open a ticket.Bautzen
Even with this code, polling did not stop!Georgiegeorgina
K
2

I have a class defined with member variables of channel and connection. These are initialized by a seperate thread. The consumer of MyClient Class uses the close() method and the the connection and consumer is stopped!

class MyClient:

    def __init__(self, unique_client_code):
        self.Channel = None
        self.Conn: pika.BlockingConnection = None  
        self.ClientThread = self.init_client_driver()

    def _close_callback(self):
        self.Channel.stop_consuming()
        self.Channel.close()
        self.Conn.close()
    def _client_driver_thread(self, tmout=None):
        print("Starting Driver Thread...")
        self.Conn = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
        self.Channel = self.Conn.channel()

    def init_client_driver(self, tmout=None):
        kwargs = {'tmout': tmout}
        t = threading.Thread(target=self._client_driver_thread, kwargs=kwargs)
        t.daemon = True
        t.start()
        return t

    def close(self):
        self.Conn.add_callback_threadsafe(self._close_callback)
        self.ClientThread.join()
Kenney answered 25/3, 2022 at 10:42 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.