Skip to content

Commit

Permalink
NEW CLI output in ProcessJobQueueTask
Browse files Browse the repository at this point in the history
It's counterintuitive to run the queue on CLI (e.g. when testing things),
get zero error output, and then discover why the job was broken by looking at a tiny text field in the admin/jobs CMS UI.

There's an interesting edge case where the logger *does* output to CLI only when
a broken job is discovered, because that uses the logger for messages *before*
adding the job-specific handlers. And in case a Monolog logger implementation doesn't have any handlers,
it'll add a php://stderr by default. Very confusing :D

Note that modifying the singleton during execution by adding job-specific handlers isn't ideal (didn't change that status quo).
But there's no clear interface for any services being executed through a task
receiving a logger *from* the task. So we have to assume they'll use the injector singleton.
Technically it means any messages after the job-specific execution (e.g. during shutdown)
would also be logged into the QueuedJobDescriptor database record,
but you could argue that's desired behaviour.

This should really be fixed by adding BuildTask->getLogger() and making it available to all tasks,
but this is the first step to fix this specific task behaviour.
See silverstripe/silverstripe-framework#9183
  • Loading branch information
chillu committed Jun 12, 2020
1 parent 6576b3f commit 798f7e5
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 43 deletions.
1 change: 1 addition & 0 deletions _config/queuedjobs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ SilverStripe\Core\Injector\Injector:
queueHandler: %$QueueHandler
# Change to %$DoormanRunner for async processing (requires *nix)
queueRunner: %$Symbiote\QueuedJobs\Tasks\Engines\QueueRunner
logger: %$Psr\Log\LoggerInterface

DefaultRule:
class: 'AsyncPHP\Doorman\Rule\InMemoryRule'
Expand Down
114 changes: 71 additions & 43 deletions src/Services/QueuedJobService.php
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ class QueuedJobService
*/
private static $lock_file_path = '';

/**
* @var LoggerInterface
*/
private $logger;

