I have a project in which I am converting a large amount of .tif images into PDF documents. File count goes into millions.
To speed up the process I am using Amphp. Since the process of converting the images with Imagemagick takes up some cpu power I want to limit the maximum amount of parallel running converter processes.
My first approach works, but could be improved if I queue the files instead of giving a set amount of workers an array of x files.
This is my current code, where I tried to replicate the example.
<?php require dirname(__DIR__) . '/vendor/autoload.php'; $constants = get_defined_constants(true); $constants = $constants['user']; $maxFileCount = THREAD_CHUNKSIZE * THREAD_COUNT; $i = 0; $folder = opendir(LOOKUP_PATH); $tasks = []; while ($i < $maxFileCount && (false !== ($import_file = readdir($folder)))) { $fileParts = explode('.', $import_file); $ext = strtolower(end($fileParts)); if($ext === 'xml') { $filePath = LOOKUP_PATH. 'xml'.DIRECTORY_SEPARATOR.$import_file; $tasks[] = new ConvertPdfTask([$filePath], $constants); } $i++; } if(!empty($tasks)) { AmpLoop::run(function () use ($tasks) { $coroutines = []; $pool = new AmpParallelWorkerDefaultPool(THREAD_COUNT); foreach ($tasks as $index => $task) { $coroutines[] = Ampcall(function() use ($pool, $task) { return yield $pool->enqueue($task); }); } $results = yield AmpPromiseall($coroutines); return yield $pool->shutdown(); }); }
My problem is, that as soon as I enqueue more than the THREAD_COUNT
amount of tasks, I get the following PHP warning: Warning: Worker in pool exited unexpectedly with code -1
and no PDFs are created.
As long as I stay below the maximum pool size, everything is fine.
I am using PHP 7.4.9 on Windows 10 and amphp/parallel 1.4.0.
Advertisement
Answer
After some more experimenting I found a solution, that seems to work. It feels a bit “hacky”, so if anyone has a better idea, please share. I thought the pool would automatically build up a queue which is then handled by the maximum amount of workers, that seems to not be the case.
I now save the coroutines that I get from the Ampcall
in two separate arrays. One which holds all coroutines and one that holds all for the current loop.
$coroutine = Ampcall(function () use ($pool, $task) { return yield $pool->enqueue($task); }); $loopRoutines[] = $coroutine; $allCoroutines[] = $coroutine;
After enqueueing an item I check if I already reached the maximum number of configured threads. If the pool has the maximum numbers of workers and no idle worker, I call the AmpPromisefirst
function on my current-loop coroutines to wait for a new free idle worker.
Since the function would instantly return the next time I get there (because the finished coroutine is still im my current-loop array), I clear the array.
if ($pool->getWorkerCount() >= (THREAD_COUNT) && $pool->getIdleWorkerCount() === 0) { yield AmpPromisefirst($loopRoutines); $loopRoutines = []; }
After the foreach I call AmpPromiseall
on my all-coroutines array, so the script waits until all workers are finished.
Here is my changed code:
<?php require dirname(__DIR__) . '/vendor/autoload.php'; $constants = get_defined_constants(true); $constants = $constants['user']; $maxFileCount = THREAD_CHUNKSIZE * THREAD_COUNT; $i = 0; $folder = opendir(LOOKUP_PATH); $tasks = []; while ($i < $maxFileCount && (false !== ($import_file = readdir($folder)))) { $fileParts = explode('.', $import_file); $ext = strtolower(end($fileParts)); if($ext === 'xml') { $filePath = LOOKUP_PATH. 'xml'.DIRECTORY_SEPARATOR.$import_file; $tasks[] = new ConvertPdfTask([$filePath], $constants); } $i++; } if(!empty($tasks)) { AmpLoop::run(function () use ($tasks) { $allCoroutines = []; $loopRoutines = []; $pool = new AmpParallelWorkerDefaultPool(THREAD_COUNT); foreach ($tasks as $index => $task) { $coroutine = Ampcall(function () use ($pool, $task) { return yield $pool->enqueue($task); }); $loopRoutines[] = $coroutine; $allCoroutines[] = $coroutine; if ($pool->getWorkerCount() >= THREAD_COUNT && $pool->getIdleWorkerCount() === 0) { yield AmpPromisefirst($loopRoutines); $loopRoutines = []; } } yield AmpPromiseall($allCoroutines); return yield $pool->shutdown(); }); }