pika `pop from an empty queue`
Asked Answered
H

2

5

I'm using pika in a kubernetes cluster and consuming messages from a queue, which triggers initiating a function in a new thread. However RabbitMQ seems crash, these are the best logs I've found so far:

2020-12-23 10:39:10,906] WARNING - WRITE indicated on fd=9, but writer callback is None; events=0b100 {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/selector_ioloop_adapter.py:393}
(repeats to a total of n=38 times)
2020-12-23 10:39:10,908] ERROR - _AsyncBaseTransport._produce() failed, aborting connection: error=IndexError('pop from an empty deque'); sock=<socket.socket fd=9, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.100.200',  44892), raddr=('192.168.101.201', 5672)>; Caller's stack:                                                                                 
Traceback (most recent call last):                                                                                                 
File "/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py", line 1097, in _on_socket_writable        
    self._produce()                                                                                                                
File "/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py", line 822, in _produce                    
    chunk = self._tx_buffers.popleft()                                                                                             
IndexError: pop from an empty deque                                                                                                
{/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:1103}                                            
Traceback (most recent call last):                                                                                                 
File "/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py", line 1097, in _on_socket_writable        
    self._produce()                                                                                                                
File "/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py", line 822, in _produce                    
    chunk = self._tx_buffers.popleft()                                                                                             
IndexError: pop from an empty deque                                                                                                
2020-12-23 10:39:10,908] INFO - _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=IndexError('pop from an empty deque'); <socket.socket fd=9, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.100.200', 44892), raddr=('192.168.101.201', 5672)> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:904}                                                                                                               
2020-12-23 10:39:10,908] INFO - Deactivating transport: state=1; <socket.socket fd=9, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.100.200', 44892), raddr=('192.168.101.201', 5672)> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:869}
2020-12-23 10:39:10,909] ERROR - connection_lost: StreamLostError: ("Stream connection lost: IndexError('pop from an empty deque')",) {/usr/local/lib/python3.9/site-packages/pika/adapters/base_connection.py:428}                                                   
2020-12-23 10:39:10,909] INFO - AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=StreamLostError: ("Stream connection lost: IndexError('pop from an empty deque')",); pending-error=None {/usr/local/lib/python3.9/site-packages/pika/connection.py:1996}
2020-12-23 10:39:10,909] INFO - Stack terminated due to StreamLostError: ("Stream connection lost: IndexError('pop from an empty deque')",) {/usr/local/lib/python3.9/site-packages/pika/connection.py:2065}                                                          
2020-12-23 10:39:10,909] INFO - Closing transport socket and unlinking: state=2; <socket.socket fd=9, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.100.200', 44892), raddr=('192.168.101.201', 5672)> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:882}      
2020-12-23 10:39:10,909] ERROR - Unexpected connection close detected: StreamLostError: ("Stream connection lost: IndexError('pop from an empty deque')",) {/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py:520} 
2020-12-23 10:39:31,416] INFO - Pika version 1.1.0 connecting to ('192.168.101.201', 5672) {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/connection_workflow.py:179}                  
2020-12-23 10:39:31,417] INFO - Socket connected: <socket.socket fd=9, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('192.168.100.200', 47142), raddr=('192.168.101.201', 5672)> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/io_services_utils.py:345}                                                                                                         
2020-12-23 10:39:31,418] INFO - Streaming transport linked up: (<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60>, _StreamingProtocolShim: <SelectConnection PROTOCOL transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60> params=<ConnectionParameters host=rabbitmq-0.rabbitmq.testing.svc.cluster.local port=5672 virtual_host=/ ssl=False>>). {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/connection_workflow.py:428}
2020-12-23 10:39:31,421] INFO - AMQPConnector - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60> params=<ConnectionParameters host=rabbitmq-0.rabbitmq.testing.svc.cluster.local port=5672 virtual_host=/ ssl=False>> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/connection_workflow.py:293}
2020-12-23 10:39:31,421] INFO - AMQPConnectionWorkflow - reporting success: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60> params=<ConnectionParameters host=rabbitmq-0.rabbitmq.testing.svc.cluster.local port=5672 virtual_host=/ ssl=False>> {/usr/local/lib/python3.9/site-packages/pika/adapters/utils/connection_workflow.py:725}                                                                                                                          
2020-12-23 10:39:31,421] INFO - Connection workflow succeeded: <SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x7f81b3099a60> params=<ConnectionParameters host=rabbitmq-0.rabbitmq.testing.svc.cluster.local port=5672 virtual_host=/ ssl=False>> {/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py:452} 
2020-12-23 10:39:31,422] INFO - Created channel=1 {/usr/local/lib/python3.9/site-packages/pika/adapters/blocking_connection.py:1247
}

My consumer has the following definition:

def publish_message(channel, message):
    channel.basic_publish(exchange='',
                        routing_key='my_queue',
                        body=message)


def connect_to_mq():
    credentials = pika.PlainCredentials(rabbit_user, rabbit_password)
    parameters = pika.ConnectionParameters(rabbit_host, rabbit_port, '/', credentials)
    connection = pika.BlockingConnection(parameters=parameters)
    channel = connection.channel()
    channel.queue_declare(queue='my_queue')
    return connection, channel
    
    
