diff --git a/.editorconfig b/.editorconfig index 47ae637b..98628c62 100644 --- a/.editorconfig +++ b/.editorconfig @@ -10,8 +10,22 @@ indent_style = space insert_final_newline = true trim_trailing_whitespace = true -[{*.yml,package.json}] +[*.md] +trim_trailing_whitespace = false + +[*.{yml,js,json,css,scss,eslintrc,feature}] indent_size = 2 +indent_style = space + +[composer.json] +indent_size = 4 + +# Don't perform any clean-up on thirdparty files + +[thirdparty/**] +trim_trailing_whitespace = false +insert_final_newline = false -# The indent size used in the package.json file cannot be changed: -# https://github.com/npm/npm/pull/3180#issuecomment-16336516 +[admin/thirdparty/**] +trim_trailing_whitespace = false +insert_final_newline = false diff --git a/.travis.yml b/.travis.yml index c244c3b8..b15e0fa8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,3 +2,7 @@ version: ~> 1.0 import: - silverstripe/silverstripe-travis-shared:config/provision/standard-jobs-range.yml + +env: + global: + - REQUIRE_RECIPE="4.7.x-dev || " diff --git a/README.md b/README.md index 04dede27..304fd9a4 100644 --- a/README.md +++ b/README.md @@ -4,30 +4,7 @@ [![Scrutinizer](https://scrutinizer-ci.com/g/symbiote/silverstripe-queuedjobs/badges/quality-score.png)](https://scrutinizer-ci.com/g/symbiote/silverstripe-queuedjobs/) [![SilverStripe supported module](https://img.shields.io/badge/silverstripe-supported-0071C4.svg)](https://www.silverstripe.org/software/addons/silverstripe-commercially-supported-module-list/) - -## Maintainer Contact - -Marcus Nyeholt - - - -## Requirements - -* SilverStripe 4.x -* Access to create cron jobs - -## Version info - -The master branch of this module is currently aiming for SilverStripe 4.x compatibility - -* [SilverStripe 3.1+ compatible version](https://github.com/symbiote/silverstripe-queuedjobs/tree/2.9) -* [SilverStripe 3.0 compatible version](https://github.com/symbiote/silverstripe-queuedjobs/tree/1.0) -* [SilverStripe 2.4 compatible version](https://github.com/symbiote/silverstripe-queuedjobs/tree/ss24) - -## Documentation - -See http://github.com/symbiote/silverstripe-queuedjobs/wiki/ for more complete -documentation +## Overview The Queued Jobs module provides a framework for SilverStripe developers to define long running processes that should be run as background tasks. @@ -42,29 +19,16 @@ The module comes with * A task that is executed as a cronjob for collecting and executing jobs. * A pre-configured job to cleanup the QueuedJobDescriptor database table. -## Quick Usage Overview - -* Install the cronjob needed to manage all the jobs within the system. It is best to have this execute as the -same user as your webserver - this prevents any problems with file permissions. +## Installation ``` -*/1 * * * * /path/to/silverstripe/vendor/bin/sake dev/tasks/ProcessJobQueueTask +composer require symbiote/silverstripe-queuedjobs ``` -* If your code is to make use of the 'long' jobs, ie that could take days to process, also install another task -that processes this queue. Its time of execution can be left a little longer. +Now setup a cron job: ``` -*/15 * * * * /path/to/silverstripe/vendor/bin/sake dev/tasks/ProcessJobQueueTask queue=large -``` - -* From your code, add a new job for execution. - -```php -use Symbiote\QueuedJobs\Services\QueuedJobService; - -$publish = new PublishItemsJob(21); -QueuedJobService::singleton()->queueJob($publish); +*/1 * * * * /path/to/silverstripe/vendor/bin/sake dev/tasks/ProcessJobQueueTask ``` * To schedule a job to be executed at some point in the future, pass a date through with the call to queueJob @@ -427,509 +391,26 @@ SilverStripe\Control\Email\Email: queued_job_admin_email: support@mycompany.com ``` -**Long running jobs are running multiple times!** - -A long running job _may_ fool the system into thinking it has gone away (ie the job health check fails because -`currentStep` hasn't been incremented). To avoid this scenario, you can set `$this->currentStep = -1` in your job's -constructor, to prevent any health checks detecting the job. - -### Understanding job states - -It's really useful to understand how job state changes during the job lifespan as it makes troubleshooting easier. -Following chart shows the whole job lifecycle: - -![JobStatus](docs/job_status.jpg) - -* every job starts in `New` state -* every job should eventually reach either `Complete`, `Broken` or `Paused` -* `Cancelled` state is not listed here as the queue runner will never transition job to that state as it is reserved for user triggered actions -* progress indication is either state change or step increment -* every job can be restarted however there is a limit on how many times (see `stall_threshold` config) - -## Performance configuration - -By default this task will run until either 256mb or the limit specified by php\_ini('memory\_limit') is reached. +## Documentation ->NOTE: This was increased to 256MB in 4.x to handle the increase in memory usage by framework. + * [Overview](docs/en/index.md): Running and triggering jobs. Different queue types and job lifecycles. + * [Defining Jobs](docs/en/defining-jobs.md): Jobs are just PHP classes. Learn how to write your own. + * [Performance](docs/en/performance.md): Advice on job performance in large or highly concurrent setups + * [Troubleshooing](docs/en/troubleshooting.md) + * [Dependant Jobs](docs/en/dependant-jobs.md) + * [Immediate jobs](docs/en/immediate-run.md) + * [Unit Testing](docs/en/unit-testing.md) -You can adjust this with the below config change +## Show job data +In case you need an easy access to additonal job data via CMS for debug purposes enable the `show_job_data` option by including the configuration below. ```yaml -# Force memory limit to 256 megabytes -Symbiote\QueuedJobs\Services\QueuedJobService\QueuedJobsService: - # Accepts b, k, m, or b suffixes - memory_limit: 256m -``` - - -You can also enforce a time limit for each queue, after which the task will attempt a restart to release all -resources. By default this is disabled, so you must specify this in your project as below: - - -```yml -# Force limit to 10 minutes -Symbiote\QueuedJobs\Services\QueuedJobService\QueuedJobsService: - time_limit: 600 -``` - - -## Indexes - -```sql -ALTER TABLE `QueuedJobDescriptor` ADD INDEX ( `JobStatus` , `JobType` ) -``` - -## Unit tests - -Writing units tests for queued jobs can be tricky as it's quite a complex system. Still, it can be done. - -### Basic unit tests - -Note that you don't actually need to run your queued job via the `QueuedJobService` in your unit test in most cases. Instead, you can run it directly, like this: - -``` -$job = new YourQueuedJob($someArguments); -$job->setup(); -$job->process(); - -$this->assertTrue($job->jobFinished()); -other assertions can be placed here (job side effects, job data assertions...) -``` - -`setup()` needs to be run only once and `process()` needs to be run as many times as needed to complete the job. This depends on your job and the job data. -Usually, `process()` needs to be run once for every `step` your job completes, but this may vary per job implementation. Please avoid using `do while {jobFinished}`, you should always be clear on how many times the `process()` runs in your test job. -If you are unsure, do a test run in your application with some logging to indicate how many times it is run. - -This should cover most cases, but sometimes you need to run a job via the service. For example you may need to test if your job related extension hooks are working. - -### Advanced unit tests - -Please be sure to disable the shutdown function and the queued job handler as these two will cause you some major pain in your unit tests. -You can do this in multiple ways: - -* `setUp()` at the start of your unit test - -This is pretty easy, but it may be tedious to add this to your every unit test. - -* create a parent class for your unit tests and add `setUp()` function to it - -You can now have the code in just one place, but inheritance has some limitations. - -* add a test state and add `setUp()` function to it, see `SilverStripe\Dev\State\TestState` - -Create your test state like this: - -``` -registerService(new QueuedJobsTest_Handler(), QueuedJobHandler::class); - Config::modify()->set(QueuedJobService::class, 'use_shutdown_function', false); - } - - public function tearDown(SapphireTest $test) - { - } - - public function setUpOnce($class) - { - } - - public function tearDownOnce($class) - { - } -} - -``` - -Register your test state with `Injector` like this: - -``` -SilverStripe\Core\Injector\Injector: - SilverStripe\Dev\State\SapphireTestState: - properties: - States: - queuedjobs: '%$App\Dev\State\QueuedJobTestState' -``` - -This solution is great if you want to apply this change to all of your unit tests. - -Regardless of which approach you choose, the two changes that need to be inside the `setUp()` function are as follows: - -This will replace the standard logger with a dummy one. -``` -Injector::inst()->registerService(new QueuedJobsTest_Handler(), QueuedJobHandler::class); -``` - -This will disable the shutdown function completely as `QueuedJobService` doesn't work well with `SapphireTest`. - -``` -Config::modify()->set(QueuedJobService::class, 'use_shutdown_function', false); -``` - -This is how your run a job via service in your unit tests. - -``` -$job = new YourQueuedJob($someArguments); - -/** @var QueuedJobService $service */ -$service = Injector::inst()->get(QueuedJobService::class); - -$descriptorID = $service->queueJob($job); -$service->runJob($descriptorID); - -/** @var QueuedJobDescriptor $descriptor */ -$descriptor = QueuedJobDescriptor::get()->byID($descriptorID); -$this->assertNotEquals(QueuedJob::STATUS_BROKEN, $descriptor->JobStatus); -``` - -For example, this code snippet runs the job and checks if the job ended up in a non-broken state. - -## Advanced job setup - -This section is recommended for developers who are already familiar with basic concepts and want to take full advantage of the features in this module. - -### Job creation - -First, let's quickly summarise the lifecycle of a queued job: - -1. job is created as an object in your code -2. job is queued, the matching job descriptor is saved into the database -3. job is picked and processed up by the queue runner. - -Important thing to note is that **step 3** will create an empty job instance and populate it with data from the matching job descriptor. -Any defined params in the job constructor will not be populated in this step. -If you want to define your own job constructor and not use the inherited one, you will need to take this into account when implementing your job. -Incorrect implementation may result in the job processing losing some or all of the job data before processing starts. -To avoid this issue consider using one of the options below to properly implement your job creation. - -Suppose we have a job which needs a `string`, an `integer` and an `array` as the input. - -#### Option 1: Job data is set directly - -It's possible to completely avoid defining constructor on your job and set the job data directly to the job object. -This is a good approach for simple jobs, but more complex jobs with a lot of properties may end up using several lines of code. - -##### Job class constructor - -```php -// no constructor -``` - -##### Job creation - -```php -$job = new MyJob(); -// set job data -$job->string = $string; -$job->integer = $integer; -$job->array = $array; +Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor: + show_job_data: true ``` -##### Advantages - -* No need to define constructor. -* Nullable values don't need to be handled. - -##### Disadvantages - -* No strict parameter types. -* Code may not be as DRY in case you create the job in many different places. - -#### Option 2: Job constructor with specific params - -Defining your own constructor is the most intuitive approach. -We need to take into consideration that the job constructor will be called without any params by the queue runner. -The implementation needs to provide default values for all parameters and handle this special case. - -##### Job class constructor - -```php -public function __construct(?string $string = null, ?int $integer = null, ?array $array = null) -{ - if ($string === null || $integer === null || $array === null) { - // job constructor called by the queue runner - exit early - return; - } - - // job constructor called in project code - populate job data - $this->string = $string; - $this->integer = $integer; - $this->array = $array; -} -``` - -##### Job creation - -```php -$job = new MyJob($string, $integer, $array); -``` - -##### Advantages - -* Typed parameters. - -##### Disadvantages - -* Nullable values need to be provided and code handling of the nullable values has to be present. That is necessary because the queue runner calls the constructor without parameters as data will come in later from job descriptor. -* Strict type is not completely strict because nullable values can be passed when they shouldn't be (e.g.: at job creation in your code). - -This approach is especially problematic on PHP 7.3 or higher as the static syntax checker may have an issue with nullable values and force you to implement additional check like `is_countable` on the job properties. - -#### Option 3: Job constructor without specific params - -The inherited constructor has a generic parameter array as the only input and we can use it to pass arbitrary parameters to our job. -This makes the job constructor match the parent constructor signature but there is no type checking. - -##### Job class constructor - -```php -public function __construct(array $params = []) -{ - if (!array_key_exists('string', $params) || !array_key_exists('integer', $params) || !array_key_exists('array', $params)) { - // job constructor called by the queue runner - exit early - return; - } - - // job constructor called in project code - populate job data - $this->string = $params['string']; - $this->integer = $params['integer']; - $this->array = $params['array']; -} -``` - -##### Job creation - -```php -$job = new MyJob(['string' => $string, 'integer' => $integer, 'array' => $array]); -``` - -##### Advantages - -* Nullable values don't need to be handled. - -##### Disadvantages - -* No strict parameter types. - -This approach is probably the simplest one but with the least parameter validation. - -#### Option 4: Separate hydrate method - -This approach is the strictest when it comes to validating parameters but requires the `hydrate` method to be called after each job creation. -Don't forget to call the `hydrate` method in your unit test as well. -This option is recommended for projects which have many job types with complex processing. Strict checking reduces the risk of input error. - -##### Job class constructor - -```php -// no constructor - -public function hydrate(string $string, int $integer, array $array): void -{ - $this->string = $string; - $this->integer = $integer; - $this->array = $array; -} -``` - -##### Job creation - -```php -$job = new MyJob(); -$job->hydrate($string, $integer, $array); -``` - -##### Unit tests - -```php -$job = new MyJob(); -$job->hydrate($string, $integer, $array); -$job->setup(); -$job->process(); - -$this->assertTrue($job->jobFinished()); -// other assertions can be placed here (job side effects, job data assertions...) -``` - -##### Advantages - -* Strict parameter type checking. -* No nullable values. -* No issues with PHP 7.3 or higher. - -##### Disadvantages - -* Separate method has to be implemented and called after job creation in your code. - -### Ideal job size - -How much work should be done by a single job? This is the question you should ask yourself when implementing a new job type. -There is no precise answer. This really depends on your project setup but there are some good practices that should be considered: - -* similar size — it's easier to optimise the queue settings and stack size of your project when your jobs are about the same size -* split the job work into steps — this prevents your job running for too long without an update to the job manager and it lowers the risk of the job getting labelled as crashed -* avoid jobs that are too small — jobs that are too small produce a large amount of job management overhead and are thus inefficient -* avoid jobs that are too large — jobs that are too large are difficult to execute as they may cause timeout issues during execution. - -As a general rule of thumb, one run of your job's `process()` method should not exceed 30 seconds. - -If your job is too large and takes way too long to execute, the job manager may label the job as crashed even though it's still executing. -If this happens you can: - -* Add job steps which help the job manager to determine if job is still being processed. -* If you're job is already divided in steps, try dividing the larger steps into smaller ones. -* If your job performs actions that can be completed independently from the others, you can split the job into several smaller dependant jobs (e.g.: there is value even if only one part is completed). - -The dependant job approach also allows you to run these jobs concurrently on some project setups. -Job steps, on the other hand, always run in sequence. - -### Job steps - -It is highly recommended to use the job steps feature in your jobs. -Correct implementation of jobs steps makes your jobs more robust. - -The job step feature has two main purposes. - -* Communicating progress to the job manager so it knows if the job execution is still underway. -* Providing a checkpoint in case job execution is interrupted for some reason. This allows the job to resume from the last completed step instead of restarting from the beginning. - -The currently executing job step can also be observed in the CMS via the _Jobs admin_ UI. This is useful mostly for debugging purposes when monitoring job execution. - -Job steps *should not* be used to determine if a job is completed or not. You should rely on the job data or the database state instead. - -For example, consider a job which accept a list of items to process and each item represents a separate step. - -```php -items = $items; - } - - /** - * @return string - */ - public function getTitle(): string - { - return 'My awesome job'; - } - - public function setup(): void - { - $this->remaining = $this->items; - $this->totalSteps = count($this->items); - } - - public function process(): void - { - $remaining = $this->remaining; - - // check for trivial case - if (count($remaining) === 0) { - $this->isComplete = true; - - return; - } - - $item = array_shift($remaining); - - // code that will process your item goes here - - // update job progress - $this->remaining = $remaining; - $this->currentStep += 1; - - // check for job completion - if (count($remaining) > 0) { - return; - } - - $this->isComplete = true; - } -} - -``` - -This job setup has following features: - -* one item is processed in each step -* each step will produce a checkpoint so job can be safely resumed -* job manager will be notified about job progress and is unlikely to label the job as crashed by mistake -* job uses data to determine job completion rather than the steps -* original list of items is preserved in the job data so it can be used for other purposes (dependant jobs, debug). - -Don't forget that in your unit test you must call `process()` as many times as you have items in your test data as one `process()` call handles only one item. - -### Dependant jobs - -Sometimes it makes sense to split the work to be done between several jobs. -For example, consider the following flow: - -* page gets published (generates URLs for static cache) -* page gets statically cached (generates static HTML for provided URLs) -* page flushes cache on CDN for provided URLs. - -One way to implement this flow using queued jobs is to split the work between several jobs. -Note that these actions have to be done in sequence, so we may not be able to queue all needed jobs right away. - -This may be because of: - -* queue processing is run on multiple threads and we can't guarantee that jobs will be run in sequence -* later actions have data dependencies on earlier actions. - -In this situation, it's recommended to use the _Dependant job_ approach. - -Use the `updateJobDescriptorAndJobOnCompletion` extension point in `QueuedJobService::runJob()` like this: - -```php -public function updateJobDescriptorAndJobOnCompletion( - QueuedJobDescriptor $descriptor, - QueuedJob $job -): void -{ - // your code goes here -} -``` - -This extension point is invoked each time a job completes successfully. -This allows you to create a new job right after the current job completes. -You have access to the job object and to the job descriptor in the extension point. If you need any data from the previous job, simply use these two variables to access the needed data. - -Going back to our example, we would use the extension point to look for the static cache job, i.e. if the completed job is not the static cache job, just exit early. -Then we would extract the URLs we need form the `$job` variable and queue a new CDN flush job with those URLs. - -This approach has a downside though. The newly created job will be placed at the end of the queue. -As a consequence, the work might end up being very fragmented and each chunk may be processed at a different time. - -Some projects do not mind this however, so this solution may still be quite suitable. +This will add Job data and Messages raw tabs to the job descriptor edit form. Displayed information is read only. ## Contributing diff --git a/_config/queuedjobs.yml b/_config/queuedjobs.yml index ce4cda91..f0e3785b 100644 --- a/_config/queuedjobs.yml +++ b/_config/queuedjobs.yml @@ -12,6 +12,7 @@ SilverStripe\Core\Injector\Injector: queueHandler: '%$QueueHandler' # Change to %$DoormanRunner for async processing (requires *nix) queueRunner: '%$Symbiote\QueuedJobs\Tasks\Engines\QueueRunner' + logger: '%$Psr\Log\LoggerInterface' DefaultRule: class: 'AsyncPHP\Doorman\Rule\InMemoryRule' @@ -23,7 +24,7 @@ SilverStripe\Core\Injector\Injector: Symbiote\QueuedJobs\Tasks\Engines\DoormanRunner: properties: DefaultRules: - - '%$DefaultRule' + DefaultRule: '%$DefaultRule' SilverStripe\SiteConfig\SiteConfig: extensions: diff --git a/client/styles/task-runner.css b/client/styles/task-runner.css new file mode 100644 index 00000000..d07c0b32 --- /dev/null +++ b/client/styles/task-runner.css @@ -0,0 +1,47 @@ +/* This file is manually maintained, it is not generated from SCSS sources */ + +.task { + padding-top: 10px; +} + +.task__panel { + padding: 10px 15px 15px 15px; +} + +.task__selector { + display: inline-block; + height: 42px; + margin: 0; + visibility: hidden; +} + +.task__selector:checked + .task__label .task__label-inner, +.task__selector:hover + .task__label .task__label-inner { + border-bottom-color: #43536d; +} + +.task__label { + display: inline-block; + transform: translateY(-15px); +} + +.task__label-inner { + border-bottom: 2px solid transparent; + padding-bottom: 15px; + transition: border-bottom-color 0.2s; +} + +.task__label:first-child { + margin-left: 0; +} + +.task__panel .task__item { + display: none; +} + +.task__selector--immediate:checked ~ .task__panel .task__item--immediate, +.task__selector--universal:checked ~ .task__panel .task__item--universal, +.task__selector--queue-only:checked ~ .task__panel .task__item--queue-only, +.task__selector--all:checked ~ .task__panel .task__item { + display: inline-block; +} diff --git a/composer.json b/composer.json index 2cebb431..4a08e518 100644 --- a/composer.json +++ b/composer.json @@ -16,9 +16,9 @@ ], "require": { "php": "^7.1", - "silverstripe/framework": "^4", + "silverstripe/framework": "^4.7", "silverstripe/admin": "^1", - "asyncphp/doorman": "^3.0" + "asyncphp/doorman": "^3.1" }, "require-dev": { "sminnee/phpunit": "^5.7", @@ -29,7 +29,10 @@ "extra": { "branch-alias": { "dev-master": "4.x-dev" - } + }, + "expose": [ + "client/styles" + ] }, "replace": { "silverstripe/queuedjobs": "self.version" diff --git a/docs/en/configure-runners.md b/docs/en/configure-runners.md new file mode 100644 index 00000000..69c06feb --- /dev/null +++ b/docs/en/configure-runners.md @@ -0,0 +1,55 @@ +# Configuring Runners + +## Overview + +The default runner (`Symbiote\QueuedJobs\Tasks\Engines\QueueRunner`) +isn't great for any serious queue throughput, +and causes delays before a job gets picked up. Here's some alternatives. + +You might also be interested in ways to run [immediate jobs](immediate-jobs.md) +through watchers such as `inotifyd`. + +## Using Doorman for running jobs + +Doorman is included by default, and allows for asynchronous task processing. + +This requires that you are running an a unix based system, or within some kind of environment +emulator such as cygwin. + +In order to enable this, configure the `ProcessJobQueueTask` to use this backend. + + +```yaml +--- +Name: localproject +After: '#queuedjobsettings' +--- +SilverStripe\Core\Injector\Injector: + Symbiote\QueuedJobs\Services\QueuedJobService: + properties: + queueRunner: %$DoormanRunner +``` + +See [Multi process execution in Doorman](performance.md#multi-doorman) +for more ways to increase concurrency in Doorman. + +## Using Gearman for running jobs + +* Make sure gearmand is installed +* Get the gearman module from https://github.com/nyeholt/silverstripe-gearman +* Create a `_config/queuedjobs.yml` file in your project with the following declaration + +```yaml +--- +Name: localproject +After: '#queuedjobsettings' +--- +SilverStripe\Core\Injector\Injector: + QueueHandler: + class: Symbiote\QueuedJobs\Services\GearmanQueueHandler +``` + +* Run the gearman worker using `php gearman/gearman_runner.php` in your SS root dir + +This will cause all queuedjobs to trigger immediate via a gearman worker (`src/workers/JobWorker.php`) +EXCEPT those with a StartAfter date set, for which you will STILL need the cron settings from above diff --git a/docs/en/configuring-multi-process-execution.md b/docs/en/configuring-multi-process-execution.md deleted file mode 100644 index b7866573..00000000 --- a/docs/en/configuring-multi-process-execution.md +++ /dev/null @@ -1,55 +0,0 @@ -# Configuring Multi-process Execution - -You can enable multi-process execution by selecting `doorman` as the engine: - -```yaml ---- -Name: myqueuedjobsconfig -After: '#queuedjobsettings' ---- -SilverStripe\Core\Injector\Injector: - Symbiote\QueuedJobs\Services\QueuedJobService: - properties: - queueRunner: %$Symbiote\QueuedJobs\Tasks\Engines\DoormanRunner -``` - -By default, this will allow a single child process to complete queued jobs. You can increase the number of processes allowed by changing the default rule: - -```yaml ---- -Name: myqueuedjobsconfig ---- -SilverStripe\Core\Injector\Injector: - LowUsageRule: - class: 'AsyncPHP\Doorman\Rule\InMemoryRule' - properties: - Processes: 2 - MinimumProcessorUsage: 0 - MaximumProcessorUsage: 50 - MinimumMemoryUsage: 0 - MaximumMemoryUsage: 50 - MediumUsageRule: - class: 'AsyncPHP\Doorman\Rule\InMemoryRule' - properties: - Processes: 1 - MinimumProcessorUsage: 50 - MaximumProcessorUsage: 75 - MinimumMemoryUsage: 50 - MaximumMemoryUsage: 75 - HighUsageRule: - class: 'AsyncPHP\Doorman\Rule\InMemoryRule' - properties: - Processes: 0 - MinimumProcessorUsage: 75 - MaximumProcessorUsage: 100 - MinimumMemoryUsage: 75 - MaximumMemoryUsage: 100 - DoormanRunner: - properties: - DefaultRules: - - '%LowUsageRule' - - '%MediumUsageRule' - - '%HighUsageRule' -``` - -As with all parallel processing architectures, you should be aware of the race conditions that can occur. You cannot depend on a predictable order of execution, or that every process has a predictable state. Use this with caution! diff --git a/docs/en/default-jobs.md b/docs/en/default-jobs.md new file mode 100644 index 00000000..29b42e5c --- /dev/null +++ b/docs/en/default-jobs.md @@ -0,0 +1,99 @@ +# Default Jobs + +## Overview + +Some jobs should always be either running or queued to run, things like data refreshes or periodic clean up jobs, we call these Default Jobs. +Default jobs are checked for at the end of each job queue process, using the job type and any fields in the filter to create an SQL query e.g. + +```yml +ArbitraryName: + type: 'ScheduledExternalImportJob' + filter: + JobTitle: 'Scheduled import from Services' +``` + +Will become: + +```php +QueuedJobDescriptor::get()->filter([ + 'type' => 'ScheduledExternalImportJob', + 'JobTitle' => 'Scheduled import from Services' +]); +``` + +This query is checked to see if there's at least 1 healthly (new, run, wait or paused) job matching the filter. If there's not and recreate is true in the yml config we use the construct array as params to pass to a new job object e.g: + +```yml +ArbitraryName: + type: 'ScheduledExternalImportJob' + filter: + JobTitle: 'Scheduled import from Services' + recreate: 1 + construct: + repeat: 300 + contentItem: 100 + target: 157 +``` +If the above job is missing it will be recreated as: +```php +Injector::inst()->createWithArgs(ScheduledExternalImportJob::class, $construct[]) +``` + +## Pausing Default Jobs + +If you need to stop a default job from raising alerts and being recreated, set an existing copy of the job to Paused in the CMS. + +Default jobs are defined in yml config the sample below covers the options and expected values + +```yaml +SilverStripe\Core\Injector\Injector: + Symbiote\QueuedJobs\Services\QueuedJobService: + properties: + defaultJobs: + # This key is used as the title for error logs and alert emails + ArbitraryName: + # The job type should be the class name of a job REQUIRED + type: 'ScheduledExternalImportJob' + # This plus the job type is used to create the SQL query REQUIRED + filter: + # 1 or more Fieldname: 'value' sets that will be queried on REQUIRED + # These can be valid ORM filter + JobTitle: 'Scheduled import from Services' + # Parameters set on the recreated object OPTIONAL + construct: + # 1 or more Fieldname: 'value' sets be passed to the constructor REQUIRED + # If your constructor needs none, put something arbitrary + repeat: 300 + title: 'Scheduled import from Services' + # A date/time format string for the job's StartAfter field REQUIRED + # The shown example generates strings like "2020-02-27 01:00:00" + startDateFormat: 'Y-m-d H:i:s' + # A string acceptable to PHP's date() function for the job's StartAfter field REQUIRED + startTimeString: 'tomorrow 01:00' + # Sets whether the job will be recreated or not OPTIONAL + recreate: 1 + # Set the email address to send the alert to if not set site admin email is used OPTIONAL + email: 'admin@example.com' + # Minimal implementation will send alerts but not recreate + AnotherTitle: + type: 'AJob' + filter: + JobTitle: 'A job' +``` + +It's possible to enable a setting which allows the pausing of the queued jobs processing. To enable it, add following code to your config YAML file: + +```yaml +Symbiote\QueuedJobs\Services\QueuedJobService: + lock_file_enabled: true + lock_file_path: '/shared-folder-path' +``` + +`Queue settings` tab will appear in the CMS settings and there will be an option to pause the queued jobs processing. If enabled, no new jobs will start running however, the jobs already running will be left to finish. + This is really useful in case of planned downtime like queue jobs related third party service maintenance or DB restore / backup operation. + +Note that this maintenance lock state is stored in a file. This is intentionally not using DB as a storage as it may not be available during some maintenance operations. +Please make sure that the `lock_file_path` is pointing to a folder on a shared drive in case you are running a server with multiple instances. + +One benefit of file locking is that in case of critical failure (e.g.: the site crashes and CMS is not available), you may still be able to get access to the filesystem and change the file lock manually. +This gives you some additional disaster recovery options in case running jobs are causing the issue. diff --git a/docs/en/defining-jobs.md b/docs/en/defining-jobs.md new file mode 100644 index 00000000..87d276f5 --- /dev/null +++ b/docs/en/defining-jobs.md @@ -0,0 +1,371 @@ +# Defining Jobs + +The best way to learn about defining your own jobs is by checking the examples + +* `PublishItemsJob` - A job used to publish all the children of a particular node. To create this job, run the PublishItemsTask passing in the parent as a request var (eg ?parent=1) +* `GenerateGoogleSitemapJob` - A job used to create a google sitemap. If the googlesitemaps module is installed it will include priority settings as defined there, otherwise just produces a generic structure. To create an initial instance of this job, call dev/tasks/CreateDummyJob?name=GenerateGoogleSitemapJob. This will create the initial job and queue it; once the job has been run once, it will automatically schedule itself to be run again 24 hours later. +* `CreateDummyJob` - A very simple skeleton job. + +## API Overview + +The module comes with an `AbstractQueuedJob` class that defines many of the boilerplate functionality for the +job to execute within the framework. An example job can be found in queuedjobs/code/jobs/PublishItemsJob.php. + +The key points to be aware of are + +* When defining the constructor for your job, be aware that the QueuedJobService will, when +loading the job for execution, create an object of your job type without passing any parameters. Therefore, if you want to pass parameters when initially creating the job, make sure to provide defaults +(eg *__construct($param=null)*), and that their presence is detected before being used. So the base rules for `__construct`ors are + * they must have default parameters, as the JobService will re-create the job class passing through no constructor params + * you must have logic in your constructor that can determine if it's been constructed by the job service, or by user-land code, and whether the constructor params are to be used. + +The kind of logic to use in your constructor could be something like + +```php + +public function __construct($to = null) { + if ($to) { + // we know that we've been called by user code, so + // do the real initialisation work + } +} +``` + +Of course, the other alternative is to set properties on the job directly after constructing it from your own code. + +* **_Job Properties_** QueuedJobs inherited from the AbstractQueuedJob have a default mechanism for persisting values via the __set and __get mechanism that stores items in the *jobData* map, which is serialize()d between executions of the job processing. All you need to do from within your job is call `$this->myProperty = 'somevalue';`, and it will be automatically persisted for you; HOWEVER, on subsequent creation of the job object (ie, in the `__constructor()`) these properties _have not_ been loaded, so you _cannot_ rely on them at that point. +* **_Special Properties_** The queuedjobs framework itself expects the following properties to be set by a job to ensure that jobs execute smoothly and can be paused/stopped/restarted. **YOU MUST DEFINE THESE for this to be effectively hooked ** + * **totalSteps** - the total number of steps in the job + * **currentStep** - the current step in the job + * **isComplete** - whether the job is complete yet. This MUST be set to true for the job to be removed from the queue + * **messages** - an array that contains a list of messages about the job that can be displayed to the user via the CMS +* **_Titles_** Make sure to return a title via *getTitle()* - this is so that users can be shown what's running in the CMS admin. +* **_Job Signatures_** When a job is added to the job queue, it is assigned a unique key based on the parameters of the job +(see AbstractQueuedJob->getSignature()). If a job is already in the queue with the same signature, the new job +is NOT queued up; this prevents duplicate jobs being added to a queue, but in some cases that may be the +intention. If so, you'll need to override the getSignature() method in your custom job class and make sure +to return a unique signature for each instantiation of the job. +* **_Job Type_** You can use either QueuedJob::QUEUED, which will mean the job will run within a minute (due to the cronjob), or QueuedJob::IMMEDIATE, which will execute the job as soon as possible. This forces execution of the job at the end of the currently +processing request, OR if you have set QueuedJobService::$use_shutdown_function = false, a monitoring job to trigger the execution of the job queue (see the lsyncd config section). This job type is useful for doing small things (such as deleting a few items at a time, indexing content in a separate search indexer, etc) +* **_queueJob()_** To actually add a job to a queue, you call QueuedJobService->queueJob(Job $job, $startAfter=null). +This will add the job to whichever queue is relevant, with whatever 'startAfter' (a date in Y-m-d H:i:s format) +to start execution after particular datetime. +* **_Switching Users_** Jobs can be specified to run as a particular user. By default this is the user that created +the job, however it can be changed by setting the value returned by setting a user via the RunAs +relationship of the job. +* **_Job Execution_** The following sequence occurs at job execution + * The cronjob looks for jobs that need execution. + * The job is passed to QueuedJobService->runJob() + * The user to run as is set into the session + * The job is initialised. This calls *QueuedJob->setup()*. Generally, the *setup()* method should be used to provide +some initialisation of the job, in particular figuring out how many total steps will be required to execute (if it's +actually possible to determine this). Typically, the *setup()* method is used to generate a list of IDs of data +objects that are going to have some processing done to them, then each call to *process()* processes just one of +these objects. This method makes pausing and resuming jobs later quite a lot easier. +It is very important to be aware that this method is called every time a job is 'started' by a cron execution, +meaning that any time a job is paused and restarted, this code is executed. Your Job class MUST handle this in its +*setup()* method. In some cases, it won't change what happens because a restarted job should re-perform everything, +but in others it might only need to process the remainder of what is left. + * The QueuedJobService enters a loop that executes until either the job indicates it is finished +(the *QueuedJob->jobFinished()* method returns true), the job is in some way broken, or a user has paused the job +via the CMS. This loop repeatedly calls *QueuedJob->process()* - each time this is called, the job should execute +code equivalent of 1 step in the overall process, updating its currentStep value each call, and finally updating +the isComplete value if it is actually finished. After each return of *process()*, the job state is saved so that +broken or paused jobs can be picked up again later. + +## Terminology + +The following are some key parts of the system that you should be familiar with + +### AbstractQueuedJob + +A subclass to define your own queued jobs based upon. You don't neeeed to use it, but it takes care of a lot of stuff for you. + +### QueuedJobService + +A service for registering instances of queued jobs + +### QueuedJobProcessorTask + +The task you run to have queued jobs processed. This must be set up to run via cron. + +### QueuedJobDescriptor + +A QueuedJobDescriptor is the stored representation of a piece of work that could take a while to execute, +because of which it is desireable to not have it executing in parallel to other jobs. + +A queued job should always attempt to report how many potential dataobjects will be affected by being executed; +this will determine which queue it is placed within so that some shorter jobs can execute immediately without needing +to wait for a potentially long running job. + +Note that in future this may/will be adapted to work with the messagequeue module to provide a more distributed +approach to solving a very similar problem. The messagequeue module is a lot more generalised than this approach, +and while this module solves a specific problem, it may in fact be better working in with the messagequeue module + +## Multiple Steps {#multiple-steps} + +It is highly recommended to use the job steps feature in your jobs. +Job steps are required to avoid long-running jobs from being falsely detected as stale +(see [Troubleshooting: Jobs are marked as broken when they aren't](troubleshooting#broken)). + +The job step feature has two main purposes. + +* Communicating progress to the job manager so it knows if the job execution is still underway. +* Providing a checkpoint in case job execution is interrupted for some reason. This allows the job to resume from the last completed step instead of restarting from the beginning. + +The currently executing job step can also be observed in the CMS via the _Jobs admin_ UI. This is useful mostly for debugging purposes when monitoring job execution. + +Job steps *should not* be used to determine if a job is completed or not. You should rely on the job data or the database state instead. + +For example, consider a job which accept a list of items to process and each item represents a separate step. + +```php +items = $items; + } + + /** + * @return string + */ + public function getTitle(): string + { + return 'My awesome job'; + } + + public function setup(): void + { + $this->remaining = $this->items; + $this->totalSteps = count($this->items); + } + + public function process(): void + { + $remaining = $this->remaining; + + // check for trivial case + if (count($remaining) === 0) { + $this->isComplete = true; + + return; + } + + $item = array_shift($remaining); + + // code that will process your item goes here + + // update job progress + $this->remaining = $remaining; + $this->currentStep += 1; + + // check for job completion + if (count($remaining) > 0) { + return; + } + + $this->isComplete = true; + } +} + +``` + +This job setup has following features: + +* one item is processed in each step +* each step will produce a checkpoint so job can be safely resumed +* job manager will be notified about job progress and is unlikely to label the job as crashed by mistake +* job uses data to determine job completion rather than the steps +* original list of items is preserved in the job data so it can be used for other purposes (dependant jobs, debug). + +Don't forget that in your unit test you must call `process()` as many times as you have items in your test data as one `process()` call handles only one item. + +## Advanced Job Setup + +This section is recommended for developers who are already familiar with basic concepts and want to take full advantage of the features in this module. + +### Job creation + +First, let's quickly summarise the lifecycle of a queued job: + +1. job is created as an object in your code +2. job is queued, the matching job descriptor is saved into the database +3. job is picked and processed up by the queue runner. + +Important thing to note is that **step 3** will create an empty job instance and populate it with data from the matching job descriptor. +Any defined params in the job constructor will not be populated in this step. +If you want to define your own job constructor and not use the inherited one, you will need to take this into account when implementing your job. +Incorrect implementation may result in the job processing losing some or all of the job data before processing starts. +To avoid this issue consider using one of the options below to properly implement your job creation. + +Suppose we have a job which needs a `string`, an `integer` and an `array` as the input. + +#### Option 1: Job data is set directly + +It's possible to completely avoid defining constructor on your job and set the job data directly to the job object. +This is a good approach for simple jobs, but more complex jobs with a lot of properties may end up using several lines of code. + +##### Job class constructor + +```php +// no constructor +``` + +##### Job creation + +```php +$job = new MyJob(); +// set job data +$job->string = $string; +$job->integer = $integer; +$job->array = $array; +``` + +##### Advantages + +* No need to define constructor. +* Nullable values don't need to be handled. + +##### Disadvantages + +* No strict parameter types. +* Code may not be as DRY in case you create the job in many different places. + +#### Option 2: Job constructor with specific params + +Defining your own constructor is the most intuitive approach. +We need to take into consideration that the job constructor will be called without any params by the queue runner. +The implementation needs to provide default values for all parameters and handle this special case. + +##### Job class constructor + +```php +public function __construct(?string $string = null, ?int $integer = null, ?array $array = null) +{ + if ($string === null || $integer === null || $array === null) { + // job constructor called by the queue runner - exit early + return; + } + + // job constructor called in project code - populate job data + $this->string = $string; + $this->integer = $integer; + $this->array = $array; +} +``` + +##### Job creation + +```php +$job = new MyJob($string, $integer, $array); +``` + +##### Advantages + +* Typed parameters. + +##### Disadvantages + +* Nullable values need to be provided and code handling of the nullable values has to be present. That is necessary because the queue runner calls the constructor without parameters as data will come in later from job descriptor. +* Strict type is not completely strict because nullable values can be passed when they shouldn't be (e.g.: at job creation in your code). + +This approach is especially problematic on PHP 7.3 or higher as the static syntax checker may have an issue with nullable values and force you to implement additional check like `is_countable` on the job properties. + +#### Option 3: Job constructor without specific params + +The inherited constructor has a generic parameter array as the only input and we can use it to pass arbitrary parameters to our job. +This makes the job constructor match the parent constructor signature but there is no type checking. + +##### Job class constructor + +```php +public function __construct(array $params = []) +{ + if (!array_key_exists('string', $params) || !array_key_exists('integer', $params) || !array_key_exists('array', $params)) { + // job constructor called by the queue runner - exit early + return; + } + + // job constructor called in project code - populate job data + $this->string = $params['string']; + $this->integer = $params['integer']; + $this->array = $params['array']; +} +``` + +##### Job creation + +```php +$job = new MyJob(['string' => $string, 'integer' => $integer, 'array' => $array]); +``` + +##### Advantages + +* Nullable values don't need to be handled. + +##### Disadvantages + +* No strict parameter types. + +This approach is probably the simplest one but with the least parameter validation. + +#### Option 4: Separate hydrate method + +This approach is the strictest when it comes to validating parameters but requires the `hydrate` method to be called after each job creation. +Don't forget to call the `hydrate` method in your unit test as well. +This option is recommended for projects which have many job types with complex processing. Strict checking reduces the risk of input error. + +##### Job class constructor + +```php +// no constructor + +public function hydrate(string $string, int $integer, array $array): void +{ + $this->string = $string; + $this->integer = $integer; + $this->array = $array; +} +``` + +##### Job creation + +```php +$job = new MyJob(); +$job->hydrate($string, $integer, $array); +``` + +##### Unit tests + +```php +$job = new MyJob(); +$job->hydrate($string, $integer, $array); +$job->setup(); +$job->process(); + +$this->assertTrue($job->jobFinished()); +// other assertions can be placed here (job side effects, job data assertions...) +``` + +##### Advantages + +* Strict parameter type checking. +* No nullable values. +* No issues with PHP 7.3 or higher. + +##### Disadvantages + +* Separate method has to be implemented and called after job creation in your code. diff --git a/docs/en/dependant-jobs.md b/docs/en/dependant-jobs.md new file mode 100644 index 00000000..218f7ec3 --- /dev/null +++ b/docs/en/dependant-jobs.md @@ -0,0 +1,42 @@ +# Dependant jobs + +Sometimes it makes sense to split the work to be done between several jobs. +For example, consider the following flow: + +* page gets published (generates URLs for static cache) +* page gets statically cached (generates static HTML for provided URLs) +* page flushes cache on CDN for provided URLs. + +One way to implement this flow using queued jobs is to split the work between several jobs. +Note that these actions have to be done in sequence, so we may not be able to queue all needed jobs right away. + +This may be because of: + +* queue processing is run on multiple threads and we can't guarantee that jobs will be run in sequence +* later actions have data dependencies on earlier actions. + +In this situation, it's recommended to use the _Dependant job_ approach. + +Use the `updateJobDescriptorAndJobOnCompletion` extension point in `QueuedJobService::runJob()` like this: + +```php +public function updateJobDescriptorAndJobOnCompletion( + QueuedJobDescriptor $descriptor, + QueuedJob $job +): void +{ + // your code goes here +} +``` + +This extension point is invoked each time a job completes successfully. +This allows you to create a new job right after the current job completes. +You have access to the job object and to the job descriptor in the extension point. If you need any data from the previous job, simply use these two variables to access the needed data. + +Going back to our example, we would use the extension point to look for the static cache job, i.e. if the completed job is not the static cache job, just exit early. +Then we would extract the URLs we need form the `$job` variable and queue a new CDN flush job with those URLs. + +This approach has a downside though. The newly created job will be placed at the end of the queue. +As a consequence, the work might end up being very fragmented and each chunk may be processed at a different time. + +Some projects do not mind this however, so this solution may still be quite suitable. diff --git a/docs/en/immediate-jobs.md b/docs/en/immediate-jobs.md new file mode 100644 index 00000000..8fb249bc --- /dev/null +++ b/docs/en/immediate-jobs.md @@ -0,0 +1,137 @@ +# Immediate + +## Overview + +Queued jobs can be executed "immediately", which happens through +a PHP shutdown function by default ([details](index.md#immediate-jobs)). +Below are more robust ways to achieve immediate execution, +in addition to the standard queueing behaviour. + +When using these approaches, remember to disable the PHP shutdown behaviour: + +```yml +Symbiote\QueuedJobs\Services\QueuedJobService: + use_shutdown_function: false +``` + +## inotifywait + +The `inotifywait` system utility monitors a folder for changes (by +default this is SILVERSTRIPE_CACHE_DIR/queuedjobs) and triggers the `ProcessJobQueueTask` +with `job=$filename` as the argument. An example script is in `queuedjobs/scripts` that will run +inotifywait and then call the task when a new job is ready to run. + +```sh +#!/bin/sh + +# This script is an EXAMPLE ONLY. You must copy this into your system's script +# execution framework (init.d, service etc) and run it as a daemon AFTER +# editing the relevant paths for your system roots. + +SILVERSTRIPE_ROOT=/path/to/silverstripe +SILVERSTRIPE_CACHE=/path/to/silverstripe-cache + +inotifywait --monitor --event attrib --format "php $SILVERSTRIPE_ROOT/vendor/bin/sake dev/tasks/ProcessJobQueueTask job=%f" $SILVERSTRIPE_CACHE/queuedjobs | sh +``` + +You can also turn this into an `init.d` service: + +``` +#!/bin/bash +# +# /etc/init.d/queue_processor +# +# Service that watches for changes in queued_jobs targets. Defined targets will spawn an instance +# of inotitywait. +# +# Currently only tested on Centos5.6 (x86_64) +# +# Depends: inotify-tools (tested with Centos Package inotify-tools-3.14-1.el5) +# +# Usage: - Ensure that inotify-tools is installed. +# - Silverstripe cache paths are expected to be in $webroot/silverstripe-cache rather than /tmp +# - SILVERSTRIPE_ROOT is a space separated Array of Silvestripe installations +# +# - Copy this script to /etc/init.d/queue_processor +# - Update the SILVERSTRIPE_ROOT to reflect your installations +# - execute /etc/init.d/queue_processor start + +PATH=/bin:/usr/bin:/sbin:/usr/sbin +export PATH + +# Source function library. +. /etc/init.d/functions + +# Define a couple of base variables + +# list all the silverstripe root directories that you want monitored here +SILVERSTRIPE_ROOT=(/var/www/deployer/ /home/other/public-html/deployer) + + +start() { + echo -n "Starting queue_processor: " + for PATH in ${SILVERSTRIPE_ROOT[@]}; + do + INOTIFY_OPTS="--monitor --event attrib -q" + INOTIFY_ARGS="--format 'php ${PATH}/vendor/bin/sake dev/tasks/ProcessJobQueueTask job=%f' ${PATH}/silverstripe-cache/queuedjobs | /bin/sh" + daemon --user apache /usr/bin/inotifywait ${INOTIFY_OPTS} ${INOTIFY_ARGS} & + /bin/touch /var/lock/subsys/queue_processor + done + + return 0 +} + +stop() { + echo -n "Shutting down queue_processor: " + killproc inotifywait + rm -f /var/lock/subsys/queue_processor + return 0 +} + +case "$1" in + start) + start + ;; + stop) + stop + ;; + restart) + stop + start + ;; + *) + echo "Usage: queue_processor {start|stop|reload|}" + exit 1 + ;; +esac +exit $? +``` + +## lsyncd + +Similar concept to `inotifywait`, but with the `lsyncd` system utility. + +The following is an example config `/etc/lsyncd.conf` + +``` +-- Queue Processor configuration, typically placed in /etc/lsyncd.conf + +settings = { + logfile = "/var/log/lsyncd/lsyncd.log", + statusFile = "/var/run/lsyncd.status", + nodaemon = true, +} + +-- Define the command and path for the each system being monitored here, where webuser is the user your webserver +-- runs as +runcmd = "/sbin/runuser webuser -c \"/usr/bin/php /var/www/sitepath/framework/cli-script.php dev/tasks/ProcessJobQueueTask job=\"$1\" /var/www/sitepath/framework/silverstripe-cache/queuedjobs\"" + +site_processor = { + onCreate = function(event) + log("Normal", "got an onCreate Event") + spawnShell(event, runcmd, event.basename) + end, +} + +sync{site_processor, source="/var/www/sitepath/silverstripe-cache/queuedjobs"} +``` diff --git a/docs/en/index.md b/docs/en/index.md new file mode 100644 index 00000000..e4218f86 --- /dev/null +++ b/docs/en/index.md @@ -0,0 +1,202 @@ +# Overview + + * [Installation](#installation) + * [Triggering jobs](#triggering-jobs) + * [Defining Jobs](#defining-jobs) + * [Choosing a Runner](#choosing-a-runner) + * [Long-running Jobs](#long-running-jobs) + * [Immediate Jobs](#immediate-jobs) + * [Logging and Error Reporting](#logging-and-reporting) + * [Default Jobs](#default-jobs) + * [Job States](#job-states) + +## Installation {#installation} + +Install the cronjob needed to manage all the jobs within the system. It is best to have this execute as the +same user as your webserver - this prevents any problems with file permissions. + +``` +*/1 * * * * /path/to/silverstripe/vendor/bin/sake dev/tasks/ProcessJobQueueTask +``` + +To test things are working, run the following command to create a dummy task + +``` +vendor/bin/sake dev/tasks/CreateQueuedJobTask +``` + +Every job is tracked as a database record, through `QueuedJobDescriptor` objects. +This means jobs can also be managed via the CMS. +Open up `/admin/queuedjobs` in a browser. +You should see the new job with `Status=New`. +Now either wait for your cron to execute, or trigger execution manually. + +``` +vendor/bin/sake dev/tasks/ProcessJobQueueTask +``` + +The job should now be marked with `Status=Completed`. + +## Triggering jobs {#triggering-jobs} + +```php +$publish = new PublishItemsJob(21); +singleton('QueuedJobService')->queueJob($publish); +``` + +To schedule a job to be executed at some point in the future, pass a date through with the call to queueJob + +```php +// Runs a day from now +$publish = new PublishItemsJob(21); +singleton('QueuedJobService')->queueJob($publish, date('Y-m-d H:i:s', time() + 86400)); +``` + +## Defining jobs {#defining-jobs} + +Jobs are just PHP classes. They are stored as database records, +and can have different states, as well as multiple steps. +See [Defining Jobs](defining-jobs.md) for more information. + +## Choosing a runner {#choosing-a-runner} + +The default runner (`Symbiote\QueuedJobs\Tasks\Engines\QueueRunner`) +for queued (rather than immediate) jobs +will pick up one job every time it executes. Since that usually happens through +a cron task, and crons can only run once per minute, +this is quite limiting. + +The modules comes with more advanced approaches to speed up queues: + + * [Doorman](configure-runners.md): Spawns child PHP processes. Does not have any system dependencies. + * [Gearman](configure-runners.md): Triggered through a `gearmand` system process + +Note: If you're running a hosting-specific recipe such as +[cwp/cwp-core](https://github.com/silverstripe/cwp-core), +a runner might already be preconfigured for you - in this case Doorman. + +## Cleaning up job entries {#cleanup} + +Every job is a database record, and this table can fill up fast! +While it's great to have a record of when and how each job has run, +this can lead to delays in reading and writing from the `QueuedJobDescriptor` table. +The module has an optional `CleanupJob` for this purpose. You can enable +it through your config: + +```yaml +Symbiote\QueuedJobs\Jobs\CleanupJob: + is_enabled: true +``` + +You will need to trigger the first run manually in the UI. After that the CleanupJob is run once a day. + +You can configure this job to clean up based on the number of jobs, or the age of the jobs. This is +configured with the `cleanup_method` setting - current valid values are "age" (default) and "number". +Each of these methods will have a value associated with it - this is an integer, set with `cleanup_value`. +For "age", this will be converted into days; for "number", it is the minimum number of records to keep, sorted by LastEdited. +The default value is 30, as we are expecting days. + +You can determine which JobStatuses are allowed to be cleaned up. The default setting is to clean up "Broken" and "Complete" jobs. All other statuses can be configured with `cleanup_statuses`. You can also define `query_limit` to limit the number of rows queried/deleted by the cleanup job (defaults to 100k). + +The default configuration looks like this: + +```yaml +Symbiote\QueuedJobs\Jobs\CleanupJob: + is_enabled: false + query_limit: 100000 + cleanup_method: "age" + cleanup_value: 30 + cleanup_statuses: + - Broken + - Complete +``` + +## Long-Running Jobs {#long-running-jobs} + +If your code is to make use of the 'long' jobs, ie that could take days to process, also install another task +that processes this queue. Its time of execution can be left a little longer. + +``` +*/15 * * * * /path/to/silverstripe/vendor/bin/sake dev/tasks/ProcessJobQueueTask queue=large +``` + +From your code, add a new job for execution. + +```php +use Symbiote\QueuedJobs\Services\QueuedJobService; + +$publish = new PublishItemsJob(21); +QueuedJobService::singleton()->queueJob($publish); +``` + +To schedule a job to be executed at some point in the future, pass a date through with the call to queueJob +The following will run the publish job in 1 day's time from now. + +```php +use SilverStripe\ORM\FieldType\DBDatetime; +use Symbiote\QueuedJobs\Services\QueuedJobService; + +$publish = new PublishItemsJob(21); +QueuedJobService::singleton() + ->queueJob($publish, DBDatetime::create()->setValue(DBDatetime::now()->getTimestamp() + 86400)->Rfc2822()); +``` + +## Immediate Jobs {#immediate-jobs} + +Jobs can be declare to run "immediately", rather than being queued. +What that means in practice depends on your configuration. + +By default, these jobs are run on PHP shutdown, +in the same process which queued the job +(see `QueuedJobService::$use_shutdown_function`). +Code run after shutdown does not affect the response to the requesting client, +but does continue to consume resources on the server. +In a worker-based environment such as Apache or Nginx, +this can have side effects on request processing. + +So while this is the easiest setup (zero config or system dependencies), +there are more robust alternatives: + + * [inotifywait](immediate-jobs.md): Works based on watching files. Useful for immediate jobs. + * [lsyncd](immediate-run-through-lsyncd.md): Alternative based on watching files. + +## Logging and Error Reporting {#logging-and-reporting} + +Just like any other code in Silverstripe, jobs can create log entries and errors. +You should use the global `LoggerInterface` singleton +as outlined in the [framework docs on error handling](https://docs.silverstripe.org/en/4/developer_guides/debugging/error_handling/). + +Any log handlers which are configured within your application +(e.g. services like Sentry or Raygun) will also pick up logging +within your jobs, to the reporting level you've specified for them. + +Additionally, messages handled through `LoggerInterface` +as well as an exceptions thrown in a job will be logged +to the database record for the job in the `QueuedJobDescriptor.SavedJobMessages` +column. This makes it easier to associate messages to specific job runs, +particularly when running multiple jobs concurrently. + +Immediate jobs run through `ProcessJobQueueTask` will also +log to stderr and stdout when run through the command-line (incl. cron execution). +Queued jobs run this way may not log consistently to stdout and stderr, +see [troubleshooting](troubleshooting.md#cant-see-errors) + + +## Default Jobs {#default-jobs} + +Some jobs should always be either running or queued to run, things like data refreshes or periodic clean up jobs, we call these Default Jobs. +See [Default Jobs](default-jobs.md) for information on how to +disable or pause these jobs. + +## Job states {#job-states} + +It's really useful to understand how job state changes during the job lifespan as it makes troubleshooting easier. +Following chart shows the whole job lifecycle: + +![JobStatus](../job_status.jpg) + +* every job starts in `New` state +* every job should eventually reach either `Complete`, `Broken` or `Paused` +* `Cancelled` state is not listed here as the queue runner will never transition job to that state as it is reserved for user triggered actions +* progress indication is either state change or step increment +* every job can be restarted however there is a limit on how many times (see `stall_threshold` config) diff --git a/docs/en/performance.md b/docs/en/performance.md new file mode 100644 index 00000000..972015fa --- /dev/null +++ b/docs/en/performance.md @@ -0,0 +1,123 @@ +# Performance + +## Increase concurrent execution through runners + +The default runner only executes one job per minute +if it's set up via `cron`. That's not great. +See [alternative runners](configure-runners.md) +to speed this up, as well as +[Multi Process Execution in Doorman](#multi-doorman). + +## Clean up jobs database + +Every job is recorded in the database via the `QueuedJobDescriptor` table. +If you're running a lot of them, this table can quickly grow! +This can affect job execution due to slow lookups. +The easiest way around this is to +[clear out old job entries](index.md#cleanup) +regularly. + +## Time and Memory Limits + +By default task swill run until either 256mb or the limit specified by php\_ini('memory\_limit') is reached. +For some jobs you might need to increase this value: + + +```yaml +Symbiote\QueuedJobs\Services\QueuedJobService\QueuedJobsService: + # Accepts b, k, m, or b suffixes + memory_limit: 512m +``` + + +You can also enforce a time limit for each queue, after which the task will attempt a restart to release all +resources. By default this is disabled, so you must specify this in your project as below: + + +```yml +# Force limit to 10 minutes +Symbiote\QueuedJobs\Services\QueuedJobService\QueuedJobsService: + time_limit: 600 +``` + +## Indexes + +```sql +ALTER TABLE `QueuedJobDescriptor` ADD INDEX ( `JobStatus` , `JobType` ) +``` + +## Multi Process Execution in Doorman {#multi-doorman} + +The Doorman runner (`Symbiote\QueuedJobs\Tasks\Engines\DoormanRunner`) +supports multi process execution through the +[asyncphp/doorman](https://github.com/asyncphp/doorman/) library. +It works by spawning child processes within the main PHP execution +triggered through a cron job. + +The default configuration is limited to a single process. +If you want to allocate more server capacity to running queues, +you can increase the number of processes allowed by changing the default rule: + +```yaml +--- +Name: myqueuedjobsconfig +--- +SilverStripe\Core\Injector\Injector: + LowUsageRule: + class: 'AsyncPHP\Doorman\Rule\InMemoryRule' + properties: + Processes: 2 + MinimumProcessorUsage: 0 + MaximumProcessorUsage: 50 + MinimumMemoryUsage: 0 + MaximumMemoryUsage: 50 + MediumUsageRule: + class: 'AsyncPHP\Doorman\Rule\InMemoryRule' + properties: + Processes: 1 + MinimumProcessorUsage: 50 + MaximumProcessorUsage: 75 + MinimumMemoryUsage: 50 + MaximumMemoryUsage: 75 + HighUsageRule: + class: 'AsyncPHP\Doorman\Rule\InMemoryRule' + properties: + Processes: 0 + MinimumProcessorUsage: 75 + MaximumProcessorUsage: 100 + MinimumMemoryUsage: 75 + MaximumMemoryUsage: 100 + DoormanRunner: + properties: + DefaultRules: + - '%LowUsageRule' + - '%MediumUsageRule' + - '%HighUsageRule' +``` + +As with all parallel processing architectures, you should be aware of the race conditions that can occur. You cannot depend on a predictable order of execution, or that every process has a predictable state. Use this with caution! + + +## Ideal job size + +How much work should be done by a single job? This is the question you should ask yourself when implementing a new job type. +There is no precise answer. This really depends on your project setup but there are some good practices that should be considered: + +* similar size — it's easier to optimise the queue settings and stack size of your project when your jobs are about the same size +* split the job work into steps — this prevents your job running for too long without an update to the job manager and it lowers the risk of the job getting labelled as crashed +* avoid jobs that are too small — jobs that are too small produce a large amount of job management overhead and are thus inefficient +* avoid jobs that are too large — jobs that are too large are difficult to execute as they may cause timeout issues during execution. + +As a general rule of thumb, one run of your job's `process()` method should not exceed 30 seconds. + +If your job is too large and takes way too long to execute, the job manager may label the job as crashed even though it's still executing. +If this happens you can: + +* Add job steps which help the job manager to determine if job is still being processed. +* If you're job is already divided in steps, try dividing the larger steps into smaller ones. +* If your job performs actions that can be completed independently from the others, you can split the job into several smaller dependant jobs (e.g.: there is value even if only one part is completed). + +The dependant job approach also allows you to run these jobs concurrently on some project setups. +Job steps, on the other hand, always run in sequence. + +Read [Defining Jobs](defining-jobs.md) for different ways to create jobs. diff --git a/docs/en/troubleshooting.md b/docs/en/troubleshooting.md new file mode 100644 index 00000000..725ac45d --- /dev/null +++ b/docs/en/troubleshooting.md @@ -0,0 +1,93 @@ +# Troubleshooting + +## Jobs aren't running + +To make sure your job works, you can first try to execute the job directly outside the framework of the +queues - this can be done by manually calling the *setup()* and *process()* methods. If it works fine +under these circumstances, try having *getJobType()* return *QueuedJob::IMMEDIATE* to have execution +work immediately, without being persisted or executed via cron. If this works, next make sure your +cronjob is configured and executing correctly. + +If defining your own job classes, be aware that when the job is started on the queue, the job class +is constructed _without_ parameters being passed; this means if you accept constructor args, you +_must_ detect whether they're present or not before using them. See [this issue](https://github.com/symbiote/silverstripe-queuedjobs/issues/35) +and [this wiki page](https://github.com/symbiote/silverstripe-queuedjobs/wiki/Defining-queued-jobs) for +more information. + +If defining your own jobs, please ensure you follow PSR conventions, i.e. use "YourVendor" rather than "SilverStripe". + +Ensure that notifications are configured so that you can get updates or stalled or broken jobs. You can +set the notification email address in your config as below: + + +```yaml +SilverStripe\Control\Email\Email: + queued_job_admin_email: support@mycompany.com +``` + +## Jobs are broken but I can't see errors {#cant-see-errors} + +Make sure that you've got the right loggers configured. + +Check for messages on the job database record (`SavedJobMessages`). + +When using the Doorman runner, messages are only recorded on the job, +and not visible on the command line (see [bug report](https://github.com/asyncphp/doorman/issues/23)). + +## Jobs are executed more than once + +A long running job _may_ fool the system into thinking it has gone away (ie the job health check fails because +`currentStep` hasn't been incremented). To avoid this scenario, you can set `$this->currentStep = -1` in your job's +constructor, to prevent any health checks detecting the job.**** + +## Jobs are marked as broken when they aren't {#broken} + +Jobs track their execution in steps - as the job runs it increments the "steps" that have been run. Periodically jobs +are checked to ensure they are healthy. This asserts the count of steps on a job is always increasing between health +checks. By default health checks are performed when a worker picks starts running a queue. + +In a multi-worker environment this can cause issues when health checks are performed too frequently. You can disable the +automatic health check with the following configuration: + +```yaml +Symbiote\QueuedJobs\Services\QueuedJobService: + disable_health_check: true +``` + +Job health is checked automatically in queue processing. +You might also need to disable the `CheckJobHealthTask` if it's set up as a cron job. + +Alternatively, you can increase the TTL before jobs are considered stalled: + +``` +Symbiote\QueuedJobs\Services\QueuedJobService: + worker_ttl: 'PT120M' +``` + +The `RunBuildTaskJob` is excluded from these health checks because it can't use steps, +so you'll need to find other ways to ensure this type of job stays healthy when using it. + +## HTTP_HOST not set errors + +``` +Director::protocolAndHost() lacks sufficient information - HTTP_HOST not set. +``` + +The CLI execution environment doesn't know about your domains by default. +If anything in your jobs relies on this, you'll need to add it +an `SS_BASE_URL` to your `.env` file: + +``` +SS_BASE_URL="http://localhost/" +``` + +## php command not found + +If you are setting up the crons under Plesk 10, you might receive an email: + +_-: php: command not found + +This restriction is a security feature coming with Plesk 10. +On round about page 150 of the plesk Administrator Guide you will find a solution to enable scheduled tasks which use the command line. (But the latest Guide for 10.3.1 mentions /usr/local/psa/admin/bin/server_pref -u -crontab-secure-shell "/bin/sh" although "server_pref" doesnt exit. +Since we are using a dedicated server for only one customer, we defined the crons under "Server Management"->"Tools & Utilities"->"Scheduled Tasks"->"root". The security restrictions of plesk are not involved then. + diff --git a/docs/en/unit-testing.md b/docs/en/unit-testing.md new file mode 100644 index 00000000..aaff7a71 --- /dev/null +++ b/docs/en/unit-testing.md @@ -0,0 +1,118 @@ +## Unit tests + +Writing units tests for queued jobs can be tricky as it's quite a complex system. Still, it can be done. + +### Overview + +Note that you don't actually need to run your queued job via the `QueuedJobService` in your unit test in most cases. Instead, you can run it directly, like this: + +``` +$job = new YourQueuedJob($someArguments); +$job->setup(); +$job->process(); + +$this->assertTrue($job->jobFinished()); +other assertions can be placed here (job side effects, job data assertions...) +``` + +`setup()` needs to be run only once and `process()` needs to be run as many times as needed to complete the job. This depends on your job and the job data. +Usually, `process()` needs to be run once for every `step` your job completes, but this may vary per job implementation. Please avoid using `do while {jobFinished}`, you should always be clear on how many times the `process()` runs in your test job. +If you are unsure, do a test run in your application with some logging to indicate how many times it is run. + +This should cover most cases, but sometimes you need to run a job via the service. For example you may need to test if your job related extension hooks are working. + +### Advanced Usage + +Please be sure to disable the shutdown function and the queued job handler as these two will cause you some major pain in your unit tests. +You can do this in multiple ways: + +* `setUp()` at the start of your unit test + +This is pretty easy, but it may be tedious to add this to your every unit test. + +* create a parent class for your unit tests and add `setUp()` function to it + +You can now have the code in just one place, but inheritance has some limitations. + +* add a test state and add `setUp()` function to it, see `SilverStripe\Dev\State\TestState` + +Create your test state like this: + +``` +registerService(new QueuedJobsTest_Handler(), QueuedJobHandler::class); + Config::modify()->set(QueuedJobService::class, 'use_shutdown_function', false); + } + + public function tearDown(SapphireTest $test) + { + } + + public function setUpOnce($class) + { + } + + public function tearDownOnce($class) + { + } +} + +``` + +Register your test state with `Injector` like this: + +``` +SilverStripe\Core\Injector\Injector: + SilverStripe\Dev\State\SapphireTestState: + properties: + States: + queuedjobs: '%$App\Dev\State\QueuedJobTestState' +``` + +This solution is great if you want to apply this change to all of your unit tests. + +Regardless of which approach you choose, the two changes that need to be inside the `setUp()` function are as follows: + +This will replace the standard logger with a dummy one. +``` +Injector::inst()->registerService(new QueuedJobsTest_Handler(), QueuedJobHandler::class); +``` + +This will disable the shutdown function completely as `QueuedJobService` doesn't work well with `SapphireTest`. + +``` +Config::modify()->set(QueuedJobService::class, 'use_shutdown_function', false); +``` + +This is how your run a job via service in your unit tests. + +``` +$job = new YourQueuedJob($someArguments); + +/** @var QueuedJobService $service */ +$service = Injector::inst()->get(QueuedJobService::class); + +$descriptorID = $service->queueJob($job); +$service->runJob($descriptorID); + +/** @var QueuedJobDescriptor $descriptor */ +$descriptor = QueuedJobDescriptor::get()->byID($descriptorID); +$this->assertNotEquals(QueuedJob::STATUS_BROKEN, $descriptor->JobStatus); +``` + +For example, this code snippet runs the job and checks if the job ended up in a non-broken state. diff --git a/scripts/lsyncd-config.sample b/scripts/lsyncd-config.sample deleted file mode 100644 index f56b4108..00000000 --- a/scripts/lsyncd-config.sample +++ /dev/null @@ -1,21 +0,0 @@ --- Queue Processor configuration, typically placed in /etc/lsyncd.conf --- Remember to set QueuedJobService::$use_shutdown_function = false; in local.conf.php - -settings = { - logfile = "/var/log/lsyncd/lsyncd.log", - statusFile = "/var/run/lsyncd.status", - nodaemon = true, -} - --- Define the command and path for the each system being monitored here, where webuser is the user your webserver --- runs as -runcmd = "/sbin/runuser webuser -c \"/usr/bin/php /var/www/sitepath/vendor/bin/sake dev/tasks/ProcessJobQueueTask job=\"$1\" /var/www/sitepath/framework/silverstripe-cache/queuedjobs\"" - -site_processor = { - onCreate = function(event) - log("Normal", "got an onCreate Event") - spawnShell(event, runcmd, event.basename) - end, -} - -sync{site_processor, source="/var/www/sitepath/silverstripe-cache/queuedjobs"} diff --git a/scripts/queue_processor.init b/scripts/queue_processor.init deleted file mode 100644 index 5f17e1e4..00000000 --- a/scripts/queue_processor.init +++ /dev/null @@ -1,74 +0,0 @@ -#!/bin/bash -# -# -# -# /etc/init.d/queue_processor -# -# Service that watches for changes in queued_jobs targets. Defined targets will spawn an instance -# of inotitywait. -# -# Currently only tested on Centos5.6 (x86_64) -# -# Depends: inotify-tools (tested with Centos Package inotify-tools-3.14-1.el5) -# -# Usage: - Ensure that inotify-tools is installed. -# - Silverstripe cache paths are expected to be in $webroot/silverstripe-cache rather than /tmp -# - SILVERSTRIPE_ROOT is a space separated Array of Silvestripe installations -# -# - Copy this script to /etc/init.d/queue_processor -# - Update the SILVERSTRIPE_ROOT to reflect your installations -# - execute /etc/init.d/queue_processor start - - - - - -PATH=/bin:/usr/bin:/sbin:/usr/sbin -export PATH - -# Source function library. -. /etc/init.d/functions - -# Define a couple of base variables - -# list all the silverstripe root directories that you want monitored here -SILVERSTRIPE_ROOT=(/var/www/deployer/ /home/other/public-html/deployer) - - -start() { - echo -n "Starting queue_processor: " - for PATH in ${SILVERSTRIPE_ROOT[@]}; - do - INOTIFY_OPTS="--monitor --event attrib -q" - INOTIFY_ARGS="--format 'php ${PATH}/vendor/bin/sake dev/tasks/ProcessJobQueueTask job=%f' ${PATH}/silverstripe-cache/queuedjobs | /bin/sh" - daemon --user apache /usr/bin/inotifywait ${INOTIFY_OPTS} ${INOTIFY_ARGS} & - /bin/touch /var/lock/subsys/queue_processor - done - - return 0 -} - -stop() { - echo -n "Shutting down queue_processor: " - killproc inotifywait - rm -f /var/lock/subsys/queue_processor - return 0 -} - -case "$1" in - start) - start - ;; - stop) - stop - ;; - restart) - stop - start - ;; - *) - echo "Usage: queue_processor {start|stop|reload|}" - exit 1 - ;; -esac -exit $? diff --git a/scripts/run-queued-jobs.sh b/scripts/run-queued-jobs.sh deleted file mode 100644 index 1a47f9c8..00000000 --- a/scripts/run-queued-jobs.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/sh - -# This script is an EXAMPLE ONLY. You must copy this into your system's script -# execution framework (init.d, service etc) and run it as a daemon AFTER -# editing the relevant paths for your system roots. - -SILVERSTRIPE_ROOT=/path/to/silverstripe -SILVERSTRIPE_CACHE=/path/to/silverstripe-cache - -inotifywait --monitor --event attrib --format "php $SILVERSTRIPE_ROOT/vendor/bin/sake dev/tasks/ProcessJobQueueTask job=%f" $SILVERSTRIPE_CACHE/queuedjobs | sh diff --git a/src/Controllers/QueuedTaskRunner.php b/src/Controllers/QueuedTaskRunner.php index 6d548b1a..76269284 100644 --- a/src/Controllers/QueuedTaskRunner.php +++ b/src/Controllers/QueuedTaskRunner.php @@ -10,6 +10,9 @@ use SilverStripe\Dev\BuildTask; use SilverStripe\Dev\DebugView; use SilverStripe\Dev\TaskRunner; +use SilverStripe\ORM\ArrayList; +use SilverStripe\View\ArrayData; +use SilverStripe\View\ViewableData; use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor; use Symbiote\QueuedJobs\Jobs\RunBuildTaskJob; use Symbiote\QueuedJobs\Services\QueuedJobService; @@ -39,6 +42,13 @@ class QueuedTaskRunner extends TaskRunner 'queueTask', ]; + /** + * @var array + */ + private static $css = [ + 'symbiote/silverstripe-queuedjobs:client/styles/task-runner.css', + ]; + /** * Tasks on this list will be available to be run only via browser * @@ -62,6 +72,12 @@ class QueuedTaskRunner extends TaskRunner public function index() { + if (Director::is_cli()) { + // CLI mode - revert to default behaviour + return parent::index(); + } + + $baseUrl = Director::absoluteBaseURL(); $tasks = $this->getTasks(); $blacklist = (array) $this->config()->get('task_blacklist'); @@ -69,78 +85,65 @@ public function index() $backlistedTasks = []; $queuedOnlyTasks = []; - // Web mode - if (!Director::is_cli()) { - $renderer = new DebugView(); - echo $renderer->renderHeader(); - echo $renderer->renderInfo( - "SilverStripe Development Tools: Tasks (QueuedJobs version)", - Director::absoluteBaseURL() - ); - $base = Director::absoluteBaseURL(); - - echo "
"; - echo "

