diff --git a/src/Adapter/Swoole/EventLoop.php b/src/Adapter/Swoole/EventLoop.php index d21078d..adbc9b1 100644 --- a/src/Adapter/Swoole/EventLoop.php +++ b/src/Adapter/Swoole/EventLoop.php @@ -18,7 +18,7 @@ class EventLoop implements \M6Web\Tornado\EventLoop { private $cids; private $oldCids; - private $pendingThrowPromises; + private $unhandledFailingPromises; public function __construct() { @@ -28,7 +28,7 @@ public function __construct() $this->cids = []; $this->oldCids = []; - $this->pendingThrowPromises = []; + $this->unhandledFailingPromises = []; } public function __destruct() @@ -82,7 +82,7 @@ private function pushCoroutine(): void private function createPromise(): DummyPromise { - return new DummyPromise(function (DummyPromise $promise) { + return new DummyPromise(function () { $this->shiftCoroutine(); }); } @@ -91,9 +91,9 @@ private function getValue($value) { if ($value instanceof DummyPromise && !$this->isPending($value)) { if ($value->getException() !== null) { - foreach ($this->pendingThrowPromises as $index => $promise) { + foreach ($this->unhandledFailingPromises as $index => $promise) { if ($promise === $value) { - unset($this->pendingThrowPromises[$index]); + unset($this->unhandledFailingPromises[$index]); } } throw $value->getException(); @@ -108,7 +108,7 @@ private function getValue($value) } } - foreach ($this->pendingThrowPromises as $p) { + foreach ($this->unhandledFailingPromises as $p) { $this->getValue($p); } @@ -179,17 +179,17 @@ public function async(Generator $generator): Promise $generator->throw($exception); $fnWrapGenerator($generator, $deferred); } catch (\Throwable $throwable) { - $this->pendingThrowPromises[] = $deferred; + $this->unhandledFailingPromises[] = $deferred; $deferred->reject($throwable); } } }); }; - $deferred = $this->createPromise(); + $deferred = $this->deferred(); $fnWrapGenerator($generator, $deferred); - return $deferred; + return $deferred->getPromise(); } /** @@ -202,7 +202,7 @@ public function promiseAll(Promise ...$promises): Promise return $this->promiseFulfilled([]); } - $deferred = $this->createPromise(); + $deferred = $this->deferred(); $result = [array_fill(0, $ticks, false)]; // To ensure that the last resolved promise resolves the global promise immediately @@ -229,7 +229,7 @@ public function promiseAll(Promise ...$promises): Promise $this->async($waitOnePromise($index, $promise)); } - return $deferred; + return $deferred->getPromise(); } /** @@ -254,7 +254,7 @@ public function promiseRace(Promise ...$promises): Promise return $this->promiseFulfilled(null); } - $deferred = $this->createPromise(); + $deferred = $this->deferred(); foreach ($promises as $promise) { DummyPromise::wrap($promise)->addCallback(function (DummyPromise $promise) use ($deferred) { @@ -268,7 +268,7 @@ public function promiseRace(Promise ...$promises): Promise }); } - return $deferred; + return $deferred->getPromise(); } /** @@ -298,15 +298,15 @@ public function promiseRejected(Throwable $throwable): Promise */ public function idle(): Promise { - $promise = $this->createPromise(); - Coroutine::create(function () use ($promise) { + $deferred = $this->deferred(); + Coroutine::create(function () use ($deferred) { $this->pushCoroutine(); // Coroutine::defer(function () use ($promise) { - $promise->resolve(null); + $deferred->resolve(null); // }); }); - return $promise; + return $deferred->getPromise(); } /** @@ -314,15 +314,15 @@ public function idle(): Promise */ public function delay(int $milliseconds): Promise { - $promise = $this->createPromise(); - Coroutine::create(function () use ($milliseconds, $promise) { + $deferred = $this->deferred(); + Coroutine::create(function () use ($milliseconds, $deferred) { $this->pushCoroutine(); // Coroutine::sleep($milliseconds / 1000); usleep($milliseconds * 1000); - $promise->resolve(null); + $deferred->resolve(null); }); - return $promise; + return $deferred->getPromise(); } /** diff --git a/src/Adapter/Swoole/YieldEventLoop.php b/src/Adapter/Swoole/YieldEventLoop.php index 99c6300..8b54c34 100644 --- a/src/Adapter/Swoole/YieldEventLoop.php +++ b/src/Adapter/Swoole/YieldEventLoop.php @@ -3,8 +3,7 @@ namespace M6Web\Tornado\Adapter\Swoole; use function extension_loaded; -use JetBrains\PhpStorm\Pure; -use M6Web\Tornado\Adapter\Swoole\Internal\YieldPromise; +use M6Web\Tornado\Adapter\Swoole\Internal\DummyPromise; use M6Web\Tornado\Deferred; use M6Web\Tornado\Promise; use RuntimeException; @@ -20,6 +19,29 @@ public function __construct() } } + private function isPending(DummyPromise $promise): bool + { + if ($promise->isPending()) { + return $promise->isPending(); + } + + if ($promise->getException() === null) { + if ($promise->getValue() instanceof DummyPromise) { + return $promise->getValue()->isPending(); + } + + if (is_array($promise->getValue())) { + foreach ($promise->getValue() as $value) { + if ($value instanceof DummyPromise && $value->isPending()) { + return $value->isPending(); + } + } + } + } + + return $promise->isPending(); + } + /** * {@inheritdoc} */ @@ -40,18 +62,30 @@ public function wait(Promise $promise) */ public function async(\Generator $generator): Promise { - $generatorPromise = new YieldPromise(); - Coroutine::create(function () use ($generator, $generatorPromise) { - while ($generator->valid()) { - $promise = YieldPromise::wrap($generator->current()); - $promise->yield(); - $generator->send($promise->value()); - } + $fnWrapGenerator = function (\Generator $generator, Deferred $deferred) use (&$fnWrapGenerator) { + Coroutine::create(function () use ($generator, $deferred, $fnWrapGenerator) { + if (!$generator->valid()) { + $deferred->resolve($generator->getReturn()); + return; + } - $generatorPromise->resolve($generator->getReturn()); - }); + $promise = DummyPromise::wrap($generator->current()); + if($this->isPending($promise)) { + $cid = Coroutine::getCid(); + $promise->addCallback(function () use ($cid) { + Coroutine::resume($cid); + }); + Coroutine::yield(); + } + $generator->send($promise->getValue()); + $fnWrapGenerator($generator, $deferred); + }); + }; - return $generatorPromise; + $deferred = $this->deferred(); + $fnWrapGenerator($generator, $deferred); + + return $deferred->getPromise(); } /** @@ -59,39 +93,23 @@ public function async(\Generator $generator): Promise */ public function promiseAll(Promise ...$promises): Promise { - $nbPromises = count($promises); - if ($nbPromises === 0) { - return $this->promiseFulfilled([]); - } - - $globalPromise = new YieldPromise(); - $allResults = array_fill(0, $nbPromises, false); - - // To ensure that the last resolved promise resolves the global promise immediately - $waitOnePromise = function (int $index, Promise $promise) use ($globalPromise, &$nbPromises, &$allResults): \Generator { - try { - $allResults[$index] = yield $promise; - } catch (\Throwable $throwable) { - // Prevent to reject the globalPromise twice - if ($nbPromises > 0) { - $nbPromises = -1; - $globalPromise->reject($throwable); - - return; - } - } - - // Last resolved promise resolved globalPromise - if (--$nbPromises === 0) { - $globalPromise->resolve($allResults); - } - }; - + $wg = new Coroutine\WaitGroup(); + $result = []; foreach ($promises as $index => $promise) { - $this->async($waitOnePromise($index, $promise)); + $this->async((static function() use($wg, &$result, $index, $promise){ + $wg->add(); + $result[$index] = yield $promise; + $wg->done(); + })()); } - return $globalPromise; + $deferred = $this->deferred(); + Coroutine::create(function() use($wg, $deferred, &$result) { + $wg->wait(); + $deferred->resolve($result); + }); + + return $deferred->getPromise(); } /** @@ -113,7 +131,7 @@ public function promiseRace(Promise ...$promises): Promise */ public function promiseFulfilled($value): Promise { - $promise = new YieldPromise(); + $promise = new DummyPromise(); $promise->resolve($value); return $promise; @@ -124,7 +142,7 @@ public function promiseFulfilled($value): Promise */ public function promiseRejected(\Throwable $throwable): Promise { - $promise = new YieldPromise(); + $promise = new DummyPromise(); $promise->reject($throwable); return $promise; @@ -135,14 +153,15 @@ public function promiseRejected(\Throwable $throwable): Promise */ public function idle(): Promise { - $promise = new YieldPromise(); - Coroutine::create(function () use ($promise) { - Coroutine::defer(function () use ($promise) { - $promise->resolve(null); + $deferred = $this->deferred(); + Coroutine::create(function () use ($deferred) { + Coroutine::defer(function () use ($deferred) { + usleep(1000); + $deferred->resolve(null); }); }); - return $promise; + return $deferred->getPromise(); } /** @@ -150,29 +169,30 @@ public function idle(): Promise */ public function delay(int $milliseconds): Promise { - $promise = new YieldPromise(); - Coroutine::create(function () use ($milliseconds, $promise) { - Coroutine::sleep($milliseconds / 1000); - $promise->resolve(null); + $deferred = $this->deferred(); + Coroutine::create(function () use ($milliseconds, $deferred) { + //Coroutine::sleep($milliseconds / 1000); + usleep($milliseconds * 1000); + $deferred->resolve(null); }); - return $promise; + return $deferred->getPromise(); } /** * {@inheritdoc} */ - #[Pure] - public function deferred(): Deferred - { - return new YieldPromise(); - } + public function deferred(): Deferred + { + return new DummyPromise(); + } /** * {@inheritdoc} */ public function readable($stream): Promise { + return $this->promiseFulfilled($stream); } /** @@ -180,5 +200,6 @@ public function readable($stream): Promise */ public function writable($stream): Promise { + return $this->promiseFulfilled($stream); } } diff --git a/tests/Adapter/Swoole/YieldEventLoopTest.php b/tests/Adapter/Swoole/YieldEventLoopTest.php new file mode 100644 index 0000000..29842c1 --- /dev/null +++ b/tests/Adapter/Swoole/YieldEventLoopTest.php @@ -0,0 +1,14 @@ +