From 6c267bcec14dc6fee02df27db3ef97a0c7909747 Mon Sep 17 00:00:00 2001 From: Mateusz Charytoniuk Date: Fri, 12 Jan 2024 16:29:11 +0100 Subject: [PATCH] feat: scheduler improvements --- benchmarks/Classifiers/RandomForestBench.php | 30 +-- src/Backends/Swoole.php | 162 ++++++++++++++++ src/Backends/Swoole/Coroutine.php | 102 ---------- src/Backends/Swoole/IgbinarySerializer.php | 16 -- src/Backends/Swoole/PhpSerializer.php | 16 -- src/Backends/Swoole/Process.php | 178 ------------------ src/Backends/Swoole/Serializer.php | 10 - src/Serializers/Igbinary.php | 87 +++++++++ .../SwooleExtensionIsLoaded.php | 31 +++ tests/Backends/Swoole/CoroutineTest.php | 88 --------- .../ProcessTest.php => SwooleTest.php} | 13 +- tests/DataProvider/BackendProviderTrait.php | 15 +- 12 files changed, 309 insertions(+), 439 deletions(-) create mode 100644 src/Backends/Swoole.php delete mode 100644 src/Backends/Swoole/Coroutine.php delete mode 100644 src/Backends/Swoole/IgbinarySerializer.php delete mode 100644 src/Backends/Swoole/PhpSerializer.php delete mode 100644 src/Backends/Swoole/Process.php delete mode 100644 src/Backends/Swoole/Serializer.php create mode 100644 src/Serializers/Igbinary.php create mode 100644 src/Specifications/SwooleExtensionIsLoaded.php delete mode 100644 tests/Backends/Swoole/CoroutineTest.php rename tests/Backends/{Swoole/ProcessTest.php => SwooleTest.php} (80%) diff --git a/benchmarks/Classifiers/RandomForestBench.php b/benchmarks/Classifiers/RandomForestBench.php index 4d524cd73..c1b02b817 100644 --- a/benchmarks/Classifiers/RandomForestBench.php +++ b/benchmarks/Classifiers/RandomForestBench.php @@ -2,7 +2,8 @@ namespace Rubix\ML\Benchmarks\Classifiers; -use Rubix\ML\Backends\Swoole\Process as SwooleProcessBackend; +use Rubix\ML\Backends\Amp; +use Rubix\ML\Backends\Swoole as SwooleBackend; use Rubix\ML\Classifiers\RandomForest; use Rubix\ML\Datasets\Generators\Blob; use Rubix\ML\Classifiers\ClassificationTree; @@ -46,7 +47,8 @@ public function setUpContinuous() : void $this->testing = $generator->generate(self::TESTING_SIZE); $this->estimator = new RandomForest(new ClassificationTree(30)); - // $this->estimator->setBackend(new SwooleProcessBackend()); + $this->estimator->setBackend(new SwooleBackend()); + // $this->estimator->setBackend(new Amp()); } public function setUpCategorical() : void @@ -80,16 +82,16 @@ public function continuous() : void $this->estimator->predict($this->testing); } - /** - * @Subject - * @Iterations(5) - * @BeforeMethods({"setUpCategorical"}) - * @OutputTimeUnit("seconds", precision=3) - */ - public function categorical() : void - { - $this->estimator->train($this->training); - - $this->estimator->predict($this->testing); - } + // /** + // * @Subject + // * @Iterations(5) + // * @BeforeMethods({"setUpCategorical"}) + // * @OutputTimeUnit("seconds", precision=3) + // */ + // public function categorical() : void + // { + // $this->estimator->train($this->training); + + // $this->estimator->predict($this->testing); + // } } diff --git a/src/Backends/Swoole.php b/src/Backends/Swoole.php new file mode 100644 index 000000000..36946e593 --- /dev/null +++ b/src/Backends/Swoole.php @@ -0,0 +1,162 @@ +cpus = swoole_cpu_num(); + + if ($serializer) { + $this->serializer = $serializer; + } else { + if (ExtensionIsLoaded::with('igbinary')->passes()) { + $this->serializer = new Igbinary(); + } else { + $this->serializer = new Native(); + } + } + } + + /** + * Queue up a deferred task for backend processing. + * + * @internal + * + * @param \Rubix\ML\Backends\Tasks\Task $task + * @param callable(mixed,mixed):void $after + * @param mixed $context + */ + public function enqueue(Task $task, ?callable $after = null, $context = null) : void + { + $this->queue[] = function () use ($task, $after, $context) { + $result = $task(); + + if ($after) { + $after($result, $context); + } + + return $result; + }; + } + + /** + * Process the queue and return the results. + * + * @internal + * + * @return mixed[] + */ + public function process() : array + { + $results = []; + + $finishedTasksAtomic = new Atomic(); + $waitingAtomic = new Atomic(); + + $scheduledTasksTotal = count($this->queue); + $workerProcesses = []; + + $currentCpu = 0; + + while (($queueItem = array_shift($this->queue))) { + $workerProcess = new Process( + callback: function (Process $worker) use ($finishedTasksAtomic, $queueItem, $scheduledTasksTotal, $waitingAtomic) { + try { + $worker->exportSocket()->send(igbinary_serialize($queueItem())); + } finally { + $finishedTasksAtomic->add(1); + + if ($scheduledTasksTotal <= $finishedTasksAtomic->get()) { + $waitingAtomic->wakeup(); + } + } + }, + enable_coroutine: true, + pipe_type: 1, + redirect_stdin_and_stdout: false, + ); + + $workerProcess->setAffinity([ + $currentCpu, + ]); + $workerProcess->setBlocking(false); + $workerProcess->start(); + + $workerProcesses[] = $workerProcess; + + $currentCpu = ($currentCpu + 1) % $this->cpus; + } + + $waitingAtomic->wait(-1); + + run(function () use (&$results, $workerProcesses) { + foreach ($workerProcesses as $workerProcess) { + $receivedData = $workerProcess->exportSocket()->recv(); + $unserialized = igbinary_unserialize($receivedData); + + $results[] = $unserialized; + } + }); + + return $results; + } + + /** + * Flush the queue + */ + public function flush() : void + { + $this->queue = []; + } + + /** + * Return the string representation of the object. + * + * @internal + * + * @return string + */ + public function __toString() : string + { + return 'Swoole\\Process'; + } +} diff --git a/src/Backends/Swoole/Coroutine.php b/src/Backends/Swoole/Coroutine.php deleted file mode 100644 index 18c25e188..000000000 --- a/src/Backends/Swoole/Coroutine.php +++ /dev/null @@ -1,102 +0,0 @@ - - */ - protected array $queue = [ - // - ]; - - /** - * Queue up a deferred task for backend processing. - * - * @internal - * - * @param \Rubix\ML\Backends\Tasks\Task $task - * @param callable(mixed,mixed):void $after - * @param mixed $context - */ - public function enqueue(Task $task, ?callable $after = null, $context = null) : void - { - $this->queue[] = function () use ($task, $after, $context) { - $result = $task(); - - if ($after) { - $after($result, $context); - } - - return $result; - }; - } - - /** - * Process the queue and return the results. - * - * @internal - * - * @return mixed[] - */ - public function process() : array - { - /** - * Swoole promises that all the coroutines added to the root of the - * scheduler will be executed in parallel. - */ - $scheduler = new Scheduler(); - - $results = []; - - foreach ($this->queue as $callback) { - $scheduler->add(function () use ($callback, &$results) { - $results[] = $callback(); - }); - } - - if (!$scheduler->start()) { - throw new RuntimeException('Not all coroutines finished successfully'); - } - - $this->queue = []; - - return $results; - } - - /** - * Flush the queue - */ - public function flush() : void - { - $this->queue = []; - } - - /** - * Return the string representation of the object. - * - * @internal - * - * @return string - */ - public function __toString() : string - { - return 'Swoole\\Coroutine'; - } -} diff --git a/src/Backends/Swoole/IgbinarySerializer.php b/src/Backends/Swoole/IgbinarySerializer.php deleted file mode 100644 index 6bdcf01d4..000000000 --- a/src/Backends/Swoole/IgbinarySerializer.php +++ /dev/null @@ -1,16 +0,0 @@ -cpus = swoole_cpu_num(); - - if ($serializer) { - $this->serializer = $serializer; - } else { - if (extension_loaded('igbinary')) { - $this->serializer = new IgbinarySerializer(); - } else { - $this->serializer = new PhpSerializer(); - } - } - - $this->serialiedRowLength = $serialiedRowLength; - } - - /** - * Queue up a deferred task for backend processing. - * - * @internal - * - * @param \Rubix\ML\Backends\Tasks\Task $task - * @param callable(mixed,mixed):void $after - * @param mixed $context - */ - public function enqueue(Task $task, ?callable $after = null, $context = null) : void - { - $this->queue[] = function () use ($task, $after, $context) { - $result = $task(); - - if ($after) { - $after($result, $context); - } - - return $result; - }; - } - - /** - * Process the queue and return the results. - * - * @internal - * - * @return mixed[] - */ - public function process() : array - { - $resultsTable = new Table(count($this->queue), self::HASH_COLLISIONS_ALLOWED); - $resultsTable->column('result', Table::TYPE_STRING, $this->serialiedRowLength); - $resultsTable->create(); - - $workersTable = new Table($this->cpus, self::HASH_COLLISIONS_ALLOWED); - $workersTable->column('working', Table::TYPE_INT); - $workersTable->create(); - - $pool = new Pool($this->cpus); - - $pool->on('WorkerStart', function (Pool $pool, $workerId) use ($resultsTable, $workersTable) { - try { - $process = $pool->getProcess(); - - // Worker id is an integer that goes from 0 to the number of - // workers. There are "$this->cpu" workers spawned, so worker - // id should correspond to a specific core. - $process->setAffinity([$workerId]); - - if (!$workersTable->exist($workerId)) { - if (!$workersTable->set($workerId, [ - 'working' => 1, - ])) { - throw new RuntimeException('Unable to store worker status in the shared memory table'); - } - - for ($i = $workerId; $i < count($this->queue); $i += $this->cpus) { - if (!$resultsTable->exist($i)) { - $result = $this->queue[$i](); - $serialized = $this->serializer->serialize($result); - - if (!$resultsTable->set($i, [ - 'result' => $serialized, - ])) { - throw new RuntimeException('Unable to store task result in the shared memory table'); - } - } - } - } - } finally { - // Shuts down only the current worker. Tells Pool to not - // create a new worker - $pool->shutdown(); - } - }); - - // This is blocking, waits until all processes finish. - $pool->start(); - - $results = []; - - for ($i = 0; $i < count($this->queue); $i += 1) { - $serialized = $resultsTable->get($i, 'result'); - $unserialized = $this->serializer->unserialize($serialized); - - if (false === $unserialized) { - // Task needs to be repeated due to hash collision in the Table - // That should be at most HASH_COLLISIONS_ALLOWED, usually less - // - // If 'false' was serialized, then the task will be redone - // unnecessarily. That is the price we have to pay for the lack - // of proper error handling in 'unserialize'. If you disagree - // or have some better idea, please open an issue on GitHub. ;) - $results[] = $this->queue[$i](); - } else { - $results[] = $unserialized; - } - } - - return $results; - } - - /** - * Flush the queue - */ - public function flush() : void - { - $this->queue = []; - } - - /** - * Return the string representation of the object. - * - * @internal - * - * @return string - */ - public function __toString() : string - { - return 'Swoole\\Process'; - } -} diff --git a/src/Backends/Swoole/Serializer.php b/src/Backends/Swoole/Serializer.php deleted file mode 100644 index 711dead90..000000000 --- a/src/Backends/Swoole/Serializer.php +++ /dev/null @@ -1,10 +0,0 @@ -check(); + } + + /** + * Serialize a persistable object and return the data. + * + * @internal + * + * @param \Rubix\ML\Persistable $persistable + * @throws \Rubix\ML\Exceptions\RuntimeException + * @return \Rubix\ML\Encoding + */ + public function serialize(Persistable $persistable) : Encoding + { + $data = igbinary_serialize($persistable); + + if (!$data) { + throw new RuntimeException('Could not serialize data.'); + } + + return new Encoding($data); + } + + /** + * Deserialize a persistable object and return it. + * + * @internal + * + * @param \Rubix\ML\Encoding $encoding + * @throws \Rubix\ML\Exceptions\RuntimeException + * @return \Rubix\ML\Persistable + */ + public function deserialize(Encoding $encoding) : Persistable + { + $persistable = igbinary_unserialize($encoding); + + if (!is_object($persistable)) { + throw new RuntimeException('deserialized data must be an object.'); + } + + if ($persistable instanceof __PHP_Incomplete_Class) { + throw new RuntimeException('Missing class for object data.'); + } + + if (!$persistable instanceof Persistable) { + throw new RuntimeException('deserialized object must' + . ' implement the Persistable interface.'); + } + + return $persistable; + } + + /** + * Return the string representation of the object. + * + * @return string + */ + public function __toString() : string + { + return 'Igbinary'; + } +} \ No newline at end of file diff --git a/src/Specifications/SwooleExtensionIsLoaded.php b/src/Specifications/SwooleExtensionIsLoaded.php new file mode 100644 index 000000000..d2179ef51 --- /dev/null +++ b/src/Specifications/SwooleExtensionIsLoaded.php @@ -0,0 +1,31 @@ +passes() + || ExtensionIsLoaded::with('openswoole')->passes() + ) { + return; + } + + throw new MissingExtension('swoole'); + } +} diff --git a/tests/Backends/Swoole/CoroutineTest.php b/tests/Backends/Swoole/CoroutineTest.php deleted file mode 100644 index c3cb1ab7a..000000000 --- a/tests/Backends/Swoole/CoroutineTest.php +++ /dev/null @@ -1,88 +0,0 @@ -markTestSkipped( - 'Swoole/OpenSwoole extension is not available.' - ); - } - - $this->backend = new SwooleCoroutineBackend(); - } - - /** - * @after - */ - protected function tearDown() : void - { - Event::wait(); - } - - /** - * @test - */ - public function build() : void - { - $this->assertInstanceOf(SwooleCoroutineBackend::class, $this->backend); - $this->assertInstanceOf(Backend::class, $this->backend); - } - - /** - * @test - */ - public function enqueueProcess() : void - { - for ($i = 0; $i < 10; ++$i) { - $this->backend->enqueue(new Task([self::class, 'foo'], [$i])); - } - - $results = $this->backend->process(); - - $this->assertCount(10, $results); - $this->assertEquals([ - 0, - 2, - 4, - 6, - 8, - 10, - 12, - 14, - 16, - 18, - ], $results); - } -} diff --git a/tests/Backends/Swoole/ProcessTest.php b/tests/Backends/SwooleTest.php similarity index 80% rename from tests/Backends/Swoole/ProcessTest.php rename to tests/Backends/SwooleTest.php index cc80b012c..ece9e41a6 100644 --- a/tests/Backends/Swoole/ProcessTest.php +++ b/tests/Backends/SwooleTest.php @@ -1,11 +1,12 @@ passes()) { $this->markTestSkipped( 'Swoole/OpenSwoole extension is not available.' ); } - $this->backend = new SwooleProcessBackend(); + $this->backend = new SwooleBackend(); } /** @@ -56,7 +57,7 @@ protected function tearDown() : void */ public function build() : void { - $this->assertInstanceOf(SwooleProcessBackend::class, $this->backend); + $this->assertInstanceOf(SwooleBackend::class, $this->backend); $this->assertInstanceOf(Backend::class, $this->backend); } diff --git a/tests/DataProvider/BackendProviderTrait.php b/tests/DataProvider/BackendProviderTrait.php index 7207532df..f33d72a6a 100644 --- a/tests/DataProvider/BackendProviderTrait.php +++ b/tests/DataProvider/BackendProviderTrait.php @@ -4,9 +4,9 @@ use Rubix\ML\Backends\Backend; use Rubix\ML\Backends\Serial; -use Rubix\ML\Backends\Amp as AmpBackend; -use Rubix\ML\Backends\Swoole\Coroutine as SwooleCoroutineBackend; -use Rubix\ML\Backends\Swoole\Process as SwooleProcessBackend; +use Rubix\ML\Backends\Amp; +use Rubix\ML\Backends\Swoole; +use Rubix\ML\Specifications\SwooleExtensionIsLoaded; trait BackendProviderTrait { @@ -20,14 +20,11 @@ public static function provideBackends() : array $serialBackend = new Serial(); $backends[(string) $serialBackend] = [$serialBackend]; - $ampBackend = new AmpBackend(); + $ampBackend = new Amp(); $backends[(string) $ampBackend] = [$ampBackend]; - if (extension_loaded('swoole') || extension_loaded('openswoole')) { - $swooleCoroutineBackend = new SwooleCoroutineBackend(); - $backends[(string) $swooleCoroutineBackend] = [$swooleCoroutineBackend]; - - $swooleProcessBackend = new SwooleProcessBackend(); + if (SwooleExtensionIsLoaded::create()->passes()) { + $swooleProcessBackend = new Swoole(); $backends[(string) $swooleProcessBackend] = [$swooleProcessBackend]; }