Queueable jobs

\n"; - echo "

By default these jobs will be added the job queue, rather than run immediately

\n"; - echo "
"; - - echo "
"; - echo "

Non-queueable tasks

\n"; - echo "

These tasks shouldn't be added the queuejobs queue, but you can run them immediately.

\n"; - echo "
"; - - echo "
"; - echo "

Queueable only tasks

\n"; - echo "

These tasks must be be added the queuejobs queue, running it immediately is not allowed.

\n"; - echo "
"; - echo $renderer->renderFooter(); + $taskList->push(ArrayData::create([ + 'QueueLink' => $baseUrl . 'dev/tasks/queue/' . $task['segment'], + 'TaskLink' => $baseUrl . 'dev/tasks/' . $task['segment'], + 'Title' => $task['title'], + 'Description' => $task['description'], + 'Type' => 'universal', + ])); + } - // CLI mode - revert to default behaviour - } else { - return parent::index(); + // Non-queueable tasks + foreach ($backlistedTasks as $task) { + $taskList->push(ArrayData::create([ + 'TaskLink' => $baseUrl . 'dev/tasks/' . $task['segment'], + 'Title' => $task['title'], + 'Description' => $task['description'], + 'Type' => 'immediate', + ])); } + + // Queue only tasks + $queueOnlyTaskList = ArrayList::create(); + + foreach ($queuedOnlyTasks as $task) { + $taskList->push(ArrayData::create([ + 'QueueLink' => $baseUrl . 'dev/tasks/queue/' . $task['segment'], + 'Title' => $task['title'], + 'Description' => $task['description'], + 'Type' => 'queue-only', + ])); + } + + $renderer = DebugView::create(); + $header = $renderer->renderHeader(); + $header = $this->addCssToHeader($header); + + $data = [ + 'Tasks' => $taskList, + 'Header' => $header, + 'Footer' => $renderer->renderFooter(), + 'Info' => $renderer->renderInfo('SilverStripe Development Tools: Tasks (QueuedJobs version)', $baseUrl), + ]; + + return ViewableData::create()->renderWith(static::class, $data); } diff --git a/src/DataObjects/QueuedJobDescriptor.php b/src/DataObjects/QueuedJobDescriptor.php index 911454e2..9bca9d08 100644 --- a/src/DataObjects/QueuedJobDescriptor.php +++ b/src/DataObjects/QueuedJobDescriptor.php @@ -2,13 +2,22 @@ namespace Symbiote\QueuedJobs\DataObjects; +use DateInterval; +use DateTime; use SilverStripe\Assets\Filesystem; use SilverStripe\Core\Config\Config; use SilverStripe\Core\Convert; +use SilverStripe\Forms\CheckboxField; +use SilverStripe\Forms\DatetimeField; use SilverStripe\Forms\DropdownField; use SilverStripe\Forms\FieldList; +use SilverStripe\Forms\HeaderField; use SilverStripe\Forms\LiteralField; +use SilverStripe\Forms\NumericField; +use SilverStripe\Forms\TextareaField; +use SilverStripe\Forms\TextField; use SilverStripe\ORM\DataObject; +use SilverStripe\ORM\FieldType\DBDatetime; use SilverStripe\ORM\FieldType\DBField; use SilverStripe\Security\Member; use SilverStripe\Security\Permission; @@ -124,6 +133,14 @@ class QueuedJobDescriptor extends DataObject */ private static $default_sort = 'Created DESC'; + /** + * Show job data and raw messages in the edit form + * + * @config + * @var bool + */ + private static $show_job_data = false; + public function requireDefaultRecords() { parent::requireDefaultRecords(); @@ -310,6 +327,21 @@ public function getJobTypeString() return isset($map[$this->JobType]) ? $map[$this->JobType] : '(Unknown)'; } + /** + * @return string|null + */ + public function getSavedJobDataPreview() + { + return $this->SavedJobData; + } + + /** + * @return string|null + */ + public function getMessagesRaw() + { + return $this->SavedJobMessages; + } /** * Return a map of numeric JobType values to localisable string representations. @@ -325,16 +357,13 @@ public function getJobTypeValues() } /** - * @return FieldList + * List all possible job statuses, useful for forms and filters + * + * @return array */ - public function getCMSFields() + public function getJobStatusValues(): array { - $fields = parent::getCMSFields(); - $fields->replaceField( - 'JobType', - new DropdownField('JobType', $this->fieldLabel('JobType'), $this->getJobTypeValues()) - ); - $statuses = [ + return [ QueuedJob::STATUS_NEW, QueuedJob::STATUS_INIT, QueuedJob::STATUS_RUN, @@ -344,18 +373,186 @@ public function getCMSFields() QueuedJob::STATUS_CANCELLED, QueuedJob::STATUS_BROKEN, ]; - $fields->replaceField( + } + + /** + * @return FieldList + */ + public function getCMSFields() + { + $fields = parent::getCMSFields(); + $statuses = $this->getJobStatusValues(); + $runAs = $fields->fieldByName('Root.Main.RunAsID'); + + $fields->removeByName([ + 'Expiry', + 'Implementation', + 'JobTitle', + 'JobFinished', + 'JobRestarted', + 'JobType', + 'JobStarted', 'JobStatus', - DropdownField::create('JobStatus', $this->fieldLabel('JobStatus'), array_combine($statuses, $statuses)) + 'LastProcessedCount', + 'NotifiedBroken', + 'ResumeCounts', + 'RunAs', + 'RunAsID', + 'SavedJobData', + 'SavedJobMessages', + 'Signature', + 'StepsProcessed', + 'StartAfter', + 'TotalSteps', + 'Worker', + 'WorkerCount', + ]); + + // Main + $fields->addFieldsToTab('Root.Main', [ + LiteralField::create( + 'JobProgressReportIntro', + sprintf( + '

