From 3b5570edfc66fab853daf2517971798cd8b89f51 Mon Sep 17 00:00:00 2001 From: Mathieu Ledru Date: Tue, 20 Apr 2021 05:16:10 +0200 Subject: [PATCH] :hammer: refactor promise and clean --- src/Adapter/ReactPhp/EventLoop.php | 1 + src/Adapter/Swoole/EventLoop.php | 44 +----- src/Adapter/Swoole/Internal/SwoolePromise.php | 149 +++++------------- 3 files changed, 50 insertions(+), 144 deletions(-) diff --git a/src/Adapter/ReactPhp/EventLoop.php b/src/Adapter/ReactPhp/EventLoop.php index db4e189..5aed1d1 100644 --- a/src/Adapter/ReactPhp/EventLoop.php +++ b/src/Adapter/ReactPhp/EventLoop.php @@ -69,6 +69,7 @@ public function async(\Generator $generator): Promise try { if (!$generator->valid()) { $deferred->resolve($generator->getReturn()); + return; } $promise = $generator->current(); if (!$promise instanceof Internal\PromiseWrapper) { diff --git a/src/Adapter/Swoole/EventLoop.php b/src/Adapter/Swoole/EventLoop.php index 05efb73..9132e76 100644 --- a/src/Adapter/Swoole/EventLoop.php +++ b/src/Adapter/Swoole/EventLoop.php @@ -8,8 +8,6 @@ use M6Web\Tornado\Deferred; use M6Web\Tornado\Promise; use Swoole\Coroutine; -use Swoole\Coroutine\Channel; -use Swoole\Coroutine\WaitGroup; use Swoole\Event; class EventLoop implements \M6Web\Tornado\EventLoop @@ -35,43 +33,22 @@ public function wait(Promise $promise) $isRejected = false; $promiseSettled = false; - //Coroutine::create(function() use ($promise, &$value, &$isRejected, &$promiseSettled) { - $ticks = 1; - //$wg = new WaitGroup(1); - //$channel = new Channel($ticks); Internal\PromiseWrapper::toHandledPromise($promise, $this->unhandledFailingPromises)->getSwoolePromise()->then( - function ($result) use (/* $wg , $channel, */&$value, &$promiseSettled) { + function ($result) use (&$value, &$promiseSettled) { $promiseSettled = true; $value = $result; - //$channel->push(true); - //$wg->done(); Event::exit(); }, - function ($result) use (/* $wg , $channel, */&$value, &$isRejected, &$promiseSettled) { + function ($result) use (&$value, &$isRejected, &$promiseSettled) { $promiseSettled = true; $value = $result; $isRejected = true; - //$channel->push(true); - //$wg->done(); Event::exit(); } ); if (!$promiseSettled) { Event::wait(); } - //while ($ticks--) { - //$channel->pop(); - //} - //$channel->close(); - //$wg->wait(); - //}); - //while (!$promiseSettled) { - // @codeCoverageIgnoreStart - //usleep(SwoolePromise::PROMISE_WAIT); - // @codeCoverageIgnoreEnd - //} - //Event::wait(); - //swoole_event_wait(); if (!$promiseSettled) { throw new \Error('Impossible to resolve the promise, no more task to execute.'); @@ -96,6 +73,7 @@ public function async(\Generator $generator): Promise try { if (!$generator->valid()) { $deferred->resolve($generator->getReturn()); + return; } $promise = $generator->current(); if (!$promise instanceof Internal\PromiseWrapper) { @@ -225,22 +203,12 @@ public function promiseRejected(\Throwable $throwable): Promise */ public function idle(): Promise { - /*return Internal\PromiseWrapper::createUnhandled(new SwoolePromise(function($resolve) { + return Internal\PromiseWrapper::createUnhandled(new SwoolePromise(function($resolve) { Coroutine::defer(function () use ($resolve) { - //Coroutine::sleep(1.0); - //usleep(1000000); + //Coroutine::sleep(0.001); $resolve(null); }); - }), $this->unhandledFailingPromises);*/ - return Internal\PromiseWrapper::createUnhandled(SwoolePromise::resolve(null), $this->unhandledFailingPromises, function() { - return new SwoolePromise(function($resolve) { - Coroutine::defer(function () use ($resolve) { - //Coroutine::sleep(1.0); - //usleep(1); - $resolve(null); - }); - }); - }); + }), $this->unhandledFailingPromises); } /** diff --git a/src/Adapter/Swoole/Internal/SwoolePromise.php b/src/Adapter/Swoole/Internal/SwoolePromise.php index bbeb2b1..a38ac15 100644 --- a/src/Adapter/Swoole/Internal/SwoolePromise.php +++ b/src/Adapter/Swoole/Internal/SwoolePromise.php @@ -17,40 +17,34 @@ */ final class SwoolePromise implements Promise { - const PROMISE_WAIT = 100; - - const STATE_PENDING = 1; - const STATE_FULFILLED = 0; - const STATE_REJECTED = -1; - - /** @var int */ - protected $state = self::STATE_PENDING; - - /** @var mixed */ private $result; + private $onResolve; - private $resolvedCallback; + const STATUS_RESOLVE = 1; + const STATUS_ERROR = -1; /** - * PromiseCo constructor. + * Promise constructor. * * @param callable $executor */ public function __construct(callable $executor) { - $this->resolvedCallback = static function() {}; - if (!extension_loaded('swoole')) { throw new RuntimeException( 'SwoolePromise MUST running only in CLI mode with swoole extension.' ); } - // @codeCoverageIgnoreEnd + + $this->onResolve = function($status, $value) { + $this->result = [$status, $value]; + }; + $resolve = function ($value) { - $this->setResult($value, self::STATE_FULFILLED); + ($this->onResolve)(self::STATUS_RESOLVE, $value); }; - $reject = function ($value) { - $this->setResult($value, self::STATE_REJECTED); + $reject = function ($error) { + ($this->onResolve)(self::STATUS_ERROR, $error); }; Coroutine::create(function (callable $executor, callable $resolve, callable $reject) { try { @@ -109,37 +103,6 @@ final public function catch(callable $onRejected): SwoolePromise return $this->then(null, $onRejected); } - /** - * Change promise state - * - * @param integer $state - * @return void - */ - final protected function setState(int $state): void - { - $this->state = $state; - } - - /** - * Promise is pending - * - * @return boolean - */ - final protected function isPending(): bool - { - return $this->state == self::STATE_PENDING; - } - - /** - * Promise is fulfilled - * - * @return boolean - */ - final protected function isFulfilled(): bool - { - return $this->state == self::STATE_FULFILLED; - } - /** * {@inheritDoc} * @@ -149,21 +112,37 @@ final protected function isFulfilled(): bool */ public function then(?callable $onFulfilled = null, ?callable $onRejected = null): SwoolePromise { + if($this->result === null) { + return self::create(function (callable $resolve, callable $reject) use ($onFulfilled, $onRejected) { + $this->onResolve = function($status, $value) use ($resolve, $reject, $onFulfilled, $onRejected) { + if($status === self::STATUS_RESOLVE) { + if($onFulfilled !== null) { + $onFulfilled($value); + } + $resolve($value); + } else if($status === self::STATUS_ERROR) { + if($onRejected !== null) { + $onRejected($value); + } + $reject($value); + } + }; + }); + } + return self::create(function (callable $resolve, callable $reject) use ($onFulfilled, $onRejected) { - $this->resolvedCallback = function() use ($resolve, $reject, $onFulfilled, $onRejected) { - $callable = $this->isFulfilled() ? $onFulfilled : $onRejected; - if (!is_callable($callable)) { - $resolve($this->result); - return; + if($this->result[0] === self::STATUS_RESOLVE) { + $value = $this->result[1]; + if($onFulfilled !== null) { + $onFulfilled($value); } - try { - $resolve($callable($this->result)); - } catch (\Throwable $error) { - $reject($error); + $resolve($value); + } else if($this->result[0] === self::STATUS_ERROR) { + $error = $this->result[1]; + if($onRejected !== null) { + $onRejected($error); } - }; - if(!$this->isPending()) { - ($this->resolvedCallback)(); + $reject($error); } }); } @@ -180,26 +159,23 @@ public static function all(iterable $promises): SwoolePromise $ticks = count($promises); $firstError = null; - //$channel = new Channel($ticks); $result = []; $key = 0; foreach ($promises as $promise) { if (!$promise instanceof SwoolePromise) { - //$channel->close(); throw new RuntimeException( 'Supported only SwoolePromise instance' ); } - $promise->then(function ($value) use ($key, &$result, &$ticks, $resolve/* $channel*/) { + $promise->then(function ($value) use ($key, &$result, &$ticks, $resolve) { $result[$key] = $value; $ticks--; - //$channel->push(true); if($ticks === 0) { + ksort($result); $resolve($result); } return $value; - }, function ($error) use (/*$channel, */&$firstError, &$ticks, $reject) { - //$channel->push(true); + }, function ($error) use (&$firstError, &$ticks, $reject) { $ticks--; if ($firstError === null) { $firstError = $error; @@ -208,45 +184,6 @@ public static function all(iterable $promises): SwoolePromise }); $key++; } - //while ($ticks--) { - // $channel->pop(); - //} - //$channel->close(); - - /*if ($firstError !== null) { - $reject($firstError); - return; - } - - $resolve($result);*/ }); } - - /** - * Set resolved result - * - * @param mixed $value - * @return void - */ - private function setResult($value, $state): void - { - if ($value instanceof self) { - $resolved = false; - $callable = function ($value) use (&$resolved) { - $this->setResult($value); - $resolved = true; - }; - $value->then($callable, $callable); - // resolve async locking error (code to remove) - //while (!$resolved) { - // @codeCoverageIgnoreStart - //usleep(self::PROMISE_WAIT); - // @codeCoverageIgnoreEnd - //} - } else if ($this->isPending()) { - $this->result = $value; - $this->setState($state); - ($this->resolvedCallback)(); - } - } }