Skip to content

Commit

Permalink
Merge pull request #32 from zf-fr/periodic-tasks
Browse files Browse the repository at this point in the history
Add support for periodic tasks
  • Loading branch information
danizord authored Jan 9, 2017
2 parents a75aeb0 + 45e893c commit 730086d
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 46 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 5.3.0

* Add support for periodic tasks

# 5.2.0

* Always add a `X-Handled-By` to the response at the end of the worker pipeline. This allows to make sure that the original response is modified, as
Expand Down
53 changes: 38 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,31 +103,31 @@ First, make sure to configure the ZfrEbWorker library by adding this config:
],

'messages' => [
'project.created' => SendCampaignMiddleware::class,
'image.saved' => ProcessImageMiddleware::class
'project.created' => SendCampaignListener::class,
'image.saved' => ProcessImageListener::class
]
```

The `queues` is an associative array of queue name and queue URL hosted on AWS SQS, while `messages` is an associative array that map
a message name to a specific middleware.
a message name to a specific listeners (each listener is just a standard middleware).

You can also attach multiple middlewares to a given message, hence allowing to do different actions based on a message:
You can also attach multiple listeners to a given message, hence allowing to do different actions based on a message:

```php
'zfr_eb_worker' => [
'messages' => [
'image.saved' => [
OptimizeImageMiddleware::class,
UploadImageMiddleware::class
OptimizeImageListener::class,
UploadImageListener::class
]
]
```

### Configuring Elastic Beanstalk

Then, you should configure your Elastic Beanstalk worker environment to push messages to "/internal/worker" URL (this is the
default URL configured if you use Zend Expressive). You could even add a pre-routing middleware to do additional security check
on this URL.
default URL configured if you use Zend Expressive). By default, ZfrEbWorker do additional security checks to ensure that the request
is coming from localhost (as the daemon is installed on EC2 instances directly and push the messages locally):

### Pushing message

Expand Down Expand Up @@ -179,18 +179,22 @@ ZfrEbWorker will automatically dispatch the incoming request to the middleware s
stored inside various request attributes, as shown below:

```php
use ZfrEbWorker\Middleware\WorkerMiddleware;

class MyEventMiddleware
{
public function __invoke($request, $response, $out)
{
$queue = $request->getAttribute('worker.matched_queue');
$messageId = $request->getAttribute('worker.message_id');
$messagePayload = $request->getAttribute('worker.message_payload');
$name = $request->getAttribute('worker.message_name');
$queue = $request->getAttribute(WorkerMiddleware::MATCHED_QUEUE_ATTRIBUTE);
$messageId = $request->getAttribute(WorkerMiddleware::MESSAGE_ID_ATTRIBUTE);
$messagePayload = $request->getAttribute(WorkerMiddleware::MESSAGE_PAYLOAD_ATTRIBUTE);
$name = $request->getAttribute(WorkerMiddleware::MESSAGE_NAME_ATTRIBUTE);
}
}
```

> Note: for a periodic task, only the `Middleware::MESSAGE_NAME_ATTRIBUTE` is available.
### How to silently ignore some message?

When ZfrEbWorker don't find a mapped middleware to handle a message, it throws a `RuntimeException`, which makes Elastic
Expand All @@ -206,9 +210,28 @@ Beanstalk to retry it later, you should map SilentFailingListener to the message

### How to use periodic tasks?

Elastic Beanstalk also supports periodic tasks through the usage of `cron.yaml` file. However, this is actually easier as you can
specify a different URL on a task-basis. Therefore, you can dispatch to the URL of your choice and immediately be re-routed to the
correct middleware.
Elastic Beanstalk also supports periodic tasks through the usage of `cron.yaml` file ([more info](http://docs.aws.amazon.com/elasticbeanstalk/latest/dg/using-features-managing-env-tiers.html#worker-periodictasks)).
ZfrEbWorker supports this use case in the same, unified way.

Simply redirect all your periodic tasks to the same "/internal/worker" route, and make sure that the task name you use is part of your config. For instance,
here is a task called "image.backup" that will run every 12 hours:

```yaml
version: 1
cron:
- name: "image.backup"
url: "/internal/worker"
schedule: "0 */12 * * *"
```
Then, in your ZfrEbWorker config, just configure it like any other messages:
```php
'zfr_eb_worker' => [
'messages' => [
'image.backup' => ImageBackupListener::class,
]
```

## CLI commands

Expand Down
2 changes: 1 addition & 1 deletion config/routes.global.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
WorkerMiddleware::class
],
'allowed_methods' => ['POST'],
],
]
],
];
20 changes: 15 additions & 5 deletions src/Middleware/WorkerMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,20 @@ public function __invoke(
ResponseInterface $response,
callable $out = null
): ResponseInterface {
// The full message is set as part of the body
$body = json_decode($request->getBody(), true);
$name = $body['name'];
$payload = $body['payload'];
// Two types of messages can be dispatched: either a periodic task or a normal task. For periodic tasks,
// the worker daemon automatically adds the "X-Aws-Sqsd-Taskname" header. When we find it, we simply use this
// name as the message name and continue the process

if ($request->hasHeader('X-Aws-Sqsd-Taskname')) {
// The full message is set as part of the body
$name = $request->getHeaderLine('X-Aws-Sqsd-Taskname');
$payload = [];
} else {
// The full message is set as part of the body
$body = json_decode($request->getBody(), true);
$name = $body['name'];
$payload = $body['payload'];
}

// Let's create a middleware pipeline of mapped middlewares
$pipeline = new Pipeline($this->container, $this->getMiddlewaresForMessage($name), $out);
Expand All @@ -97,7 +107,7 @@ public function __invoke(
/** @var ResponseInterface $response */
$response = $pipeline($request, $response);

return $response->withHeader('X-HANDLED-BY', 'ZfrEbWorker');
return $response->withHeader('X-Handled-By', 'ZfrEbWorker');
}

/**
Expand Down
61 changes: 36 additions & 25 deletions test/Middleware/WorkerMiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ public function testThrowsExceptionIfNoMappedMiddleware()
{
$middleware = new WorkerMiddleware([], $this->prophesize(ContainerInterface::class)->reveal());

$this->setExpectedException(
RuntimeException::class,
'No middleware was mapped for message "message-name". Did you fill the "zfr_eb_worker" configuration?'
);
$this->expectException(RuntimeException::class);
$this->expectExceptionMessage('No middleware was mapped for message "message-name". Did you fill the "zfr_eb_worker" configuration?');

$middleware($this->createRequest(), new Response(), function() {
$this->fail('$next should not be called');
Expand All @@ -48,10 +46,8 @@ public function testThrowsExceptionIfInvalidMappedMiddlewareType()
{
$middleware = new WorkerMiddleware(['message-name' => 10], $this->prophesize(ContainerInterface::class)->reveal());

$this->setExpectedException(
InvalidArgumentException::class,
'Mapped middleware must be either a string or an array of strings, integer given.'
);
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Mapped middleware must be either a string or an array of strings, integer given.');

$middleware($this->createRequest(), new Response(), function() {
$this->fail('$next should not be called');
Expand All @@ -62,10 +58,8 @@ public function testThrowsExceptionIfInvalidMappedMiddlewareClass()
{
$middleware = new WorkerMiddleware(['message-name' => new \stdClass()], $this->prophesize(ContainerInterface::class)->reveal());

$this->setExpectedException(
InvalidArgumentException::class,
'Mapped middleware must be either a string or an array of strings, stdClass given.'
);
$this->expectException(InvalidArgumentException::class);
$this->expectExceptionMessage('Mapped middleware must be either a string or an array of strings, stdClass given.');

$middleware($this->createRequest(), new Response(), function() {
$this->fail('$next should not be called');
Expand All @@ -77,12 +71,13 @@ public function testThrowsExceptionIfInvalidMappedMiddlewareClass()
*
* @param array|string $mappedMiddlewares
* @param int $expectedCounter
* @param bool $isPeriodicTask
*/
public function testDispatchesMappedMiddlewares($mappedMiddlewares, int $expectedCounter)
public function testDispatchesMappedMiddlewaresFor($mappedMiddlewares, int $expectedCounter, bool $isPeriodicTask)
{
$container = $this->prophesize(ContainerInterface::class);
$middleware = new WorkerMiddleware(['message-name' => $mappedMiddlewares], $container->reveal());
$request = $this->createRequest();
$request = $this->createRequest($isPeriodicTask);
$response = new Response();

if (is_string($mappedMiddlewares)) {
Expand All @@ -93,22 +88,28 @@ public function testDispatchesMappedMiddlewares($mappedMiddlewares, int $expecte
$container->get($mappedMiddleware)->shouldBeCalled()->willReturn([$this, 'incrementMiddleware']);
}

$out = function ($request, ResponseInterface $response) use ($expectedCounter) {
$out = function ($request, ResponseInterface $response) use ($expectedCounter, $isPeriodicTask) {
$this->assertEquals('message-name', $request->getAttribute(WorkerMiddleware::MESSAGE_NAME_ATTRIBUTE));
$this->assertEquals('default-queue', $request->getAttribute(WorkerMiddleware::MATCHED_QUEUE_ATTRIBUTE));
$this->assertEquals('123abc', $request->getAttribute(WorkerMiddleware::MESSAGE_ID_ATTRIBUTE));
$this->assertEquals('message-name', $request->getAttribute(WorkerMiddleware::MESSAGE_NAME_ATTRIBUTE));
$this->assertEquals(['id' => 123], $request->getAttribute(WorkerMiddleware::MESSAGE_PAYLOAD_ATTRIBUTE));
$this->assertEquals($expectedCounter, $request->getAttribute('counter', 0));
$this->assertEquals($expectedCounter, $response->hasHeader('counter') ? $response->getHeaderLine('counter') : 0);

if ($isPeriodicTask) {
// Elastic Beanstalk never push any body inside a periodic task
$this->assertEquals([], $request->getAttribute(WorkerMiddleware::MESSAGE_PAYLOAD_ATTRIBUTE));
} else {
$this->assertEquals(['id' => 123], $request->getAttribute(WorkerMiddleware::MESSAGE_PAYLOAD_ATTRIBUTE));
}

return $response->withAddedHeader('foo', 'bar');
};

/** @var ResponseInterface $returnedResponse */
$returnedResponse = $middleware($request, $response, $out);

$this->assertEquals('bar', $returnedResponse->getHeaderLine('foo'), 'Make sure that $out was called');
$this->assertEquals('ZfrEbWorker', $returnedResponse->getHeaderLine('X-HANDLED-BY'), 'Make sure that it adds the X-HANDLED-BY header');
$this->assertEquals('ZfrEbWorker', $returnedResponse->getHeaderLine('X-Handled-By'), 'Make sure that it adds the X-Handled-By header');
}

public function incrementMiddleware(ServerRequestInterface $request, ResponseInterface $response, callable $next): ResponseInterface
Expand All @@ -123,22 +124,32 @@ public function incrementMiddleware(ServerRequestInterface $request, ResponseInt
public function mappedMiddlewaresProvider(): array
{
return [
[[], 0],
['FooMiddleware', 1],
[['FooMiddleware'], 1],
[['FooMiddleware', 'BarMiddleware'], 2],
[['FooMiddleware', 'BarMiddleware', 'BazMiddleware'], 3],
[[], 0, false],
['FooMiddleware', 1, false],
[['FooMiddleware'], 1, false],
[['FooMiddleware', 'BarMiddleware'], 2, false],
[['FooMiddleware', 'BarMiddleware', 'BazMiddleware'], 3, false],
[[], 0, true],
['FooMiddleware', 1, true],
[['FooMiddleware'], 1, true],
[['FooMiddleware', 'BarMiddleware'], 2, true],
[['FooMiddleware', 'BarMiddleware', 'BazMiddleware'], 3, true],
];
}

private function createRequest(): ServerRequestInterface
private function createRequest(bool $isPeriodicTask = false): ServerRequestInterface
{
$request = new ServerRequest();

$request = $request->withHeader('X-Aws-Sqsd-Queue', 'default-queue');
$request = $request->withHeader('X-Aws-Sqsd-Msgid', '123abc');
$request = $request->withBody(new Stream('php://temp', 'w'));

$request->getBody()->write(json_encode(['name' => 'message-name', 'payload' => ['id' => 123]]));
if ($isPeriodicTask) {
$request = $request->withHeader('X-Aws-Sqsd-Taskname', 'message-name');
} else {
$request->getBody()->write(json_encode(['name' => 'message-name', 'payload' => ['id' => 123]]));
}

return $request;
}
Expand Down

0 comments on commit 730086d

Please sign in to comment.