We’re using Symfony Messenger, and have these transports:
framework: messenger: failure_transport: failed transports: failed: dsn: 'doctrine://default?queue_name=failed' options: table_name: 'MessengerMessages' async: dsn: '%env(MESSENGER_TRANSPORT_DSN)%' retry_strategy: max_retries: 3 delay: 5000 multiplier: 2 max_delay: 0 asyncLowPriority: dsn: '%env(MESSENGER_TRANSPORT_DSN)%_low_priority' retry_strategy: max_retries: 5 delay: 3600000 multiplier: 2 max_delay: 0 sync: 'sync://'
When we send a message to the async
queue, and the last retry fails with an exception, the exception is logged to the MessengerMessages
table, and the exception bubbles up (goes to Sentry in our case). This is what we want.
When we send a message to the asyncLowPriority
queue however, we would like failed messages to:
- not got to the
failed
transport - not make the exception bubble up
Basically, the exception should be dropped.
Is this possible, and how?
The reason is that we’re using this queue for downloading images asynchronously, and we already log each failure in a dedicated database table in the command handler.
Advertisement
Answer
I managed to do this with a middleware:
use SymfonyComponentMessengerEnvelope; use SymfonyComponentMessengerExceptionHandlerFailedException; use SymfonyComponentMessengerExceptionUnrecoverableMessageHandlingException; use SymfonyComponentMessengerMiddlewareMiddlewareInterface; use SymfonyComponentMessengerMiddlewareStackInterface; use SymfonyComponentMessengerStampReceivedStamp; use SymfonyComponentMessengerStampRedeliveryStamp; use SymfonyComponentMessengerStampSentToFailureTransportStamp; use Throwable; final class BypassFailureTransportMiddleware implements MiddlewareInterface { public function __construct( private string $transportName, private int $maxRetries, ) { } public function handle(Envelope $envelope, StackInterface $stack): Envelope { try { return $stack->next()->handle($envelope, $stack); } catch (HandlerFailedException $exception) { $nestedException = $this->getNestedException($exception); if ($nestedException === null) { throw $exception; } /** @var ReceivedStamp|null $receivedStamp */ $receivedStamp = $envelope->last(ReceivedStamp::class); if ($receivedStamp === null || $receivedStamp->getTransportName() !== $this->transportName) { throw $exception; } if (!$this->isLastRetry($envelope, $nestedException)) { throw $exception; } return $envelope->with(new SentToFailureTransportStamp($receivedStamp->getTransportName())); } } private function getNestedException(HandlerFailedException $exception): ?Throwable { $nestedExceptions = $exception->getNestedExceptions(); if (count($nestedExceptions) === 1) { return $nestedExceptions[0]; } return null; } private function isLastRetry(Envelope $envelope, Throwable $nestedException): bool { if ($nestedException instanceof UnrecoverableMessageHandlingException) { return true; } /** @var RedeliveryStamp|null $redeliveryStamp */ $redeliveryStamp = $envelope->last(RedeliveryStamp::class); if ($redeliveryStamp === null) { return false; } return $redeliveryStamp->getRetryCount() === $this->maxRetries; } }
It must be configured with the name of the transport and the configured max_retries
of this transport:
parameters: async_allow_failure_transport_name: 'asyncAllowFailure' async_allow_failure_max_retries: 5 services: command.bus.bypass_failure_transport_middleware: class: AppInfrastructureCommandBusMiddlewareBypassFailureTransportMiddleware arguments: $transportName: '%async_allow_failure_transport_name%' $maxRetries: '%async_allow_failure_max_retries%' framework: messenger: transports: - name: '%async_allow_failure_transport_name%' dsn: '...' retry_strategy: max_retries: '%async_allow_failure_max_retries%' delay: 1000 multiplier: 2 max_delay: 0 buses: command.bus: middleware: - 'command.bus.bypass_failure_transport_middleware'