How to detect dead RabbitMQ connection?
Asked Answered
S

4

23

I have a RabbitMQ consumer script in Go. This is a simple script from RabbitMQ tutorial that uses streadway/amqp library.

The problem is that if the RabbitMQ server is stopped, the consumer script does not exit; and when RabbitMQ server is restarted, the consumer does not receive messages anymore.

Is there a way to detect that the consumer connection is dead and reconnect, or at least terminate the consumer script?

I know that the library sets a default 10 sec. heartbeat interval for the connection; is it possible to use that someway?

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "test_task_queue", // name
        true,         // durable
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            d.Ack(false)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second)
            log.Printf("Done")
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}
Siblee answered 1/2, 2017 at 23:43 Comment(0)
G
28

amqp.Connection has method NotifyClose() which return channel signalling a transport or protocol error. So something like

for {  //reconnection loop
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") //setup
    notify := conn.NotifyClose(make(chan *amqp.Error)) //error channel
...
    ch, err := conn.Channel()
    msgs, err := ch.Consume(
...
    for{  //receive loop
        select {  //check connection
            case err = <-notify:
            //work with error
            break //reconnect
        case d = <- msgs:
            //work with message
        ...
        }
    }
}
Guimond answered 2/2, 2017 at 1:23 Comment(1)
it should be noted that if someone calls Close on the connection, NotifyClose would return with nil. this allows differentiating between errors (which would likely warrant reconnection) and normal app termination.Return
F
8

There are a couple ways of doing this: checking whether the delivery channel is closed or using Channel.NotifyClose.

Checking the delivery channel

After starting the consumer, you will receive from the delivery channel. As you know, the receive operation may take the special form x, ok := <-ch, where ok is false when x has a zero value due the channel being closed (and empty):

conn, _ := amqp.Dial(url)
ch, _ := conn.Channel()

delivery, _ := ch.Consume(
        queueName,
        consumerName,
        true,  // auto ack
        false, // exclusive
        false, // no local
        true,  // no wait,
        nil,   // table
    )

for {
    payload, ok := <- delivery
    if !ok {
        // ... channel closed
        return
    }
}

This works because the Go channel <-chan amqp.Delivery will be closed when the AMQP channel is closed or an error occurs:

[It] continues deliveries to the returned chan Delivery until Channel.Cancel, Connection.Close, Channel.Close, or an AMQP exception occurs.

Using Channel.NotifyClose

This is straightforward. And the principle is the same:

NotifyClose registers a listener for when the server sends a channel or connection exception in the form of a Connection.Close or Channel.Close method.

The channel returned by NotifyClose is the same you pass as argument; the method only registers it internally, so you can do just:

errC := ch.NotifyClose(make(chan *amqp.Error, n))

where n is a non-zero buffer size. Make sure to pass a buffered channel to NotifyClose otherwise, depending on how your code is structured, the library may block on send.

Then you can receive on the errC channel and take action depending on the type of error you get. In short, the error can be:

  • a connection error, usually unrecoverable
  • a channel error, also called soft exception, usually recoverable by resetting the connection
  • nil if the program calls conn.Close() on purpose

To know whether the error is recoverable or not, you can inspect the amqp.Error's Code field and/or the Recover field, which is set to true in case of soft exceptions.

The following func shows how error codes can be distinguished — this is provided as additional insight. For the general case, just check Error.Recover:

const (
    ConnectionError = 1
    ChannelError    = 2
)

func isConnectionError(err *amqp.Error) bool {
    return errorType(err.Code) == ConnectionError
}

func isChannelError(err *amqp.Error) bool {
    return errorType(err.Code) == ChannelError
}

func errorType(code int) int {
    switch code {
    case
        amqp.ContentTooLarge,    // 311
        amqp.NoConsumers,        // 313
        amqp.AccessRefused,      // 403
        amqp.NotFound,           // 404
        amqp.ResourceLocked,     // 405
        amqp.PreconditionFailed: // 406
        return ChannelError

    case
        amqp.ConnectionForced, // 320
        amqp.InvalidPath,      // 402
        amqp.FrameError,       // 501
        amqp.SyntaxError,      // 502
        amqp.CommandInvalid,   // 503
        amqp.ChannelError,     // 504
        amqp.UnexpectedFrame,  // 505
        amqp.ResourceError,    // 506
        amqp.NotAllowed,       // 530
        amqp.NotImplemented,   // 540
        amqp.InternalError:    // 541
        fallthrough

    default:
        return ConnectionError
    }
}
Featly answered 26/8, 2021 at 20:48 Comment(0)
A
1

This may help someone

// MAIN PACKAGE - "cmd/my-project-name/main.go"

package main

import (
    "my-proyect-name/rmq"
)

func main() {
    // RMQ
    rmq.ConnectToRMQ()
}
// RMQ PACKAGE - "rmq"
import (
    "errors"
    "log"
    "ms-gcp-cloud-storage/constants"
    "time"

    amqp "github.com/rabbitmq/amqp091-go"
)

const (
    rmqCredentials string = "amqp://user:pswd@localhost:5672"
    rmqQueue       string = "golang-queue:new"
    rmqExchange    string = constants.RMQ_DIRECT_EXCHANGE // "" empty string
    rmqContentType string = "application/json"
)

var conn *amqp.Connection
var chann *amqp.Channel

func hasError(err error, msg string) {
    if err != nil {
        log.Printf("%s: %s", msg, err)
    }
}

func ConnectToRMQ() (err error) {
    conn, err = amqp.Dial(rmqCredentials)
    if err != nil {
        return errors.New("Error de conexion: " + err.Error())
    }

    chann, err = conn.Channel()
    if err != nil {
        return errors.New("Error al abrir canal: " + err.Error())
    }

    q, err := chann.QueueDeclare(
        rmqQueue, // name
        true,     // durable
        false,    // delete when unused
        false,    // exclusive
        false,    // no-wait
        nil,      // arguments
    )

    if err != nil {
        log.Fatalf("Error al declarar queue %v\n", q.Name)
    }

    log.Printf("Conectado al Queue: %v\n", q.Name)

    observeConnection()

    return nil
}

func observeConnection() {
    go func() {
        log.Printf("Conexion perdida: %s\n", <-conn.NotifyClose(make(chan *amqp.Error)))
        log.Printf("Intentando reconectar con RMQ\n")

        closeActiveConnections()

        for err := ConnectToRMQ(); err != nil; err = ConnectToRMQ() {
            log.Println(err)
            time.Sleep(5 * time.Second)
        }
    }()
}

// Can be also implemented in graceful shutdowns
func closeActiveConnections() {
    if !chann.IsClosed() {
        if err := chann.Close(); err != nil {
            log.Println(err.Error())
        }
    }

    if conn != nil && !conn.IsClosed() {
        if err := conn.Close(); err != nil {
            log.Println(err.Error())
        }
    }
}

// SendMessage - message without response
func SendMessage(body string) {
    err := chann.Publish(
        rmqExchange, // exchange
        rmqQueue,    // routing key
        false,       // mandatory
        false,       // immediate
        amqp.Publishing{
            ContentType:  rmqContentType,
            DeliveryMode: constants.RMQ_PERSISTENT_MSG,
            Body:         []byte(body),
        })

    if err != nil {
        log.Printf("%s\n %s\n", "Error al publicar mensaje", err)
        log.Println(body)
    }
}

Assuntaassur answered 10/6, 2022 at 21:6 Comment(1)
How may it help someone? Does it answer the question? Provide some explanation; answers that are just code dumps are likely to be downvoted and deleted.Shoemake
D
-1

It has not been found that the go-amqp library implements the disconnection and reconnection function of the connection pool.
There is an open source code based on Amqp secondary packaging on github. Reconnect after disconnection and abnormal reconnect have been supported. The code is also relatively simple to use, and each service has a connection and channel.

Source Code here

Example Code:

package main
 
import (
    "go-rabbit/rabbit"
)
 
/*
    support isconnection and reconnection function
    And Failure re-send function
    @author : Bill
*/
func main() {
    var(
        addr = "amqp://guest:guest@localhost:5672/"
        queue = "testQueue"
        exchange = "test_exchange"
        routerKey = "/test"
        msg = "test1!"
 
        //delay
        delayQueue = "delay_queue"
        delayExchange = "delay_exchange"
        delayRouterKey = "delay_exchange"
        prefix = "v1_prefix"
        sep = "_"
        eType = "F"
        _ttl = 60 * 1000
    )
 
    var rabbitProduct1 = rabbit.NewRabbitProduct(addr,_ttl,prefix,sep,delayExchange,delayQueue,delayRouterKey)
    // register recycle
    go rabbitProduct1.InitDefdelay(false)
    go rabbitProduct1.InitDefdelay(true)
    go rabbitProduct1.RegisterDelayWithPreFix("delay_queue","delay_exchange","delay_exchange")
 
    // ttl is dead recycle time if ttl > 0 then recycle
    rabbitProduct1.PubMessage(true,eType,queue,exchange,routerKey,msg,rabbitProduct1.GetBool(1),rabbitProduct1.GetBool(0),_ttl)
 
}

Wish it will help you or give you some idea

Deepsix answered 20/7, 2020 at 6:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.