Skip to content

Commit

Permalink
🚧 integrate Internal\DummyPromise
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Apr 22, 2021
1 parent 679e4da commit 04e15a6
Show file tree
Hide file tree
Showing 9 changed files with 405 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/Adapter/Amp/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class EventLoop implements \M6Web\Tornado\EventLoop
/**
* {@inheritdoc}
*/
public function wait(Promise $promise)
public function wait(Promise $promise): mixed
{
try {
$result = \Amp\Promise\wait(
Expand Down
2 changes: 1 addition & 1 deletion src/Adapter/ReactPhp/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public function __construct(\React\EventLoop\LoopInterface $reactEventLoop)
/**
* {@inheritdoc}
*/
public function wait(Promise $promise)
public function wait(Promise $promise): mixed
{
$value = null;
$isRejected = false;
Expand Down
155 changes: 125 additions & 30 deletions src/Adapter/Swoole/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,113 @@

namespace M6Web\Tornado\Adapter\Swoole;

use JetBrains\PhpStorm\Pure;
use M6Web\Tornado\Adapter\Swoole\Internal\YieldPromise;
use Generator;
use M6Web\Tornado\Adapter\Swoole\Internal\DummyPromise;
use M6Web\Tornado\Deferred;
use M6Web\Tornado\Promise;
use Swoole\Coroutine;
use Swoole\Event;
use RuntimeException;
use Throwable;
use function extension_loaded;

class EventLoop implements \M6Web\Tornado\EventLoop
{
private $cids;

public function __construct()
{
if (!extension_loaded('swoole')) {
throw new RuntimeException(
'EventLoop must running only with swoole extension.'
);
}

$this->cids = [];
}

private function shiftCoroutine(): mixed
{
if(count($this->cids) === 0) {
return null;
}

$cid = array_shift($this->cids);
if(Coroutine::exists($cid)) {
Coroutine::resume($cid);
} else {
$this->shiftCoroutine();
}

return $cid;
}

private function pushCoroutine(): mixed
{
$cid = Coroutine::getCid();
$this->cids[] = $cid;
Coroutine::yield();
return $cid;
}

private function createPromise(): DummyPromise
{
return new DummyPromise(function () {
$this->shiftCoroutine();
});
}

private function resolve($value)
{
if($value instanceof DummyPromise && !$value->isPending()) {
if($value->getException() !== null) {
throw $value->getException();
}

$value = $this->resolve($value->getValue());
}

if(is_array($value)) {
foreach ($value as $k => $v) {
$value[$k] = $this->resolve($v);
}
}

return $value;
}

/**
* {@inheritdoc}
*/
public function wait(Promise $promise)
public function wait(Promise $promise): mixed
{
$value = null;
$this->async((static function() use($promise, &$value): \Generator {
$value = yield $promise;
Event::exit();
})());
Event::wait();
if(!count($this->cids)) {
throw new \Error('Impossible to resolve the promise, no more task to execute..');
}

return $value;
$promise = DummyPromise::wrap($promise);
while ($promise->isPending()) {
$this->shiftCoroutine();
}

return $this->resolve($promise);
}

/**
* {@inheritdoc}
*/
public function async(\Generator $generator): Promise
public function async(Generator $generator): Promise
{
$generatorPromise = new YieldPromise();
$generatorPromise = $this->createPromise();
Coroutine::create(function() use($generator, $generatorPromise) {
while($generator->valid()) {
$promise = YieldPromise::wrap($generator->current());
$promise->yield();
$generator->send($promise->value());
try {
$promise = $generator->current();
$this->pushCoroutine();
$generator->send($this->resolve($promise));
} catch (Throwable $exception) {
$generatorPromise->reject($exception);
return;
}
}

$generatorPromise->resolve($generator->getReturn());
Expand All @@ -66,14 +127,14 @@ public function promiseAll(Promise ...$promises): Promise
return $this->promiseFulfilled([]);
}

$globalPromise = new YieldPromise();
$globalPromise = $this->createPromise();
$allResults = array_fill(0, $nbPromises, false);

// To ensure that the last resolved promise resolves the global promise immediately
$waitOnePromise = function (int $index, Promise $promise) use ($globalPromise, &$nbPromises, &$allResults): \Generator {
$waitOnePromise = static function (int $index, Promise $promise) use ($globalPromise, &$nbPromises, &$allResults): Generator {
try {
$allResults[$index] = yield $promise;
} catch (\Throwable $throwable) {
} catch (Throwable $throwable) {
// Prevent to reject the globalPromise twice
if ($nbPromises > 0) {
$nbPromises = -1;
Expand Down Expand Up @@ -101,23 +162,54 @@ public function promiseAll(Promise ...$promises): Promise
*/
public function promiseForeach($traversable, callable $function): Promise
{
$promises = [];
foreach ($traversable as $key => $value) {
$promises[] = $this->async($function($value, $key));
}

return $this->promiseAll(...$promises);
}

/**
* {@inheritdoc}
*/
public function promiseRace(Promise ...$promises): Promise
{
if (empty($promises)) {
return $this->promiseFulfilled(null);
}

$globalPromise = $this->createPromise();
$isFirstPromise = true;

$wrapPromise = function (Promise $promise) use ($globalPromise, &$isFirstPromise): \Generator {
try {
$result = yield $promise;
if ($isFirstPromise) {
$isFirstPromise = false;
$globalPromise->resolve($result);
}
} catch (\Throwable $throwable) {
if ($isFirstPromise) {
$isFirstPromise = false;
$globalPromise->reject($throwable);
}
}
};

foreach ($promises as $promise) {
$this->async($wrapPromise($promise));
}

return $globalPromise;
}

/**
* {@inheritdoc}
*/
public function promiseFulfilled($value): Promise
{
$promise = new YieldPromise();
$promise = $this->createPromise();
$promise->resolve($value);

return $promise;
Expand All @@ -126,9 +218,9 @@ public function promiseFulfilled($value): Promise
/**
* {@inheritdoc}
*/
public function promiseRejected(\Throwable $throwable): Promise
public function promiseRejected(Throwable $throwable): Promise
{
$promise = new YieldPromise();
$promise = $this->createPromise();
$promise->reject($throwable);

return $promise;
Expand All @@ -139,11 +231,12 @@ public function promiseRejected(\Throwable $throwable): Promise
*/
public function idle(): Promise
{
$promise = new YieldPromise();
$promise = new DummyPromise();
Coroutine::create(function() use ($promise) {
Coroutine::defer(function () use ($promise) {
$promise->resolve(null);
});
$this->pushCoroutine();
$promise->resolve(null);
//Coroutine::defer(function () use ($promise) {
//});
});

return $promise;
Expand All @@ -154,9 +247,11 @@ public function idle(): Promise
*/
public function delay(int $milliseconds): Promise
{
$promise = new YieldPromise();
$promise = new DummyPromise();
Coroutine::create(function() use($milliseconds, $promise) {
Coroutine::sleep($milliseconds / 1000);
$this->pushCoroutine();
//Coroutine::sleep($milliseconds / 1000);
usleep($milliseconds * 1000);
$promise->resolve(null);
});

Expand All @@ -166,9 +261,9 @@ public function delay(int $milliseconds): Promise
/**
* {@inheritdoc}
*/
#[Pure] public function deferred(): Deferred
public function deferred(): Deferred
{
return new YieldPromise();
return $this->createPromise();
}

/**
Expand Down
85 changes: 85 additions & 0 deletions src/Adapter/Swoole/Internal/DummyPromise.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

namespace M6Web\Tornado\Adapter\Swoole\Internal;

use M6Web\Tornado\Promise;
use M6Web\Tornado\Deferred;
use Throwable;

final class DummyPromise implements Promise, Deferred
{
private bool $isPending;
private mixed $value;
private ?Throwable $exception;
private $callback;

public function __construct(?callable $callback = null)
{
$this->isPending = true;
$this->exception = null;
$this->callback = $callback ?? static function() {};
}

public static function wrap(Promise $promise): self
{
assert($promise instanceof self, new \Error('Input promise was not created by this adapter.'));

return $promise;
}

public function getPromise(): Promise
{
return $this;
}

public function resolve($value): void
{
assert(true === $this->isPending, new \Error('Promise is already resolved.'));

$this->isPending = false;
$this->value = $value;
($this->callback)();
}

public function reject(Throwable $throwable): void
{
assert(true === $this->isPending, new \Error('Promise is already resolved.'));

$this->isPending = false;
$this->exception = $throwable;
($this->callback)();
}

public function isPending(): bool
{
if($this->isPending) {
return $this->isPending;
}

if($this->exception === null) {
if ($this->value instanceof self) {
return $this->value->isPending();
}

if(is_array($this->value)) {
foreach ($this->value as $value) {
if($value instanceof self && $value->isPending()) {
return $value->isPending();
}
}
}
}

return $this->isPending;
}

public function getValue(): mixed
{
return $this->value;
}

public function getException(): ?Throwable
{
return $this->exception;
}
}
2 changes: 1 addition & 1 deletion src/Adapter/Swoole/PromiseEventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function __construct()
/**
* {@inheritdoc}
*/
public function wait(Promise $promise)
public function wait(Promise $promise): mixed
{
$value = null;
$isRejected = false;
Expand Down
Loading

0 comments on commit 04e15a6

Please sign in to comment.