From 04e15a6b62bfbd4f27370165181945bf122d139a Mon Sep 17 00:00:00 2001 From: Mathieu Ledru Date: Thu, 22 Apr 2021 02:27:40 +0200 Subject: [PATCH] :construction: integrate Internal\DummyPromise --- src/Adapter/Amp/EventLoop.php | 2 +- src/Adapter/ReactPhp/EventLoop.php | 2 +- src/Adapter/Swoole/EventLoop.php | 155 ++++++++++++--- src/Adapter/Swoole/Internal/DummyPromise.php | 85 +++++++++ src/Adapter/Swoole/PromiseEventLoop.php | 2 +- src/Adapter/Swoole/YieldEventLoop.php | 189 +++++++++++++++++++ src/Adapter/Tornado/EventLoop.php | 2 +- src/Adapter/Tornado/SynchronousEventLoop.php | 2 +- src/EventLoop.php | 4 +- 9 files changed, 405 insertions(+), 38 deletions(-) create mode 100644 src/Adapter/Swoole/Internal/DummyPromise.php create mode 100644 src/Adapter/Swoole/YieldEventLoop.php diff --git a/src/Adapter/Amp/EventLoop.php b/src/Adapter/Amp/EventLoop.php index 5668fa3..c20bc7f 100644 --- a/src/Adapter/Amp/EventLoop.php +++ b/src/Adapter/Amp/EventLoop.php @@ -11,7 +11,7 @@ class EventLoop implements \M6Web\Tornado\EventLoop /** * {@inheritdoc} */ - public function wait(Promise $promise) + public function wait(Promise $promise): mixed { try { $result = \Amp\Promise\wait( diff --git a/src/Adapter/ReactPhp/EventLoop.php b/src/Adapter/ReactPhp/EventLoop.php index 5aed1d1..934fac5 100644 --- a/src/Adapter/ReactPhp/EventLoop.php +++ b/src/Adapter/ReactPhp/EventLoop.php @@ -23,7 +23,7 @@ public function __construct(\React\EventLoop\LoopInterface $reactEventLoop) /** * {@inheritdoc} */ - public function wait(Promise $promise) + public function wait(Promise $promise): mixed { $value = null; $isRejected = false; diff --git a/src/Adapter/Swoole/EventLoop.php b/src/Adapter/Swoole/EventLoop.php index 1e0554b..f583a63 100644 --- a/src/Adapter/Swoole/EventLoop.php +++ b/src/Adapter/Swoole/EventLoop.php @@ -2,17 +2,19 @@ namespace M6Web\Tornado\Adapter\Swoole; -use JetBrains\PhpStorm\Pure; -use M6Web\Tornado\Adapter\Swoole\Internal\YieldPromise; +use Generator; +use M6Web\Tornado\Adapter\Swoole\Internal\DummyPromise; use M6Web\Tornado\Deferred; use M6Web\Tornado\Promise; use Swoole\Coroutine; -use Swoole\Event; use RuntimeException; +use Throwable; use function extension_loaded; class EventLoop implements \M6Web\Tornado\EventLoop { + private $cids; + public function __construct() { if (!extension_loaded('swoole')) { @@ -20,34 +22,93 @@ public function __construct() 'EventLoop must running only with swoole extension.' ); } + + $this->cids = []; + } + + private function shiftCoroutine(): mixed + { + if(count($this->cids) === 0) { + return null; + } + + $cid = array_shift($this->cids); + if(Coroutine::exists($cid)) { + Coroutine::resume($cid); + } else { + $this->shiftCoroutine(); + } + + return $cid; + } + + private function pushCoroutine(): mixed + { + $cid = Coroutine::getCid(); + $this->cids[] = $cid; + Coroutine::yield(); + return $cid; + } + + private function createPromise(): DummyPromise + { + return new DummyPromise(function () { + $this->shiftCoroutine(); + }); + } + + private function resolve($value) + { + if($value instanceof DummyPromise && !$value->isPending()) { + if($value->getException() !== null) { + throw $value->getException(); + } + + $value = $this->resolve($value->getValue()); + } + + if(is_array($value)) { + foreach ($value as $k => $v) { + $value[$k] = $this->resolve($v); + } + } + + return $value; } /** * {@inheritdoc} */ - public function wait(Promise $promise) + public function wait(Promise $promise): mixed { - $value = null; - $this->async((static function() use($promise, &$value): \Generator { - $value = yield $promise; - Event::exit(); - })()); - Event::wait(); + if(!count($this->cids)) { + throw new \Error('Impossible to resolve the promise, no more task to execute..'); + } - return $value; + $promise = DummyPromise::wrap($promise); + while ($promise->isPending()) { + $this->shiftCoroutine(); + } + + return $this->resolve($promise); } /** * {@inheritdoc} */ - public function async(\Generator $generator): Promise + public function async(Generator $generator): Promise { - $generatorPromise = new YieldPromise(); + $generatorPromise = $this->createPromise(); Coroutine::create(function() use($generator, $generatorPromise) { while($generator->valid()) { - $promise = YieldPromise::wrap($generator->current()); - $promise->yield(); - $generator->send($promise->value()); + try { + $promise = $generator->current(); + $this->pushCoroutine(); + $generator->send($this->resolve($promise)); + } catch (Throwable $exception) { + $generatorPromise->reject($exception); + return; + } } $generatorPromise->resolve($generator->getReturn()); @@ -66,14 +127,14 @@ public function promiseAll(Promise ...$promises): Promise return $this->promiseFulfilled([]); } - $globalPromise = new YieldPromise(); + $globalPromise = $this->createPromise(); $allResults = array_fill(0, $nbPromises, false); // To ensure that the last resolved promise resolves the global promise immediately - $waitOnePromise = function (int $index, Promise $promise) use ($globalPromise, &$nbPromises, &$allResults): \Generator { + $waitOnePromise = static function (int $index, Promise $promise) use ($globalPromise, &$nbPromises, &$allResults): Generator { try { $allResults[$index] = yield $promise; - } catch (\Throwable $throwable) { + } catch (Throwable $throwable) { // Prevent to reject the globalPromise twice if ($nbPromises > 0) { $nbPromises = -1; @@ -101,7 +162,12 @@ public function promiseAll(Promise ...$promises): Promise */ public function promiseForeach($traversable, callable $function): Promise { + $promises = []; + foreach ($traversable as $key => $value) { + $promises[] = $this->async($function($value, $key)); + } + return $this->promiseAll(...$promises); } /** @@ -109,7 +175,33 @@ public function promiseForeach($traversable, callable $function): Promise */ public function promiseRace(Promise ...$promises): Promise { + if (empty($promises)) { + return $this->promiseFulfilled(null); + } + + $globalPromise = $this->createPromise(); + $isFirstPromise = true; + $wrapPromise = function (Promise $promise) use ($globalPromise, &$isFirstPromise): \Generator { + try { + $result = yield $promise; + if ($isFirstPromise) { + $isFirstPromise = false; + $globalPromise->resolve($result); + } + } catch (\Throwable $throwable) { + if ($isFirstPromise) { + $isFirstPromise = false; + $globalPromise->reject($throwable); + } + } + }; + + foreach ($promises as $promise) { + $this->async($wrapPromise($promise)); + } + + return $globalPromise; } /** @@ -117,7 +209,7 @@ public function promiseRace(Promise ...$promises): Promise */ public function promiseFulfilled($value): Promise { - $promise = new YieldPromise(); + $promise = $this->createPromise(); $promise->resolve($value); return $promise; @@ -126,9 +218,9 @@ public function promiseFulfilled($value): Promise /** * {@inheritdoc} */ - public function promiseRejected(\Throwable $throwable): Promise + public function promiseRejected(Throwable $throwable): Promise { - $promise = new YieldPromise(); + $promise = $this->createPromise(); $promise->reject($throwable); return $promise; @@ -139,11 +231,12 @@ public function promiseRejected(\Throwable $throwable): Promise */ public function idle(): Promise { - $promise = new YieldPromise(); + $promise = new DummyPromise(); Coroutine::create(function() use ($promise) { - Coroutine::defer(function () use ($promise) { - $promise->resolve(null); - }); + $this->pushCoroutine(); + $promise->resolve(null); + //Coroutine::defer(function () use ($promise) { + //}); }); return $promise; @@ -154,9 +247,11 @@ public function idle(): Promise */ public function delay(int $milliseconds): Promise { - $promise = new YieldPromise(); + $promise = new DummyPromise(); Coroutine::create(function() use($milliseconds, $promise) { - Coroutine::sleep($milliseconds / 1000); + $this->pushCoroutine(); + //Coroutine::sleep($milliseconds / 1000); + usleep($milliseconds * 1000); $promise->resolve(null); }); @@ -166,9 +261,9 @@ public function delay(int $milliseconds): Promise /** * {@inheritdoc} */ - #[Pure] public function deferred(): Deferred + public function deferred(): Deferred { - return new YieldPromise(); + return $this->createPromise(); } /** diff --git a/src/Adapter/Swoole/Internal/DummyPromise.php b/src/Adapter/Swoole/Internal/DummyPromise.php new file mode 100644 index 0000000..132972a --- /dev/null +++ b/src/Adapter/Swoole/Internal/DummyPromise.php @@ -0,0 +1,85 @@ +isPending = true; + $this->exception = null; + $this->callback = $callback ?? static function() {}; + } + + public static function wrap(Promise $promise): self + { + assert($promise instanceof self, new \Error('Input promise was not created by this adapter.')); + + return $promise; + } + + public function getPromise(): Promise + { + return $this; + } + + public function resolve($value): void + { + assert(true === $this->isPending, new \Error('Promise is already resolved.')); + + $this->isPending = false; + $this->value = $value; + ($this->callback)(); + } + + public function reject(Throwable $throwable): void + { + assert(true === $this->isPending, new \Error('Promise is already resolved.')); + + $this->isPending = false; + $this->exception = $throwable; + ($this->callback)(); + } + + public function isPending(): bool + { + if($this->isPending) { + return $this->isPending; + } + + if($this->exception === null) { + if ($this->value instanceof self) { + return $this->value->isPending(); + } + + if(is_array($this->value)) { + foreach ($this->value as $value) { + if($value instanceof self && $value->isPending()) { + return $value->isPending(); + } + } + } + } + + return $this->isPending; + } + + public function getValue(): mixed + { + return $this->value; + } + + public function getException(): ?Throwable + { + return $this->exception; + } +} diff --git a/src/Adapter/Swoole/PromiseEventLoop.php b/src/Adapter/Swoole/PromiseEventLoop.php index 8d29cdf..fd77d57 100644 --- a/src/Adapter/Swoole/PromiseEventLoop.php +++ b/src/Adapter/Swoole/PromiseEventLoop.php @@ -35,7 +35,7 @@ public function __construct() /** * {@inheritdoc} */ - public function wait(Promise $promise) + public function wait(Promise $promise): mixed { $value = null; $isRejected = false; diff --git a/src/Adapter/Swoole/YieldEventLoop.php b/src/Adapter/Swoole/YieldEventLoop.php new file mode 100644 index 0000000..ffbe9fe --- /dev/null +++ b/src/Adapter/Swoole/YieldEventLoop.php @@ -0,0 +1,189 @@ +async((static function() use($promise, &$value): \Generator { + $value = yield $promise; + Event::exit(); + })()); + Event::wait(); + + return $value; + } + + /** + * {@inheritdoc} + */ + public function async(\Generator $generator): Promise + { + $generatorPromise = new YieldPromise(); + Coroutine::create(function() use($generator, $generatorPromise) { + while($generator->valid()) { + $promise = YieldPromise::wrap($generator->current()); + $promise->yield(); + $generator->send($promise->value()); + } + + $generatorPromise->resolve($generator->getReturn()); + }); + + return $generatorPromise; + } + + /** + * {@inheritdoc} + */ + public function promiseAll(Promise ...$promises): Promise + { + $nbPromises = count($promises); + if ($nbPromises === 0) { + return $this->promiseFulfilled([]); + } + + $globalPromise = new YieldPromise(); + $allResults = array_fill(0, $nbPromises, false); + + // To ensure that the last resolved promise resolves the global promise immediately + $waitOnePromise = function (int $index, Promise $promise) use ($globalPromise, &$nbPromises, &$allResults): \Generator { + try { + $allResults[$index] = yield $promise; + } catch (\Throwable $throwable) { + // Prevent to reject the globalPromise twice + if ($nbPromises > 0) { + $nbPromises = -1; + $globalPromise->reject($throwable); + + return; + } + } + + // Last resolved promise resolved globalPromise + if (--$nbPromises === 0) { + $globalPromise->resolve($allResults); + } + }; + + foreach ($promises as $index => $promise) { + $this->async($waitOnePromise($index, $promise)); + } + + return $globalPromise; + } + + /** + * {@inheritdoc} + */ + public function promiseForeach($traversable, callable $function): Promise + { + + } + + /** + * {@inheritdoc} + */ + public function promiseRace(Promise ...$promises): Promise + { + + } + + /** + * {@inheritdoc} + */ + public function promiseFulfilled($value): Promise + { + $promise = new YieldPromise(); + $promise->resolve($value); + + return $promise; + } + + /** + * {@inheritdoc} + */ + public function promiseRejected(\Throwable $throwable): Promise + { + $promise = new YieldPromise(); + $promise->reject($throwable); + + return $promise; + } + + /** + * {@inheritdoc} + */ + public function idle(): Promise + { + $promise = new YieldPromise(); + Coroutine::create(function() use ($promise) { + Coroutine::defer(function () use ($promise) { + $promise->resolve(null); + }); + }); + + return $promise; + } + + /** + * {@inheritdoc} + */ + public function delay(int $milliseconds): Promise + { + $promise = new YieldPromise(); + Coroutine::create(function() use($milliseconds, $promise) { + Coroutine::sleep($milliseconds / 1000); + $promise->resolve(null); + }); + + return $promise; + } + + /** + * {@inheritdoc} + */ + #[Pure] public function deferred(): Deferred + { + return new YieldPromise(); + } + + /** + * {@inheritdoc} + */ + public function readable($stream): Promise + { + + } + + /** + * {@inheritdoc} + */ + public function writable($stream): Promise + { + + } +} diff --git a/src/Adapter/Tornado/EventLoop.php b/src/Adapter/Tornado/EventLoop.php index 546c5d3..753a4a3 100644 --- a/src/Adapter/Tornado/EventLoop.php +++ b/src/Adapter/Tornado/EventLoop.php @@ -26,7 +26,7 @@ public function __construct() /** * {@inheritdoc} */ - public function wait(Promise $promise) + public function wait(Promise $promise): mixed { $promiseIsPending = true; $finalAction = function () {throw new \Error('Impossible to resolve the promise, no more task to execute..'); }; diff --git a/src/Adapter/Tornado/SynchronousEventLoop.php b/src/Adapter/Tornado/SynchronousEventLoop.php index 21cd4f8..91a7852 100644 --- a/src/Adapter/Tornado/SynchronousEventLoop.php +++ b/src/Adapter/Tornado/SynchronousEventLoop.php @@ -13,7 +13,7 @@ class SynchronousEventLoop implements \M6Web\Tornado\EventLoop /** * {@inheritdoc} */ - public function wait(Promise $promise) + public function wait(Promise $promise): mixed { // If there are some uncaught exceptions, throw the first one. if ($throwable = reset($this->asyncThrowables)) { diff --git a/src/EventLoop.php b/src/EventLoop.php index d93d742..0f31af1 100644 --- a/src/EventLoop.php +++ b/src/EventLoop.php @@ -7,10 +7,8 @@ interface EventLoop /** * Waits the resolution of a promise, and returns its value. * You should use this function once for your global result. - * - * @return mixed */ - public function wait(Promise $promise); + public function wait(Promise $promise): mixed; /** * Registers a generator in the event loop to execute it asynchronously.