/**
* @var DefaultQueueHandler
*/
Expand Down Expand Up @@ -750,6 +755,8 @@ protected function grabMutex(QueuedJobDescriptor $jobDescriptor)
*/
public function runJob($jobId)
{
$logger = $this->getLogger();

// first retrieve the descriptor
/** @var QueuedJobDescriptor $jobDescriptor */
$jobDescriptor = DataObject::get_by_id(
Expand Down Expand Up @@ -781,7 +788,7 @@ public function runJob($jobId)

$broken = false;

$this->withNestedState(function () use ($jobDescriptor, $jobId, &$broken) {
$this->withNestedState(function () use ($jobDescriptor, $jobId, &$broken, $logger) {
if (!$this->grabMutex($jobDescriptor)) {
return;
}
Expand Down Expand Up @@ -853,52 +860,18 @@ public function runJob($jobId)
}

if (!$broken) {
// Inject real-time log handler
$logger = Injector::inst()->get(LoggerInterface::class);
if ($logger instanceof Logger) {
// Check if there is already a handler
$exists = false;
foreach ($logger->getHandlers() as $handler) {
if ($handler instanceof QueuedJobHandler) {
$exists = true;
break;
}
}

if (!$exists) {
// Add the handler
/** @var QueuedJobHandler $queuedJobHandler */
$queuedJobHandler = QueuedJobHandler::create($job, $jobDescriptor);

// We only write for every 100 file
$bufferHandler = new BufferHandler(
$queuedJobHandler,
100,
Logger::DEBUG,
true,
true
);

$logger->pushHandler($bufferHandler);
}
} else {
if ($logger instanceof LoggerInterface) {
$logger->warning(
'Monolog not found, messages will not output while the job is running'
);
}
}
// Add job-specific logger handling. Modifies the job singleton by reference
$this->addJobHandlersToLogger($logger, $job, $jobDescriptor);

// Collect output as job messages as well as sending it to the screen after processing
$obLogger = function ($buffer, $phase) use ($job, $jobDescriptor) {
// Collect output where jobs aren't using the logger singleton
ob_start(function ($buffer, $phase) use ($job, $jobDescriptor) {
$job->addMessage($buffer);
if ($jobDescriptor) {
$this->copyJobToDescriptor($job, $jobDescriptor);
$jobDescriptor->write();
}
return $buffer;
};
ob_start($obLogger, 256);
}, 256);

try {
$job->process();
Expand All @@ -915,7 +888,7 @@ public function runJob($jobId)
]
)
);
$this->getLogger()->error(
$logger->error(
$e->getMessage(),
[
'exception' => $e,
Expand Down Expand Up @@ -983,7 +956,7 @@ public function runJob($jobId)
$this->copyJobToDescriptor($job, $jobDescriptor);
$jobDescriptor->write();
} else {
$this->getLogger()->error(
$logger->error(
print_r(
[
'errno' => 0,
Expand Down Expand Up @@ -1368,7 +1341,17 @@ public function onShutdown()
*/
public function getLogger()
{
return Injector::inst()->get(LoggerInterface::class);
return $this->logger;
}

/**
* @param LoggerInterface $logger
*/
public function setLogger(LoggerInterface $logger)
{
$this->logger = $logger;

return $this;
}

public function enableMaintenanceLock()
Expand Down Expand Up @@ -1434,6 +1417,51 @@ protected function getWorkerExpiry(): string
return $expiry->Rfc2822();
}

/**
* Add job-specific logger functionality which has the ability to flush logs into
* the job descriptor database record. Based on the default logger set for this class,
* which means it'll also log to other channels (e.g. stdout/stderr).
*
* @param QueuedJob $job
* @param QueuedJobDescriptor $jobDescriptor
*/
private function addJobHandlersToLogger(LoggerInterface $logger, QueuedJob $job, QueuedJobDescriptor $jobDescriptor)
{
if ($logger instanceof Logger) {
// Check if there is already a handler
$exists = false;
foreach ($logger->getHandlers() as $handler) {
if ($handler instanceof QueuedJobHandler) {
$exists = true;
break;
}
}

if (!$exists) {
// Add the handler
/** @var QueuedJobHandler $queuedJobHandler */
$queuedJobHandler = QueuedJobHandler::create($job, $jobDescriptor);

// Only write for every 100 messages to avoid excessive database activity
$bufferHandler = new BufferHandler(
$queuedJobHandler,
100,
Logger::DEBUG,
true,
true
);

$logger->pushHandler($bufferHandler);
}
} else {
if ($logger instanceof LoggerInterface) {
$logger->warning(
'Monolog not found, messages will not output while the job is running'
);
}
}
}

/**
* @return string
*/
Expand Down
24 changes: 24 additions & 0 deletions src/Tasks/ProcessJobQueueTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

namespace Symbiote\QueuedJobs\Tasks;

use Monolog\Handler\FilterHandler;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use SilverStripe\Control\HTTPRequest;
use SilverStripe\Core\Environment;
use SilverStripe\Dev\BuildTask;
use Symbiote\QueuedJobs\Services\QueuedJob;
use Symbiote\QueuedJobs\Services\QueuedJobService;
Expand Down Expand Up @@ -43,6 +47,26 @@ public function run($request)

$service = $this->getService();

// Ensure that log messages are visible when executing this task on CLI.
// TODO Replace with BuildTask logger: https://github.com/silverstripe/silverstripe-framework/issues/9183
if (Environment::isCli()) {
$logger = $service->getLogger();

// Assumes that general purpose logger usually doesn't already contain a stream handler.
$errorHandler = new StreamHandler('php://stderr', Logger::ERROR);
$standardHandler = new StreamHandler('php://stdout');

// Avoid double logging of errors
$standardFilterHandler = new FilterHandler(
$standardHandler,
Logger::DEBUG,
Logger::WARNING
);

$logger->pushHandler($standardFilterHandler);
$logger->pushHandler($errorHandler);
}

if ($request->getVar('list')) {
// List helper
$service->queueRunner->listJobs();
Expand Down

0 comments on commit 798f7e5

Please sign in to comment.