%3$0.2f%% completed

%3$0.2f%%

', + $this->StepsProcessed, + $this->TotalSteps, + $this->TotalSteps > 0 ? ($this->StepsProcessed / $this->TotalSteps) * 100 : 0 + ) + ), + $jobTitle = TextField::create('JobTitle', 'Title'), + $status = DropdownField::create('JobStatus', 'Status', array_combine($statuses, $statuses)), + $jobType = DropdownField::create('JobType', 'Queue type', $this->getJobTypeValues()), + $runAs, + $startAfter = DatetimeField::create('StartAfter', 'Scheduled Start Time'), + HeaderField::create('JobTimelineTitle', 'Timeline'), + LiteralField::create( + 'JobTimelineIntro', + sprintf( + '

%s

', + 'It is recommended to avoid editing these fields' + . ' as they are managed by the Queue Runner / Service.' + ) + ), + $jobStarted = DatetimeField::create('JobStarted', 'Started (initial)'), + $jobRestarted = DatetimeField::create('JobRestarted', 'Started (recent)'), + $jobFinished = DatetimeField::create('JobFinished', 'Completed'), + ]); + + $jobFinished->setDescription('Job completion time.'); + $jobRestarted->setDescription('Most recent attempt to run the job.'); + $jobStarted->setDescription('First attempt to run the job.'); + $jobType->setDescription('Type of Queue which the jobs belongs to.'); + $status->setDescription('Represents current state within the job lifecycle.'); + + $jobTitle->setDescription( + 'This field can be used to hold user comments about specific jobs (no functional impact).' + ); + + $startAfter->setDescription( + 'Used to prevent the job from starting earlier than the specified time.' + . ' Note that this does not guarantee that the job will start' + . ' exactly at the specified time (it will start the next time the cron job runs).' + ); + + $runAs + ->setTitle('Run With User') + ->setDescription( + 'Select a user to be used to run this job.' + . ' This should be used in case the changes done by this job' + . ' have to look like the specified user made them.' + ); + + // Advanced + $fields->addFieldsToTab('Root.Advanced', [ + HeaderField::create('AdvancedTabTitle', 'Advanced fields', 1), + LiteralField::create( + 'AdvancedTabIntro', + sprintf( + '

