diff --git a/src/Adapter/Tornado/EventLoop.php b/src/Adapter/Tornado/EventLoop.php index 3e97345..6ab005b 100644 --- a/src/Adapter/Tornado/EventLoop.php +++ b/src/Adapter/Tornado/EventLoop.php @@ -45,6 +45,29 @@ function (\Throwable $throwable) use (&$finalAction, &$promiseIsPending) { return count($this->tasks) !== 0; }; + $fnThrowIfNotNull = function (?\Throwable $throwable) { + if ($throwable !== null) { + throw $throwable; + } + }; + + $globalException = null; + // Returns a callback to propagate a value to a generator via $function + $fnSafeGeneratorCallback = function (Internal\Task $task, string $function) use (&$globalException) { + return function ($value) use ($task, $function, &$globalException) { + try { + $task->getGenerator()->$function($value); + $this->tasks[] = $task; + } catch (\Throwable $exception) { + if ($task->getPromise()->hasBeenYielded()) { + $task->getPromise()->reject($exception); + } else { + $globalException = $exception; + } + } + }; + }; + do { // Copy tasks list to safely allow tasks addition by tasks themselves $allTasks = $this->tasks; @@ -59,26 +82,18 @@ function (\Throwable $throwable) use (&$finalAction, &$promiseIsPending) { $blockingPromise = Internal\PendingPromise::fromGenerator($task->getGenerator()); $blockingPromise->addCallbacks( - function ($value) use ($task) { - try { - $task->getGenerator()->send($value); - $this->tasks[] = $task; - } catch (\Throwable $exception) { - $task->getPromise()->reject($exception); - } - }, - function (\Throwable $throwable) use ($task) { - try { - $task->getGenerator()->throw($throwable); - $this->tasks[] = $task; - } catch (\Throwable $exception) { - $task->getPromise()->reject($exception); - } - } + $fnSafeGeneratorCallback($task, 'send'), + $fnSafeGeneratorCallback($task, 'throw') ); } catch (\Throwable $exception) { - $task->getPromise()->reject($exception); + if ($task->getPromise()->hasBeenYielded()) { + $task->getPromise()->reject($exception); + } else { + throw $exception; + } } + + $fnThrowIfNotNull($globalException); } } while ($promiseIsPending && $somethingToDo()); diff --git a/src/Adapter/Tornado/Internal/PendingPromise.php b/src/Adapter/Tornado/Internal/PendingPromise.php index 3455aab..da6b6d1 100644 --- a/src/Adapter/Tornado/Internal/PendingPromise.php +++ b/src/Adapter/Tornado/Internal/PendingPromise.php @@ -14,6 +14,7 @@ class PendingPromise implements Promise private $throwable; private $callbacks = []; private $isSettled = false; + private $hasBeenYielded = false; public static function downcast(Promise $promise): self { @@ -29,7 +30,15 @@ public static function fromGenerator(\Generator $generator): self throw new \Error('Asynchronous function is yielding a ['.gettype($promise).'] instead of a Promise.'); } - return self::downcast($promise); + $promise = self::downcast($promise); + $promise->hasBeenYielded = true; + + return $promise; + } + + public function hasBeenYielded(): bool + { + return $this->hasBeenYielded; } public function resolve($value): self diff --git a/src/Adapter/Tornado/Internal/StreamEventLoop.php b/src/Adapter/Tornado/Internal/StreamEventLoop.php index c2a3d78..8015726 100644 --- a/src/Adapter/Tornado/Internal/StreamEventLoop.php +++ b/src/Adapter/Tornado/Internal/StreamEventLoop.php @@ -40,23 +40,27 @@ private function internalLoop(EventLoop $eventLoop): \Generator $read = $this->readStreams; $write = $this->writeStreams; - stream_select($read, $write, $except, 0); + $nbStreams = @\stream_select($read, $write, $except, 0); - foreach ($read as $stream) { - $streamId = (int) $stream; - $pendingPromise = $this->pendingPromises[$streamId]; - unset($this->readStreams[$streamId]); - unset($this->pendingPromises[$streamId]); - $pendingPromise->resolve($stream); - } + if ($nbStreams !== false) { + foreach ($read as $stream) { + $streamId = (int) $stream; + $pendingPromise = $this->pendingPromises[$streamId]; + unset($this->readStreams[$streamId]); + unset($this->pendingPromises[$streamId]); + $pendingPromise->resolve($stream); + } - foreach ($write as $stream) { - $streamId = (int) $stream; - $pendingPromise = $this->pendingPromises[$streamId]; - unset($this->writeStreams[$streamId]); - unset($this->pendingPromises[$streamId]); - $pendingPromise->resolve($stream); + foreach ($write as $stream) { + $streamId = (int) $stream; + $pendingPromise = $this->pendingPromises[$streamId]; + unset($this->writeStreams[$streamId]); + unset($this->pendingPromises[$streamId]); + $pendingPromise->resolve($stream); + } } + + yield $eventLoop->idle(); } } diff --git a/tests/Adapter/Amp/EventLoopTest.php b/tests/Adapter/Amp/EventLoopTest.php index 1a3f8b6..8ab2817 100644 --- a/tests/Adapter/Amp/EventLoopTest.php +++ b/tests/Adapter/Amp/EventLoopTest.php @@ -11,4 +11,10 @@ protected function createEventLoop(): EventLoop { return new Amp\EventLoop(); } + + public function testStreamShouldReadFromWritable($expectedSequence = '') + { + // Because Amp resolve promises in a slightly different order. + parent::testStreamShouldReadFromWritable('W0R0W12345R12R34W6R56R'); + } } diff --git a/tests/Adapter/ReactPhp/EventLoopTest.php b/tests/Adapter/ReactPhp/EventLoopTest.php index e2df65d..0c75ce0 100644 --- a/tests/Adapter/ReactPhp/EventLoopTest.php +++ b/tests/Adapter/ReactPhp/EventLoopTest.php @@ -12,10 +12,4 @@ protected function createEventLoop(): EventLoop { return new ReactPhp\EventLoop(new StreamSelectLoop()); } - - public function testStreamShouldReadFromWritable($expectedSequence = '') - { - // Because ReactPhp resolve promise in a slightly different order. - parent::testStreamShouldReadFromWritable('W0R0W12345R12W6R34R56R'); - } } diff --git a/tests/EventLoopTest/StreamsTest.php b/tests/EventLoopTest/StreamsTest.php index 31ec422..b7711a9 100644 --- a/tests/EventLoopTest/StreamsTest.php +++ b/tests/EventLoopTest/StreamsTest.php @@ -31,7 +31,7 @@ private function createStreamPair() return $sockets; } - public function testStreamShouldReadFromWritable($expectedSequence = 'W0R0W12345R12R34W6R56R') + public function testStreamShouldReadFromWritable($expectedSequence = 'W0R0W12345R12W6R34R56R') { $tokens = ['0', '12345', '6']; [$streamIn, $streamOut] = $this->createStreamPair();