Skip to content

Commit

Permalink
fix: phpstan, socket message size
Browse files Browse the repository at this point in the history
  • Loading branch information
mcharytoniuk committed Jan 15, 2024
1 parent f2d829a commit cfd31fb
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 153 deletions.
10 changes: 9 additions & 1 deletion benchmarks/Classifiers/OneVsRestBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@

namespace Rubix\ML\Benchmarks\Classifiers;

use Rubix\ML\Backends\Backend;
use Rubix\ML\Classifiers\OneVsRest;
use Rubix\ML\Datasets\Generators\Blob;
use Rubix\ML\Classifiers\LogisticRegression;
use Rubix\ML\NeuralNet\Optimizers\Stochastic;
use Rubix\ML\Datasets\Generators\Agglomerate;
use Rubix\ML\Tests\DataProvider\BackendProviderTrait;

/**
* @Groups({"Classifiers"})
* @BeforeMethods({"setUp"})
*/
class OneVsRestBench
{
use BackendProviderTrait;

protected const TRAINING_SIZE = 10000;

protected const TESTING_SIZE = 10000;
Expand Down Expand Up @@ -52,9 +56,13 @@ public function setUp() : void
* @Subject
* @Iterations(5)
* @OutputTimeUnit("seconds", precision=3)
* @ParamProviders("provideBackends")
* @param array{ backend: Backend } $params
*/
public function trainPredict() : void
public function trainPredict(array $params) : void
{
$this->estimator->setBackend($params['backend']);

$this->estimator->train($this->training);

$this->estimator->predict($this->testing);
Expand Down
42 changes: 25 additions & 17 deletions benchmarks/Classifiers/RandomForestBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@

namespace Rubix\ML\Benchmarks\Classifiers;

use Rubix\ML\Backends\Amp;
use Rubix\ML\Backends\Swoole as SwooleBackend;
use Rubix\ML\Backends\Backend;
use Rubix\ML\Classifiers\RandomForest;
use Rubix\ML\Datasets\Generators\Blob;
use Rubix\ML\Classifiers\ClassificationTree;
use Rubix\ML\Datasets\Generators\Agglomerate;
use Rubix\ML\Tests\DataProvider\BackendProviderTrait;
use Rubix\ML\Transformers\IntervalDiscretizer;

/**
* @Groups({"Classifiers"})
*/
class RandomForestBench
{
use BackendProviderTrait;

protected const TRAINING_SIZE = 10000;

protected const TESTING_SIZE = 10000;
Expand Down Expand Up @@ -47,8 +49,6 @@ public function setUpContinuous() : void
$this->testing = $generator->generate(self::TESTING_SIZE);

$this->estimator = new RandomForest(new ClassificationTree(30));
$this->estimator->setBackend(new SwooleBackend());
// $this->estimator->setBackend(new Amp());
}

public function setUpCategorical() : void
Expand All @@ -74,24 +74,32 @@ public function setUpCategorical() : void
* @Iterations(5)
* @BeforeMethods({"setUpContinuous"})
* @OutputTimeUnit("seconds", precision=3)
* @ParamProviders("provideBackends")
* @param array{ backend: Backend } $params
*/
public function continuous() : void
public function continuous(array $params) : void
{
$this->estimator->setBackend($params['backend']);

$this->estimator->train($this->training);

$this->estimator->predict($this->testing);
}

// /**
// * @Subject
// * @Iterations(5)
// * @BeforeMethods({"setUpCategorical"})
// * @OutputTimeUnit("seconds", precision=3)
// */
// public function categorical() : void
// {
// $this->estimator->train($this->training);

// $this->estimator->predict($this->testing);
// }
/**
* @Subject
* @Iterations(5)
* @BeforeMethods({"setUpCategorical"})
* @OutputTimeUnit("seconds", precision=3)
* @ParamProviders("provideBackends")
* @param array{ backend: Backend } $params
*/
public function categorical(array $params) : void
{
$this->estimator->setBackend($params['backend']);

$this->estimator->train($this->training);

$this->estimator->predict($this->testing);
}
}
2 changes: 1 addition & 1 deletion phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
forceCoversAnnotation="true"
processIsolation="false"
processIsolation="true"
stopOnFailure="false"
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.3/phpunit.xsd"
>
Expand Down
85 changes: 60 additions & 25 deletions src/Backends/Swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
namespace Rubix\ML\Backends;

use Rubix\ML\Backends\Tasks\Task;
use Rubix\ML\Serializers\Serializer;
use Rubix\ML\Serializers\Igbinary;
use Rubix\ML\Serializers\Native;
use Rubix\ML\Specifications\ExtensionIsLoaded;
use Rubix\ML\Specifications\SwooleExtensionIsLoaded;
use RuntimeException;
use Swoole\Atomic;
use Swoole\Process;

use function Swoole\Coroutine\run;
Expand All @@ -28,21 +28,14 @@ class Swoole implements Backend

private int $cpus;

private Serializer $serializer;
private int $hasIgbinary;

public function __construct(?Serializer $serializer = null)
public function __construct()
{
$this->cpus = swoole_cpu_num();
SwooleExtensionIsLoaded::create()->check();

if ($serializer) {
$this->serializer = $serializer;
} else {
if (ExtensionIsLoaded::with('igbinary')->passes()) {
$this->serializer = new Igbinary();
} else {
$this->serializer = new Native();
}
}
$this->cpus = swoole_cpu_num();
$this->hasIgbinary = ExtensionIsLoaded::with('igbinary')->passes();
}

/**
Expand Down Expand Up @@ -78,19 +71,29 @@ public function process() : array
{
$results = [];

$maxMessageLength = new Atomic(0);
$workerProcesses = [];

$currentCpu = 0;

while (($queueItem = array_shift($this->queue))) {
foreach ($this->queue as $index => $queueItem) {
$workerProcess = new Process(
function (Process $worker) use ($queueItem) {
$worker->exportSocket()->send(igbinary_serialize($queueItem()));
function (Process $worker) use ($maxMessageLength, $queueItem) {
$serialized = $this->serialize($queueItem());

$serializedLength = strlen($serialized);
$currentMaxSerializedLength = $maxMessageLength->get();

if ($serializedLength > $currentMaxSerializedLength) {
$maxMessageLength->set($serializedLength);
}

$worker->exportSocket()->send($serialized);
},
// redirect_stdin_and_stdout
false,
// pipe_type
SOCK_STREAM,
SOCK_DGRAM,
// enable_coroutine
true,
);
Expand All @@ -99,15 +102,29 @@ function (Process $worker) use ($queueItem) {
$workerProcess->setBlocking(false);
$workerProcess->start();

$workerProcesses[] = $workerProcess;
$workerProcesses[$index] = $workerProcess;

$currentCpu = ($currentCpu + 1) % $this->cpus;
}

run(function () use (&$results, $workerProcesses) {
foreach ($workerProcesses as $workerProcess) {
$receivedData = $workerProcess->exportSocket()->recv();
$unserialized = igbinary_unserialize($receivedData);
run(function () use ($maxMessageLength, &$results, $workerProcesses) {
foreach ($workerProcesses as $index => $workerProcess) {
$status = $workerProcess->wait();

if (0 !== $status['code']) {
throw new RuntimeException('Worker process exited with an error');
}

$socket = $workerProcess->exportSocket();

if ($socket->isClosed()) {
throw new RuntimeException('Coroutine socket is closed');
}

$maxMessageLengthValue = $maxMessageLength->get();

$receivedData = $socket->recv($maxMessageLengthValue);
$unserialized = $this->unserialize($receivedData);

$results[] = $unserialized;
}
Expand All @@ -124,6 +141,24 @@ public function flush() : void
$this->queue = [];
}

private function serialize(mixed $data) : string
{
if ($this->hasIgbinary) {
return igbinary_serialize($data);
}

return serialize($data);
}

private function unserialize(string $serialized) : mixed
{
if ($this->hasIgbinary) {
return igbinary_unserialize($serialized);
}

return unserialize($serialized);
}

/**
* Return the string representation of the object.
*
Expand All @@ -133,6 +168,6 @@ public function flush() : void
*/
public function __toString() : string
{
return 'Swoole\\Process';
return 'Swoole';
}
}
16 changes: 16 additions & 0 deletions src/Classifiers/LogisticRegression.php
Original file line number Diff line number Diff line change
Expand Up @@ -491,4 +491,20 @@ public function __toString() : string
{
return 'Logistic Regression (' . Params::stringify($this->params()) . ')';
}

/**
* Without this method, causes errors with Swoole backend + Igbinary
* serialization.
*
* Can be removed if it's no longer the case.
*
* @internal
* @param array<string,mixed> $data
*/
public function __unserialize(array $data) : void
{
foreach ($data as $propertyName => $propertyValue) {
$this->{$propertyName} = $propertyValue;
}
}
}
87 changes: 0 additions & 87 deletions src/Serializers/Igbinary.php

This file was deleted.

2 changes: 1 addition & 1 deletion tests/Backends/SwooleTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
class SwooleTest extends TestCase
{
/**
* @var \Rubix\ML\Backends\Swoole\Process
* @var \Rubix\ML\Backends\Swoole
*/
protected $backend;

Expand Down
Loading

0 comments on commit cfd31fb

Please sign in to comment.