Skip to content

Commit

Permalink
Merge pull request #4 from tompedals/feature/poller
Browse files Browse the repository at this point in the history
Add non-blocking poller
  • Loading branch information
rcwsr authored Mar 8, 2017
2 parents b7e2517 + cee3d13 commit 09f6112
Show file tree
Hide file tree
Showing 16 changed files with 785 additions and 131 deletions.
14 changes: 14 additions & 0 deletions src/Broker/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,20 @@ public function consume(callable $callback = null)
$this->getAMQPQueue()->consume($callback);
}

/**
* @return Message|null
*/
public function pop()
{
$envelope = $this->getAMQPQueue()->get();

if ($envelope instanceof AMQPEnvelope) {
return Message::createFromEnvelope($envelope);
}

return null;
}

public function cancel()
{
$this->getAMQPQueue()->cancel();
Expand Down
30 changes: 30 additions & 0 deletions src/Broker/QueueCollection.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@

class QueueCollection implements ConsumableInterface
{
/**
* @var Queue[]
*/
protected $queues;

/**
* @var int
*/
private $queueCounter = 0;

public function __construct(array $queues = [])
{
$this->queues = [];
Expand Down Expand Up @@ -47,6 +55,28 @@ public function consume(callable $callback = null)
$this->cancel();
}

/**
* @return Message|null
*/
public function pop()
{
$keys = array_keys($this->queues);
if (!isset($keys[$this->queueCounter])) {
$this->queueCounter = 0;
}

$queue = $this->queues[$keys[$this->queueCounter]];

$message = $queue->pop();
$this->queueCounter++;

if ($message !== null) {
return $message;
}

return null;
}

public function cancel()
{
foreach ($this->queues as $queue) {
Expand Down
33 changes: 33 additions & 0 deletions src/Broker/QueueLoader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

namespace Radish\Broker;

class QueueLoader
{
/**
* @var QueueRegistry
*/
private $queueRegistry;

/**
* @param QueueRegistry $queueRegistry
*/
public function __construct(QueueRegistry $queueRegistry)
{
$this->queueRegistry = $queueRegistry;
}
/**
* @param array $queueNames
* @return QueueCollection
*/
public function load(array $queueNames)
{
$queues = new QueueCollection();

foreach ($queueNames as $queueName) {
$queues->add($this->queueRegistry->get($queueName));
}

return $queues;
}
}
63 changes: 35 additions & 28 deletions src/Consumer/ConsumerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,55 @@
namespace Radish\Consumer;

use Radish\Broker\QueueCollection;
use Radish\Broker\QueueLoader;
use Radish\Broker\QueueRegistry;
use Radish\Middleware\ConfigurableInterface;
use Radish\Middleware\MiddlewareLoader;
use Radish\Middleware\MiddlewareRegistry;
use Psr\Log\LoggerInterface;
use Radish\Poller\Poller;
use Symfony\Component\OptionsResolver\OptionsResolver;

class ConsumerFactory implements ConsumerFactoryInterface
{
protected $queueRegistry;
protected $middlewareRegistry;
/**
* @var LoggerInterface|null
*/
protected $logger;

public function __construct(QueueRegistry $queueRegistry, MiddlewareRegistry $middlewareRegistry, LoggerInterface $logger = null)
/**
* @var MiddlewareLoader
*/
private $middlewareLoader;
/**
* @var QueueLoader
*/
private $queueLoader;

/**
* @param QueueLoader $queueLoader
* @param MiddlewareLoader $middlewareLoader
* @param LoggerInterface|null $logger
*/
public function __construct(QueueLoader $queueLoader, MiddlewareLoader $middlewareLoader, LoggerInterface $logger = null)
{
$this->queueRegistry = $queueRegistry;
$this->middlewareRegistry = $middlewareRegistry;
$this->queueLoader = $queueLoader;
$this->middlewareLoader = $middlewareLoader;
$this->logger = $logger;
}

/**
* @param array $queueNames
* @param array $middlewareOptions
* @param array $workers
* @return Consumer
*/
public function create(array $queueNames, array $middlewareOptions, array $workers)
{
$queues = new QueueCollection();
foreach ($queueNames as $queueName) {
$queues->add($this->queueRegistry->get($queueName));
}

$middlewares = [];
foreach ($middlewareOptions as $middlewareName => $options) {
$middleware = $this->middlewareRegistry->get($middlewareName);
if ($middleware instanceof ConfigurableInterface) {
if (!is_array($options)) {
$options = [];
}

$optionsResolver = new OptionsResolver();
$middleware->configureOptions($optionsResolver);
$middleware->setOptions($optionsResolver->resolve($options));
}

$middlewares[] = $middleware;
}

return new Consumer($queues, $middlewares, $workers, $this->logger);
return new Consumer(
$this->queueLoader->load($queueNames),
$this->middlewareLoader->load($middlewareOptions),
$workers,
$this->logger
);
}
}
122 changes: 122 additions & 0 deletions src/Consumer/Poller.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
<?php

namespace Radish\Consumer;

use Psr\Log\LoggerInterface;
use Radish\Broker\QueueCollection;
use Radish\Middleware\InitializableInterface;
use Radish\Middleware\MiddlewareInterface;
use Radish\Middleware\Next;
use Radish\Middleware\SleepyMiddlewareInterface;
use RuntimeException;

class Poller implements ConsumerInterface
{
protected $queues;
protected $middlewares;
protected $workers;
protected $logger;
protected $interval;
protected $waiting = false;

/**
* @param QueueCollection $queues
* @param MiddlewareInterface[] $middlewares
* @param array $workers
* @param int $interval
* @param LoggerInterface|null $logger
*/
public function __construct(QueueCollection $queues, array $middlewares, array $workers, $interval = 10, LoggerInterface $logger = null)
{
$this->queues = $queues;
$this->middlewares = $middlewares;
$this->workers = $workers;
$this->interval = $interval;
$this->logger = $logger;
}

/**
* {@inheritdoc}
*/
public function consume()
{
if ($this->logger) {
$this->logger->debug('Starting poller');
}

foreach ($this->middlewares as $middleware) {
if ($middleware instanceof InitializableInterface) {
$middleware->initialize();
}
}

while (true) {
if ($this->process() === false) {
break;
}

if ($this->sleep() === false) {
break;
}
}

if ($this->logger) {
$this->logger->debug('Stopping poller');
}
}

/**
* @return bool
*/
private function process()
{
while (($message = $this->queues->pop()) !== null) {

$queue = $this->queues->get($message->getRoutingKey());

// Process the message using the worker and middleware
$worker = $this->getWorker($queue->getName());
$next = new Next($this->middlewares, $worker);

$result = $next($message, $queue);

if ($result === false) {
return false;
}
}

$this->waiting = !isset($message);
return true;
}

/**
* @return bool
*/
private function sleep()
{
foreach ($this->middlewares as $middleware) {
if ($middleware instanceof SleepyMiddlewareInterface && $middleware->sleep() === false) {
return false;
}
}

if ($this->waiting) {
// Sleep between queue polls when the queue is empty
usleep($this->interval * 1000000);
}
return true;
}

/**
* @param string $name
* @return callable
*/
private function getWorker($name)
{
if (!isset($this->workers[$name])) {
throw new RuntimeException(sprintf('Worker not defined for queue "%s"', $name));
}

return $this->workers[$name];
}
}
53 changes: 53 additions & 0 deletions src/Consumer/PollerFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?php

namespace Radish\Consumer;

use Psr\Log\LoggerInterface;
use Radish\Broker\QueueLoader;
use Radish\Middleware\MiddlewareLoader;

class PollerFactory
{
/**
* @var QueueLoader
*/
protected $queueLoader;
/**
* @var MiddlewareLoader
*/
private $middlewareLoader;
/**
* @var LoggerInterface|null
*/
protected $logger;

/**
* @param QueueLoader $queueLoader
* @param MiddlewareLoader $middlewareLoader
* @param LoggerInterface|null $logger
*/
public function __construct(QueueLoader $queueLoader, MiddlewareLoader $middlewareLoader, LoggerInterface $logger = null)
{
$this->queueLoader = $queueLoader;
$this->middlewareLoader = $middlewareLoader;
$this->logger = $logger;
}

/**
* @param array $queueNames
* @param array $middlewareOptions
* @param array $workers
* @param int $interval
* @return Poller
*/
public function create(array $queueNames, array $middlewareOptions, array $workers, $interval)
{
return new Poller(
$this->queueLoader->load($queueNames),
$this->middlewareLoader->load($middlewareOptions),
$workers,
$interval,
$this->logger
);
}
}
Loading

0 comments on commit 09f6112

Please sign in to comment.