def on_message(channel, method_frame, header_frame, body):
    message = body.decode('utf-8')
    if message == 'do_work':
        thread = threading.Thread(target=start_processing, args=(channel,))
        thread.start()
        publish_message(channel, 'initiated thread')
    
    
def start_processing(channel):
    publish_message(channel, 'starting...')
    time.sleep(240)
    publish_message(channel, 'processing complete!')


def main():
    connection, channel = connect_to_mq()
    channel.basic_consume(queue='my_queue',
                        auto_ack=True,
                        on_message_callback=on_message)

    channel.start_consuming()

Is there anything inherently wrong with my implementation and strategy for handling messages and workloads in separate threads that is causing this to happen?

Hypogenous answered 23/12, 2020 at 11:9 Comment(0)
T
12

Pika isn't thread safe by default. You should ideally keep one connection per thread.

There are a bunch of example implementation here, and I have a thread safe rpc example here that you look at as well, but I would recommend using one of their reference implementations for threading.

Thenceforward answered 3/1, 2021 at 20:46 Comment(3)
Thank you. I have successfully resolved the issue by wrapping the calls to publish_message (the ones which are happening in the a separate thread) in functions submitted to Connection.add_callback_threadsafeHypogenous
hey, could you provide the code example? thanks!Tude
github.com/pika/pika/blob/1.3.x/examples/…Felsite
B
1

As eandersson said, Pika is not thread safe. and you can't share one connection between threads...

Use the amqpstorm if you want thread safety.

Here is a simple example of using pika and amqpstorm in multithreaded application:

import amqpstorm
import time
import threading
import multiprocessing



def simple_consumer(conn: amqpstorm.Connection):
    with conn.channel() as channel:
        while True:
            msg = channel.basic.get('fruits')
            if msg is None:
                time.sleep(1)
                continue
            print(msg.body)
            msg.ack()
    return


def producer_task(conn: amqpstorm.Connection, counter: int):
    with conn.channel() as channel:
        while counter > 0:
            channel.queue.declare('fruits')
            print(f'Thread {counter}')
            message = amqpstorm.Message.create(
                channel,
                body=f'Hello RabbitMQ! {counter}',
                properties={
                    'content_type': 'text/plain',
                    "expiration": '5000'
                }
            )
            message.publish('fruits')
            counter -= 1
            time.sleep(1)
    print(f'end {counter}')
    return

def main():
    conn = amqpstorm.Connection('localhost', 'guest', 'guest')
    p1 = threading.Thread(
        target=producer_task,
        kwargs={'conn': conn, 'counter': 10},
    )
    p1.start()
    
    p2 = threading.Thread(
        target=producer_task,
        kwargs={'conn': conn, 'counter': 20}
    )
    p2.start()
    
    p3 = threading.Thread(
        target=producer_task,
        kwargs={'conn': conn, 'counter': 30}
    )
    p3.start()
    
    p4 = threading.Thread(
        target=simple_consumer,
        kwargs={'conn': conn}
    )
    p4.start()
    
    p1.join()
    p2.join()
    p3.join()
    p4.join()
    
    print('end main')
    return

if __name__ == '__main__':
    main()


AS THE FOOTNOTE: Notice that you can not also share one connection between processes in amqpstorm, due to each process has it's memory space.

the same app will not working using pika:

import pika
import time
import threading


def on_message(message):
    print("Message:", message.body)    
    return


def consumer(conn: pika.BlockingConnection):
    with conn.channel() as channel:
        channel.queue_declare(queue='fruits')
        channel.basic_consume(queue='fruits', on_message_callback=on_message, auto_ack=True)
        
        try:
            channel.start_consuming()
        except KeyboardInterrupt:
            channel.close()


def producer_task(conn: pika.BlockingConnection, counter: int):
    with conn.channel() as channel:
        while counter > 0:
            channel.queue_declare('fruits')
            print(f'Thread {counter}')
            channel.basic_publish(
                exchange='',
                routing_key='fruits',
                body=f'Hello RabbitMQ! {counter}',
                properties=pika.BasicProperties(expiration='5000')
            )
            counter -= 1
            time.sleep(1)
    print(f'end {counter}')
    return



def main():
    conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    p1 = threading.Thread(
        target=producer_task,
        kwargs={'conn': conn, 'counter': 10},
    )
    p1.start()
    
    p2 = threading.Thread(
        target=producer_task,
        kwargs={'conn': conn, 'counter': 20}
    )
    p2.start()

    p3 = threading.Thread(
        target=producer_task,
        kwargs={'conn': conn, 'counter': 30}
    )
    p3.start()

    p4 = threading.Thread(
        target=consumer,
        kwargs={'conn': conn}
    )
    p4.start()

    p1.join()
    p2.join()
    p3.join()
    p4.join()
    
    print('end main')
    return



if __name__ == '__main__':
    main()
Blain answered 15/2 at 5:12 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.