diff --git a/src/Adapter/Amp/EventLoop.php b/src/Adapter/Amp/EventLoop.php index 9ba61df..386ee0f 100644 --- a/src/Adapter/Amp/EventLoop.php +++ b/src/Adapter/Amp/EventLoop.php @@ -22,6 +22,8 @@ public function wait(Promise $promise) switch ($error->getMessage()) { case 'Loop stopped without resolving the promise': throw new \Error('Impossible to resolve the promise, no more task to execute.', 0, $error); + case 'Loop exceptionally stopped without resolving the promise': + throw $error->getPrevious() ?? $error; default: throw $error; } @@ -33,31 +35,48 @@ public function wait(Promise $promise) */ public function async(\Generator $generator): Promise { - $wrapper = function (\Generator $generator): \Generator { - while ($generator->valid()) { - $blockingPromise = Internal\PromiseWrapper::fromGenerator($generator)->getAmpPromise(); - - // Forwards promise value/exception to underlying generator - $blockingPromiseValue = null; - $blockingPromiseException = null; - try { - $blockingPromiseValue = yield $blockingPromise; - } catch (\Throwable $throwable) { - $blockingPromiseException = $throwable; - } - if ($blockingPromiseException) { - $generator->throw($blockingPromiseException); - } else { - $generator->send($blockingPromiseValue); + $wrapper = function (\Generator $generator, callable $fnSuccess, callable $fnFailure): \Generator { + try { + while ($generator->valid()) { + $blockingPromise = Internal\PromiseWrapper::fromGenerator($generator)->getAmpPromise(); + + // Forwards promise value/exception to underlying generator + $blockingPromiseValue = null; + $blockingPromiseException = null; + try { + $blockingPromiseValue = yield $blockingPromise; + } catch (\Throwable $throwable) { + $blockingPromiseException = $throwable; + } + if ($blockingPromiseException) { + $generator->throw($blockingPromiseException); + } else { + $generator->send($blockingPromiseValue); + } } + } catch (\Throwable $throwable) { + $fnFailure($throwable); } - return $generator->getReturn(); + $fnSuccess($generator->getReturn()); }; - return new Internal\PromiseWrapper( - new \Amp\Coroutine($wrapper($generator)) - ); + $deferred = new Internal\Deferred(); + new \Amp\Coroutine($wrapper( + $generator, + [$deferred, 'resolve'], + function (\Throwable $throwable) use ($deferred) { + if ($deferred->getPromiseWrapper()->hasBeenYielded()) { + $deferred->reject($throwable); + } else { + \Amp\Loop::defer(function () use ($throwable) { + throw $throwable; + }); + } + } + )); + + return $deferred->getPromise(); } /** diff --git a/src/Adapter/Amp/Internal/Deferred.php b/src/Adapter/Amp/Internal/Deferred.php index 162d1e8..e74f5cc 100644 --- a/src/Adapter/Amp/Internal/Deferred.php +++ b/src/Adapter/Amp/Internal/Deferred.php @@ -34,6 +34,11 @@ public function getPromise(): Promise return $this->promise; } + public function getPromiseWrapper(): PromiseWrapper + { + return $this->promise; + } + /** * {@inheritdoc} */ diff --git a/src/Adapter/Amp/Internal/PromiseWrapper.php b/src/Adapter/Amp/Internal/PromiseWrapper.php index 5d99b2b..1e2d3c9 100644 --- a/src/Adapter/Amp/Internal/PromiseWrapper.php +++ b/src/Adapter/Amp/Internal/PromiseWrapper.php @@ -15,6 +15,8 @@ class PromiseWrapper implements Promise */ private $ampPromise; + private $hasBeenYielded = false; + public function __construct(\Amp\Promise $ampPromise) { $this->ampPromise = $ampPromise; @@ -38,8 +40,15 @@ public static function fromGenerator(\Generator $generator): self if (!$promise instanceof self) { throw new \Error('Asynchronous function is yielding a ['.gettype($promise).'] instead of a Promise.'); } + $promise = self::downcast($promise); + $promise->hasBeenYielded = true; - return self::downcast($promise); + return $promise; + } + + public function hasBeenYielded(): bool + { + return $this->hasBeenYielded; } /** diff --git a/src/Adapter/ReactPhp/EventLoop.php b/src/Adapter/ReactPhp/EventLoop.php index 03bc12e..99de987 100644 --- a/src/Adapter/ReactPhp/EventLoop.php +++ b/src/Adapter/ReactPhp/EventLoop.php @@ -58,37 +58,49 @@ function ($result) use (&$value, &$isRejected, &$promiseSettled) { */ public function async(\Generator $generator): Promise { - $fnWrapGenerator = function (\Generator $generator, Deferred $deferred) use (&$fnWrapGenerator) { + $fnWrapGenerator = function (\Generator $generator, callable $fnSuccess, callable $fnFailure) use (&$fnWrapGenerator) { try { if (!$generator->valid()) { - return $deferred->resolve($generator->getReturn()); + return $fnSuccess($generator->getReturn()); } Internal\PromiseWrapper::fromGenerator($generator) ->getReactPromise()->then( - function ($result) use ($generator, $deferred, $fnWrapGenerator) { + function ($result) use ($generator, $fnSuccess, $fnFailure, $fnWrapGenerator) { try { $generator->send($result); - $fnWrapGenerator($generator, $deferred); + $fnWrapGenerator($generator, $fnSuccess, $fnFailure); } catch (\Throwable $throwable) { - $deferred->reject($throwable); + $fnFailure($throwable); } }, - function ($reason) use ($generator, $deferred, $fnWrapGenerator) { + function ($reason) use ($generator, $fnSuccess, $fnFailure, $fnWrapGenerator) { try { $generator->throw($reason); - $fnWrapGenerator($generator, $deferred); + $fnWrapGenerator($generator, $fnSuccess, $fnFailure); } catch (\Throwable $throwable) { - $deferred->reject($throwable); + $fnFailure($throwable); } } ); } catch (\Throwable $throwable) { - $deferred->reject($throwable); + $fnFailure($throwable); } }; - $deferred = $this->deferred(); - $fnWrapGenerator($generator, $deferred); + $deferred = new Internal\Deferred(); + $fnWrapGenerator( + $generator, + [$deferred, 'resolve'], + function (\Throwable $throwable) use ($deferred) { + if ($deferred->getPromiseWrapper()->hasBeenYielded()) { + $deferred->reject($throwable); + } else { + $this->reactEventLoop->futureTick(function () use ($throwable) { + throw $throwable; + }); + } + } + ); return $deferred->getPromise(); } diff --git a/src/Adapter/ReactPhp/Internal/Deferred.php b/src/Adapter/ReactPhp/Internal/Deferred.php index d55f329..1e98cfb 100644 --- a/src/Adapter/ReactPhp/Internal/Deferred.php +++ b/src/Adapter/ReactPhp/Internal/Deferred.php @@ -34,6 +34,11 @@ public function getPromise(): Promise return $this->promise; } + public function getPromiseWrapper(): PromiseWrapper + { + return $this->promise; + } + /** * {@inheritdoc} */ diff --git a/src/Adapter/ReactPhp/Internal/PromiseWrapper.php b/src/Adapter/ReactPhp/Internal/PromiseWrapper.php index ec3d6ef..edc382c 100644 --- a/src/Adapter/ReactPhp/Internal/PromiseWrapper.php +++ b/src/Adapter/ReactPhp/Internal/PromiseWrapper.php @@ -15,6 +15,8 @@ class PromiseWrapper implements Promise */ private $reactPromise; + private $hasBeenYielded = false; + public function __construct(\React\Promise\PromiseInterface $reactPromise) { $this->reactPromise = $reactPromise; @@ -39,7 +41,15 @@ public static function fromGenerator(\Generator $generator): self throw new \Error('Asynchronous function is yielding a ['.gettype($promise).'] instead of a Promise.'); } - return self::downcast($promise); + $promise = self::downcast($promise); + $promise->hasBeenYielded = true; + + return $promise; + } + + public function hasBeenYielded(): bool + { + return $this->hasBeenYielded; } /** diff --git a/src/Adapter/Tornado/EventLoop.php b/src/Adapter/Tornado/EventLoop.php index 205bbb2..6ab005b 100644 --- a/src/Adapter/Tornado/EventLoop.php +++ b/src/Adapter/Tornado/EventLoop.php @@ -11,6 +11,10 @@ class EventLoop implements \M6Web\Tornado\EventLoop * @var Internal\StreamEventLoop */ private $streamLoop; + + /** + * @var Internal\Task[] + */ private $tasks = []; public function __construct() @@ -41,40 +45,55 @@ function (\Throwable $throwable) use (&$finalAction, &$promiseIsPending) { return count($this->tasks) !== 0; }; + $fnThrowIfNotNull = function (?\Throwable $throwable) { + if ($throwable !== null) { + throw $throwable; + } + }; + + $globalException = null; + // Returns a callback to propagate a value to a generator via $function + $fnSafeGeneratorCallback = function (Internal\Task $task, string $function) use (&$globalException) { + return function ($value) use ($task, $function, &$globalException) { + try { + $task->getGenerator()->$function($value); + $this->tasks[] = $task; + } catch (\Throwable $exception) { + if ($task->getPromise()->hasBeenYielded()) { + $task->getPromise()->reject($exception); + } else { + $globalException = $exception; + } + } + }; + }; + do { // Copy tasks list to safely allow tasks addition by tasks themselves $allTasks = $this->tasks; $this->tasks = []; foreach ($allTasks as $task) { try { - if (!$task->generator->valid()) { - $task->promise->resolve($task->generator->getReturn()); + if (!$task->getGenerator()->valid()) { + $task->getPromise()->resolve($task->getGenerator()->getReturn()); // This task is finished continue; } - $blockingPromise = Internal\PendingPromise::fromGenerator($task->generator); + $blockingPromise = Internal\PendingPromise::fromGenerator($task->getGenerator()); $blockingPromise->addCallbacks( - function ($value) use ($task) { - try { - $task->generator->send($value); - $this->tasks[] = $task; - } catch (\Throwable $exception) { - $task->promise->reject($exception); - } - }, - function (\Throwable $throwable) use ($task) { - try { - $task->generator->throw($throwable); - $this->tasks[] = $task; - } catch (\Throwable $exception) { - $task->promise->reject($exception); - } - } + $fnSafeGeneratorCallback($task, 'send'), + $fnSafeGeneratorCallback($task, 'throw') ); } catch (\Throwable $exception) { - $task->promise->reject($exception); + if ($task->getPromise()->hasBeenYielded()) { + $task->getPromise()->reject($exception); + } else { + throw $exception; + } } + + $fnThrowIfNotNull($globalException); } } while ($promiseIsPending && $somethingToDo()); @@ -86,15 +105,9 @@ function (\Throwable $throwable) use ($task) { */ public function async(\Generator $generator): Promise { - $task = new class() { - public $generator; - public $promise; - }; - $task->generator = $generator; - $task->promise = new Internal\PendingPromise(); - $this->tasks[] = $task; + $this->tasks[] = ($task = new Internal\Task($generator)); - return $task->promise; + return $task->getPromise(); } /** diff --git a/src/Adapter/Tornado/Internal/PendingPromise.php b/src/Adapter/Tornado/Internal/PendingPromise.php index 3455aab..da6b6d1 100644 --- a/src/Adapter/Tornado/Internal/PendingPromise.php +++ b/src/Adapter/Tornado/Internal/PendingPromise.php @@ -14,6 +14,7 @@ class PendingPromise implements Promise private $throwable; private $callbacks = []; private $isSettled = false; + private $hasBeenYielded = false; public static function downcast(Promise $promise): self { @@ -29,7 +30,15 @@ public static function fromGenerator(\Generator $generator): self throw new \Error('Asynchronous function is yielding a ['.gettype($promise).'] instead of a Promise.'); } - return self::downcast($promise); + $promise = self::downcast($promise); + $promise->hasBeenYielded = true; + + return $promise; + } + + public function hasBeenYielded(): bool + { + return $this->hasBeenYielded; } public function resolve($value): self diff --git a/src/Adapter/Tornado/Internal/StreamEventLoop.php b/src/Adapter/Tornado/Internal/StreamEventLoop.php index c2a3d78..8015726 100644 --- a/src/Adapter/Tornado/Internal/StreamEventLoop.php +++ b/src/Adapter/Tornado/Internal/StreamEventLoop.php @@ -40,23 +40,27 @@ private function internalLoop(EventLoop $eventLoop): \Generator $read = $this->readStreams; $write = $this->writeStreams; - stream_select($read, $write, $except, 0); + $nbStreams = @\stream_select($read, $write, $except, 0); - foreach ($read as $stream) { - $streamId = (int) $stream; - $pendingPromise = $this->pendingPromises[$streamId]; - unset($this->readStreams[$streamId]); - unset($this->pendingPromises[$streamId]); - $pendingPromise->resolve($stream); - } + if ($nbStreams !== false) { + foreach ($read as $stream) { + $streamId = (int) $stream; + $pendingPromise = $this->pendingPromises[$streamId]; + unset($this->readStreams[$streamId]); + unset($this->pendingPromises[$streamId]); + $pendingPromise->resolve($stream); + } - foreach ($write as $stream) { - $streamId = (int) $stream; - $pendingPromise = $this->pendingPromises[$streamId]; - unset($this->writeStreams[$streamId]); - unset($this->pendingPromises[$streamId]); - $pendingPromise->resolve($stream); + foreach ($write as $stream) { + $streamId = (int) $stream; + $pendingPromise = $this->pendingPromises[$streamId]; + unset($this->writeStreams[$streamId]); + unset($this->pendingPromises[$streamId]); + $pendingPromise->resolve($stream); + } } + + yield $eventLoop->idle(); } } diff --git a/src/Adapter/Tornado/Internal/Task.php b/src/Adapter/Tornado/Internal/Task.php new file mode 100644 index 0000000..8dce504 --- /dev/null +++ b/src/Adapter/Tornado/Internal/Task.php @@ -0,0 +1,29 @@ +generator = $generator; + $this->promise = new PendingPromise(); + } + + public function getPromise(): PendingPromise + { + return $this->promise; + } + + public function getGenerator(): \Generator + { + return $this->generator; + } +} diff --git a/src/Adapter/Tornado/SynchronousEventLoop.php b/src/Adapter/Tornado/SynchronousEventLoop.php index f0d0581..1861464 100644 --- a/src/Adapter/Tornado/SynchronousEventLoop.php +++ b/src/Adapter/Tornado/SynchronousEventLoop.php @@ -7,22 +7,33 @@ class SynchronousEventLoop implements \M6Web\Tornado\EventLoop { + /** + * @var \Throwable[] + */ + private $asyncThrowables = []; + /** * {@inheritdoc} */ public function wait(Promise $promise) { - // Is it a fulfilled promise - if (property_exists($promise, 'value')) { - return $promise->value; + // If there are some uncaught exceptions, throw the first one. + if ($throwable = reset($this->asyncThrowables)) { + throw $throwable; } - // Is it a rejected promise? - if (property_exists($promise, 'exception')) { - throw $promise->exception; - } + $promise = Internal\PendingPromise::downcast($promise); + $result = null; + $promise->addCallbacks( + function ($value) use (&$result) { + $result = $value; + }, + function (\Throwable $throwable) { + throw $throwable; + } + ); - throw new \LogicException('Cannot wait a promise not created from the same EventLoop'); + return $result; } /** @@ -32,29 +43,26 @@ public function async(\Generator $generator): Promise { try { while ($generator->valid()) { - $blockingPromise = $generator->current(); - - if (!$blockingPromise instanceof Promise) { - throw new \Error('Asynchronous function is yielding a ['.gettype($blockingPromise).'] instead of a Promise.'); - } - - // Resolves blocking promise and forwards result to the generator - $blockingPromiseValue = null; - $blockingPromiseException = null; - try { - $blockingPromiseValue = $this->wait($blockingPromise); - } catch (\Throwable $exception) { - $blockingPromiseException = $exception; - } - if ($blockingPromiseException) { - $generator->throw($blockingPromiseException); - } else { - $generator->send($blockingPromiseValue); - } + Internal\PendingPromise::fromGenerator($generator)->addCallbacks( + function ($value) use ($generator) { + $generator->send($value); + }, + function (\Throwable $throwable) use ($generator) { + // Since this exception is caught, remove it from the list + $index = array_search($throwable, $this->asyncThrowables, true); + if ($index !== false) { + unset($this->asyncThrowables[$index]); + } + $generator->throw($throwable); + } + ); } return $this->promiseFulfilled($generator->getReturn()); } catch (\Throwable $exception) { + // Will have to check that this exception is caught later… + $this->asyncThrowables[] = $exception; + return $this->promiseRejected($exception); } } @@ -101,12 +109,7 @@ public function promiseRace(Promise ...$promises): Promise */ public function promiseFulfilled($value): Promise { - $promise = new class() implements Promise { - public $value; - }; - $promise->value = $value; - - return $promise; + return (new Internal\PendingPromise())->resolve($value); } /** @@ -114,12 +117,7 @@ public function promiseFulfilled($value): Promise */ public function promiseRejected(\Throwable $throwable): Promise { - $promise = new class() implements Promise { - public $exception; - }; - $promise->exception = $throwable; - - return $promise; + return (new Internal\PendingPromise())->reject($throwable); } /** diff --git a/tests/Adapter/Amp/EventLoopTest.php b/tests/Adapter/Amp/EventLoopTest.php index 1a3f8b6..8ab2817 100644 --- a/tests/Adapter/Amp/EventLoopTest.php +++ b/tests/Adapter/Amp/EventLoopTest.php @@ -11,4 +11,10 @@ protected function createEventLoop(): EventLoop { return new Amp\EventLoop(); } + + public function testStreamShouldReadFromWritable($expectedSequence = '') + { + // Because Amp resolve promises in a slightly different order. + parent::testStreamShouldReadFromWritable('W0R0W12345R12R34W6R56R'); + } } diff --git a/tests/Adapter/ReactPhp/EventLoopTest.php b/tests/Adapter/ReactPhp/EventLoopTest.php index e2df65d..0c75ce0 100644 --- a/tests/Adapter/ReactPhp/EventLoopTest.php +++ b/tests/Adapter/ReactPhp/EventLoopTest.php @@ -12,10 +12,4 @@ protected function createEventLoop(): EventLoop { return new ReactPhp\EventLoop(new StreamSelectLoop()); } - - public function testStreamShouldReadFromWritable($expectedSequence = '') - { - // Because ReactPhp resolve promise in a slightly different order. - parent::testStreamShouldReadFromWritable('W0R0W12345R12W6R34R56R'); - } } diff --git a/tests/EventLoopTest/AsyncTest.php b/tests/EventLoopTest/AsyncTest.php index ccf69da..df30955 100644 --- a/tests/EventLoopTest/AsyncTest.php +++ b/tests/EventLoopTest/AsyncTest.php @@ -120,6 +120,34 @@ public function testSubGenerators() $this->assertSame([1, [2, 3], 4], $eventLoop->wait($promise)); } + public function testSubGeneratorThrowing() + { + $eventLoop = $this->createEventLoop(); + $throwingGenerator = function (\Throwable $throwable) use ($eventLoop): \Generator { + yield $eventLoop->idle(); + throw $throwable; + }; + $tryCatchGenerator = function (Promise $promise) use ($eventLoop): \Generator { + try { + yield $promise; + + return 'Not an error message'; + } catch (\Throwable $throwable) { + yield $eventLoop->idle(); + + return $throwable->getMessage(); + } + }; + + $promise = $eventLoop->async($tryCatchGenerator( + $eventLoop->async($throwingGenerator( + new \Exception('Error Message') + )) + )); + + $this->assertSame('Error Message', $eventLoop->wait($promise)); + } + public function testYieldingForTheSameFulfilledPromise() { $eventLoop = $this->createEventLoop(); @@ -183,4 +211,43 @@ public function testEventLoopFirstTick() $this->assertSame(1, $count); } + + public function testEventLoopShouldThrowInCaseOfUncaughtExceptionInBackgroundGenerator() + { + $eventLoop = $this->createEventLoop(); + + $failingGenerator = function () use ($eventLoop) { + yield $eventLoop->idle(); + throw new \Exception('This is a failure'); + }; + + $waitingGenerator = function () use ($eventLoop) { + yield $eventLoop->idle(); + yield $eventLoop->idle(); + yield $eventLoop->idle(); + }; + + $ignoredBackgroundPromise = $eventLoop->async($failingGenerator()); + $promiseSuccess = $eventLoop->async($waitingGenerator()); + + $this->expectException(\Exception::class); + $this->expectExceptionMessage('This is a failure'); + $eventLoop->wait($promiseSuccess); + } + + public function testEventLoopShouldNotThrowInCaseOfExplicitlyRejectedPromise() + { + $eventLoop = $this->createEventLoop(); + + $generatorWaitALittle = function () use ($eventLoop) { + yield $eventLoop->idle(); + yield $eventLoop->idle(); + }; + + $unwatchedRejectedPromise = $eventLoop->promiseRejected(new \Exception('Rejected Promise')); + $unwatchedDeferred = $eventLoop->deferred(); + $unwatchedDeferred->reject(new \Exception('Rejected Deferred')); + + $this->assertSame(null, $eventLoop->wait($eventLoop->async($generatorWaitALittle()))); + } } diff --git a/tests/EventLoopTest/StreamsTest.php b/tests/EventLoopTest/StreamsTest.php index 31ec422..b7711a9 100644 --- a/tests/EventLoopTest/StreamsTest.php +++ b/tests/EventLoopTest/StreamsTest.php @@ -31,7 +31,7 @@ private function createStreamPair() return $sockets; } - public function testStreamShouldReadFromWritable($expectedSequence = 'W0R0W12345R12R34W6R56R') + public function testStreamShouldReadFromWritable($expectedSequence = 'W0R0W12345R12W6R34R56R') { $tokens = ['0', '12345', '6']; [$streamIn, $streamOut] = $this->createStreamPair();