RabbitMQ Pika connection reset , (-1, ConnectionResetError(104, 'Connection reset by peer'))
Asked Answered
M

3

7

searched through stackoverflow and posting this question because no solution worked for me and my question might be different from other question.

I am writing a script which gets an article from rabbitMQ queue and process the article to count words and extract key words from it and dump it in db. my script is working fine but after some time of execution i get this exception
(-1, "ConnectionResetError(104, 'Connection reset by peer')")

I have no idea why am I getting this. I have tried a lot of solutions available on stackover flow none is working for me. I havr written my script and tried it in two different ways. both work fine but after some time same exception occurs.

here is my first code:

def app_main():

    global channel, results, speedvars
    Logger.log_message('Starting app main')

    # Edit 4
    def pika_connect():
        connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=Config.AMQ_DAEMONS['base']['amq-host']))
        channel = connection.channel()
        print ("In pika connect")
        Logger.log_message('Setting up input queue consumer')
        channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
        channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

        Logger.log_message('Starting loop')
        channel.start_consuming()

    #########

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()


    # Edit 5 starting 10 threads to listen to pika 

    for th in range(qthreads):
        Logger.log_message('Starting thread: '+str(th))
        try:
            t = Thread(target=pika_connect, args=())
            t.start()
        except Exception as e:
            Logger.error_message("Exception in starting threads " + str(e))



try:
    app_main()
except Exception as e:
    Logger.error_message("Exception in APP MAIN " + str(e))

here is my second code:

def app_main():

    global channel, results, speedvars
    Logger.log_message('Starting app main')

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()

    connection = pika.BlockingConnection(pika.ConnectionParameters(
             host=Config.AMQ_DAEMONS['base']['amq-host']))
    channel = connection.channel()
    print ("In app main")
    Logger.log_message('Setting up input queue consumer')
    channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)
    channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)

    Logger.log_message('Starting loop')

    try:
        channel.start_consuming()
    except Exception as e:
        Logger.error_message("Exception in start_consuming in main " + str(e))
        raise e


try:
    app_main()
except Exception as e:
Logger.error_message("Exception in APP MAIN " + str(e))


in my first code i used threading because i want to speed up the process of processing articles.
this is my call back fuction
def on_message(ch, method, properties, message): Logger.log_message("Starting parsing new msg ") handle_message(message)

EDIT: Full Code

import os
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)

from Modules import Logger
import pika
import Config
import json
import pickle
import Pipeline
import sys
import time
import datetime
import threading
import queue
import functools

from pid.decorator import pidfile

Logger.log_init(Config.AMQ_DAEMONS['consumer']['log-ident'])
#qthreads = Config.AMQ_DAEMONS['consumer']['threads']
results = queue.Queue()
channel = None
speedvars = None

SPD_RECEIVED = 'received'
SPD_DISCARDED = 'discarded'
SPD_SENT = 'sent'

class SpeedVars(object):
    vars = {}
    lock = None

    def __init__(self):
        self.lock = threading.Lock()

    def inc(self, var):

        self.lock.acquire()
        try:
            if var in self.vars:
                self.vars[var] += 1
            else:
                self.vars[var] = 1
        finally:
            self.lock.release()


    def dec(self, var):

        self.lock.acquire()
        try:
            if var in self.vars:
                self.vars[var] -= 1
            else:
                Logger.error_message('Cannot decrement ' + var + ', not tracked')
        finally:
            self.lock.release()

    def get(self, var):

        out = None
        self.lock.acquire()
        try:
            if var in self.vars:
                out = self.vars[var]
            else:
                Logger.error_message('Cannot get ' + var + ', not tracked')
        finally:
            self.lock.release()


        return out

    def get_all(self):

        out = None
        self.lock.acquire()
        try:
            out = self.vars.copy()
        finally:
            self.lock.release()


        return out


class SpeedTracker(threading.Thread):
    speedvars = None
    start_ts = None
    last_vars = {}

    def __init__(self, speedvars):
        super(SpeedTracker, self).__init__()
        self.start_ts = time.time()
        self.speedvars = speedvars
        Logger.log_message('Setting up speed tracker')

    def run(self):
        while True:
            time.sleep(Config.AMQ_DAEMONS['consumer']['speed-tracking-interval'])
            prev = self.last_vars
            cur = self.speedvars.get_all()
            now = time.time()
            if len(prev) > 0:
                q = {}
                for key in cur:
                    qty = cur[key] - prev[key]
                    avg = qty / Config.AMQ_DAEMONS['consumer']['speed-tracking-interval']
                    overall_avg = cur[key] / (now - self.start_ts)
                    Logger.log_message('Speed-tracking (' + key + '): total ' + str(cur[key])
                                       + ', delta ' + str(qty) + ', speed ' + '%0.2f' % avg + '/sec, '
                                       + ', overall speed ' + '%0.2f' % overall_avg + '/sec')
                pending = cur[SPD_RECEIVED] - cur[SPD_DISCARDED] - cur[SPD_SENT]
                pending_avg = pending / (now - self.start_ts)
                Logger.log_message('Speed-tracking (pending): total ' + str(pending)
                                   + ', overall speed ' + '%0.2f' % pending_avg + '/sec')
            self.last_vars = cur