%s

', + 'It is recommended to avoid editing these fields' + . ' as they are managed by the Queue Runner / Service.' + ) + ), + $implementation = TextField::create('Implementation', 'Job Class'), + $signature = TextField::create('Signature', 'Job Signature'), + $notifiedBroken = CheckboxField::create('NotifiedBroken', 'Broken job notification sent'), + HeaderField::create('AdvancedTabProgressTitle', 'Progression metadata'), + LiteralField::create( + 'AdvancedTabProgressIntro', + sprintf( + '

%s

', + 'Job progression mechanism related fields which are used to' + . ' ensure that stalled jobs are paused / resumed.' + ) + ), + $totalSteps = NumericField::create('TotalSteps', 'Steps Total'), + $stepsProcessed = NumericField::create('StepsProcessed', 'Steps Processed'), + $lastProcessCount = NumericField::create('LastProcessedCount', 'Steps Processed (previous)'), + $resumeCount = NumericField::create('ResumeCounts', 'Resume Count'), + HeaderField::create('AdvancedTabLockTitle', 'Lock metadata'), + LiteralField::create( + 'AdvancedTabLockTitleIntro', + sprintf( + '

%s

', + 'Job locking mechanism related fields which are used to' + . ' ensure that every job gets executed only once at any given time.' + ) + ), + $worker = TextField::create('Worker', 'Worker Signature'), + $workerCount = NumericField::create('WorkerCount', 'Worker Count'), + $expiry = DatetimeField::create('Expiry', 'Lock Expiry'), + ]); + + $implementation->setDescription('Class name which is used to execute this job.'); + $notifiedBroken->setDescription('Indicates if a broken job notification was sent (this happens only once).'); + $totalSteps->setDescription('Number of steps which is needed to complete this job.'); + $stepsProcessed->setDescription('Number of steps processed so far.'); + $workerCount->setDescription('Number of workers (processes) used to execute this job overall.'); + $worker->setDescription( + 'Used by a worker (process) to claim this job which prevents any other process from claiming it.' + ); + + $lastProcessCount->setDescription( + 'Steps Processed value from previous execution of this job' + . ', used to compare against current state of the steps to determine the difference (progress).' + ); + + $signature->setDescription( + 'Usualy derived from the job data, prevents redundant jobs from being created to some degree.' + ); + + $resumeCount->setDescription( + sprintf( + 'Number of times this job stalled and was resumed (limit of %d time(s)).', + QueuedJobService::singleton()->config()->get('stall_threshold') + ) ); - $fields->removeByName('SavedJobData'); - $fields->removeByName('SavedJobMessages'); + $expiry->setDescription( + sprintf( + 'Specifies when the lock is released (lock expires %d seconds after the job is claimed).', + $this->getWorkerExpiry() + ) + ); if (strlen($this->SavedJobMessages)) { $fields->addFieldToTab('Root.Messages', LiteralField::create('Messages', $this->getMessages())); } + if ($this->config()->get('show_job_data')) { + $fields->addFieldsToTab('Root.JobData', [ + $jobDataPreview = TextareaField::create('SavedJobDataPreview', 'Job Data'), + ]); + + $jobDataPreview->setReadonly(true); + + $fields->addFieldsToTab('Root.MessagesRaw', [ + $messagesRaw = TextareaField::create('MessagesRaw', 'Messages Raw'), + ]); + + $messagesRaw->setReadonly(true); + } + if (Permission::check('ADMIN')) { return $fields; } @@ -363,4 +560,17 @@ public function getCMSFields() // Readonly CMS view is a lot more useful for debugging than no view at all return $fields->makeReadonly(); } + + private function getWorkerExpiry() + { + $now = DBDatetime::now(); + $time = new DateTime($now->Rfc2822()); + $timeToLive = QueuedJobService::singleton()->config()->get('worker_ttl'); + + if ($timeToLive) { + $time->add(new DateInterval($timeToLive)); + } + + return $time->getTimestamp() - $now->getTimestamp(); + } } diff --git a/src/DataObjects/QueuedJobRule.php b/src/DataObjects/QueuedJobRule.php index b86ad6e8..bac8b104 100644 --- a/src/DataObjects/QueuedJobRule.php +++ b/src/DataObjects/QueuedJobRule.php @@ -48,7 +48,7 @@ class QueuedJobRule extends DataObject implements Rule */ public function getProcesses() { - if ($this->getField('Processes')) { + if ($this->getField('Processes') !== null) { return $this->getField('Processes'); } @@ -74,7 +74,7 @@ public function getHandler() */ public function getMinimumProcessorUsage() { - if ($this->getField('MinimumProcessorUsage')) { + if ($this->getField('MinimumProcessorUsage') !== null) { return $this->getField('MinimumProcessorUsage'); } @@ -88,7 +88,7 @@ public function getMinimumProcessorUsage() */ public function getMaximumProcessorUsage() { - if ($this->getField('MaximumProcessorUsage')) { + if ($this->getField('MaximumProcessorUsage') !== null) { return $this->getField('MaximumProcessorUsage'); } @@ -102,7 +102,7 @@ public function getMaximumProcessorUsage() */ public function getMinimumMemoryUsage() { - if ($this->getField('MinimumMemoryUsage')) { + if ($this->getField('MinimumMemoryUsage') !== null) { return $this->getField('MinimumMemoryUsage'); } @@ -114,7 +114,7 @@ public function getMinimumMemoryUsage() */ public function getMaximumMemoryUsage() { - if ($this->getField('MaximumMemoryUsage')) { + if ($this->getField('MaximumMemoryUsage') !== null) { return $this->getField('MaximumMemoryUsage'); } @@ -128,7 +128,7 @@ public function getMaximumMemoryUsage() */ public function getMinimumSiblingProcessorUsage() { - if ($this->getField('MinimumSiblingProcessorUsage')) { + if ($this->getField('MinimumSiblingProcessorUsage') !== null) { return $this->getField('MinimumSiblingProcessorUsage'); } @@ -142,7 +142,7 @@ public function getMinimumSiblingProcessorUsage() */ public function getMaximumSiblingProcessorUsage() { - if ($this->getField('MaximumSiblingProcessorUsage')) { + if ($this->getField('MaximumSiblingProcessorUsage') !== null) { return $this->getField('MaximumSiblingProcessorUsage'); } @@ -156,7 +156,7 @@ public function getMaximumSiblingProcessorUsage() */ public function getMinimumSiblingMemoryUsage() { - if ($this->getField('MinimumSiblingMemoryUsage')) { + if ($this->getField('MinimumSiblingMemoryUsage') !== null) { return $this->getField('MinimumSiblingMemoryUsage'); } @@ -170,7 +170,7 @@ public function getMinimumSiblingMemoryUsage() */ public function getMaximumSiblingMemoryUsage() { - if ($this->getField('MaximumSiblingMemoryUsage')) { + if ($this->getField('MaximumSiblingMemoryUsage') !== null) { return $this->getField('MaximumSiblingMemoryUsage'); } diff --git a/src/Jobs/RunBuildTaskJob.php b/src/Jobs/RunBuildTaskJob.php index 6502efab..59d3c839 100644 --- a/src/Jobs/RunBuildTaskJob.php +++ b/src/Jobs/RunBuildTaskJob.php @@ -10,9 +10,16 @@ use Symbiote\QueuedJobs\Services\QueuedJob; /** - * A job used to delete a data object. Typically used for deletes that need to happen on - * a schedule, or where the delete may have some onflow affect that takes a while to - * finish the deletion. + * A convenience wrapper for running BuildTask implementations. + * These are usually executed via synchronous web request + * or synchronous CLI execution (under dev/tasks/*). + * + * Caution: This job can't increment steps. This is a signal + * for job health checks that a job should be considered stale + * after a (short) timeout. If you expect a build task to run + * for more than a few minutes, create it as a job with steps, + * increase timeouts, or disable health checks. + * See "Defining Jobs" in the docs for details. * * @author marcus@symbiote.com.au * @license BSD License http://silverstripe.org/bsd-license/ diff --git a/src/Services/ProcessManager.php b/src/Services/ProcessManager.php new file mode 100644 index 00000000..cd3e8a4f --- /dev/null +++ b/src/Services/ProcessManager.php @@ -0,0 +1,67 @@ +config()->get('persistent_child_process')) { + // Prevent background tasks from being killed when this script finishes + // this is an override for the default behaviour of killing background tasks + return; + } + + parent::__destruct(); + } +} diff --git a/src/Services/QueuedJobHandler.php b/src/Services/QueuedJobHandler.php index 316948e2..2450dc45 100644 --- a/src/Services/QueuedJobHandler.php +++ b/src/Services/QueuedJobHandler.php @@ -2,6 +2,7 @@ namespace Symbiote\QueuedJobs\Services; +use Monolog\Formatter\LineFormatter; use Monolog\Handler\AbstractProcessingHandler; use SilverStripe\Core\Injector\Injectable; use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor; @@ -56,11 +57,21 @@ protected function write(array $record) public function handleBatch(array $records) { - foreach ($records as $record) { - $this->job->addMessage($record['message'], $record['level_name'], $record['datetime']); + foreach ($records as $i => $record) { + $records[$i] = $this->processRecord($records[$i]); + $records[$i]['formatted'] = $this->getFormatter()->format($records[$i]); + $this->job->addMessage($records[$i]['formatted'], $records[$i]['level_name'], $records[$i]['datetime']); }; $this->jobDescriptor->SavedJobMessages = serialize($this->job->getJobData()->messages); $this->jobDescriptor->write(); } + + /** + * Ensure that exception context is retained. Similar logic to SyslogHandler. + */ + protected function getDefaultFormatter() + { + return new LineFormatter('%message% %context% %extra%'); + } } diff --git a/src/Services/QueuedJobService.php b/src/Services/QueuedJobService.php index 018a180e..271f197f 100644 --- a/src/Services/QueuedJobService.php +++ b/src/Services/QueuedJobService.php @@ -124,6 +124,15 @@ class QueuedJobService */ private static $worker_ttl = 'PT5M'; + /** + * Duration for TTL of initialising state based on ISO 8601 duration specification. + * if a job is stuck in this state longer than this value it's considered stalled + * + * @var string + * @config + */ + private static $initialising_state_ttl = 'PT2M'; + /** * Timestamp (in seconds) when the queue was started * @@ -175,6 +184,11 @@ class QueuedJobService */ private static $lock_file_path = ''; + /** + * @var LoggerInterface + */ + private $logger; + /** * @var DefaultQueueHandler */ @@ -373,7 +387,7 @@ protected function copyDescriptorToJob($jobDescriptor, $job) * * @param string $type Job type * - * @return QueuedJobDescriptor|false + * @return QueuedJobDescriptor|null */ public function getNextPendingJob($type = null) { @@ -420,6 +434,7 @@ public function getNextPendingJob($type = null) * * @param int $queue The queue to check against * @return array stalled job and broken job IDs + * @throws Exception */ public function checkJobHealth($queue = null) { @@ -438,13 +453,19 @@ public function checkJobHealth($queue = null) // If no steps have been processed since the last run, consider it a broken job // Only check jobs that have been viewed before. LastProcessedCount defaults to -1 on new jobs. // Only check jobs that are past expiry to ensure another process isn't currently executing the job - $now = DBDatetime::now()->Rfc2822(); $stalledJobs = $runningJobs ->filter([ 'LastProcessedCount:GreaterThanOrEqual' => 0, - 'Expiry:LessThanOrEqual' => $now, ]) - ->where('"StepsProcessed" = "LastProcessedCount"'); + ->where('"StepsProcessed" = "LastProcessedCount"') + ->whereAny([ + // either job lock is expired + '"Expiry" <= ?' => DBDatetime::now()->Rfc2822(), + // or job lock was never assigned (maybe there were not enough server resources to kick off the process) + // fall back to LastEdited time and only restart those jobs that were left untouched for a small while + // this covers the situation where a process is still going to pick up the job + '"Expiry" IS NULL AND "LastEdited" <= ?' => $this->getInitStateExpiry() + ]); /** @var QueuedJobDescriptor $stalledJob */ foreach ($stalledJobs as $stalledJob) { @@ -487,16 +508,7 @@ public function checkJobHealth($queue = null) } $this->getLogger()->error( - print_r( - [ - 'errno' => 0, - 'errstr' => 'Broken jobs were found in the job queue', - 'errfile' => __FILE__, - 'errline' => __LINE__, - 'errcontext' => [], - ], - true - ), + 'Broken jobs were found in the job queue', [ 'file' => __FILE__, 'line' => __LINE__, @@ -600,8 +612,7 @@ public function checkDefaultJobs($queue = null) */ protected function restartStalledJob($stalledJob) { - // release job lock on the descriptor so it can run again - $stalledJob->Worker = null; + $this->releaseJobLock($stalledJob); if ($stalledJob->ResumeCounts < static::config()->get('stall_threshold')) { $stalledJob->restart(); @@ -735,7 +746,7 @@ protected function grabMutex(QueuedJobDescriptor $jobDescriptor) }); return true; - } catch (Exception $e) { + } catch (\Throwable $e) { // note that error here may not be an issue as failing to acquire a job lock is a valid state // which happens when other process claimed the job lock first $this->getLogger()->debug( @@ -770,6 +781,8 @@ protected function grabMutex(QueuedJobDescriptor $jobDescriptor) */ public function runJob($jobId) { + $logger = $this->getLogger(); + // first retrieve the descriptor /** @var QueuedJobDescriptor $jobDescriptor */ $jobDescriptor = DataObject::get_by_id( @@ -801,7 +814,7 @@ public function runJob($jobId) $broken = false; - $this->withNestedState(function () use ($jobDescriptor, $jobId, &$broken) { + $this->withNestedState(function () use ($jobDescriptor, $jobId, &$broken, $logger) { if (!$this->grabMutex($jobDescriptor)) { return; } @@ -853,17 +866,8 @@ public function runJob($jobId) ); if (!$jobDescriptor || !$jobDescriptor->exists()) { $broken = true; - $this->getLogger()->error( - print_r( - [ - 'errno' => 0, - 'errstr' => 'Job descriptor ' . $jobId . ' could not be found', - 'errfile' => __FILE__, - 'errline' => __LINE__, - 'errcontext' => [], - ], - true - ), + $logger->error( + 'Job descriptor ' . $jobId . ' could not be found', [ 'file' => __FILE__, 'line' => __LINE__, @@ -871,9 +875,13 @@ public function runJob($jobId) ); break; } + + // Add job-specific logger handling. Modifies the job singleton by reference + $this->addJobHandlersToLogger($logger, $job, $jobDescriptor); + if ($jobDescriptor->JobStatus != QueuedJob::STATUS_RUN) { // we've been paused by something, so we'll just exit - $job->addMessage(_t( + $logger->warning(_t( __CLASS__ . '.JOB_PAUSED', 'Job paused at {time}', ['time' => DBDatetime::now()->Rfc2822()] @@ -882,83 +890,24 @@ public function runJob($jobId) } if (!$broken) { - // Inject real-time log handler - $logger = Injector::inst()->get(LoggerInterface::class); - if ($logger instanceof Logger) { - // Check if there is already a handler - $exists = false; - foreach ($logger->getHandlers() as $handler) { - if ($handler instanceof QueuedJobHandler) { - $exists = true; - break; - } - } - - if (!$exists) { - // Add the handler - /** @var QueuedJobHandler $queuedJobHandler */ - $queuedJobHandler = QueuedJobHandler::create($job, $jobDescriptor); - - // We only write for every 100 file - $bufferHandler = new BufferHandler( - $queuedJobHandler, - 100, - Logger::DEBUG, - true, - true - ); - - $logger->pushHandler($bufferHandler); - } - } else { - if ($logger instanceof LoggerInterface) { - $logger->warning( - 'Monolog not found, messages will not output while the job is running' - ); - } - } - - // Collect output as job messages as well as sending it to the screen after processing - $obLogger = function ($buffer, $phase) use ($job, $jobDescriptor) { + // Collect output where jobs aren't using the logger singleton + ob_start(function ($buffer, $phase) use ($job, $jobDescriptor) { $job->addMessage($buffer); if ($jobDescriptor) { $this->copyJobToDescriptor($job, $jobDescriptor); $jobDescriptor->write(); } return $buffer; - }; - ob_start($obLogger, 256); + }, 256); try { $job->process(); - } catch (Exception $e) { - // okay, we'll just catch this exception for now - $job->addMessage( - _t( - __CLASS__ . '.JOB_EXCEPT', - 'Job caused exception {message} in {file} at line {line}', - [ - 'message' => $e->getMessage(), - 'file' => $e->getFile(), - 'line' => $e->getLine(), - ] - ) - ); - $this->getLogger()->error( - $e->getMessage(), - [ - 'exception' => $e, - ] - ); + } catch (\Throwable $e) { + $logger->error($e->getMessage(), ['exception' => $e]); $jobDescriptor->JobStatus = QueuedJob::STATUS_BROKEN; $this->extend('updateJobDescriptorAndJobOnException', $jobDescriptor, $job, $e); } - // Write any remaining batched messages at the end - if (isset($bufferHandler)) { - $bufferHandler->flush(); - } - ob_end_flush(); // now check the job state @@ -969,40 +918,38 @@ public function runJob($jobId) if ($stallCount > static::config()->get('stall_threshold')) { $broken = true; - $job->addMessage( - _t( - __CLASS__ . '.JOB_STALLED', - 'Job stalled after {attempts} attempts - please check', - ['attempts' => $stallCount] - ) - ); + $logger->error(_t( + __CLASS__ . '.JOB_STALLED', + 'Job stalled after {attempts} attempts - please check', + ['attempts' => $stallCount] + )); $jobDescriptor->JobStatus = QueuedJob::STATUS_BROKEN; } // now we'll be good and check our memory usage. If it is too high, we'll set the job to // a 'Waiting' state, and let the next processing run pick up the job. if ($this->isMemoryTooHigh()) { - $job->addMessage( - _t( - __CLASS__ . '.MEMORY_RELEASE', - 'Job releasing memory and waiting ({used} used)', - ['used' => $this->humanReadable($this->getMemoryUsage())] - ) - ); + $logger->warning(_t( + __CLASS__ . '.MEMORY_RELEASE', + 'Job releasing memory and waiting ({used} used)', + ['used' => $this->humanReadable($this->getMemoryUsage())] + )); if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) { $jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT; + $this->releaseJobLock($jobDescriptor); } $broken = true; } // Also check if we are running too long if ($this->hasPassedTimeLimit()) { - $job->addMessage(_t( + $logger->warning(_t( __CLASS__ . '.TIME_LIMIT', 'Queue has passed time limit and will restart before continuing' )); if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) { $jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT; + $this->releaseJobLock($jobDescriptor); } $broken = true; } @@ -1012,17 +959,8 @@ public function runJob($jobId) $this->copyJobToDescriptor($job, $jobDescriptor); $jobDescriptor->write(); } else { - $this->getLogger()->error( - print_r( - [ - 'errno' => 0, - 'errstr' => 'Job descriptor has been set to null', - 'errfile' => __FILE__, - 'errline' => __LINE__, - 'errcontext' => [], - ], - true - ), + $logger->error( + 'Job descriptor has been set to null', [ 'file' => __FILE__, 'line' => __LINE__, @@ -1044,15 +982,29 @@ public function runJob($jobId) $this->extend('updateJobDescriptorAndJobOnCompletion', $jobDescriptor, $job); } - } catch (Exception $e) { - // PHP 5.6 exception handling - $this->handleBrokenJobException($jobDescriptor, $job, $e); - $broken = true; } catch (\Throwable $e) { // PHP 7 Error handling) $this->handleBrokenJobException($jobDescriptor, $job, $e); $broken = true; } + + // Write any remaining batched messages at the end. + if ($logger instanceof Logger) { + foreach ($logger->getHandlers() as $handler) { + if ($handler instanceof BufferHandler) { + $handler->flush(); + } + } + } + + // If using a global singleton logger here, + // any messages added after this point will be auto-flushed on PHP shutdown through the handler. + // This causes a database write, and assumes the database and table will be available at this point. + if ($logger instanceof Logger) { + $logger->setHandlers(array_filter($logger->getHandlers(), function ($handler) { + return !($handler instanceof BufferHandler); + })); + } }); $this->unsetRunAsUser($runAsUser, $originalUser); @@ -1397,9 +1349,25 @@ public function onShutdown() */ public function getLogger() { + // Enable dependency injection + if ($this->logger) { + return $this->logger; + } + + // Fall back to implicitly created service return Injector::inst()->get(LoggerInterface::class); } + /** + * @param LoggerInterface $logger + */ + public function setLogger(LoggerInterface $logger) + { + $this->logger = $logger; + + return $this; + } + public function enableMaintenanceLock() { if (!$this->config()->get('lock_file_enabled')) { @@ -1463,6 +1431,84 @@ protected function getWorkerExpiry(): string return $expiry->Rfc2822(); } + /** + * Get expiry time for a INIT state of a queued job + * this helps to identify jobs that have stalled more accurately + * + * @return string + * @throws Exception + */ + protected function getInitStateExpiry(): string + { + $now = DBDatetime::now()->Rfc2822(); + $time = new DateTime($now); + $timeToLive = $this->config()->get('initialising_state_ttl'); + + if ($timeToLive) { + $time->sub(new DateInterval($timeToLive)); + } + + /** @var DBDatetime $expiry */ + $expiry = DBField::create_field('Datetime', $time->getTimestamp()); + + return $expiry->Rfc2822(); + } + + /** + * Add job-specific logger functionality which has the ability to flush logs into + * the job descriptor database record. Based on the default logger set for this class, + * which means it'll also log to other channels (e.g. stdout/stderr). + * + * @param QueuedJob $job + * @param QueuedJobDescriptor $jobDescriptor + */ + private function addJobHandlersToLogger(LoggerInterface $logger, QueuedJob $job, QueuedJobDescriptor $jobDescriptor) + { + if ($logger instanceof Logger) { + // Check if there is already a handler + $exists = false; + foreach ($logger->getHandlers() as $handler) { + if ($handler instanceof QueuedJobHandler) { + $exists = true; + break; + } + } + + if (!$exists) { + // Add the handler + /** @var QueuedJobHandler $queuedJobHandler */ + $queuedJobHandler = QueuedJobHandler::create($job, $jobDescriptor); + + // Only write for every 100 messages to avoid excessive database activity + $bufferHandler = new BufferHandler( + $queuedJobHandler, + 100, + Logger::DEBUG, + true, + true + ); + + $logger->pushHandler($bufferHandler); + } + } else { + if ($logger instanceof LoggerInterface) { + $logger->warning( + 'Monolog not found, messages will not output while the job is running' + ); + } + } + } + + /** + * Release job lock on the descriptor so it can run again + * + * @param QueuedJobDescriptor $descriptor + */ + protected function releaseJobLock(QueuedJobDescriptor $descriptor): void + { + $descriptor->Worker = null; + } + /** * @return string */ diff --git a/src/Tasks/Engines/DoormanRunner.php b/src/Tasks/Engines/DoormanRunner.php index 502620ab..08a4e6c8 100644 --- a/src/Tasks/Engines/DoormanRunner.php +++ b/src/Tasks/Engines/DoormanRunner.php @@ -2,12 +2,13 @@ namespace Symbiote\QueuedJobs\Tasks\Engines; -use AsyncPHP\Doorman\Manager\ProcessManager; +use SilverStripe\Core\ClassInfo; +use SilverStripe\Core\Config\Configurable; use SilverStripe\Core\Environment; use SilverStripe\Core\Injector\Injector; -use SilverStripe\ORM\FieldType\DBDatetime; use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor; use Symbiote\QueuedJobs\Jobs\DoormanQueuedJobTask; +use Symbiote\QueuedJobs\Services\ProcessManager; use Symbiote\QueuedJobs\Services\QueuedJob; use Symbiote\QueuedJobs\Services\QueuedJobService; @@ -16,6 +17,33 @@ */ class DoormanRunner extends BaseRunner implements TaskRunnerEngine { + use Configurable; + + /** + * How many ticks are executed per one @see runQueue method call + * set 0 for unlimited ticks + * + * @config + * @var int + */ + private static $max_ticks = 0; + + /** + * How many seconds between ticks + * + * @config + * @var int + */ + private static $tick_interval = 1; + + /** + * Name of the dev task used to run the child process + * + * @config + * @var string + */ + private static $child_runner = 'ProcessJobQueueChildTask'; + /** * @var string[] */ @@ -48,10 +76,13 @@ public function getDefaultRules() */ public function runQueue($queue) { - // check if queue can be processed $service = QueuedJobService::singleton(); + $logger = $service->getLogger(); + + // check if queue can be processed if ($service->isAtMaxJobs()) { - $service->getLogger()->info('Not processing queue as jobs are at max initialisation limit.'); + $logger->info('Not processing queue as jobs are at max initialisation limit.'); + return; } @@ -60,68 +91,113 @@ public function runQueue($queue) /** @var ProcessManager $manager */ $manager = Injector::inst()->create(ProcessManager::class); $manager->setWorker( - BASE_PATH . "/vendor/silverstripe/framework/cli-script.php dev/tasks/ProcessJobQueueChildTask" + sprintf( + '%s/vendor/silverstripe/framework/cli-script.php dev/tasks/%s', + BASE_PATH, + $this->getChildRunner() + ) ); + $logPath = Environment::getEnv('SS_DOORMAN_LOGPATH'); + if ($logPath) { $manager->setLogPath($logPath); } // Assign default rules $defaultRules = $this->getDefaultRules(); + if ($defaultRules) { foreach ($defaultRules as $rule) { + if (!$rule) { + continue; + } + $manager->addRule($rule); } } - $descriptor = $this->getNextJobDescriptorWithoutMutex($queue); + $tickCount = 0; + $maxTicks = $this->getMaxTicks(); + $descriptor = $service->getNextPendingJob($queue); while ($manager->tick() || $descriptor) { - if (QueuedJobService::singleton()->isMaintenanceLockActive()) { - $service->getLogger()->info('Skipped queued job descriptor since maintenance log is active.'); + if ($service->isMaintenanceLockActive()) { + $logger->info('Skipped queued job descriptor since maintenance lock is active.'); + return; } - $this->logDescriptorStatus($descriptor, $queue); - - if ($descriptor instanceof QueuedJobDescriptor) { - $descriptor->JobStatus = QueuedJob::STATUS_INIT; - $descriptor->write(); + if ($maxTicks > 0 && $tickCount >= $maxTicks) { + $logger->info(sprintf('Tick count has hit max ticks (%d)', $maxTicks)); - $manager->addTask(new DoormanQueuedJobTask($descriptor)); + return; } - sleep(1); + if ($service->isAtMaxJobs()) { + $logger->info( + sprintf( + 'Not processing queue as all job are at max limit. %s', + ClassInfo::shortName($service) + ) + ); + } elseif ($descriptor) { + $logger->info(sprintf('Next pending job is: %d', $descriptor->ID)); + $this->logDescriptorStatus($descriptor, $queue); + + if ($descriptor instanceof QueuedJobDescriptor) { + $descriptor->JobStatus = QueuedJob::STATUS_INIT; + $descriptor->write(); + + $manager->addTask(new DoormanQueuedJobTask($descriptor)); + } + } else { + $logger->info('Next pending job could NOT be found or lock could NOT be obtained.'); + } - $descriptor = $this->getNextJobDescriptorWithoutMutex($queue); + $tickCount += 1; + sleep($this->getTickInterval()); + $descriptor = $service->getNextPendingJob($queue); } } /** - * @param string $queue - * @return null|QueuedJobDescriptor + * Override this method if you need a dynamic value for the configuration, for example CMS setting + * + * @return int */ - protected function getNextJobDescriptorWithoutMutex($queue) + protected function getMaxTicks(): int { - $list = QueuedJobDescriptor::get() - ->filter('JobType', $queue) - ->sort('ID', 'ASC'); + return (int) $this->config()->get('max_ticks'); + } - $descriptor = $list - ->filter('JobStatus', QueuedJob::STATUS_WAIT) - ->first(); + /** + * Override this method if you need a dynamic value for the configuration, for example CMS setting + * + * @return int + */ + protected function getTickInterval(): int + { + return (int) $this->config()->get('tick_interval'); + } - if ($descriptor) { - return $descriptor; - } + /** + * Override this method if you need a dynamic value for the configuration, for example CMS setting + * + * @return string + */ + protected function getChildRunner(): string + { + return (string) $this->config()->get('child_runner'); + } - return $list - ->filter('JobStatus', QueuedJob::STATUS_NEW) - ->where(sprintf( - '"StartAfter" < \'%s\' OR "StartAfter" IS NULL', - DBDatetime::now()->getValue() - )) - ->first(); + /** + * @param string $queue + * @return QueuedJobDescriptor|null + * @deprecated 5.0 + */ + protected function getNextJobDescriptorWithoutMutex($queue) + { + return $this->getService()->getNextPendingJob($queue); } } diff --git a/src/Tasks/ProcessJobQueueTask.php b/src/Tasks/ProcessJobQueueTask.php index 460187b4..6df09121 100644 --- a/src/Tasks/ProcessJobQueueTask.php +++ b/src/Tasks/ProcessJobQueueTask.php @@ -2,7 +2,11 @@ namespace Symbiote\QueuedJobs\Tasks; +use Monolog\Handler\FilterHandler; +use Monolog\Handler\StreamHandler; +use Monolog\Logger; use SilverStripe\Control\HTTPRequest; +use SilverStripe\Core\Environment; use SilverStripe\Dev\BuildTask; use Symbiote\QueuedJobs\Services\QueuedJob; use Symbiote\QueuedJobs\Services\QueuedJobService; @@ -43,6 +47,26 @@ public function run($request) $service = $this->getService(); + // Ensure that log messages are visible when executing this task on CLI. + // Could be replaced with BuildTask logger: https://github.com/silverstripe/silverstripe-framework/issues/9183 + if (Environment::isCli()) { + $logger = $service->getLogger(); + + // Assumes that general purpose logger usually doesn't already contain a stream handler. + $errorHandler = new StreamHandler('php://stderr', Logger::ERROR); + $standardHandler = new StreamHandler('php://stdout'); + + // Avoid double logging of errors + $standardFilterHandler = new FilterHandler( + $standardHandler, + Logger::DEBUG, + Logger::WARNING + ); + + $logger->pushHandler($standardFilterHandler); + $logger->pushHandler($errorHandler); + } + if ($request->getVar('list')) { // List helper $service->queueRunner->listJobs(); diff --git a/templates/Symbiote/QueuedJobs/Controllers/QueuedTaskRunner.ss b/templates/Symbiote/QueuedJobs/Controllers/QueuedTaskRunner.ss new file mode 100644 index 00000000..fb11b192 --- /dev/null +++ b/templates/Symbiote/QueuedJobs/Controllers/QueuedTaskRunner.ss @@ -0,0 +1,50 @@ +$Header.RAW +$Info.RAW + +
+ + + + + + + + + + + + +
+ <% if $Tasks.Count > 0 %> +
+ <% loop $Tasks %> +
+
+

