Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Milti-queue server #19

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
"require": {
"php": ">=8.0",
"utopia-php/cli": "0.15.*",
"utopia-php/framework": "0.*.*"
"utopia-php/framework": "0.33.*",
"utopia-php/balancing": "0.4.*"
},
"require-dev": {
"swoole/ide-helper": "4.8.8",
Expand Down
223 changes: 139 additions & 84 deletions composer.lock

Large diffs are not rendered by default.

17 changes: 15 additions & 2 deletions src/Queue/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,30 @@
abstract class Adapter
{
public int $workerNum;
public string $queue;

/**
* @var array<string> $queues
*/
public array $queues;
public string $namespace;
public Connection $connection;

public function __construct(int $workerNum, string $queue, string $namespace = 'utopia-queue')
{
$this->workerNum = $workerNum;
$this->queue = $queue;
$this->queues = [$queue];
$this->namespace = $namespace;
}

public function addQueue(string $queue): self
{
if (!(\in_array($queue, $this->queues))) {
$this->queues[] = $queue;
}

return $this;
}

/**
* Starts the Server.
* @return self
Expand Down
41 changes: 24 additions & 17 deletions src/Queue/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
use Throwable;
use Utopia\CLI\Console;
use Exception;
use Utopia\Balancer\Algorithm\Random;
use Utopia\Balancer\Algorithm\RoundRobin;
use Utopia\Balancer\Balancer;
use Utopia\Balancer\Option;
use Utopia\Hook;
use Utopia\Validator;

Expand Down Expand Up @@ -187,20 +191,23 @@ public function init(): Hook
public function start(): self
{
try {
$this->adapter->workerStart(function (string $workerId) {
Console::success("[Worker] Worker {$workerId} is ready!");
$balancer = new Balancer(new RoundRobin(-1));
foreach ($this->adapter->queues as $queue) {
$balancer->addOption(new Option([ 'queue' => $queue ]));
}

$this->adapter->workerStart(function (string $workerId) use ($balancer) {
$queue = $balancer->run()->getState('queue');
Console::success("[Worker] Worker {$workerId} is ready for queue: " . $queue);
if (!is_null($this->workerStartHook)) {
call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook));
}

while (true) {
/**
* Waiting for next Job.
*/
$nextMessage = $this->adapter->connection->rightPopArray("{$this->adapter->namespace}.queue.{$this->adapter->queue}", 5);

if (!$nextMessage) {
continue;
}
$nextMessage = $this->adapter->connection->rightPopArray("{$this->adapter->namespace}.queue.{$queue}", 5);

$nextMessage['timestamp'] = (int)$nextMessage['timestamp'];

Expand All @@ -213,19 +220,19 @@ public function start(): self
/**
* Move Job to Jobs and it's PID to the processing list.
*/
$this->adapter->connection->setArray("{$this->adapter->namespace}.jobs.{$this->adapter->queue}.{$message->getPid()}", $nextMessage);
$this->adapter->connection->leftPush("{$this->adapter->namespace}.processing.{$this->adapter->queue}", $message->getPid());
$this->adapter->connection->setArray("{$this->adapter->namespace}.jobs.{$queue}.{$message->getPid()}", $nextMessage);
$this->adapter->connection->leftPush("{$this->adapter->namespace}.processing.{$queue}", $message->getPid());

/**
* Increment Total Jobs Received from Stats.
*/
$this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.total");
$this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$queue}.total");

try {
/**
* Increment Processing Jobs from Stats.
*/
$this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.processing");
$this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$queue}.processing");

if ($this->job->getHook()) {
foreach ($this->initHooks as $hook) { // Global init hooks
Expand All @@ -250,12 +257,12 @@ public function start(): self
/**
* Remove Jobs if successful.
*/
$this->adapter->connection->remove("{$this->adapter->namespace}.jobs.{$this->adapter->queue}.{$message->getPid()}");
$this->adapter->connection->remove("{$this->adapter->namespace}.jobs.{$queue}.{$message->getPid()}");

/**
* Increment Successful Jobs from Stats.
*/
$this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.success");
$this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$queue}.success");

if ($this->job->getHook()) {
foreach ($this->shutdownHooks as $hook) { // Global init hooks
Expand All @@ -280,12 +287,12 @@ public function start(): self
/**
* Move failed Job to Failed list.
*/
$this->adapter->connection->leftPush("{$this->adapter->namespace}.failed.{$this->adapter->queue}", $message->getPid());
$this->adapter->connection->leftPush("{$this->adapter->namespace}.failed.{$queue}", $message->getPid());

/**
* Increment Failed Jobs from Stats.
*/
$this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.failed");
$this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$queue}.failed");

Console::error("[Job] ({$message->getPid()}) failed to run.");
Console::error("[Job] ({$message->getPid()}) {$th->getMessage()}");
Expand All @@ -298,12 +305,12 @@ public function start(): self
/**
* Remove Job from Processing.
*/
$this->adapter->connection->listRemove("{$this->adapter->namespace}.processing.{$this->adapter->queue}", $message->getPid());
$this->adapter->connection->listRemove("{$this->adapter->namespace}.processing.{$queue}", $message->getPid());

/**
* Decrease Processing Jobs from Stats.
*/
$this->adapter->connection->decrement("{$this->adapter->namespace}.stats.{$this->adapter->queue}.processing");
$this->adapter->connection->decrement("{$this->adapter->namespace}.stats.{$queue}.processing");
}

$this->resources = [];
Expand Down
49 changes: 42 additions & 7 deletions tests/Queue/E2E/Adapter/Base.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public function setUp(): void
/**
* @return Client
*/
abstract protected function getClient(): Client;
abstract protected function getClient(string $suffix = ''): Client;

public function testEvents(): void
{
Expand All @@ -66,8 +66,8 @@ public function testEvents(): void
foreach ($this->payloads as $payload) {
$this->assertTrue($client->enqueue($payload));
}

sleep(1);
sleep(3);

$this->assertEquals(7, $client->countTotalJobs());
$this->assertEquals(0, $client->getQueueSize());
Expand All @@ -87,7 +87,7 @@ protected function testConcurrency(): void
$this->assertTrue($client->enqueue($payload));
}

sleep(1);
sleep(3);

$this->assertEquals(7, $client->countTotalJobs());
$this->assertEquals(0, $client->countProcessingJobs());
Expand Down Expand Up @@ -122,7 +122,7 @@ public function testRetry(): void
'id' => 4
]);