class ResultsSender(threading.Thread):
    channel = None
    results = None
    speedvars = None

    def __init__(self, results, speedvars):
        super(ResultsSender, self).__init__()
        connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=Config.AMQ_DAEMONS['base']['amq-host']))
        self.channel = connection.channel()
        Logger.log_message('Setting up output exchange')
        self.channel.exchange_declare(exchange=Config.AMQ_DAEMONS['consumer']['output'], exchange_type='direct')
        self.results = results
        self.speedvars = speedvars

    def run(self):
        while True:
            item = self.results.get()
            self.channel.basic_publish(
                exchange=Config.AMQ_DAEMONS['consumer']['output'],
                routing_key='',
                body=item)
            self.speedvars.inc(SPD_SENT)

def parse_message(message):
    try:
        bodytxt = message.decode('UTF-8')
        body = json.loads(bodytxt)
        return body
    except Exception as e:
        Logger.error_message("Cannot parse message - " + str(e))
        raise e

def get_body_elements(body):
    try:
        artid = str(body.get('article_id'))
        article_dt = datetime.datetime.fromtimestamp(body.get('pubTime'))
        date = article_dt.strftime(Config.DATE_FORMAT)
        article = "\n".join([body.get('title', ''), body.get('subheading', ''), body.get('content', '')])
        return (artid, date, article)
    except Exception as e:
        Logger.error_message("Cannot retrieve article attributes " + str(e))
        raise e

def process_article(id, date, text):
    global results, speedvars
    try:
        Logger.log_message('Processing article ' + id)
        keywords = Pipeline.extract_keywords(text)
        send_data = {"id": id, "date": date, "keywords": keywords}
        results.put(pickle.dumps(send_data))
        # print('Queue Size:',results.qsize())
    except Exception as e:
        Logger.error_message("Problem processing article " + str(e))
        raise e

def ack_message(ch, delivery_tag):
    """Note that `channel` must be the same pika channel instance via which
    the message being ACKed was retrieved (AMQP protocol constraint).
    """
    if channel.is_open:
        channel.basic_ack(delivery_tag)
    else:
        Logger.error_message("Channel is already closed, so we can't ACK this message" + str(e))
        # Channel is already closed, so we can't ACK this message;
        # log and/or do something that makes sense for your app in this case.
        #pass

def handle_message(connection, ch, delivery_tag, message):
    global speedvars
    start = time.time()
    thread_id = threading.get_ident()

    try:
        speedvars.inc(SPD_RECEIVED)
        body = parse_message(message)
        (id, date, text) = get_body_elements(body)
        words = len(text.split())
        if words <= Config.AMQ_DAEMONS['consumer']['word-count-limit']:
            process_article(id, date, text)
        else:
            Logger.log_message('Ignoring article, over word count limit')
            speedvars.inc(SPD_DISCARDED)

    except Exception as e:
        Logger.error_message("Could not process message - " + str(e))

    cb = functools.partial(ack_message, ch, delivery_tag)
    connection.add_callback_threadsafe(cb)

    Logger.log_message("Thread id: "+str(thread_id)+" Delivery tag: "+str(delivery_tag)) 
    Logger.log_message("TOtal time taken to handle message : "+ str(time.time()-start))

# CALL BACK    
## def on_message(ch, method, properties, message):
##    global executor
##    executor.submit(handle_message, message)

def on_message(ch, method, header_frame, message, args):
    (connection, threads) = args
    delivery_tag = method.delivery_tag
    t = threading.Thread(target=handle_message, args=(connection, ch, delivery_tag, message))
    t.start()
    threads.append(t)


####################################################
@pidfile(piddir=Config.AMQ_DAEMONS['base']['pid-dir'], pidname=Config.AMQ_DAEMONS['consumer']['pid-file'])
def app_main():
    global channel, results, speedvars

    speedvars = SpeedVars()
    speedtracker = SpeedTracker(speedvars)
    speedtracker.start()

    sender = ResultsSender(results, speedvars)
    sender.start()


    # Pika Connection
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                host=Config.AMQ_DAEMONS['base']['amq-host']))
    channel = connection.channel()

    Logger.log_message('Setting up input queue consumer')
    channel.queue_declare(Config.AMQ_DAEMONS['consumer']['input'], durable=True)

    #channel.basic_consume(on_message, queue=Config.AMQ_DAEMONS['consumer']['input'], no_ack=True)
    channel.basic_qos(prefetch_count=1)
    threads = []
    on_message_callback = functools.partial(on_message, args=(connection, threads))
    channel.basic_consume(on_message_callback, Config.AMQ_DAEMONS['consumer']['input'])

    Logger.log_message('Starting loop')
    ## channel.start_consuming()
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()

    Wait for all to complete
    for thread in threads:
        thread.join()

    connection.close()


