Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dual versions for parallel #332

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"require": {
"php": ">=7.4",
"ext-json": "*",
"amphp/parallel": "^1.3",
"amphp/parallel": "^1.3|^2.2",
"andrewdalpino/okbloomer": "^1.0",
"psr/log": "^1.1|^2.0|^3.0",
"rubix/tensor": "^3.0",
Expand Down
31 changes: 31 additions & 0 deletions src/Backends/Amp/CallableTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

namespace Rubix\ML\Backends\Amp;

use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;

/**
* @template-covariant TResult
* @template TReceive
* @template TSend
* @implements Task<TResult, TReceive, TSend>
*/
class CallableTask implements Task
{
/**
* @var callable
*/
private $callable;

public function __construct(callable $callable)
{
$this->callable = $callable;
}

public function run(Channel $channel, Cancellation $cancellation) : mixed
{
return ($this->callable)();
}
}
155 changes: 155 additions & 0 deletions src/Backends/Amp2.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
<?php

namespace Rubix\ML\Backends;

use Amp\Future;
use Amp\Parallel\Worker\ContextWorkerPool;
use Amp\Parallel\Worker\Task as AmpTask;
use Rubix\ML\Backends\Amp\CallableTask;
use Rubix\ML\Backends\Tasks\Task;
use Rubix\ML\Exceptions\InvalidArgumentException;
use Rubix\ML\Helpers\CPU;
use function Amp\async;

/**
* Amp
*
* Amp Parallel is a multiprocessing subsystem that requires no extensions. It uses a
* non-blocking concurrency framework that implements coroutines using PHP generator
* functions under the hood.
*
* @category Machine Learning
* @package Rubix/ML
* @author Andrew DalPino
*/
class Amp2 implements Backend
{
/**
* The worker pool.
*
* @var ContextWorkerPool
*/
protected ContextWorkerPool $pool;

/**
* The queue of coroutines to be processed in parallel.
*
* @var Future<mixed>[]
*/
protected array $queue = [
//
];

/**
* The memorized results of the last parallel computation.
*
* @var mixed[]
*/
protected array $results = [
//
];

/**
* @param int|null $workers
* @throws InvalidArgumentException
*/
public function __construct(?int $workers = null)
{
if (isset($workers) and $workers < 1) {
throw new InvalidArgumentException('Number of workers'
. " must be greater than 0, $workers given.");
}

$workers = $workers ?? CPU::cores();

$this->pool = new ContextWorkerPool($workers);
}

/**
* Return the number of background worker processes.
*
* @return int
*/
public function workers() : int
{
return $this->pool->getLimit();
}

/**
* Queue up a deferred task for backend processing.
*
* @param Task $task
* @param callable(mixed,mixed):void $after
* @param mixed $context
* @internal
*/
public function enqueue(Task $task, ?callable $after = null, $context = null) : void
{
$task = new CallableTask($task);

$coroutine = $this->coroutine($task, $after, $context);

$this->queue[] = $coroutine;
}

/**
* The future for a particular task and callback.
*
* @template TResult
* @template TReceive
* @template TSend
* @param AmpTask<TResult, TReceive, TSend> $task
* @param callable(mixed,mixed):void $after
* @param mixed $context
* @return Future<mixed>
* @internal
*/
public function coroutine(AmpTask $task, ?callable $after = null, $context = null) : Future
{
return async(function () use ($context, $after, $task) {
$result = $this->pool->submit($task)->await();

if ($after) {
$after($result, $context);
}

return $result;
});
}

/**
* Process the queue and return the results.
*
* @return mixed[]
* @internal
*/
public function process() : array
{
$this->results = Future\await($this->queue);

$this->queue = [];

return $this->results;
}

/**
* Flush the queue and clear the memorized results.
*
* @internal
*/
public function flush() : void
{
$this->queue = $this->results = [];
}

/**
* Return the string representation of the object.
*
* @return string
* @internal
*/
public function __toString() : string
{
return "Amp (workers: {$this->workers()})";
}
}
23 changes: 4 additions & 19 deletions tests/Backends/AmpTest.php → tests/Backends/AbstractAmp.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Rubix\ML\Tests\Backends;

use Rubix\ML\Backends\Amp;
use Rubix\ML\Backends\Amp2;
use Rubix\ML\Backends\Backend;
use Rubix\ML\Backends\Tasks\Task;
use PHPUnit\Framework\TestCase;
Expand All @@ -11,10 +12,10 @@
* @group Backends
* @covers \Rubix\ML\Backends\Amp
*/
class AmpTest extends TestCase
abstract class AbstractAmp extends TestCase
{
/**
* @var Amp
* @var Amp|Amp2
*/
protected $backend;

Expand All @@ -27,23 +28,6 @@ public static function foo(int $i) : array
return [$i * 2, microtime(true)];
}

/**
* @before
*/
protected function setUp() : void
{
$this->backend = new Amp(4);
}

/**
* @test
*/
public function build() : void
{
$this->assertInstanceOf(Amp::class, $this->backend);
$this->assertInstanceOf(Backend::class, $this->backend);
}

/**
* @test
*/
Expand All @@ -64,5 +48,6 @@ public function enqueueProcess() : void
$results = $this->backend->process();

$this->assertCount(10, $results);
array_map([$this, 'assertIsArray'], $results);
}
}
32 changes: 32 additions & 0 deletions tests/Backends/Amp1Test.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

namespace Backends;

use Rubix\ML\Backends\Amp;
use Rubix\ML\Backends\Backend;
use Rubix\ML\Tests\Backends\AbstractAmp;

/**
* @group Backends
* @covers \Rubix\ML\Backends\Amp
* @requires function \Amp\call
*/
class Amp1Test extends AbstractAmp
{
/**
* @before
*/
protected function setUp() : void
{
$this->backend = new Amp(4);
}

/**
* @test
*/
public function build() : void
{
$this->assertInstanceOf(Amp::class, $this->backend);
$this->assertInstanceOf(Backend::class, $this->backend);
}
}
32 changes: 32 additions & 0 deletions tests/Backends/Amp2Test.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

namespace Backends;

use Rubix\ML\Backends\Amp2;
use Rubix\ML\Backends\Backend;
use Rubix\ML\Tests\Backends\AbstractAmp;

/**
* @group Backends
* @covers \Rubix\ML\Backends\Amp2
* @requires function \Amp\async
*/
class Amp2Test extends AbstractAmp
{
/**
* @before
*/
protected function setUp() : void
{
$this->backend = new Amp2(4);
}

/**
* @test
*/
public function build() : void
{
$this->assertInstanceOf(Amp2::class, $this->backend);
$this->assertInstanceOf(Backend::class, $this->backend);
}
}
Loading