0

I am running this worker:

php -d memory_limit=-1 bin/console messenger:consume sqs_channel_manager -vv

And worker fails:

14:29:32 INFO      [messenger] Received message App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived ["class" => "App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived"]
14:29:32 WARNING   [messenger] Error thrown while handling message App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived. Sending for retry #1 using 922 ms delay. Error: "Handling "App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived" failed: " ["class" => "App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived","message_id" => null,"retryCount" => 1,"delay" => 922,"error" => "Handling "App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived" failed: ","exception" => Symfony\Component\Messenger\Exception\HandlerFailedException^ { …}]
14:29:37 INFO      [messenger] Received message App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived ["class" => "App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived"]
14:29:37 WARNING   [messenger] Error thrown while handling message App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived. Sending for retry #3 using 3676 ms delay. Error: "Handling "App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived" failed: " ["class" => "App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived","message_id" => null,"retryCount" => 3,"delay" => 3676,"error" => "Handling "App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived" failed: ","exception" => Symfony\Component\Messenger\Exception\HandlerFailedException^ { …}]
14:30:08 INFO      [messenger] Received message App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived ["class" => "App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived"]
14:30:08 CRITICAL  [messenger] Error thrown while handling message App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived. Removing from transport after 5 retries. Error: "Handling "App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived" failed: " ["class" => "App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived","message_id" => null,"retryCount" => 5,"error" => "Handling "App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived" failed: ","exception" => Symfony\Component\Messenger\Exception\HandlerFailedException^ { …}]
14:30:08 INFO      [messenger] Rejected message App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived will be sent to the failure transport Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport. ["class" => "App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived","transport" => "Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport"]

The transports sqs_channel_manager is configured like this:

framework:
    messenger:
        failure_transport: failed
        transports:
            failed: 'doctrine://default?table_name=failed_messages'
            sqs_channel_manager:
              dsn: 'https://sqs.eu-north-1.amazonaws.com/XXXXXX/sqs_channel_manager_test'
              serializer: App\Infrastructure\Messenger\ChannelManagerSerializer
              options:
                  access_key: '%env(AWS_ACCESS_KEY_ID)%'
                  secret_key: '%env(AWS_SECRET_ACCESS_KEY)%'
                  region: '%env(AWS_REGION)%'
                  queue_name: '%env(CHANNEL_MANAGER_QUEUE_NAME)%'

And is an SQS queue The queue has a redrive policy:

aws sqs get-queue-attributes --queue-url https://sqs.eu-north-1.amazonaws.com/XXXXXX/sqs_channel_manager_test --attribute-names RedrivePolicy
{
    "Attributes": {
        "RedrivePolicy": "{\"deadLetterTargetArn\":\"arn:aws:sqs:eu-north-1:XXXXX:sqs_channel_manager_test_dlq\",\"maxReceiveCount\":1}"
    }
}

But upon failure message is not placed upon Deal letter queue:

aws sqs get-queue-attributes --queue-url https://sqs.eu-north-1.amazonaws.com/XXXXXXXX/sqs_channel_manager_test_dlq  --attribute-names ApproximateNumberOfMessages
{
    "Attributes": {
        "ApproximateNumberOfMessages": "0"
    }
}

Does failure_transport prevents the message to be placed upon DLQ? Setting failure_transport has no effect:

            sqs_channel_manager:
              failure_transport: null
              dsn: '%env(SQS_CHANNEL_MANAGER_TRANSPORT_DSN)%'
              serializer: App\Infrastructure\Messenger\ChannelManagerSerializer
              options:
                  access_key: '%env(AWS_ACCESS_KEY_ID)%'
                  secret_key: '%env(AWS_SECRET_ACCESS_KEY)%'
                  region: '%env(AWS_REGION)%'
                  queue_name: '%env(CHANNEL_MANAGER_QUEUE_NAME)%'

Also I am using custom serializer:

<?php
declare(strict_types=1);

namespace App\Infrastructure\Messenger;

use App\Domain\Event\ChannelManager\ChannelManagerEventHasReceived;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class ChannelManagerSerializer implements SerializerInterface
{
    public function decode(array $encodedEnvelope): Envelope
    {
        $data = json_decode($encodedEnvelope['body'], true);

        if (json_last_error() !== JSON_ERROR_NONE) {
            throw new \InvalidArgumentException(
                'Invalid JSON received from SQS: ' . json_last_error_msg()
            );
        }

        $envelope = new Envelope(new ChannelManagerEventHasReceived($data));

        if (isset($encodedEnvelope['headers']['stamps'])) {
            $stamps = unserialize(base64_decode($encodedEnvelope['headers']['stamps']));
            foreach ($stamps as $stamp) {
                $envelope = $envelope->with($stamp);
            }
        }

        return $envelope;
    }

    public function encode(Envelope $envelope): array
    {
        $event = $envelope->getMessage();

        $stampsToSerialize = [];

        foreach ($envelope->all() as $stampArray) {
            foreach ($stampArray as $stamp) {
                if ($stamp instanceof RedeliveryStamp || $stamp instanceof TransportMessageIdStamp) {
                    $stampsToSerialize[] = $stamp;
                }
            }
        }

        return [
            'body' => json_encode($event->channelManagerData, JSON_THROW_ON_ERROR),
            'headers' => [
                'stamps' => base64_encode(serialize($stampsToSerialize)),
            ],
        ];
    }
}

Does it affect a message entering DLQ? I am using Symfony 7.2, it this a known issue?

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.