From e5727da4cfd09dbf9194379fac0d570f3da363a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matej=20Ba=C4=8Do?= Date: Tue, 9 Apr 2024 13:50:27 +0000 Subject: [PATCH 1/2] Milti-queue server --- composer.json | 3 +- composer.lock | 223 ++++++++++++++-------- src/Queue/Server.php | 44 +++-- tests/Queue/E2E/Adapter/Base.php | 35 +++- tests/Queue/E2E/Adapter/SwooleTest.php | 4 +- tests/Queue/E2E/Adapter/WorkermanTest.php | 4 +- tests/Queue/servers/Swoole/worker.php | 2 +- tests/Queue/servers/Workerman/worker.php | 2 +- 8 files changed, 214 insertions(+), 103 deletions(-) diff --git a/composer.json b/composer.json index 3d2dcb3..faa1172 100644 --- a/composer.json +++ b/composer.json @@ -26,7 +26,8 @@ "require": { "php": ">=8.0", "utopia-php/cli": "0.15.*", - "utopia-php/framework": "0.*.*" + "utopia-php/framework": "0.33.*", + "utopia-php/balancing": "0.4.*" }, "require-dev": { "swoole/ide-helper": "4.8.8", diff --git a/composer.lock b/composer.lock index 50cd1b6..47de8c4 100644 --- a/composer.lock +++ b/composer.lock @@ -4,8 +4,55 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "f349a46dfa6a0938eb8f5ecea04a7eb7", + "content-hash": "94be0dcd84d46f528854bcbd23a7dc93", "packages": [ + { + "name": "utopia-php/balancing", + "version": "0.4.0", + "source": { + "type": "git", + "url": "https://github.com/utopia-php/balancer.git", + "reference": "b281cf22cba25429384cd792a9b0df83b5f9f938" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/utopia-php/balancer/zipball/b281cf22cba25429384cd792a9b0df83b5f9f938", + "reference": "b281cf22cba25429384cd792a9b0df83b5f9f938", + "shasum": "" + }, + "require": { + "php": ">=8.0" + }, + "require-dev": { + "laravel/pint": "1.2.*", + "phpstan/phpstan": "1.8.*", + "phpunit/phpunit": "^9.3" + }, + "type": "library", + "autoload": { + "psr-4": { + "Utopia\\Balancer\\": "src/Balancer" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "description": "A simple library to balance choices between multiple options.", + "keywords": [ + "balancer", + "balancing", + "framework", + "php", + "upf", + "utopia" + ], + "support": { + "issues": "https://github.com/utopia-php/balancer/issues", + "source": "https://github.com/utopia-php/balancer/tree/0.4.0" + }, + "time": "2023-08-07T10:26:14+00:00" + }, { "name": "utopia-php/cli", "version": "0.15.0", @@ -57,16 +104,16 @@ }, { "name": "utopia-php/framework", - "version": "0.32.0", + "version": "0.33.6", "source": { "type": "git", - "url": "https://github.com/utopia-php/framework.git", - "reference": "ad6f7e6d6b38cf5bed4e3af9a1394c59d4bb9225" + "url": "https://github.com/utopia-php/http.git", + "reference": "8fe57da0cecd57e3b17cd395b4a666a24f4c07a6" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/framework/zipball/ad6f7e6d6b38cf5bed4e3af9a1394c59d4bb9225", - "reference": "ad6f7e6d6b38cf5bed4e3af9a1394c59d4bb9225", + "url": "https://api.github.com/repos/utopia-php/http/zipball/8fe57da0cecd57e3b17cd395b4a666a24f4c07a6", + "reference": "8fe57da0cecd57e3b17cd395b4a666a24f4c07a6", "shasum": "" }, "require": { @@ -95,10 +142,10 @@ "upf" ], "support": { - "issues": "https://github.com/utopia-php/framework/issues", - "source": "https://github.com/utopia-php/framework/tree/0.32.0" + "issues": "https://github.com/utopia-php/http/issues", + "source": "https://github.com/utopia-php/http/tree/0.33.6" }, - "time": "2023-12-26T14:18:36+00:00" + "time": "2024-03-21T18:10:57+00:00" } ], "packages-dev": [ @@ -299,25 +346,27 @@ }, { "name": "nikic/php-parser", - "version": "v4.18.0", + "version": "v5.0.2", "source": { "type": "git", "url": "https://github.com/nikic/PHP-Parser.git", - "reference": "1bcbb2179f97633e98bbbc87044ee2611c7d7999" + "reference": "139676794dc1e9231bf7bcd123cfc0c99182cb13" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/1bcbb2179f97633e98bbbc87044ee2611c7d7999", - "reference": "1bcbb2179f97633e98bbbc87044ee2611c7d7999", + "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/139676794dc1e9231bf7bcd123cfc0c99182cb13", + "reference": "139676794dc1e9231bf7bcd123cfc0c99182cb13", "shasum": "" }, "require": { + "ext-ctype": "*", + "ext-json": "*", "ext-tokenizer": "*", - "php": ">=7.0" + "php": ">=7.4" }, "require-dev": { "ircmaxell/php-yacc": "^0.0.7", - "phpunit/phpunit": "^6.5 || ^7.0 || ^8.0 || ^9.0" + "phpunit/phpunit": "^7.0 || ^8.0 || ^9.0" }, "bin": [ "bin/php-parse" @@ -325,7 +374,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "4.9-dev" + "dev-master": "5.0-dev" } }, "autoload": { @@ -349,26 +398,27 @@ ], "support": { "issues": "https://github.com/nikic/PHP-Parser/issues", - "source": "https://github.com/nikic/PHP-Parser/tree/v4.18.0" + "source": "https://github.com/nikic/PHP-Parser/tree/v5.0.2" }, - "time": "2023-12-10T21:03:43+00:00" + "time": "2024-03-05T20:51:40+00:00" }, { "name": "phar-io/manifest", - "version": "2.0.3", + "version": "2.0.4", "source": { "type": "git", "url": "https://github.com/phar-io/manifest.git", - "reference": "97803eca37d319dfa7826cc2437fc020857acb53" + "reference": "54750ef60c58e43759730615a392c31c80e23176" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phar-io/manifest/zipball/97803eca37d319dfa7826cc2437fc020857acb53", - "reference": "97803eca37d319dfa7826cc2437fc020857acb53", + "url": "https://api.github.com/repos/phar-io/manifest/zipball/54750ef60c58e43759730615a392c31c80e23176", + "reference": "54750ef60c58e43759730615a392c31c80e23176", "shasum": "" }, "require": { "ext-dom": "*", + "ext-libxml": "*", "ext-phar": "*", "ext-xmlwriter": "*", "phar-io/version": "^3.0.1", @@ -409,9 +459,15 @@ "description": "Component for reading phar.io manifest information from a PHP Archive (PHAR)", "support": { "issues": "https://github.com/phar-io/manifest/issues", - "source": "https://github.com/phar-io/manifest/tree/2.0.3" + "source": "https://github.com/phar-io/manifest/tree/2.0.4" }, - "time": "2021-07-20T11:28:43+00:00" + "funding": [ + { + "url": "https://github.com/theseer", + "type": "github" + } + ], + "time": "2024-03-03T12:33:53+00:00" }, { "name": "phar-io/version", @@ -466,16 +522,16 @@ }, { "name": "phpstan/phpstan", - "version": "1.10.50", + "version": "1.10.66", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "06a98513ac72c03e8366b5a0cb00750b487032e4" + "reference": "94779c987e4ebd620025d9e5fdd23323903950bd" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/06a98513ac72c03e8366b5a0cb00750b487032e4", - "reference": "06a98513ac72c03e8366b5a0cb00750b487032e4", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/94779c987e4ebd620025d9e5fdd23323903950bd", + "reference": "94779c987e4ebd620025d9e5fdd23323903950bd", "shasum": "" }, "require": { @@ -524,20 +580,20 @@ "type": "tidelift" } ], - "time": "2023-12-13T10:59:42+00:00" + "time": "2024-03-28T16:17:31+00:00" }, { "name": "phpunit/php-code-coverage", - "version": "9.2.30", + "version": "9.2.31", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/php-code-coverage.git", - "reference": "ca2bd87d2f9215904682a9cb9bb37dda98e76089" + "reference": "48c34b5d8d983006bd2adc2d0de92963b9155965" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/ca2bd87d2f9215904682a9cb9bb37dda98e76089", - "reference": "ca2bd87d2f9215904682a9cb9bb37dda98e76089", + "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/48c34b5d8d983006bd2adc2d0de92963b9155965", + "reference": "48c34b5d8d983006bd2adc2d0de92963b9155965", "shasum": "" }, "require": { @@ -594,7 +650,7 @@ "support": { "issues": "https://github.com/sebastianbergmann/php-code-coverage/issues", "security": "https://github.com/sebastianbergmann/php-code-coverage/security/policy", - "source": "https://github.com/sebastianbergmann/php-code-coverage/tree/9.2.30" + "source": "https://github.com/sebastianbergmann/php-code-coverage/tree/9.2.31" }, "funding": [ { @@ -602,7 +658,7 @@ "type": "github" } ], - "time": "2023-12-22T06:47:57+00:00" + "time": "2024-03-02T06:37:42+00:00" }, { "name": "phpunit/php-file-iterator", @@ -847,16 +903,16 @@ }, { "name": "phpunit/phpunit", - "version": "9.6.15", + "version": "9.6.19", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit.git", - "reference": "05017b80304e0eb3f31d90194a563fd53a6021f1" + "reference": "a1a54a473501ef4cdeaae4e06891674114d79db8" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/05017b80304e0eb3f31d90194a563fd53a6021f1", - "reference": "05017b80304e0eb3f31d90194a563fd53a6021f1", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/a1a54a473501ef4cdeaae4e06891674114d79db8", + "reference": "a1a54a473501ef4cdeaae4e06891674114d79db8", "shasum": "" }, "require": { @@ -930,7 +986,7 @@ "support": { "issues": "https://github.com/sebastianbergmann/phpunit/issues", "security": "https://github.com/sebastianbergmann/phpunit/security/policy", - "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.15" + "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.19" }, "funding": [ { @@ -946,20 +1002,20 @@ "type": "tidelift" } ], - "time": "2023-12-01T16:55:19+00:00" + "time": "2024-04-05T04:35:58+00:00" }, { "name": "sebastian/cli-parser", - "version": "1.0.1", + "version": "1.0.2", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/cli-parser.git", - "reference": "442e7c7e687e42adc03470c7b668bc4b2402c0b2" + "reference": "2b56bea83a09de3ac06bb18b92f068e60cc6f50b" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/cli-parser/zipball/442e7c7e687e42adc03470c7b668bc4b2402c0b2", - "reference": "442e7c7e687e42adc03470c7b668bc4b2402c0b2", + "url": "https://api.github.com/repos/sebastianbergmann/cli-parser/zipball/2b56bea83a09de3ac06bb18b92f068e60cc6f50b", + "reference": "2b56bea83a09de3ac06bb18b92f068e60cc6f50b", "shasum": "" }, "require": { @@ -994,7 +1050,7 @@ "homepage": "https://github.com/sebastianbergmann/cli-parser", "support": { "issues": "https://github.com/sebastianbergmann/cli-parser/issues", - "source": "https://github.com/sebastianbergmann/cli-parser/tree/1.0.1" + "source": "https://github.com/sebastianbergmann/cli-parser/tree/1.0.2" }, "funding": [ { @@ -1002,7 +1058,7 @@ "type": "github" } ], - "time": "2020-09-28T06:08:49+00:00" + "time": "2024-03-02T06:27:43+00:00" }, { "name": "sebastian/code-unit", @@ -1248,16 +1304,16 @@ }, { "name": "sebastian/diff", - "version": "4.0.5", + "version": "4.0.6", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/diff.git", - "reference": "74be17022044ebaaecfdf0c5cd504fc9cd5a7131" + "reference": "ba01945089c3a293b01ba9badc29ad55b106b0bc" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/diff/zipball/74be17022044ebaaecfdf0c5cd504fc9cd5a7131", - "reference": "74be17022044ebaaecfdf0c5cd504fc9cd5a7131", + "url": "https://api.github.com/repos/sebastianbergmann/diff/zipball/ba01945089c3a293b01ba9badc29ad55b106b0bc", + "reference": "ba01945089c3a293b01ba9badc29ad55b106b0bc", "shasum": "" }, "require": { @@ -1302,7 +1358,7 @@ ], "support": { "issues": "https://github.com/sebastianbergmann/diff/issues", - "source": "https://github.com/sebastianbergmann/diff/tree/4.0.5" + "source": "https://github.com/sebastianbergmann/diff/tree/4.0.6" }, "funding": [ { @@ -1310,7 +1366,7 @@ "type": "github" } ], - "time": "2023-05-07T05:35:17+00:00" + "time": "2024-03-02T06:30:58+00:00" }, { "name": "sebastian/environment", @@ -1377,16 +1433,16 @@ }, { "name": "sebastian/exporter", - "version": "4.0.5", + "version": "4.0.6", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/exporter.git", - "reference": "ac230ed27f0f98f597c8a2b6eb7ac563af5e5b9d" + "reference": "78c00df8f170e02473b682df15bfcdacc3d32d72" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/exporter/zipball/ac230ed27f0f98f597c8a2b6eb7ac563af5e5b9d", - "reference": "ac230ed27f0f98f597c8a2b6eb7ac563af5e5b9d", + "url": "https://api.github.com/repos/sebastianbergmann/exporter/zipball/78c00df8f170e02473b682df15bfcdacc3d32d72", + "reference": "78c00df8f170e02473b682df15bfcdacc3d32d72", "shasum": "" }, "require": { @@ -1442,7 +1498,7 @@ ], "support": { "issues": "https://github.com/sebastianbergmann/exporter/issues", - "source": "https://github.com/sebastianbergmann/exporter/tree/4.0.5" + "source": "https://github.com/sebastianbergmann/exporter/tree/4.0.6" }, "funding": [ { @@ -1450,20 +1506,20 @@ "type": "github" } ], - "time": "2022-09-14T06:03:37+00:00" + "time": "2024-03-02T06:33:00+00:00" }, { "name": "sebastian/global-state", - "version": "5.0.6", + "version": "5.0.7", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/global-state.git", - "reference": "bde739e7565280bda77be70044ac1047bc007e34" + "reference": "bca7df1f32ee6fe93b4d4a9abbf69e13a4ada2c9" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/global-state/zipball/bde739e7565280bda77be70044ac1047bc007e34", - "reference": "bde739e7565280bda77be70044ac1047bc007e34", + "url": "https://api.github.com/repos/sebastianbergmann/global-state/zipball/bca7df1f32ee6fe93b4d4a9abbf69e13a4ada2c9", + "reference": "bca7df1f32ee6fe93b4d4a9abbf69e13a4ada2c9", "shasum": "" }, "require": { @@ -1506,7 +1562,7 @@ ], "support": { "issues": "https://github.com/sebastianbergmann/global-state/issues", - "source": "https://github.com/sebastianbergmann/global-state/tree/5.0.6" + "source": "https://github.com/sebastianbergmann/global-state/tree/5.0.7" }, "funding": [ { @@ -1514,7 +1570,7 @@ "type": "github" } ], - "time": "2023-08-02T09:26:13+00:00" + "time": "2024-03-02T06:35:11+00:00" }, { "name": "sebastian/lines-of-code", @@ -1750,16 +1806,16 @@ }, { "name": "sebastian/resource-operations", - "version": "3.0.3", + "version": "3.0.4", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/resource-operations.git", - "reference": "0f4443cb3a1d92ce809899753bc0d5d5a8dd19a8" + "reference": "05d5692a7993ecccd56a03e40cd7e5b09b1d404e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/resource-operations/zipball/0f4443cb3a1d92ce809899753bc0d5d5a8dd19a8", - "reference": "0f4443cb3a1d92ce809899753bc0d5d5a8dd19a8", + "url": "https://api.github.com/repos/sebastianbergmann/resource-operations/zipball/05d5692a7993ecccd56a03e40cd7e5b09b1d404e", + "reference": "05d5692a7993ecccd56a03e40cd7e5b09b1d404e", "shasum": "" }, "require": { @@ -1771,7 +1827,7 @@ "type": "library", "extra": { "branch-alias": { - "dev-master": "3.0-dev" + "dev-main": "3.0-dev" } }, "autoload": { @@ -1792,8 +1848,7 @@ "description": "Provides a list of PHP built-in functions that operate on resources", "homepage": "https://www.github.com/sebastianbergmann/resource-operations", "support": { - "issues": "https://github.com/sebastianbergmann/resource-operations/issues", - "source": "https://github.com/sebastianbergmann/resource-operations/tree/3.0.3" + "source": "https://github.com/sebastianbergmann/resource-operations/tree/3.0.4" }, "funding": [ { @@ -1801,7 +1856,7 @@ "type": "github" } ], - "time": "2020-09-28T06:45:17+00:00" + "time": "2024-03-14T16:00:52+00:00" }, { "name": "sebastian/type", @@ -1956,16 +2011,16 @@ }, { "name": "theseer/tokenizer", - "version": "1.2.2", + "version": "1.2.3", "source": { "type": "git", "url": "https://github.com/theseer/tokenizer.git", - "reference": "b2ad5003ca10d4ee50a12da31de12a5774ba6b96" + "reference": "737eda637ed5e28c3413cb1ebe8bb52cbf1ca7a2" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/theseer/tokenizer/zipball/b2ad5003ca10d4ee50a12da31de12a5774ba6b96", - "reference": "b2ad5003ca10d4ee50a12da31de12a5774ba6b96", + "url": "https://api.github.com/repos/theseer/tokenizer/zipball/737eda637ed5e28c3413cb1ebe8bb52cbf1ca7a2", + "reference": "737eda637ed5e28c3413cb1ebe8bb52cbf1ca7a2", "shasum": "" }, "require": { @@ -1994,7 +2049,7 @@ "description": "A small library for converting tokenized PHP source code into XML and potentially other formats", "support": { "issues": "https://github.com/theseer/tokenizer/issues", - "source": "https://github.com/theseer/tokenizer/tree/1.2.2" + "source": "https://github.com/theseer/tokenizer/tree/1.2.3" }, "funding": [ { @@ -2002,20 +2057,20 @@ "type": "github" } ], - "time": "2023-11-20T00:12:19+00:00" + "time": "2024-03-03T12:36:25+00:00" }, { "name": "workerman/workerman", - "version": "v4.1.14", + "version": "v4.1.15", "source": { "type": "git", "url": "https://github.com/walkor/workerman.git", - "reference": "f7c9667c7b5387c01fa9e50ee79ed931e93ee76e" + "reference": "afc8242fc769ab7cf22eb4ac22b97cb59d465e4e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/walkor/workerman/zipball/f7c9667c7b5387c01fa9e50ee79ed931e93ee76e", - "reference": "f7c9667c7b5387c01fa9e50ee79ed931e93ee76e", + "url": "https://api.github.com/repos/walkor/workerman/zipball/afc8242fc769ab7cf22eb4ac22b97cb59d465e4e", + "reference": "afc8242fc769ab7cf22eb4ac22b97cb59d465e4e", "shasum": "" }, "require": { @@ -2065,7 +2120,7 @@ "type": "patreon" } ], - "time": "2023-08-09T03:37:45+00:00" + "time": "2024-02-19T02:10:39+00:00" } ], "aliases": [], diff --git a/src/Queue/Server.php b/src/Queue/Server.php index f43d6a5..65c5c18 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -5,6 +5,9 @@ use Throwable; use Utopia\CLI\Console; use Exception; +use Utopia\Balancer\Algorithm\RoundRobin; +use Utopia\Balancer\Balancer; +use Utopia\Balancer\Option; use Utopia\Hook; use Utopia\Validator; @@ -192,11 +195,30 @@ public function start(): self if (!is_null($this->workerStartHook)) { call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); } + + $queues = []; + if (!(\str_contains($this->adapter->queue, ','))) { + $queues[] = $this->adapter->queue; + } else { + foreach (\explode(',', $this->adapter->queue) as $queue) { + $queues[] = $queue; + } + } + + $balancer = new Balancer(new RoundRobin(-1)); + + + foreach ($queues as $queue) { + $balancer->addOption(new Option([ 'queue' => $queue ])); + } + while (true) { + $queue = $balancer->run()->getState('queue', $this->adapter->queue); + /** * Waiting for next Job. */ - $nextMessage = $this->adapter->connection->rightPopArray("{$this->adapter->namespace}.queue.{$this->adapter->queue}", 5); + $nextMessage = $this->adapter->connection->rightPopArray("{$this->adapter->namespace}.queue.{$queue}", 5); if (!$nextMessage) { continue; @@ -213,19 +235,19 @@ public function start(): self /** * Move Job to Jobs and it's PID to the processing list. */ - $this->adapter->connection->setArray("{$this->adapter->namespace}.jobs.{$this->adapter->queue}.{$message->getPid()}", $nextMessage); - $this->adapter->connection->leftPush("{$this->adapter->namespace}.processing.{$this->adapter->queue}", $message->getPid()); + $this->adapter->connection->setArray("{$this->adapter->namespace}.jobs.{$queue}.{$message->getPid()}", $nextMessage); + $this->adapter->connection->leftPush("{$this->adapter->namespace}.processing.{$queue}", $message->getPid()); /** * Increment Total Jobs Received from Stats. */ - $this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.total"); + $this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$queue}.total"); try { /** * Increment Processing Jobs from Stats. */ - $this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.processing"); + $this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$queue}.processing"); if ($this->job->getHook()) { foreach ($this->initHooks as $hook) { // Global init hooks @@ -250,12 +272,12 @@ public function start(): self /** * Remove Jobs if successful. */ - $this->adapter->connection->remove("{$this->adapter->namespace}.jobs.{$this->adapter->queue}.{$message->getPid()}"); + $this->adapter->connection->remove("{$this->adapter->namespace}.jobs.{$queue}.{$message->getPid()}"); /** * Increment Successful Jobs from Stats. */ - $this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.success"); + $this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$queue}.success"); if ($this->job->getHook()) { foreach ($this->shutdownHooks as $hook) { // Global init hooks @@ -280,12 +302,12 @@ public function start(): self /** * Move failed Job to Failed list. */ - $this->adapter->connection->leftPush("{$this->adapter->namespace}.failed.{$this->adapter->queue}", $message->getPid()); + $this->adapter->connection->leftPush("{$this->adapter->namespace}.failed.{$queue}", $message->getPid()); /** * Increment Failed Jobs from Stats. */ - $this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$this->adapter->queue}.failed"); + $this->adapter->connection->increment("{$this->adapter->namespace}.stats.{$queue}.failed"); Console::error("[Job] ({$message->getPid()}) failed to run."); Console::error("[Job] ({$message->getPid()}) {$th->getMessage()}"); @@ -298,12 +320,12 @@ public function start(): self /** * Remove Job from Processing. */ - $this->adapter->connection->listRemove("{$this->adapter->namespace}.processing.{$this->adapter->queue}", $message->getPid()); + $this->adapter->connection->listRemove("{$this->adapter->namespace}.processing.{$queue}", $message->getPid()); /** * Decrease Processing Jobs from Stats. */ - $this->adapter->connection->decrement("{$this->adapter->namespace}.stats.{$this->adapter->queue}.processing"); + $this->adapter->connection->decrement("{$this->adapter->namespace}.stats.{$queue}.processing"); } $this->resources = []; diff --git a/tests/Queue/E2E/Adapter/Base.php b/tests/Queue/E2E/Adapter/Base.php index ab5b6f1..e91967b 100644 --- a/tests/Queue/E2E/Adapter/Base.php +++ b/tests/Queue/E2E/Adapter/Base.php @@ -56,7 +56,7 @@ public function setUp(): void /** * @return Client */ - abstract protected function getClient(): Client; + abstract protected function getClient(string $suffix = ''): Client; public function testEvents(): void { @@ -152,4 +152,37 @@ public function testRetry(): void $this->assertEquals(2, $client->countFailedJobs()); $this->assertEquals(0, $client->countSuccessfulJobs()); } + + public function testMultiQueueServer(): void + { + $client = $this->getClient(); + $client->resetStats(); + + $this->assertTrue($client->enqueue([ + 'type' => 'test_string', + 'value' => 'lorem ipsum' + ])); + + sleep(1); + + $this->assertEquals(1, $client->countTotalJobs()); + $this->assertEquals(0, $client->getQueueSize()); + $this->assertEquals(0, $client->countProcessingJobs()); + $this->assertEquals(0, $client->countFailedJobs()); + $this->assertEquals(1, $client->countSuccessfulJobs()); + + $client = $this->getClient('_v2'); + $client->resetStats(); + + $this->assertTrue($client->enqueue([ + 'type' => 'test_string', + 'value' => 'lorem ipsum' + ])); + + $this->assertEquals(1, $client->countTotalJobs()); + $this->assertEquals(0, $client->getQueueSize()); + $this->assertEquals(0, $client->countProcessingJobs()); + $this->assertEquals(0, $client->countFailedJobs()); + $this->assertEquals(1, $client->countSuccessfulJobs()); + } } diff --git a/tests/Queue/E2E/Adapter/SwooleTest.php b/tests/Queue/E2E/Adapter/SwooleTest.php index e940c98..9717946 100644 --- a/tests/Queue/E2E/Adapter/SwooleTest.php +++ b/tests/Queue/E2E/Adapter/SwooleTest.php @@ -7,10 +7,10 @@ class SwooleTest extends Base { - protected function getClient(): Client + protected function getClient(string $suffix = ''): Client { $connection = new Redis('redis', 6379); - $client = new Client('swoole', $connection); + $client = new Client('swoole' . $suffix, $connection); return $client; } diff --git a/tests/Queue/E2E/Adapter/WorkermanTest.php b/tests/Queue/E2E/Adapter/WorkermanTest.php index e0cd1ed..386edd4 100644 --- a/tests/Queue/E2E/Adapter/WorkermanTest.php +++ b/tests/Queue/E2E/Adapter/WorkermanTest.php @@ -7,10 +7,10 @@ class WorkermanTest extends Base { - protected function getClient(): Client + protected function getClient(string $suffix = ''): Client { $connection = new Redis('redis', 6379); - $client = new Client('workerman', $connection); + $client = new Client('workerman' . $suffix, $connection); return $client; } diff --git a/tests/Queue/servers/Swoole/worker.php b/tests/Queue/servers/Swoole/worker.php index 972aba9..f14eaa6 100644 --- a/tests/Queue/servers/Swoole/worker.php +++ b/tests/Queue/servers/Swoole/worker.php @@ -7,7 +7,7 @@ use Utopia\Queue\Message; $connection = new Queue\Connection\Redis('redis'); -$adapter = new Queue\Adapter\Swoole($connection, 12, 'swoole'); +$adapter = new Queue\Adapter\Swoole($connection, 12, 'swoole,swoole_v2'); $server = new Queue\Server($adapter); $server->job() diff --git a/tests/Queue/servers/Workerman/worker.php b/tests/Queue/servers/Workerman/worker.php index ffc667e..34573aa 100644 --- a/tests/Queue/servers/Workerman/worker.php +++ b/tests/Queue/servers/Workerman/worker.php @@ -7,7 +7,7 @@ use Utopia\Queue\Message; $connection = new Queue\Connection\Redis('redis'); -$adapter = new Queue\Adapter\Workerman($connection, 12, 'workerman'); +$adapter = new Queue\Adapter\Workerman($connection, 12, 'workerman,workerman_v2'); $server = new Queue\Server($adapter); $server->job() ->inject('message') From 0205084d4c3adb14d822c59365d00c0a7cb22945 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matej=20Ba=C4=8Do?= Date: Tue, 9 Apr 2024 16:52:47 +0000 Subject: [PATCH 2/2] Finish multiqueue implementation --- src/Queue/Adapter.php | 17 ++++++++++-- src/Queue/Server.php | 35 +++++++----------------- tests/Queue/E2E/Adapter/Base.php | 16 ++++++----- tests/Queue/servers/Swoole/worker.php | 3 +- tests/Queue/servers/Workerman/worker.php | 3 +- 5 files changed, 38 insertions(+), 36 deletions(-) diff --git a/src/Queue/Adapter.php b/src/Queue/Adapter.php index 756e67a..9412aba 100644 --- a/src/Queue/Adapter.php +++ b/src/Queue/Adapter.php @@ -5,17 +5,30 @@ abstract class Adapter { public int $workerNum; - public string $queue; + + /** + * @var array $queues + */ + public array $queues; public string $namespace; public Connection $connection; public function __construct(int $workerNum, string $queue, string $namespace = 'utopia-queue') { $this->workerNum = $workerNum; - $this->queue = $queue; + $this->queues = [$queue]; $this->namespace = $namespace; } + public function addQueue(string $queue): self + { + if (!(\in_array($queue, $this->queues))) { + $this->queues[] = $queue; + } + + return $this; + } + /** * Starts the Server. * @return self diff --git a/src/Queue/Server.php b/src/Queue/Server.php index 65c5c18..be4acbf 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -5,6 +5,7 @@ use Throwable; use Utopia\CLI\Console; use Exception; +use Utopia\Balancer\Algorithm\Random; use Utopia\Balancer\Algorithm\RoundRobin; use Utopia\Balancer\Balancer; use Utopia\Balancer\Option; @@ -190,40 +191,24 @@ public function init(): Hook public function start(): self { try { - $this->adapter->workerStart(function (string $workerId) { - Console::success("[Worker] Worker {$workerId} is ready!"); + $balancer = new Balancer(new RoundRobin(-1)); + foreach ($this->adapter->queues as $queue) { + $balancer->addOption(new Option([ 'queue' => $queue ])); + } + + $this->adapter->workerStart(function (string $workerId) use ($balancer) { + $queue = $balancer->run()->getState('queue'); + Console::success("[Worker] Worker {$workerId} is ready for queue: " . $queue); if (!is_null($this->workerStartHook)) { call_user_func_array($this->workerStartHook->getAction(), $this->getArguments($this->workerStartHook)); } - - $queues = []; - if (!(\str_contains($this->adapter->queue, ','))) { - $queues[] = $this->adapter->queue; - } else { - foreach (\explode(',', $this->adapter->queue) as $queue) { - $queues[] = $queue; - } - } - - $balancer = new Balancer(new RoundRobin(-1)); - - - foreach ($queues as $queue) { - $balancer->addOption(new Option([ 'queue' => $queue ])); - } - + while (true) { - $queue = $balancer->run()->getState('queue', $this->adapter->queue); - /** * Waiting for next Job. */ $nextMessage = $this->adapter->connection->rightPopArray("{$this->adapter->namespace}.queue.{$queue}", 5); - if (!$nextMessage) { - continue; - } - $nextMessage['timestamp'] = (int)$nextMessage['timestamp']; $message = new Message($nextMessage); diff --git a/tests/Queue/E2E/Adapter/Base.php b/tests/Queue/E2E/Adapter/Base.php index e91967b..2785747 100644 --- a/tests/Queue/E2E/Adapter/Base.php +++ b/tests/Queue/E2E/Adapter/Base.php @@ -66,8 +66,8 @@ public function testEvents(): void foreach ($this->payloads as $payload) { $this->assertTrue($client->enqueue($payload)); } - - sleep(1); + + sleep(3); $this->assertEquals(7, $client->countTotalJobs()); $this->assertEquals(0, $client->getQueueSize()); @@ -87,7 +87,7 @@ protected function testConcurrency(): void $this->assertTrue($client->enqueue($payload)); } - sleep(1); + sleep(3); $this->assertEquals(7, $client->countTotalJobs()); $this->assertEquals(0, $client->countProcessingJobs()); @@ -122,7 +122,7 @@ public function testRetry(): void 'id' => 4 ]); - sleep(1); + sleep(3); $this->assertEquals(4, $client->countTotalJobs()); $this->assertEquals(0, $client->countProcessingJobs()); @@ -133,7 +133,7 @@ public function testRetry(): void $client->retry(); - sleep(1); + sleep(3); // Retry will retry ALL failed jobs regardless of if they are still tracked in stats $this->assertEquals(4, $client->countTotalJobs()); @@ -145,7 +145,7 @@ public function testRetry(): void $client->retry(2); - sleep(1); + sleep(3); $this->assertEquals(2, $client->countTotalJobs()); $this->assertEquals(0, $client->countProcessingJobs()); @@ -163,7 +163,7 @@ public function testMultiQueueServer(): void 'value' => 'lorem ipsum' ])); - sleep(1); + sleep(3); $this->assertEquals(1, $client->countTotalJobs()); $this->assertEquals(0, $client->getQueueSize()); @@ -178,6 +178,8 @@ public function testMultiQueueServer(): void 'type' => 'test_string', 'value' => 'lorem ipsum' ])); + + sleep(3); $this->assertEquals(1, $client->countTotalJobs()); $this->assertEquals(0, $client->getQueueSize()); diff --git a/tests/Queue/servers/Swoole/worker.php b/tests/Queue/servers/Swoole/worker.php index f14eaa6..0865776 100644 --- a/tests/Queue/servers/Swoole/worker.php +++ b/tests/Queue/servers/Swoole/worker.php @@ -7,7 +7,8 @@ use Utopia\Queue\Message; $connection = new Queue\Connection\Redis('redis'); -$adapter = new Queue\Adapter\Swoole($connection, 12, 'swoole,swoole_v2'); +$adapter = new Queue\Adapter\Swoole($connection, 12, 'swoole'); +$adapter->addQueue('swoole_v2'); $server = new Queue\Server($adapter); $server->job() diff --git a/tests/Queue/servers/Workerman/worker.php b/tests/Queue/servers/Workerman/worker.php index 34573aa..fa6fff2 100644 --- a/tests/Queue/servers/Workerman/worker.php +++ b/tests/Queue/servers/Workerman/worker.php @@ -7,7 +7,8 @@ use Utopia\Queue\Message; $connection = new Queue\Connection\Redis('redis'); -$adapter = new Queue\Adapter\Workerman($connection, 12, 'workerman,workerman_v2'); +$adapter = new Queue\Adapter\Workerman($connection, 12, 'workerman'); +$adapter->addQueue('workerman_v2'); $server = new Queue\Server($adapter); $server->job() ->inject('message')