Skip to content

Commit

Permalink
feat: scheduler improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
mcharytoniuk committed Jan 12, 2024
1 parent bfa97d1 commit 6c267bc
Show file tree
Hide file tree
Showing 12 changed files with 309 additions and 439 deletions.
30 changes: 16 additions & 14 deletions benchmarks/Classifiers/RandomForestBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

namespace Rubix\ML\Benchmarks\Classifiers;

use Rubix\ML\Backends\Swoole\Process as SwooleProcessBackend;
use Rubix\ML\Backends\Amp;
use Rubix\ML\Backends\Swoole as SwooleBackend;
use Rubix\ML\Classifiers\RandomForest;
use Rubix\ML\Datasets\Generators\Blob;
use Rubix\ML\Classifiers\ClassificationTree;
Expand Down Expand Up @@ -46,7 +47,8 @@ public function setUpContinuous() : void
$this->testing = $generator->generate(self::TESTING_SIZE);

$this->estimator = new RandomForest(new ClassificationTree(30));
// $this->estimator->setBackend(new SwooleProcessBackend());
$this->estimator->setBackend(new SwooleBackend());
// $this->estimator->setBackend(new Amp());
}

public function setUpCategorical() : void
Expand Down Expand Up @@ -80,16 +82,16 @@ public function continuous() : void
$this->estimator->predict($this->testing);
}

/**
* @Subject
* @Iterations(5)
* @BeforeMethods({"setUpCategorical"})
* @OutputTimeUnit("seconds", precision=3)
*/
public function categorical() : void
{
$this->estimator->train($this->training);

$this->estimator->predict($this->testing);
}
// /**
// * @Subject
// * @Iterations(5)
// * @BeforeMethods({"setUpCategorical"})
// * @OutputTimeUnit("seconds", precision=3)
// */
// public function categorical() : void
// {
// $this->estimator->train($this->training);

// $this->estimator->predict($this->testing);
// }
}
162 changes: 162 additions & 0 deletions src/Backends/Swoole.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
<?php

namespace Rubix\ML\Backends;

use Rubix\ML\Backends\Backend;
use Rubix\ML\Backends\Tasks\Task;
use Rubix\ML\Encoding;
use Rubix\ML\Serializers\Serializer;
use Rubix\ML\Serializers\Igbinary;
use Rubix\ML\Serializers\Native;
use Rubix\ML\Specifications\ExtensionIsLoaded;
use RuntimeException;
use Swoole\Atomic;
use Swoole\Process;
use Swoole\Process\Pool;
use Swoole\Table;

use function Swoole\Coroutine\run;

/**
* Swoole
*
* Works both with Swoole and OpenSwoole.
*
* @category Machine Learning
* @package Rubix/ML
*/
class Swoole implements Backend
{
/**
* Swoole accepts values between 0.2 and 1
*/
const CONFLICT_PROPORTION = 0.25;

/**
* The queue of tasks to be processed in parallel.
*/
protected array $queue = [];

private int $cpus;

private Serializer $serializer;

public function __construct(?Serializer $serializer = null)
{
$this->cpus = swoole_cpu_num();

if ($serializer) {
$this->serializer = $serializer;
} else {
if (ExtensionIsLoaded::with('igbinary')->passes()) {
$this->serializer = new Igbinary();
} else {
$this->serializer = new Native();
}
}
}

/**
* Queue up a deferred task for backend processing.
*
* @internal
*
* @param \Rubix\ML\Backends\Tasks\Task $task
* @param callable(mixed,mixed):void $after
* @param mixed $context
*/
public function enqueue(Task $task, ?callable $after = null, $context = null) : void
{
$this->queue[] = function () use ($task, $after, $context) {
$result = $task();

if ($after) {
$after($result, $context);
}

return $result;
};
}

/**
* Process the queue and return the results.
*
* @internal
*
* @return mixed[]
*/
public function process() : array
{
$results = [];

$finishedTasksAtomic = new Atomic();
$waitingAtomic = new Atomic();

$scheduledTasksTotal = count($this->queue);
$workerProcesses = [];

$currentCpu = 0;

while (($queueItem = array_shift($this->queue))) {
$workerProcess = new Process(
callback: function (Process $worker) use ($finishedTasksAtomic, $queueItem, $scheduledTasksTotal, $waitingAtomic) {
try {
$worker->exportSocket()->send(igbinary_serialize($queueItem()));
} finally {
$finishedTasksAtomic->add(1);

if ($scheduledTasksTotal <= $finishedTasksAtomic->get()) {
$waitingAtomic->wakeup();
}
}
},
enable_coroutine: true,
pipe_type: 1,
redirect_stdin_and_stdout: false,
);

$workerProcess->setAffinity([
$currentCpu,
]);
$workerProcess->setBlocking(false);
$workerProcess->start();

$workerProcesses[] = $workerProcess;

$currentCpu = ($currentCpu + 1) % $this->cpus;
}

$waitingAtomic->wait(-1);

run(function () use (&$results, $workerProcesses) {
foreach ($workerProcesses as $workerProcess) {
$receivedData = $workerProcess->exportSocket()->recv();
$unserialized = igbinary_unserialize($receivedData);

$results[] = $unserialized;
}
});

return $results;
}

/**
* Flush the queue
*/
public function flush() : void
{
$this->queue = [];
}

/**
* Return the string representation of the object.
*
* @internal
*
* @return string
*/
public function __toString() : string
{
return 'Swoole\\Process';
}
}
102 changes: 0 additions & 102 deletions src/Backends/Swoole/Coroutine.php

This file was deleted.

16 changes: 0 additions & 16 deletions src/Backends/Swoole/IgbinarySerializer.php

This file was deleted.

16 changes: 0 additions & 16 deletions src/Backends/Swoole/PhpSerializer.php

This file was deleted.

Loading

0 comments on commit 6c267bc

Please sign in to comment.