Skip to content
Advertisement

Symfony Messenger: is it possible to not throw the exception on last retry?

We’re using Symfony Messenger, and have these transports:

framework:
    messenger:
        failure_transport: failed

        transports:
            failed:
                dsn: 'doctrine://default?queue_name=failed'
                options:
                    table_name: 'MessengerMessages'
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                retry_strategy:
                    max_retries: 3
                    delay: 5000
                    multiplier: 2
                    max_delay: 0
            asyncLowPriority:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%_low_priority'
                retry_strategy:
                    max_retries: 5
                    delay: 3600000
                    multiplier: 2
                    max_delay: 0
            sync: 'sync://'

When we send a message to the async queue, and the last retry fails with an exception, the exception is logged to the MessengerMessages table, and the exception bubbles up (goes to Sentry in our case). This is what we want.

When we send a message to the asyncLowPriority queue however, we would like failed messages to:

  • not got to the failed transport
  • not make the exception bubble up

Basically, the exception should be dropped.

Is this possible, and how?

The reason is that we’re using this queue for downloading images asynchronously, and we already log each failure in a dedicated database table in the command handler.

Advertisement

Answer

I managed to do this with a middleware:

use SymfonyComponentMessengerEnvelope;
use SymfonyComponentMessengerExceptionHandlerFailedException;
use SymfonyComponentMessengerExceptionUnrecoverableMessageHandlingException;
use SymfonyComponentMessengerMiddlewareMiddlewareInterface;
use SymfonyComponentMessengerMiddlewareStackInterface;
use SymfonyComponentMessengerStampReceivedStamp;
use SymfonyComponentMessengerStampRedeliveryStamp;
use SymfonyComponentMessengerStampSentToFailureTransportStamp;
use Throwable;

final class BypassFailureTransportMiddleware implements MiddlewareInterface
{
    public function __construct(
        private string $transportName,
        private int $maxRetries,
    ) {
    }

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        try {
            return $stack->next()->handle($envelope, $stack);
        } catch (HandlerFailedException $exception) {
            $nestedException = $this->getNestedException($exception);

            if ($nestedException === null) {
                throw $exception;
            }

            /** @var ReceivedStamp|null $receivedStamp */
            $receivedStamp = $envelope->last(ReceivedStamp::class);

            if ($receivedStamp === null || $receivedStamp->getTransportName() !== $this->transportName) {
                throw $exception;
            }

            if (!$this->isLastRetry($envelope, $nestedException)) {
                throw $exception;
            }

            return $envelope->with(new SentToFailureTransportStamp($receivedStamp->getTransportName()));
        }
    }

    private function getNestedException(HandlerFailedException $exception): ?Throwable
    {
        $nestedExceptions = $exception->getNestedExceptions();

        if (count($nestedExceptions) === 1) {
            return $nestedExceptions[0];
        }

        return null;
    }

    private function isLastRetry(Envelope $envelope, Throwable $nestedException): bool
    {
        if ($nestedException instanceof UnrecoverableMessageHandlingException) {
            return true;
        }

        /** @var RedeliveryStamp|null $redeliveryStamp */
        $redeliveryStamp = $envelope->last(RedeliveryStamp::class);

        if ($redeliveryStamp === null) {
            return false;
        }

        return $redeliveryStamp->getRetryCount() === $this->maxRetries;
    }
}

It must be configured with the name of the transport and the configured max_retries of this transport:

parameters:
    async_allow_failure_transport_name: 'asyncAllowFailure'
    async_allow_failure_max_retries: 5

services:
  command.bus.bypass_failure_transport_middleware:
    class: AppInfrastructureCommandBusMiddlewareBypassFailureTransportMiddleware
    arguments:
      $transportName: '%async_allow_failure_transport_name%'
      $maxRetries: '%async_allow_failure_max_retries%'

framework:
    messenger:
        transports:
            - name: '%async_allow_failure_transport_name%'
              dsn: '...'
              retry_strategy:
                  max_retries: '%async_allow_failure_max_retries%'
                  delay: 1000
                  multiplier: 2
                  max_delay: 0

        buses:
            command.bus:
                middleware:
                  - 'command.bus.bypass_failure_transport_middleware'
User contributions licensed under: CC BY-SA
10 People found this is helpful
Advertisement