-
-
Notifications
You must be signed in to change notification settings - Fork 183
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch '2.5' into plus-plus-check
- Loading branch information
Showing
25 changed files
with
486 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,3 +6,4 @@ parameters: | |
- 'benchmarks' | ||
excludePaths: | ||
- src/Backends/Amp.php | ||
- src/Backends/Swoole.php |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
<?php | ||
|
||
namespace Rubix\ML\Backends; | ||
|
||
use Rubix\ML\Backends\Tasks\Task; | ||
use Rubix\ML\Specifications\ExtensionIsLoaded; | ||
use Rubix\ML\Specifications\SwooleExtensionIsLoaded; | ||
use RuntimeException; | ||
use Swoole\Atomic; | ||
use Swoole\Process; | ||
|
||
use function Swoole\Coroutine\run; | ||
|
||
/** | ||
* Swoole | ||
* | ||
* Works both with Swoole and OpenSwoole. | ||
* | ||
* @category Machine Learning | ||
* @package Rubix/ML | ||
*/ | ||
class Swoole implements Backend | ||
{ | ||
/** | ||
* The queue of tasks to be processed in parallel. | ||
*/ | ||
protected array $queue = []; | ||
|
||
private int $cpus; | ||
|
||
private int $hasIgbinary; | ||
|
||
public function __construct() | ||
{ | ||
SwooleExtensionIsLoaded::create()->check(); | ||
|
||
$this->cpus = swoole_cpu_num(); | ||
$this->hasIgbinary = ExtensionIsLoaded::with('igbinary')->passes(); | ||
} | ||
|
||
/** | ||
* Queue up a deferred task for backend processing. | ||
* | ||
* @internal | ||
* | ||
* @param Task $task | ||
* @param callable(mixed,mixed):void $after | ||
* @param mixed $context | ||
*/ | ||
public function enqueue(Task $task, ?callable $after = null, $context = null) : void | ||
{ | ||
$this->queue[] = function () use ($task, $after, $context) { | ||
$result = $task(); | ||
|
||
if ($after) { | ||
$after($result, $context); | ||
} | ||
|
||
return $result; | ||
}; | ||
} | ||
|
||
/** | ||
* Process the queue and return the results. | ||
* | ||
* @internal | ||
* | ||
* @return mixed[] | ||
*/ | ||
public function process() : array | ||
{ | ||
$results = []; | ||
|
||
$maxMessageLength = new Atomic(0); | ||
$workerProcesses = []; | ||
|
||
$currentCpu = 0; | ||
|
||
foreach ($this->queue as $index => $queueItem) { | ||
$workerProcess = new Process( | ||
function (Process $worker) use ($maxMessageLength, $queueItem) { | ||
$serialized = $this->serialize($queueItem()); | ||
|
||
$serializedLength = strlen($serialized); | ||
$currentMaxSerializedLength = $maxMessageLength->get(); | ||
|
||
if ($serializedLength > $currentMaxSerializedLength) { | ||
$maxMessageLength->set($serializedLength); | ||
} | ||
|
||
$worker->exportSocket()->send($serialized); | ||
}, | ||
// redirect_stdin_and_stdout | ||
false, | ||
// pipe_type | ||
SOCK_DGRAM, | ||
// enable_coroutine | ||
true, | ||
); | ||
|
||
$workerProcess->setAffinity([$currentCpu]); | ||
$workerProcess->setBlocking(false); | ||
$workerProcess->start(); | ||
|
||
$workerProcesses[$index] = $workerProcess; | ||
|
||
$currentCpu = ($currentCpu + 1) % $this->cpus; | ||
} | ||
|
||
run(function () use ($maxMessageLength, &$results, $workerProcesses) { | ||
foreach ($workerProcesses as $index => $workerProcess) { | ||
$status = $workerProcess->wait(); | ||
|
||
if (0 !== $status['code']) { | ||
throw new RuntimeException('Worker process exited with an error'); | ||
} | ||
|
||
$socket = $workerProcess->exportSocket(); | ||
|
||
if ($socket->isClosed()) { | ||
throw new RuntimeException('Coroutine socket is closed'); | ||
} | ||
|
||
$maxMessageLengthValue = $maxMessageLength->get(); | ||
|
||
$receivedData = $socket->recv($maxMessageLengthValue); | ||
$unserialized = $this->unserialize($receivedData); | ||
|
||
$results[] = $unserialized; | ||
} | ||
}); | ||
|
||
return $results; | ||
} | ||
|
||
/** | ||
* Flush the queue | ||
*/ | ||
public function flush() : void | ||
{ | ||
$this->queue = []; | ||
} | ||
|
||
private function serialize(mixed $data) : string | ||
{ | ||
if ($this->hasIgbinary) { | ||
return igbinary_serialize($data); | ||
} | ||
|
||
return serialize($data); | ||
} | ||
|
||
private function unserialize(string $serialized) : mixed | ||
{ | ||
if ($this->hasIgbinary) { | ||
return igbinary_unserialize($serialized); | ||
} | ||
|
||
return unserialize($serialized); | ||
} | ||
|
||
/** | ||
* Return the string representation of the object. | ||
* | ||
* @internal | ||
* | ||
* @return string | ||
*/ | ||
public function __toString() : string | ||
{ | ||
return 'Swoole'; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
<?php | ||
|
||
namespace Rubix\ML; | ||
|
||
/** | ||
* Wrapper | ||
* | ||
* @category Machine Learning | ||
* @package Rubix/ML | ||
* @author Ronan Giron | ||
*/ | ||
interface EstimatorWrapper extends Estimator | ||
{ | ||
/** | ||
* Return the base estimator instance. | ||
* | ||
* @return Estimator | ||
*/ | ||
public function base() : Estimator; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.