Skip to content

Commit

Permalink
Add missing header
Browse files Browse the repository at this point in the history
  • Loading branch information
bakura10 committed Dec 30, 2016
1 parent 17c5934 commit 45e893c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 55 deletions.
52 changes: 7 additions & 45 deletions src/Middleware/WorkerMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,27 +84,15 @@ public function __invoke(
// name as the message name and continue the process

if ($request->hasHeader('X-Aws-Sqsd-Taskname')) {
return $this->processPeriodicTask($request, $response, $out);
// The full message is set as part of the body
$name = $request->getHeaderLine('X-Aws-Sqsd-Taskname');
$payload = [];
} else {
return $this->processTask($request, $response, $out);
// The full message is set as part of the body
$body = json_decode($request->getBody(), true);
$name = $body['name'];
$payload = $body['payload'];
}
}

/**
* @param ServerRequestInterface $request
* @param ResponseInterface $response
* @param callable|null $out
* @return ResponseInterface
*/
private function processTask(
ServerRequestInterface $request,
ResponseInterface $response,
callable $out = null
): ResponseInterface {
// The full message is set as part of the body
$body = json_decode($request->getBody(), true);
$name = $body['name'];
$payload = $body['payload'];

// Let's create a middleware pipeline of mapped middlewares
$pipeline = new Pipeline($this->container, $this->getMiddlewaresForMessage($name), $out);
Expand All @@ -122,32 +110,6 @@ private function processTask(
return $response->withHeader('X-Handled-By', 'ZfrEbWorker');
}

/**
* @param ServerRequestInterface $request
* @param ResponseInterface $response
* @param callable|null $out
* @return ResponseInterface
*/
private function processPeriodicTask(
ServerRequestInterface $request,
ResponseInterface $response,
callable $out = null
): ResponseInterface {
// The full message is set as part of the body
$name = $request->getHeaderLine('X-Aws-Sqsd-Taskname');

// Let's create a middleware pipeline of mapped middlewares
$pipeline = new Pipeline($this->container, $this->getMiddlewaresForMessage($name), $out);

// For periodic tasks, Elastic Beanstalk set different headers than for normal tasks
$request = $request->withAttribute(self::MESSAGE_NAME_ATTRIBUTE, $name);

/** @var ResponseInterface $response */
$response = $pipeline($request, $response);

return $response->withHeader('X-Handled-By', 'ZfrEbWorker');
}

/**
* @param string $messageName
*
Expand Down
22 changes: 12 additions & 10 deletions test/Middleware/WorkerMiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,18 @@ public function testDispatchesMappedMiddlewaresFor($mappedMiddlewares, int $expe

$out = function ($request, ResponseInterface $response) use ($expectedCounter, $isPeriodicTask) {
$this->assertEquals('message-name', $request->getAttribute(WorkerMiddleware::MESSAGE_NAME_ATTRIBUTE));
$this->assertEquals('default-queue', $request->getAttribute(WorkerMiddleware::MATCHED_QUEUE_ATTRIBUTE));
$this->assertEquals('123abc', $request->getAttribute(WorkerMiddleware::MESSAGE_ID_ATTRIBUTE));
$this->assertEquals($expectedCounter, $request->getAttribute('counter', 0));
$this->assertEquals($expectedCounter, $response->hasHeader('counter') ? $response->getHeaderLine('counter') : 0);

if (!$isPeriodicTask) {
$this->assertEquals('default-queue', $request->getAttribute(WorkerMiddleware::MATCHED_QUEUE_ATTRIBUTE));
$this->assertEquals('123abc', $request->getAttribute(WorkerMiddleware::MESSAGE_ID_ATTRIBUTE));
if ($isPeriodicTask) {
// Elastic Beanstalk never push any body inside a periodic task
$this->assertEquals([], $request->getAttribute(WorkerMiddleware::MESSAGE_PAYLOAD_ATTRIBUTE));
} else {
$this->assertEquals(['id' => 123], $request->getAttribute(WorkerMiddleware::MESSAGE_PAYLOAD_ATTRIBUTE));
}

$this->assertEquals($expectedCounter, $request->getAttribute('counter', 0));
$this->assertEquals($expectedCounter, $response->hasHeader('counter') ? $response->getHeaderLine('counter') : 0);

return $response->withAddedHeader('foo', 'bar');
};

Expand Down Expand Up @@ -139,13 +141,13 @@ private function createRequest(bool $isPeriodicTask = false): ServerRequestInter
{
$request = new ServerRequest();

$request = $request->withHeader('X-Aws-Sqsd-Queue', 'default-queue');
$request = $request->withHeader('X-Aws-Sqsd-Msgid', '123abc');
$request = $request->withBody(new Stream('php://temp', 'w'));

if ($isPeriodicTask) {
$request = $request->withHeader('X-Aws-Sqsd-Taskname', 'message-name');
} else {
$request = $request->withHeader('X-Aws-Sqsd-Queue', 'default-queue');
$request = $request->withHeader('X-Aws-Sqsd-Msgid', '123abc');
$request = $request->withBody(new Stream('php://temp', 'w'));

$request->getBody()->write(json_encode(['name' => 'message-name', 'payload' => ['id' => 123]]));
}

Expand Down

0 comments on commit 45e893c

Please sign in to comment.