sleep(1);
sleep(3);

$this->assertEquals(4, $client->countTotalJobs());
$this->assertEquals(0, $client->countProcessingJobs());
Expand All @@ -133,7 +133,7 @@ public function testRetry(): void

$client->retry();

sleep(1);
sleep(3);

// Retry will retry ALL failed jobs regardless of if they are still tracked in stats
$this->assertEquals(4, $client->countTotalJobs());
Expand All @@ -145,11 +145,46 @@ public function testRetry(): void

$client->retry(2);

sleep(1);
sleep(3);

$this->assertEquals(2, $client->countTotalJobs());
$this->assertEquals(0, $client->countProcessingJobs());
$this->assertEquals(2, $client->countFailedJobs());
$this->assertEquals(0, $client->countSuccessfulJobs());
}

public function testMultiQueueServer(): void
{
$client = $this->getClient();
$client->resetStats();

$this->assertTrue($client->enqueue([
'type' => 'test_string',
'value' => 'lorem ipsum'
]));

sleep(3);

$this->assertEquals(1, $client->countTotalJobs());
$this->assertEquals(0, $client->getQueueSize());
$this->assertEquals(0, $client->countProcessingJobs());
$this->assertEquals(0, $client->countFailedJobs());
$this->assertEquals(1, $client->countSuccessfulJobs());

$client = $this->getClient('_v2');
$client->resetStats();

$this->assertTrue($client->enqueue([
'type' => 'test_string',
'value' => 'lorem ipsum'
]));

sleep(3);

$this->assertEquals(1, $client->countTotalJobs());
$this->assertEquals(0, $client->getQueueSize());
$this->assertEquals(0, $client->countProcessingJobs());
$this->assertEquals(0, $client->countFailedJobs());
$this->assertEquals(1, $client->countSuccessfulJobs());
}
}
4 changes: 2 additions & 2 deletions tests/Queue/E2E/Adapter/SwooleTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

class SwooleTest extends Base
{
protected function getClient(): Client
protected function getClient(string $suffix = ''): Client
{
$connection = new Redis('redis', 6379);
$client = new Client('swoole', $connection);
$client = new Client('swoole' . $suffix, $connection);

return $client;
}
Expand Down
4 changes: 2 additions & 2 deletions tests/Queue/E2E/Adapter/WorkermanTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

class WorkermanTest extends Base
{
protected function getClient(): Client
protected function getClient(string $suffix = ''): Client
{
$connection = new Redis('redis', 6379);
$client = new Client('workerman', $connection);
$client = new Client('workerman' . $suffix, $connection);

return $client;
}
Expand Down
1 change: 1 addition & 0 deletions tests/Queue/servers/Swoole/worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

$connection = new Queue\Connection\Redis('redis');
$adapter = new Queue\Adapter\Swoole($connection, 12, 'swoole');
$adapter->addQueue('swoole_v2');
$server = new Queue\Server($adapter);

$server->job()
Expand Down
1 change: 1 addition & 0 deletions tests/Queue/servers/Workerman/worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

$connection = new Queue\Connection\Redis('redis');
$adapter = new Queue\Adapter\Workerman($connection, 12, 'workerman');
$adapter->addQueue('workerman_v2');
$server = new Queue\Server($adapter);
$server->job()
->inject('message')
Expand Down
Loading