-
Notifications
You must be signed in to change notification settings - Fork 134
Worker API
- All workers used in your application should be created in the
setup_workers()
method. - By convention, the alias you give a worker should begin with an Upper-Case letter.
- You should always specify a plausible timeout value. See the How Workers Work guide for in-depth technical reasoning behind this.
- You should take time to estimate the size of arguments passed-to and return arguments passed-from your workers. Add both of these together and multiply by 20 to determine how much memory you should allocate to the worker using its
malloc()
method. If you've under-allocated memory you will almost certainly experience reduced stability in your application. If the system thinks you've under-allocated it will write a warning to you application log.
To create a worker, use the Core_Daemon::worker($alias, $object)
method. It will look and feel exactly like creating a plugin: This is intentional. Behind the scenes Workers are just a special type of plugin.
In this example from Examples/LongPoll
a worker with the alias API
is being created
protected function setup_workers()
{
$this->worker('Api', new API);
$this->Api->workers(1);
$this->Api->timeout(120);
}
All objects created by the worker()
method ($this->Api
in this case) are instances of Core_Worker_Mediator
. The Mediator class exposes a small set of functionality to your application:
-
workers($count)
/** * Set the number of concurrent workers in the pool. No limit is enforced, but processes are expensive and you should use * the minimum number of workers necessary. Too few workers will result in high latency situations and bigger risk * that if your application needs to be restarted you'll lose buffered calls. * * In lazy forking strategy, the processes are forked one-by-one, as needed. This is avoided when your loop_interval * is very short (we don't want to be forking processes if you need to loop every half second, for example) but it's * the most ideal setting. Read more about the forking strategy for more information. * * Part of the Daemon API - Use from your daemon to set the number of concurrent asynchronous worker processes. * * @param int $workers * @throws Exception */
-
timeout($seconds)
As mentioned above, it's very important to set a realistic timeout. Time-outs are part of the Worker API message processing guarantee. In one sentence: If your worker dies with a fatal error, the only way your application will know about it is when the timeout is reached for that call. The Worker API will then be able to pass the failed call to your
onTimeout
callback giving you the option to retry. Further, setting an irrationally high timeout value could leave the workers frozen with messages piling up, never being called.Timeouts are very useful: In what other situation can you call a function and cancel it easily if it takes too long? Use them.
/** * Set the timeout for methods called on this worker. When a timeout happens, the onTimeout() callback is called. * * Part of the Daemon API - Use from your daemon to set a timeout for all worker calls. * * @param $timeout * @throws Exception */
-
malloc($bytes)
Set a memory allocation (in bytes). See the notes above or, for more in-depth technical details, the How Workers Work guide.
/** * Allocate the total size of shared memory that will be allocated for passing arguments and return values to/from the * worker processes. Should be sufficient to hold the working set of each worker pool. * * This is can be calculated roughly as: * ([Max Size Of Arguments Passed] + [Max Size of Return Value]) * ([Number of Jobs Running Concurrently] + [Number of Jobs Queued, Waiting to Run]) * * The memory used by a job is freed after a worker ack's the job as complete and the onReturn handler is called. * The total pool of memory allocated here is freed when: * 1) The daemon is stopped and no messages are left in the queue. * 2) The daemon is restarted without the --recoverworkers flag (In this case the memory is freed and released and then re-allocated. * This is useful if you need to resize the shared memory the worker uses or you just want to purge any stale messages) * * Part of the Daemon API - Use from your Daemon to allocate shared memory used among all worker processes. * * @default 1 MB * @param $bytes * @throws Exception * @return int */
-
is_idle()
If the method returns
true
you'll know that you can call a worker method and it will be processed immediately and not be buffered. You can also use this to create a system where the worker is always running in the background -- as soon as it returns you can call it again. You can see an example of this in theExamples/LongPoll
application./** * Does the worker have at least one idle process? * * Part of the Daemon API - Use from your daemon to determine if any of your daemon's worker processes are idle * * @example Use this to implement a pattern where there is always a background worker working. Suppose your daemon writes results to a file * that you want to upload to S3 continuously. You could create a worker to do the upload and set ->workers(1). In your execute() method * if the worker is idle, call the upload() method. This way it should, at all times, be uploading the latest results. * * @return bool */
-
status($call_id)
When you call a worker method, a
$call_id
is returned. If you have a need in your application, you can pass the id to thestatus()
method to determine the current state. The status will be returned as an integer. You can see the statuses in the "The Call Struct" section of the How Workers Work guide./** * Determine the status of a given call. Call ID's are returned when a job is called. Important to note that * call ID's are only unique within this worker and this execution. * * Part of the Daemon API - Use from your daemon to determine the status of a given call * * @param integer $call_id * @return int Return a status int - See status constants in this class */
-
retry($call_struct)
The only time a serialized call struct itself is passed to your code is in your
onTimeout
andonReturn
callbacks. In any event, if you happen to have a$call_struct
laying around, you can retry it by passing it to this method. Doing so will increment theretries
property and wipe-out any state that was accumulated the previous time around./** * Re-run a previous call by passing in the call's struct. * Note: When calls are re-run a retry=1 property is added, and that is incremented for each re-call. You should check * that value to avoid re-calling failed methods in an infinite loop. * * Part of the Daemon API - Use from your daemon to retry a given call * * @example You set a timeout handler using onTimeout. The worker will pass the timed-out call to the handler as a * stdClass object. You can re-run it by passing the object here. * @param stdClass $call * @return bool */
-
onReturn($callable)
andonTimeout($callable)
Like an Ajax request, your daemon will be notified when a worker call returns or times-out. This is done by calling the onReturn and onTimeout callbacks, respectively. Like the worker itself, you can pass any Closure or Callback to the
onReturn()
andonTimeout()
methods. When your callbacks are called, they are passed two arguments: AstdClass
Call Struct and a logging closure that you can use to easily write to your application log from within your callback.An example onReturn callback from the
Examples/PrimeNumbers
application$this->PrimeNumbers->onReturn(function($call, $log) { $log("Job {$call->id} to {$call->method}() Complete"); switch($call->method) { case "sieve": $log(sprintf('Return: There are %s items in the resultset, from %s to %s.', count($call->return), $call->return[0], $call->return[count($call->return)-1]) ); break; case "primes_among": $log(sprintf('Return. Among [%s], Primes Are [%s]', implode(', ', $call->args[0]), implode(', ', $call->return))); break; } });
As mentioned elsewhere, and similar to Task API, when you create a worker you can pass either a Callable or an instance of Core_IWorker. When using a Callable, your worker has just a single function. You can use the worker object accordingly:
// GetFactors is created from a closure
$this->GetFactors(mt_rand(1000,10000));
As always, the Mediator intercepts the call, serializes it, and passes it to the closure you supplied, now running smoothly in a background process.
However, when you use a Core_IWorker object, all of that objects methods will be exposed through the Mediator as worker objects that you can run in the background process. In the Examples/Prime_Numbers
application, for example, the Primes
class exposes 3 methods: is_prime
, primes_among
, and sieve
. You can call any of these methods. For example:
$this->PrimeNumbers->sieve(1000, 2000); // Find all the prime numbers between 1000 and 2000
It's possible that you could want to call a worker method inline, in your Application process. This of course would mean that it's blocking: Your application doesn't do anything until the call returns. And of course the result, if you return one, will be returned directly from the call. All of the callbacks and timeouts and serialization, all of it is bypassed.
I suppose you could have some methods in a Core_IWorker object you intend to call in your Application, or you could perhaps use logic to choose at any given instance whether you want to call a function inline or in the background.
In any event, it's very easy. Here are the 2 examples used in the previous section as inline calls:
GetFactors:
// GetFactors is created from a closure
$this->GetFactors->inline(mt_rand(1000,10000));
PrimeNumbers:
$this->PrimeNumbers->inline()->sieve(1000, 2000); // Find all the prime numbers between 1000 and 2000
As mentioned elsewhere, and similar to Task API, when you create a worker you can pass either a Callable or an instance of Core_IWorker. When using a Callable, there is no supplied way to interact with the mediator or your application.
Using a Closure
As always, applications you build by extending Core_Daemon
are singletons and you can grab an instance to your application using YourApplicationClass::getInstance()
. Aside from that, it's anticipated that if you're passing a closure as a worker, it will be a somewhat straightforward and simple task.
Using a Callback
A common scenario is to have a method in your Application class that you want to run in a background process. You can see this pattern implemented as the task_sleep
method in the Examples/Tasks
application. In that case, task_sleep
is used with the Task API but, as we've learned by now, it's interchangable with the Worker API.
While you can also use a Callback on any other object or function, this is a common pattern beacuse, of course, that method can access the "main" class of your daemon application as $this
.
Using a Core_IWorker Object
When you use a Core_IWorker
object, a reference to the mediator object will be set as $this->mediator
. This gives you access to several methods in the mediator that are designed to be used by worker objects, as well as a way to access the application object.
-
log()
/** * Write do the Daemon's event log * * Part of the Worker API - Use from your workers to log events to the Daemon event log * * @param $message * @return void */
-
error()
/** * Dispatch ON_ERROR event and write an error message to the Daemon's event log * * Part of the Worker API - Use from your workers to log an error message. * * @param $message * @return void */
-
fatal_error()
/** * Dispatch ON_ERROR event, write an error message to the event log, and restart the worker. * * Part of the Worker API - Use from your worker to log a fatal error message and restart the current process. * * @param $message * @return void */
-
daemon()
/** * Access daemon properties from within your workers * * Part of the Worker API - Use from your worker to access data set on your Daemon class * * @example [inside a worker class] $this->mediator->daemon('dbconn'); * @example [inside a worker class] $ini = $this->mediator->daemon('ini'); $ini['database']['password'] * @param $property * @return mixed */