app_main()  

pika is not taking a lot of time to process message still i am facing connection reset issue.
**TOtal time taken to handle message : 0.0005991458892822266 **

Mozell answered 2/1, 2019 at 8:46 Comment(3)
What does the RabbitMQ log contain?Mayramays
@LukeBakken =ERROR REPORT==== 1-Jan-2019::12:45:17 === closing AMQP connection <0.13654.58> ([::1]:44022 -> [::1]:5672): {writer,send_failed,{error,timeout}} =ERROR REPORT==== 1-Jan-2019::12:48:19 === closing AMQP connection <0.13560.58> ([::1]:44006 -> [::1]:5672): missed heartbeats from client, timeout: 60s this is what my log file says. but i don't get it why is it missing heartbeats my script is dequeuing articles from rabbitmq queue almost every 2 3 secondsMozell
Does this answer your question? Handling long running tasks in pika / RabbitMQRuthenium
M
14

Your handle_message method is blocking heartbeats because all of your code, including the Pika I/O loop, is running on the same thread. Check out this example of how to run your work (handle_message) on a separate thread from Pikas I/O loop and then acknowledge messages correctly.

Mayramays answered 3/1, 2019 at 13:13 Comment(8)
Using this example connection error problem has been solved. but now mermory starts increasing slowly and keeps on increasing. I don't know whats causing this issue.Mozell
It's probably an issue in your code, but since you don't provide your code, it is impossible for anyone to continue to help.Mayramays
I have added full code of my consumer can you check the new edit ? do let me knowif you can help. thanks in advance :) my code is continuously processing articles and i do not want to stop it ever.Mozell
The example code for using threads is just that, an example, and is not meant to be considered "production code". What do you think happens in your application as you keep appending to the threads list?Mayramays
I tried it without appending as well because i thought that might be he issue as my consumer never stops and it never execute join() part. but it is not the problem. something somewhere is eating the memory and I still do not know whats wrong with my threads.Mozell
I tried my code without threading in it and the connection was again dropping after 5 mins. handle_message() is not taking too long to process an article but pika still misses heartbeats TOtal time taken to handle message : 0.0005991458892822266 this is the time it takes to process one message. is there anything you can suggest ?Mozell
Considering the number of people successfully using Pika without this issue in production, I can only conclude your code is still somehow responsible. If you can provide a working set of code to reproduce what you are seeing, open an issue here and provide the code as well as what version of Pika, Python, RabbitMQ, and Erlang you are using. By "working set of code" I mean something I can download and run. Thanks.Mayramays
Thanks @LukeBakken for the solution; was struggling since 3 days - before I read your solution. :-) Should have gone to stackoverflow on 1st attempt.Rein
R
5

I was getting the same issue . Increasing the duration of heart-beat & connection timeouts configuration didn't work out for me. I finally figured out that, if you have already created a channel and you are not publishing anything on it for several minutes(20 mins in my case) ,in that case we get this error.
The Solutions which worked for me:

  1. Create channel immediately just before publishing any message. OR

  2. Use try-except and if you get an exception , create another channel and republish. ie.

     try:
         channel.basic_publish(exchange='', routing_key='abcd', body=data)
     except Exception as e1:
         connection=pika.BlockingConnection(pika.ConnectionParameters(host='1.128.0.3',credentials=credentials))
         channel = connection.channel()
         channel.basic_publish(exchange='', routing_key='abcd', body=data)
    

This will atleast keep the things running and prevent from losing any data. I'm not an expert in this, but hope this helps someone!

Radiochemistry answered 30/4, 2021 at 10:10 Comment(0)
R
3

I also faced the same issue and resolved by increasing the duration for heart-beat & connection timeouts configuration.

Many thanks to @LukeBakken who has actually identified the root cause.

Here is how you can configure the timeouts:

import pika


def main():

    # NOTE: These parameters work with all Pika connection types
    params = pika.ConnectionParameters(heartbeat=600, blocked_connection_timeout=300)

    conn = pika.BlockingConnection(params)

    chan = conn.channel()

    chan.basic_publish('', 'my-alphabet-queue', "abc")

    # If publish causes the connection to become blocked, then this conn.close()
    # would hang until the connection is unblocked, if ever. However, the
    # blocked_connection_timeout connection parameter would interrupt the wait,
    # resulting in ConnectionClosed exception from BlockingConnection (or the
    # on_connection_closed callback call in an asynchronous adapter)
    conn.close()


if __name__ == '__main__':
    main()

Reference: https://pika.readthedocs.io/en/stable/examples/heartbeat_and_blocked_timeouts.html

Rein answered 7/10, 2020 at 12:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.