$Title

+
$Description
+
+
+ <% if $TaskLink %> + Run task + <% end_if %> + + <% if $QueueLink %> + Queue job + <% end_if %> +
+
+ <% end_loop %> +
+ <% end_if %> +
+
+ +$Footer.RAW diff --git a/tests/QueuedJobRuleTest.php b/tests/QueuedJobRuleTest.php new file mode 100644 index 00000000..e1ed7574 --- /dev/null +++ b/tests/QueuedJobRuleTest.php @@ -0,0 +1,60 @@ +{$property} = $value; + + $this->assertSame($expected, $rule->{$property}); + } + + public function ruleGetterProvider(): array + { + return [ + ['Processes', null, 1], + ['Processes', 0, 0], + ['Processes', 1, 1], + ['Processes', 2, 2], + ['Handler', null, null], + ['Handler', '', null], + ['Handler', 'Test', 'Test'], + ['MinimumProcessorUsage', null, null], + ['MinimumProcessorUsage', 0, 0], + ['MinimumProcessorUsage', 1, 1], + ['MaximumProcessorUsage', null, null], + ['MaximumProcessorUsage', 0, 0], + ['MaximumProcessorUsage', 1, 1], + ['MinimumMemoryUsage', null, null], + ['MinimumMemoryUsage', 0, 0], + ['MinimumMemoryUsage', 1, 1], + ['MaximumMemoryUsage', null, null], + ['MaximumMemoryUsage', 0, 0], + ['MaximumMemoryUsage', 1, 1], + ['MinimumSiblingProcessorUsage', null, null], + ['MinimumSiblingProcessorUsage', 0, 0], + ['MinimumSiblingProcessorUsage', 1, 1], + ['MaximumSiblingProcessorUsage', null, null], + ['MaximumSiblingProcessorUsage', 0, 0], + ['MaximumSiblingProcessorUsage', 1, 1], + ['MinimumSiblingMemoryUsage', null, null], + ['MinimumSiblingMemoryUsage', 0, 0], + ['MinimumSiblingMemoryUsage', 1, 1], + ['MaximumSiblingMemoryUsage', null, null], + ['MaximumSiblingMemoryUsage', 0, 0], + ['MaximumSiblingMemoryUsage', 1, 1], + ]; + } +} diff --git a/tests/QueuedJobsTest.php b/tests/QueuedJobsTest.php index 9cf21066..6fb2f207 100644 --- a/tests/QueuedJobsTest.php +++ b/tests/QueuedJobsTest.php @@ -493,6 +493,38 @@ public function testJobHealthCheck() ); } + public function testJobHealthCheckForStuckInitJobs() + { + $svc = $this->getService(); + $logger = $svc->getLogger(); + $job = new TestQueuedJob(QueuedJob::IMMEDIATE); + $id = $svc->queueJob($job); + + /** @var QueuedJobDescriptor $descriptor */ + $descriptor = QueuedJobDescriptor::get()->byID($id); + + // Kick off job processing - this is before job has a worker allocated + DBDatetime::set_mock_now('2017-01-01 16:00:00'); + $descriptor->JobStatus = QueuedJob::STATUS_INIT; + $descriptor->LastProcessedCount = 0; + $descriptor->StepsProcessed = 0; + $descriptor->write(); + + // Check that valid jobs are left untouched + DBDatetime::set_mock_now('2017-01-01 16:01:59'); + $svc->checkJobHealth(QueuedJob::IMMEDIATE); + + $descriptor = QueuedJobDescriptor::get()->byID($id); + $this->assertEquals(QueuedJob::STATUS_INIT, $descriptor->JobStatus); + + // Check that init jobs which are considered stuck are handled + DBDatetime::set_mock_now('2017-01-01 16:02:00'); + $svc->checkJobHealth(QueuedJob::IMMEDIATE); + + $descriptor = QueuedJobDescriptor::get()->byID($id); + $this->assertEquals(QueuedJob::STATUS_WAIT, $descriptor->JobStatus); + } + public function testExceptionWithMemoryExhaustion() { $svc = $this->getService();