Symfony messenger queues with binding key - retry strategy
Asked Answered
S

1

7

I'm implementing messenger in company which I work for. I found problem with routing key.

I want to to send one message to two queues. Two other apps will process this queues. Everything works well, but I found problem when handler throws an exception. It doubles message sending one it two retry queues, because retry queues are matching by binding key, which is the same for this queues.

Finally with 3 retries I have 16 messages on my dlqs. Could you help me with this problem? Is it possible to create retry strategy based maybe on queue, not routing key?

My config looks like:

messenger:
    failure_transport: failed
    default_bus: command.bus
    transports:
        async:
            dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
            options:
                retry_strategy:
                    max_retries: 3
                    delay: 1000
                    multiplier: 2
                    max_delay: 0
                exchange:
                    name: olimp
                    type: topic
                queues:
                    create_miniature_v1:
                        binding_keys:
                            - first
                    create_miniature_v2:
                        binding_keys:
                            - first
        failed:
            dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
            options:
                exchange:
                    name: olimp_dead
                    type: topic
                queues:
                    create_miniature_v1_dlq:
                        binding_keys:
                            - first
                    create_miniature_v2_dlq:
                        binding_keys:
                            - first

    routing:
        'Olimp\Messenger\TestEvent': async

    buses:
        command.bus:
            middleware:
                - Olimp\Shared\Application\Message\Middleware\EventDispatcher
                - doctrine_close_connection
                - doctrine_transaction

        event.bus:
            default_middleware: allow_no_handlers

        query.bus: ~

I dispatch event with stamp like that:

class MessengerTestCommand extends Command
{
    protected static $defaultName = 'app:messenger-test';
    private MessageBusInterface $bus;

    public function __construct(MessageBusInterface $bus)
    {
        $this->bus = $bus;

        parent::__construct();
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $io = new SymfonyStyle($input, $output);

        $this->bus->dispatch(
            new TestEvent(), [
                new AmqpStamp('first')
            ]
        );

        $io->success('Done');

        return 0;
    }
}

Handler:

class TestEventHandler implements MessageHandlerInterface
{
    public function __invoke(TestEvent $event)
    {
        dump($event->id);

        throw new \Exception('Boom');
    }
}

What I found on rabbit: Rabbit

Now I was trying config like that:

framework:
    messenger:
        failure_transport: failed
        default_bus: command.bus
        transports:
            async:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    retry_strategy:
                        max_retries: 3
                        delay: 1000
                        multiplier: 2
                        max_delay: 0
                    exchange:
                        name: olimp
                        type: topic
                    queues:
                        create_miniature_v1:
                            binding_keys:
                                - first
            async1:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    retry_strategy:
                        max_retries: 3
                        delay: 1000
                        multiplier: 2
                        max_delay: 0
                    exchange:
                        name: olimp
                        type: topic
                    queues:
                        create_miniature_v2:
                            binding_keys:
                                - first
            failed:
                dsn: amqp://rabbitmq:rabbitmq@rabbitmq:5672
                options:
                    exchange:
                        name: olimp_dead
                        type: topic
                    queues:
                        create_miniature_v1_dlq:
                            binding_keys:
                                - first
                        create_miniature_v2_dlq:
                            binding_keys:
                                - first

        routing:
            'Olimp\Messenger\TestEvent': [async, async1]

and with two running console commands:

bin/console messenger:consume async
bin/console messenger:consume async1

But it works the same.

Shultz answered 18/4, 2020 at 20:29 Comment(0)
S
6

Ok, I found answer myself.

I created new retry strategy. I changed queue_name_pattern to %routing_key%_%delay% and created my own SendFailedMessageForRetryListener. To retry envelope I added stamp new AmqpStamp($envelope->last(AmqpReceivedStamp::class)->getQueueName()) which is used to create proper routing key for delay queue. So instead of creating queue based on exchange name I have it created based on queue name.

Two more things:

Binding keys in queue looks like:

queues:
    create_miniature_v1:
        binding_keys:
            - create_miniature_v1
            - first
    create_miniature_v2:
        binding_keys:
            - create_miniature_v2
            - first

and failed queues:

queues:
    create_miniature_v1_dlq:
        binding_keys:
            - create_miniature_v1
    create_miniature_v2_dlq:
        binding_keys:
            - create_miniature_v2
Shultz answered 23/4, 2020 at 9:5 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.