I have a problem. I need to write a Laravel application that reads a json file and writes the content to the database. What is important is that the application can be closed (interrupted) at any time and the application needs to continue where it got interrupted. Now the main writing part is working with background jobs and batches like this:
Controller:
public function fileUploadPost(Request $request) { $data = json_decode(file_get_contents($request->file), true); // Chunking file $chunks = array_chunk($data, 1000); // Create a background job foreach chunk $path = storage_path('app/public/uploads'); $batch = Bus::batch([])->dispatch(); foreach ($chunks as $key => $chunk) { $batch->add(new ProcessFile($chunk)); } // Return success status return back()->with('success','You have successfully uploaded the file.'); }
JobFile:
class ProcessFile implements ShouldQueue { use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels; private $data = null; /** * ProcessFile constructor. * @param array $data */ public function __construct(array $data) { $this->data = $data; } /** * Execute the job. * * @return void */ public function handle() { foreach ($this->data as $value) { $this->saveAccount($value); } } public function failed(Throwable $exception) { } /** * Store the given account data in the database using a transaction */ private function saveAccount(array $value) { DB::transaction(function() use ($value) { // Create Account object $account = new Account; $account->name=$value['name']; $account->address=$value['address']; $account->checked=$value['checked']; $account->description=$value['description']; $account->interest=$value['interest']; $account->email=$value['email']; $account->account=$value['account']; $account->save(); }); } }
Now when I kill the docker container with this application while the application is writing the data to the database and then restart the docker again. I can see that the job that got interrupted is failing immediately, with the exception:
IlluminateQueueMaxAttemptsExceededException: AppJobsProcessFile has been attempted too many times or run too long. The job may have previously timed out. in /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php:750 Stack trace: #0 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(504): IlluminateQueueWorker->maxAttemptsExceededException(Object(IlluminateQueueJobsDatabaseJob)) #1 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(418): IlluminateQueueWorker->markJobAsFailedIfAlreadyExceedsMaxAttempts('database', Object(IlluminateQueueJobsDatabaseJob), 1) #2 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(378): IlluminateQueueWorker->process('database', Object(IlluminateQueueJobsDatabaseJob), Object(IlluminateQueueWorkerOptions)) #3 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(172): IlluminateQueueWorker->runJob(Object(IlluminateQueueJobsDatabaseJob), 'database', Object(IlluminateQueueWorkerOptions)) #4 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php(130): IlluminateQueueWorker->daemon('database', 'default', Object(IlluminateQueueWorkerOptions)) #5 /var/www/html/vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php(114): IlluminateQueueConsoleWorkCommand->runWorker('database', 'default') #6 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(36): IlluminateQueueConsoleWorkCommand->handle() #7 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/Util.php(41): IlluminateContainerBoundMethod::IlluminateContainer{closure}() #8 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(93): IlluminateContainerUtil::unwrapIfClosure(Object(Closure)) #9 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(37): IlluminateContainerBoundMethod::callBoundMethod(Object(IlluminateFoundationApplication), Array, Object(Closure)) #10 /var/www/html/vendor/laravel/framework/src/Illuminate/Container/Container.php(651): IlluminateContainerBoundMethod::call(Object(IlluminateFoundationApplication), Array, Array, NULL) #11 /var/www/html/vendor/laravel/framework/src/Illuminate/Console/Command.php(136): IlluminateContainerContainer->call(Array) #12 /var/www/html/vendor/symfony/console/Command/Command.php(291): IlluminateConsoleCommand->execute(Object(SymfonyComponentConsoleInputArgvInput), Object(IlluminateConsoleOutputStyle)) #13 /var/www/html/vendor/laravel/framework/src/Illuminate/Console/Command.php(121): SymfonyComponentConsoleCommandCommand->run(Object(SymfonyComponentConsoleInputArgvInput), Object(IlluminateConsoleOutputStyle)) #14 /var/www/html/vendor/symfony/console/Application.php(989): IlluminateConsoleCommand->run(Object(SymfonyComponentConsoleInputArgvInput), Object(SymfonyComponentConsoleOutputConsoleOutput)) #15 /var/www/html/vendor/symfony/console/Application.php(299): SymfonyComponentConsoleApplication->doRunCommand(Object(IlluminateQueueConsoleWorkCommand), Object(SymfonyComponentConsoleInputArgvInput), Object(SymfonyComponentConsoleOutputConsoleOutput)) #16 /var/www/html/vendor/symfony/console/Application.php(171): SymfonyComponentConsoleApplication->doRun(Object(SymfonyComponentConsoleInputArgvInput), Object(SymfonyComponentConsoleOutputConsoleOutput)) #17 /var/www/html/vendor/laravel/framework/src/Illuminate/Console/Application.php(102): SymfonyComponentConsoleApplication->run(Object(SymfonyComponentConsoleInputArgvInput), Object(SymfonyComponentConsoleOutputConsoleOutput)) #18 /var/www/html/vendor/laravel/framework/src/Illuminate/Foundation/Console/Kernel.php(129): IlluminateConsoleApplication->run(Object(SymfonyComponentConsoleInputArgvInput), Object(SymfonyComponentConsoleOutputConsoleOutput)) #19 /var/www/html/artisan(37): IlluminateFoundationConsoleKernel->handle(Object(SymfonyComponentConsoleInputArgvInput), Object(SymfonyComponentConsoleOutputConsoleOutput)) #20 {main}
When I restart the failed job, it completely restarts the writing, so I get duplicate entries in my database. Duplicate entries are allowed, so a database solution for ignoring the duplicates is not acceptable.
Why is this job failing and how can I fix that?
Advertisement
Answer
In my experience with this kind of stuff, generate a uuid and assign it to the job when it’s placed on the queue. When inserting to the database, include the request id. In this manner you can avoid logging the same request to the database twice since you’d check if any records exist associated with the request id.
Since each job would be inserting multiple records, you can count the records in the database and the records in the job. If there’s a match, the job is done. If there’s not a match, there’s more work to be done.
Keep in mind there’s situations which can be created where two workers could be working on the same job and you’d wind up with duplicates.
For this reason you might be interested in using a composite primary key which would be a concatenation between the request id and the item number within the request. Then even if the job gets “stuck” in one worker, and you have another worker which tries to pick up the slack, since they’d be working on the same request id, they couldn’t both write to the same database for the same record for the same request id and item number.
It’s also a good idea to log information regarding a request’s status so you can see when data is being written, how much is left, what percent, etc, and when it finishes.
This can get complicated real fast. Depending on the situation, you might have extra information available already such as a primary key for each record, and you can utilize that while inserting to the database.
Also keep in mind jobs can fail. You can get a faulty job which you’d want to reject after it fails maybe 3 times in a row, so it doesn’t hold up the rest of the queue, or move to the back of the line, and abort after a few tries. Keeping proper log of these is important so you can figure out what happened and whether you want to take some sort of action on them. An example of this is if the database entered some sort of maintenance, ran out of connections at the time it was working, or if a user id was problematic etc.