I have an S3 bucket, mybucket
, and I want to execute something when a new file is copied into that bucket. For the notifications, I want to use an SQS queue, notifiqueue
, because my goal is to access that queue with Laravel
Since I am creating my infrastructure in CloudFormation
, the resources are created like this:
NotificationQueue: Type: AWS::SQS::Queue Properties: VisibilityTimeout: 120 QueueName: 'NotificationQueue' DataGateBucket: Type: AWS::S3::Bucket Properties: AccessControl: BucketOwnerFullControl BucketName: 'mybucket' NotificationConfiguration: QueueConfigurations: - Event: 's3:ObjectCreated:*' Queue: !GetAtt NotificationQueue.Arn
Each time a new file is saved on the bucket, S3 automatically creates a notification in SQS.
Sadly, the format of the payload is NOT COMPATIBLE with Laravel standard job payload, and if I run a worker process on the NotificationQueue
I get this error:
local.ERROR: Undefined index: job {"exception":"[object] (ErrorException(code: 0): Undefined index: job at .../vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php:273)
To provide a more complete indication, here is what I get in the notification (after turning JSON into a PHP array)
array:1 [ "Records" => array:1 [ 0 => array:9 [ "eventVersion" => "2.1" "eventSource" => "aws:s3" "awsRegion" => "eu-central-1" "eventTime" => "2019-04-23T17:02:41.308Z" "eventName" => "ObjectCreated:Put" "userIdentity" => array:1 [ "principalId" => "AWS:XXXXXXXXXXXXXXXXXX" ] "requestParameters" => array:1 [ "sourceIPAddress" => "217.64.198.7" ] "responseElements" => array:2 [ "x-amz-request-id" => "602CE18B8DE0BE5C" "x-amz-id-2" => "wA/A3Jl2XpoxBWJEgQzy11s6O28Cz9Wc6pVi6Ho1vnIrOjqsWkGozlUmqRdpYAfub0MqdF8d/YI=" ] "s3" => array:4 [ "s3SchemaVersion" => "1.0" "configurationId" => "0d4eaa75-5730-495e-b6d4-368bf3690f30" "bucket" => array:3 [ "name" => "mybucket" "ownerIdentity" => array:1 [ "principalId" => "XXXXXXXXXXXXXXXXXX" ] "arn" => "arn:aws:s3:::mybucket" ] "object" => array:4 [ "key" => "dirName/myFile.txt" "size" => 1991721 "eTag" => "824a20edad0091027b5d0fa6d78bb24f" "sequencer" => "005CBF452E30AAC02A" ] ] ] ] ]
Which is the working / best / right way to access the notification using Laravel so that I can trigger some other option in response to the file upload ?
Advertisement
Answer
I found a way to obtain the desired behavior, but I am not sure this is the best way, so I post it here and maybe can give me a feedback.
When we talk about Laravel Queues, a lot of configuration comes from app.php
, and in particular from the Provider
section. I managed to add the behavior that I needed overriding Original QueueServiceProvider
class and substituting it:
// Here is the original Provider Class //IlluminateQueueQueueServiceProvider::class, // Here is the overridden Provider AppProvidersQueueServiceProvider::class,
The new QueueServiceProvider
class is the following:
<?php namespace AppProviders; use AppJobsSqsNotificationsSqsConnector; class QueueServiceProvider extends IlluminateQueueQueueServiceProvider { /** * Register the Amazon SQS queue connector. * * @param IlluminateQueueQueueManager $manager * @return void */ protected function registerSqsNotifConnector($manager) { $manager->addConnector('sqsNotif', function () { return new SqsConnector(); }); } public function registerConnectors($manager){ parent::registerConnectors($manager); // Add the custom SQS notification connector $this->registerSqsNotifConnector($manager); } }
Notice the new connector sqsNotif
, that will need to be added to the queue.php
'sqsNotif' => [ 'driver' => 'sqsNotif', 'key' => env('AWS_ACCESS_KEY_ID'), 'secret' => env('AWS_SECRET_ACCESS_KEY'), 'prefix' => env('SQS_PREFIX', 'https://sqs.eu-central-1.amazonaws.com/your-account'), 'queue' => env('SQS_QUEUE', 'your-queue-name'), 'region' => env('AWS_DEFAULT_REGION', 'eu-central-1'), ],
In the new QueueServiceProvider
we just register an extra connector, which code is:
<?php namespace AppJobsSqsNotifications; use AwsSqsSqsClient; use IlluminateSupportArr; class SqsConnector extends IlluminateQueueConnectorsSqsConnector { /** * Establish a queue connection. * * @param array $config * @return IlluminateContractsQueueQueue */ public function connect(array $config) { $config = $this->getDefaultConfiguration($config); if ($config['key'] && $config['secret']) { $config['credentials'] = Arr::only($config, ['key', 'secret', 'token']); } return new SqsQueue( new SqsClient($config), $config['queue'], $config['prefix'] ?? '' ); } }
The SqsQueue is redefined too, in this way:
<?php namespace AppJobsSqsNotifications; class SqsQueue extends IlluminateQueueSqsQueue { /** * Pop the next job off of the queue. * * @param string $queue * @return IlluminateContractsQueueJob|null */ public function pop($queue = null) { $response = $this->sqs->receiveMessage([ 'QueueUrl' => $queue = $this->getQueue($queue), 'AttributeNames' => ['ApproximateReceiveCount'], ]); if (! is_null($response['Messages']) && count($response['Messages']) > 0) { return new SqsJob( $this->container, $this->sqs, $response['Messages'][0], $this->connectionName, $queue ); } } }
And the last missing piece is SqsJob, defined like this:
<?php namespace AppJobsSqsNotifications; use IlluminateQueueJobsJobName; /** * Class SqsJob * @package AppJobsSqsNotifications * * Alternate SQS job that is used in case of S3 notifications */ class SqsJob extends IlluminateQueueJobsSqsJob { /** * Get the name of the queued job class. * * @return string */ public function getName() { $bucketName = ''; // Define the name of the Process based on the bucket name switch($this->payload()['Records'][0]['s3']['bucket']['name']){ case 'mybucket': $bucketName = 'NewMyBucketFileJob'; break; } return $bucketName; } /** * Fire the job. * * @return void */ public function fire() { // Mimic the original behavior with a different payload $payload = $this->payload(); [$class, $method] = JobName::parse('AppJobs\' . $this->getName() . '@handle'); ($this->instance = $this->resolve($class))->{$method}($payload); // The Job wasn't automatically deleted, so we need to delete it manually once the process went fine $this->delete(); } }
At this point, I just need to define the processing Job, for example like the one below, in a NewMyBucketFileJob
:
<?php namespace AppJobs; use IlluminateBusQueueable; use IlluminateQueueSerializesModels; use IlluminateQueueInteractsWithQueue; use IlluminateContractsQueueShouldQueue; use IlluminateFoundationBusDispatchable; class ProcessDataGateNewFile implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; /** * Create a new job instance. * * @return void */ public function __construct() { } /** * Execute the job. * * @return void */ public function handle($data) { // Print the whole data structure print_r($data); // Or just the name of the uploaded file print_r($data['Records'][0]['s3']['object']['key']); } }
This process works, so this is a solution, but involves a lot of class extensions, and it’s quite fragile in case the internal queue implementation will be changed in the future releases. I am honestly wondering if there is something easier or more robust