AnyEvent::RabbitMQ issues with closed channels
Asked Answered
S

1

6

I'm writing a master program for publishing message into a message queue (RabbitMQ). The program is written in Perl 5 and is using AnyEvent::RabbitMQ for the communication to RabbitMQ.

The following minimal example (for the issue I ran into) will fail on a second command send via the same channel with the error "Channel closed".

use strictures 2;

use AnyEvent::RabbitMQ;

main();

############################################################################
sub main {
  _log( debug => 'main' );
  my $condvar = AnyEvent->condvar;
  my $ar      = AnyEvent::RabbitMQ->new;
  $ar->load_xml_spec;
  _log( debug => 'Connecting to RabbitMQ...' );
  $ar->connect(
    host            => 'localhost',
    port            => 5672,
    user            => 'guest',
    pass            => 'guest',
    vhost           => '/',
    timeout         => 1,
    tls             => 0,
    on_success      => sub { _on_connect_success( $condvar, $ar, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure', @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return', @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close', @_ ) },
  );
  $condvar->recv;
  $ar->close;
  return;
}

############################################################################
sub _on_connect_success {
  my ( $condvar, $ar, $new_ar ) = @_;
  _log( debug => 'Connected to RabbitMQ.' );
  _open_channel( $condvar, $new_ar );
  return;
}

############################################################################
sub _open_channel {
  my ( $condvar, $ar ) = @_;
  _log( debug => 'Opening RabbitMQ channel...' );
  $ar->open_channel(
    on_success => sub { _on_open_channel_success( $condvar, $ar, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
  );
  return;
}

############################################################################
sub _on_open_channel_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Opened RabbitMQ channel.' );
  _declare_queue( $condvar, $ar, $channel );
  return;
}

############################################################################
sub _declare_queue {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declaring RabbitMQ queue...' );
  $channel->declare_queue(
    queue       => 'test',
    auto_delete => 1,
    passive     => 0,
    durable     => 0,
    exclusive   => 0,
    no_ack      => 1,
    ticket      => 0,
    on_success =>
      sub { _on_declare_queue_success( $condvar, $ar, $channel, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
  );
  return;
}

############################################################################
sub _on_declare_queue_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declared RabbitMQ queue.' );
  _bind_queue( $condvar, $ar, $channel );
  return;
}

############################################################################
sub _bind_queue {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binding RabbitMQ queue...' );
  $channel->bind_queue(
    queue       => 'test',
    exchange    => '',
    routing_key => '',
    on_success => sub { _on_bind_queue_success( $condvar, $ar, $channel, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
  );
  return;
}

############################################################################
sub _on_bind_queue_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binded RabbitMQ queue.' );
  _log( info  => 'Master ready to publish messages.' );
  _publish_message( $condvar, $ar, $channel, 'Hello, world!' );
  return;
}

############################################################################
sub _publish_message {
  my ( $condvar, $ar, $channel, $message ) = @_;
  _log( debug => "Publishing RabbitMQ message ($message)..." );
  $channel->publish(
    queue       => 'test',
    exchange    => '',
    routing_key => '',
    body        => $message,
    header      => {},
    mandatory   => 0,
    immediate   => 0,
    on_success =>
      sub { _on_publish_message_success( $condvar, $ar, $channel, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
    on_ack          => sub { _error( $condvar, $ar, 'ack',          @_ ) },
    on_nack         => sub { _error( $condvar, $ar, 'nack',         @_ ) },
  );
  return;
}

############################################################################
sub _on_publish_message_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => "Published RabbitMQ message." );
  sleep 1;
  _publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
  return;
}

############################################################################
sub _error {
  my ( $condvar, $ar, $type, @error ) = @_;
  _log( error => sprintf '%s - %s', $type, join ', ', @error );
  $condvar->send( $condvar, $ar, $type, @error );
  return;
}

############################################################################
sub _log {
  my ( $level, $message ) = @_;
  my @time = gmtime time;
  $time[5] += 1900;
  $time[4] += 1;
  my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00', @time[ 5, 4, 3, 2, 1, 0 ];
  my @caller0    = caller(0);
  my @caller1    = caller(1);
  my $subroutine = $caller1[3];
  $subroutine =~ s/^$caller0[0]:://;
  print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])\n";
  return;
}

This program should:

  • connect to RabbitMQ
  • opens a RabbitMQ channel
  • declares a simpe queue (named "test")
  • bind to that queue (named "test")
  • publish a message ("Hello, world!")
  • after successfull publishing the message wait a second and publish another message

This program (master program) should not consume messages. There are other programs out there to do this job.

The minimal example (see above) will produce the following output:

2015-08-12T13:02:07+00:00 [debug] main at minimal.pl line 9 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connecting to RabbitMQ... at minimal.pl line 13 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connected to RabbitMQ. at minimal.pl line 36 (_on_connect_success; from minimal.pl line 22)
2015-08-12T13:02:07+00:00 [debug] Opening RabbitMQ channel... at minimal.pl line 44 (_open_channel; from minimal.pl line 37)
2015-08-12T13:02:07+00:00 [debug] Opened RabbitMQ channel. at minimal.pl line 58 (_on_open_channel_success; from minimal.pl line 46)
2015-08-12T13:02:07+00:00 [debug] Declaring RabbitMQ queue... at minimal.pl line 66 (_declare_queue; from minimal.pl line 59)
2015-08-12T13:02:07+00:00 [debug] Declared RabbitMQ queue. at minimal.pl line 88 (_on_declare_queue_success; from minimal.pl line 76)
2015-08-12T13:02:07+00:00 [debug] Binding RabbitMQ queue... at minimal.pl line 96 (_bind_queue; from minimal.pl line 89)
2015-08-12T13:02:07+00:00 [error] failure - Channel closed at minimal.pl line 155 (_error; from minimal.pl line 102)
2015-08-12T13:02:07+00:00 [error] close - Net::AMQP::Frame::Method=HASH(0x38fe1c8) at minimal.pl line 155 (_error; from minimal.pl line 50)

Why does AnyEvent::RabbitMQ or RabbitMQ itself closes the channel (not the connection or did I miss something)?

Sarraceniaceous answered 12/8, 2015 at 13:0 Comment(0)
M
6

If you take a look at the RabbitMQ server logs you will see something like this:

{amqp_error,access_refused,"operation not permitted on the default exchange",'queue.bind'}

Apparently it doesn't let you bind a queue on the default exchange. So you need to declare and bind your own exchange first.

sub _declare_exchange {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declaring RabbitMQ exchange...' );
  $channel->declare_exchange(
    exchange        => 'testest',
    type            => 'fanout',
    on_success =>
      sub { _on_declare_exchange_success( $condvar, $ar, $channel, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
  );
  return;
}

############################################################################
sub _on_declare_exchange_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declared RabbitMQ exchange.' );
  _bind_exchange( $condvar, $ar, $channel );
  return;
}

############################################################################
sub _bind_exchange {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binding RabbitMQ exchange...' );
  $channel->bind_exchange(
    source      => 'testest',
    destination => 'testest',
    routing_key => '',
    on_success => sub { _on_bind_exchange_success( $condvar, $ar, $channel, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
  );
  return;
}

Once you've set those subs up, tell your program to use this custom exchange.

sub _on_open_channel_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Opened RabbitMQ channel.' );
  $channel->confirm;
  _declare_exchange( $condvar, $ar, $channel );
  return;
}

The $channel->confirm is necessary to make RabbitMQ answer with a confirmation when you send a message to the queue. If you do not do that, the success handler will never get called because there are no success responses coming back.

Then in your _bind_queue you need to add the exchange to the bind_queue() call.

  $channel->bind_queue(
    queue       => 'test',
    exchange    => 'testest', # <-- here
    routing_key => '',
    # ...
  );

The same needs to be done in the _publish_message with the publish() call. There you should also replace the on_ack handler with something that actually deals with the acknowledgement. I think you intended to do that but had a copy/paste error1.

$channel->publish(
  queue       => 'test',
  exchange    => 'testest', # <-- here
  routing_key => '',
  # ...
  on_ack          => sub { 
  _on_publish_message_success( $condvar, $ar, $channel, @_ );
  },
);

One more thing is that the sleep call in _on_publish_message_success is not a good idea when you are working with AnyEvent as that will stop the whole program. Use an AE::timer instead.

my $t; 
$t = AE::timer(1,0,sub {
  _publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
  undef $t;
});

Here is the full code with all the changes.

use strictures 2;

use AnyEvent::RabbitMQ;

main();

############################################################################
sub main {
  _log( debug => 'main' );
  my $condvar = AnyEvent->condvar;
  my $ar      = AnyEvent::RabbitMQ->new;
  $ar->load_xml_spec;
  _log( debug => 'Connecting to RabbitMQ...' );
  $ar->connect(
    host            => 'localhost',
    port            => 5672,
    user            => 'guest',
    pass            => 'guest',
    vhost           => '/guest',
    timeout         => 1,
    tls             => 0,
    on_success      => sub { _on_connect_success( $condvar, $ar, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure', @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return', @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close', @_ ) },
  );
  $condvar->recv;
  $ar->close;
  return;
}

############################################################################
sub _on_connect_success {
  my ( $condvar, $ar, $new_ar ) = @_;
  _log( debug => 'Connected to RabbitMQ.' );
  _open_channel( $condvar, $new_ar );
  return;
}

############################################################################
sub _open_channel {
  my ( $condvar, $ar ) = @_;
  _log( debug => 'Opening RabbitMQ channel...' );
  $ar->open_channel(
    on_success => sub { _on_open_channel_success( $condvar, $ar, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
  );
  return;
}

############################################################################
sub _on_open_channel_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Opened RabbitMQ channel.' );
  $channel->confirm;
  _declare_exchange( $condvar, $ar, $channel );
  return;
}

############################################################################
sub _declare_exchange {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declaring RabbitMQ exchange...' );
  $channel->declare_exchange(
    exchange        => 'testest',
    type            => 'fanout',
    on_success =>
      sub { _on_declare_exchange_success( $condvar, $ar, $channel, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
  );
  return;
}

############################################################################
sub _on_declare_exchange_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declared RabbitMQ exchange.' );
  _bind_exchange( $condvar, $ar, $channel );
  return;
}

############################################################################
sub _bind_exchange {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binding RabbitMQ exchange...' );
  $channel->bind_exchange(
    source      => 'testest',
    destination => 'testest',
    routing_key => '',
    on_success => sub { _on_bind_exchange_success( $condvar, $ar, $channel, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
  );
  return;
}

############################################################################
sub _on_bind_exchange_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binded RabbitMQ exchange.' );
  _declare_queue( $condvar, $ar, $channel );
  return;
}


############################################################################
sub _declare_queue {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declaring RabbitMQ queue...' );
  $channel->declare_queue(
    queue       => 'test',
    auto_delete => 1,
    passive     => 0,
    durable     => 0,
    exclusive   => 0,
    no_ack      => 1,
    ticket      => 0,
    on_success =>
      sub { _on_declare_queue_success( $condvar, $ar, $channel, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
  );
  return;
}

############################################################################
sub _on_declare_queue_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Declared RabbitMQ queue.' );
  _bind_queue( $condvar, $ar, $channel );
  return;
}

############################################################################
sub _bind_queue {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binding RabbitMQ queue...' );
  $channel->bind_queue(
    queue       => 'test',
    exchange    => 'testest',
    routing_key => '',
    on_success => sub { _on_bind_queue_success( $condvar, $ar, $channel, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
  );
  return;
}

############################################################################
sub _on_bind_queue_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => 'Binded RabbitMQ queue.' );
  _log( info  => 'Master ready to publish messages.' );
  _publish_message( $condvar, $ar, $channel, 'Hello, world!' );
  return;
}

############################################################################
sub _publish_message {
  my ( $condvar, $ar, $channel, $message ) = @_;
  _log( debug => "Publishing RabbitMQ message ($message)..." );
  $channel->publish(
    queue       => 'test',
    exchange    => 'testest',
    routing_key => '',
    body        => $message,
    header      => {},
    mandatory   => 0,
    immediate   => 0,
    on_success =>
      sub { _on_publish_message_success( $condvar, $ar, $channel, @_ ) },
    on_failure      => sub { _error( $condvar, $ar, 'failure',      @_ ) },
    on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
    on_return       => sub { _error( $condvar, $ar, 'return',       @_ ) },
    on_close        => sub { _error( $condvar, $ar, 'close',        @_ ) },
    on_ack          => sub { 
        _on_publish_message_success( $condvar, $ar, $channel, @_ );
#        _error( $condvar, $ar, 'ack',          @_ )    
    },
    on_nack         => sub { _error( $condvar, $ar, 'nack',         @_ ) },
  );
  return;
}

############################################################################
sub _on_publish_message_success {
  my ( $condvar, $ar, $channel ) = @_;
  _log( debug => "Published RabbitMQ message." );
  my $t; $t=AE::timer(1,0,sub {
      _publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
      undef $t;
  });
  return;
}

############################################################################
sub _error {
  my ( $condvar, $ar, $type, @error ) = @_;
  _log( error => sprintf '%s - %s', $type, join ', ', @error );
  $condvar->send( $condvar, $ar, $type, @error );
  return;
}

############################################################################
sub _log {
  my ( $level, $message ) = @_;
  my @time = gmtime time;
  $time[5] += 1900;
  $time[4] += 1;
  my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00', @time[ 5, 4, 3, 2, 1, 0 ];
  my @caller0    = caller(0);
  my @caller1    = caller(1);
  my $subroutine = $caller1[3];
  $subroutine =~ s/^$caller0[0]:://;
  print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])\n";
  return;
}

1) In some places you need to buy your colleagues a beer for those :)

Misspell answered 13/8, 2015 at 11:20 Comment(2)
Hi. Do you know how to tune heartbeat? I have tried this, but code takes value from server frame. It is 60 seconds and I do not know how to change that =(Esker
@EugenKonkov it's been years since I touched this stuff. That was two jobs ago I'm afraid. I think you should ask a new question.Misspell

© 2022 - 2024 — McMap. All rights reserved.