composer require dereuromark/cakephp-queue
Enable the plugin within your config/bootstrap.php (unless you use loadAll):
Plugin::load('Queue');
If you want to also access the backend controller (not just using CLI), you need to use
Plugin::load('Queue', ['routes' => true]);
Run the following command in the CakePHP console to create the tables using the Migrations plugin:
bin/cake migrations migrate -p Queue
It is also advised to have the posix
PHP extension enabled.
The plugin allows some simple runtime configuration.
You may create a file called app_queue.php
inside your config
folder (NOT the plugins config folder) to set the following values:
-
Seconds to sleep() when no executable job is found:
$config['Queue']['sleeptime'] = 10;
-
Probability in percent of an old job cleanup happening:
$config['Queue']['gcprob'] = 10;
-
Default timeout after which a job is requeued if the worker doesn't report back:
$config['Queue']['defaultworkertimeout'] = 3600;
-
Default number of retries if a job fails or times out:
$config['Queue']['defaultworkerretries'] = 3;
-
Seconds of running time after which the worker will terminate (0 = unlimited):
$config['Queue']['workermaxruntime'] = 60;
Warning: Do not use 0 if you are using a cronjob to permanantly start a new worker once in a while and if you do not exit on idle.
-
Should a Workerprocess quit when there are no more tasks for it to execute (true = exit, false = keep running):
$config['Queue']['exitwhennothingtodo'] = false;
-
Minimum number of seconds before a cleanup run will remove a completed task (set to 0 to disable):
$config['Queue']['cleanuptimeout'] = 2592000; // 30 days
-
Use a different connection:
$config['Queue']['connection'] = 'custom'; // Defaults to 'default'
Don't forget to load that config file with Configure::load('app_queue');
in your bootstrap.
You can also use Plugin::load('Queue', ['bootstrap' => true]);
which will load your app_queue.php
config file automatically.
Example app_queue.php
:
return [
'Queue' => [
'workermaxruntime' => 60,
'sleeptime' => 15,
],
];
You can also drop the configuration into an existing config file (recommended) that is already been loaded. The values above are the default settings which apply, when no configuration is found.
Finally, make sure you allow the configured pidfilepath
to be creatable and writable.
Especially on deployment some mkdir
command might be necessary.
Set it to false to use the DB here instead, as well.
You can set two main things on each task as property: timeout and retries.
/**
* Timeout for this task in seconds, after which the task is reassigned to a new worker.
*
* @var int
*/
public $timeout = 120;
/**
* Number of times a failed instance of this task should be restarted before giving up.
*
* @var int
*/
public $retries = 1;
Make sure you set the timeout high enough so that it could never run longer than this, otherwise you risk it being re-run while still being run. I recommend setting it to at least 2x the maximum possible execution length.
Set the retries to at least 1, otherwise it will never execute again after failure in the first run.
In most cases you wouldn't want to use the existing task, but just quickly build your own.
Put it into /src/Shell/Task/
as Queue{YourNameForIt}Task.php
.
You need to at least implement the run method:
namespace App\Shell\Task;
...
class QueueYourNameForItTask extends QueueTask {
/**
* @var int
*/
public $timeout = 20;
/**
* @var int
*/
public $retries = 1;
/**
* @param array $data The array passed to QueuedJobsTable::createJob()
* @param int $jobId The id of the QueuedJob entity
* @return bool Success
*/
public function run(array $data, $jobId) {
$this->loadModel('FooBars');
if (!$this->FooBars->doSth()) {
throw new RuntimeException('Couldnt do sth.');
}
return true;
}
}
Make sure it returns a boolean result (true ideally), or otherwise throws an exception with a clear error message.
Run the following using the CakePHP shell:
-
Display Help message:
bin/cake queue
-
Try to call the cli add() function on a task:
bin/cake queue add <TaskName>
Tasks may or may not provide this functionality.
-
Run a queue worker, which will look for a pending task it can execute:
bin/cake queue runworker
The worker will always try to find jobs matching its installed Tasks.
Most tasks will not be triggered from the console, but from the APP code. You will need to use the model access for QueuedJobs and the createJob() function to do this.
The createJob()
function takes three arguments.
- The first argument is the name of the type of job that you are creating.
- The second argument is optional, but if set must be an array of data and will be passed as a parameter to the
run()
function of the worker. - The third argument is options (
'notBefore'
,'priority'
,'group'
).
For sending emails, for example:
// In your controller
$this->loadModel('Queue.QueuedJobs');
$this->QueuedJobs->createJob('Email', ['to' => '[email protected]', ...]);
// Somewhere in the model or lib
TableRegistry::get('Queue.QueuedJobs')->createJob('Email',
['to' => '[email protected]', ...]);
It will use your custom APP QueueEmailTask
to send out emails via CLI.
Important: Do not forget to set your domain when sending from CLI.
You can filter "running" by group or even type:
bin/cake queue runworker -g MyGroup
bin/cake queue runworker -t MyType,AnotherType,-ThisOneToo
bin/cake queue runworker -t "-ThisOneNot"
Use -
prefix to exclude. Note that you might need to use ""
around the value then to avoid it being seen as option key.
That can be helpful when migrating servers and you only want to execute certain ones on the new system or want to test specific servers.
For some background-tasks you will want to make sure only a single instance of this type is currently run.
In your logic you can check on this using isQueued()
and a unique reference:
/**
* @return \Cake\Http\Response|null
*/
public function triggerImport()
{
$this->request->allowMethod('post');
$this->loadModel('Queue.QueuedJobs');
if ($this->QueuedJobs->isQueued('my-import')) {
$this->Flash->error('Job already running');
return $this->redirect($this->referer(['action' => 'index']));
}
$this->QueuedJobs->createJob(
'Execute',
['command' => 'bin/cake importer run'],
['reference' => 'my-import', 'priority' => 2]
);
$this->Flash->success('Job triggered, will only take few seconds :)');
return $this->redirect($this->referer(['action' => 'index']));
}
So if someone clicks on the button again before the job is finished, he will not be able to trigger a new run:
<?= $this->Form->postLink(__('Trigger Import'), ['action' => 'triggerImport'], ['confirm' => 'Sure?']) ?>
For more complex use cases, you can manually use ->find()->where()
, of course.
The createJob()
method returns the entity. So you can store the ID and at any time ask the queue about the status of this job.
// Within your regular web application
$job = $this->QueuedJobs->createJob(...);
$id = $job->id;
// Store
// Inside your Queue task, if you know the total records:
$totalRecords = count($records);
foreach ($records as $i => $record) {
$this->processImageRendering($record);
$this->QueuedJobs->updateProgress($id, ($i + 1) / $totalRecords);
}
// Get progress status in web site
$job = $this->QueuedJobs->get($id);
$progress = $job->progress; // A float from 0 to 1
echo number_format($progress * 100, 0) . '%'; // Outputs 87% for example
By default errors are always logged, and with log enabled also the execution of a job. Make sure you add this to your config:
'Log' => [
...
'queue' => [
'className' => ...,
'type' => 'queue',
'levels' => ['info'],
'scopes' => ['queue'],
],
],
When debugging (using -v) on the runworker, it will also log the worker run and end.
You can disable info logging by setting Queue.log
to false
in your config.
<TaskName>
may either be the complete classname (eg. QueueExample) or the shorthand without the leading "Queue" (e.g. Example).
Also note that you dont need to add the type ("Task"): bin/cake queue add SpecialExample
for QueueSpecialExampleTask.
Custom tasks should be placed in src/Shell/Task.
Tasks should be named QueueSomethingTask.php
and implement a "QueueSomethingTask", keeping CakePHP naming conventions intact. Custom tasks should extend the QueueTask
class (you will need to include this at the top of your custom task file: use Queue\Shell\Task\QueueTask;
).
Plugin tasks go in plugins/PluginName/src/Shell/Task.
A detailed Example task can be found in src/Shell/Task/QueueExampleTask.php inside this folder.
If you copy an example, do not forget to adapt the namespace!
As outlined in the book you can easily set up a cronjob to start a new worker.
The following example uses "crontab":
*/10 * * * * cd /full/path/to/app && bin/cake queue runworker -q
Make sure you use crontab -e -u www-data
to set it up as www-data
user, and not as root etc.
This would start a new worker every 10 minutes. If you configure your max life time of a worker to 15 minutes, you got a small overlap where two workers would run simultaneously. If you lower the 10 minutes and raise the lifetime, you get quite a few overlapping workers and thus more "parallel" processing power. Play around with it, but just don't shoot over the top.
The plugin works completely without it, by just using the CLI shell commands.
But if you want to browse the statistics via URL, you can enable the routing for it (see above) and then access /admin/queue
to see how status of your queue, statistics and settings.
Please note that this requires the Tools plugin to be loaded if you do not customize the view templates on project level.
Also make sure you loaded the helpers needed (Tools.Format, Tools.Time as Time, etc).
By default the templates should work fine in both Foundation (v5+) and Boostrap (v3+). Copy-and-paste to project level for any customization here.
If you have larger data sets, or maybe even objects/entities, do not pass those. They would not survive the json_encode/decode part and will maybe even exceed the text field in the database.
Instead, pass only the ID of the entity, and get your data in the Task itself. If you have other larger chunks of data, store them somewhere and pass the path to this file.
Instead of manually adding job every time you want to send mail you can use existing code ond change only EmailTransport and Email configurations in app.php
.
'EmailTransport' => [
'default' => [
'className' => 'Smtp',
// The following keys are used in SMTP transports
'host' => '[email protected]',
'port' => 587,
'timeout' => 30,
'username' => 'username',
'password' => 'password',
//'client' => null,
'tls' => true,
],
'queue' => [
'className' => 'Queue.Queue',
'transport' => 'default'
]
],
'Email' => [
'default' => [
'transport' => 'queue',
'from' => '[email protected]',
'charset' => 'utf-8',
'headerCharset' => 'utf-8',
],
],
This way each time you will $email->send()
it will use QueueTransport
as main to create job and worker will use 'transport'
setting to send mail.
QueueTransport
serializes whole email into the database and is useful when you have customEmail
class.SimpleQueueTransport
extracts all data from email (to, bcc, template etc.) and then uses this to recreate email inside task, this is useful when dealing with emails which serialization would overflow databasedata
field length.
The quickest and easiest way is to use the built in Email task:
$data = [
'settings' => [
'to' => $user->email,
'from' => Configure::read('Config.adminEmail'),
'subject' => $subject,
],
'content' => $content,
];
$queuedJobsTable = TableRegistry::get('Queue.QueuedJobs');
$queuedJobsTable->createJob('Email', $data);
This will sent a plain email. Each settings key must have a matching setter method on the Email class.
If you want a templated email, you need to pass view vars instead of content:
$data = [
'settings' => [
'to' => $user->email,
'from' => Configure::read('Config.adminEmail'),
'subject' => $subject,
],
'vars' => [
'myEntity' => $myEntity,
...
],
];
You can also assemble an Email object manually and pass that along as settings directly:
$data = [
'settings' => $emailObject,
'content' => $content,
];
This is the most advised way to generate your asynchronous emails.
Don't generate them directly in your code and pass them to the queue, instead just pass the minimum requirements, like non persistent data needed and the primary keys of the records that need to be included. So let's say someone posted a comment and you want to get notified.
Inside your CommentsTable class after saving the data you execute this hook:
/**
* @param Comment $comment
* @return void
*/
protected function _notifyAdmin(Comment $comment)
{
/** @var \Queue\Model\Table\QueuedJobsTable $QueuedJobs */
$QueuedJobs = TableRegistry::get('Queue.QueuedJobs');
$data = [
'settings' => [
'subject' => __('New comment submitted by {0}', $comment->name)
],
'vars' => [
'comment' => $comment->toArray()
]
];
$QueuedJobs->createJob('CommentNotification', $data);
}
And your QueueAdminEmailTask::run()
method:
$this->Email = new Email();
$this->Email->template('comment_notification');
// ...
if (!empty($data['vars'])) {
$this->Email->viewVars($data['vars']);
}
return (bool)$this->Email->send();
Make sure you got the template for it then, e.g.:
<?= $comment['name'] ?> ( <?= $comment['email'] ?> ) wrote:
<?= $comment['message'] ?>
<?= $this->Url->build(['prefix' => 'admin', 'controller' => 'Comments', 'action'=> 'view', $comment['id']], true) ?>
This way all the generation is in the specific task and template and can be tested separaretly.
First of all: Make sure you don't run workers with workermaxruntime
of 0
.
Then they would at least not run forever, and might pile up only if you start them faster then they terminate.
You can kill workers from the backend or the command line. Make sure you have set up the workers with the same user (www-data usually) as the user that tries to kill them, or it will not work.
Manually killing workers can be done using kill -15 PID
. Replace PID with the PID number (e.g. kill -15 21212
).
To find out what queue processes are currently running, use
ps aux | grep php
Then you can kill them gracefully with -15
(or forcefully with -9
, not recommended).
Locally, if you want to kill them all, usually killapp -15 php
does the trick.
Do not run this with production ones, though.
If you want to use multiple workers, please double check that all jobs have a high enough timeout (>> 2x max possible execution time of a job). Currently it would otherwise risk the jobs being run multiple times!
With IdeHelper plugin you can get typehinting and autocomplete for your createJob() calls. Especially if you use PHPStorm, this will make it possible to get support here.
Include that plugin, set up your generator config and run e.g. bin/cake phpstorm generate
.
If you use Plugin::load('Queue', ['bootstrap' => true, ...])
, the necessary config is already auto-included (recommended).
Otherwise you can manually include the Queue plugin generator tasks in your config/app.php
on project level:
use Queue\Generator\Task\QueuedJobTask;
return [
...
'IdeHelper' => [
'generatorTasks' => [
QueuedJobTask::class
],
],
];
I am looking forward to your contributions.
There are a few guidelines that I need contributors to follow:
- Coding standards (
composer cs-check
to check andcomposer cs-fix
to fix) - Passing tests (
php phpunit.phar
)