Skip to content

Commit

Permalink
Merge pull request #25 from M6Web/feature/throw-from-unwatched-promise
Browse files Browse the repository at this point in the history
Throwing exceptions from background asynchronous functions
  • Loading branch information
b-viguier authored Oct 25, 2018
2 parents ce918e2 + 2cbff81 commit 0f06d20
Show file tree
Hide file tree
Showing 15 changed files with 302 additions and 122 deletions.
59 changes: 39 additions & 20 deletions src/Adapter/Amp/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public function wait(Promise $promise)
switch ($error->getMessage()) {
case 'Loop stopped without resolving the promise':
throw new \Error('Impossible to resolve the promise, no more task to execute.', 0, $error);
case 'Loop exceptionally stopped without resolving the promise':
throw $error->getPrevious() ?? $error;
default:
throw $error;
}
Expand All @@ -33,31 +35,48 @@ public function wait(Promise $promise)
*/
public function async(\Generator $generator): Promise
{
$wrapper = function (\Generator $generator): \Generator {
while ($generator->valid()) {
$blockingPromise = Internal\PromiseWrapper::fromGenerator($generator)->getAmpPromise();

// Forwards promise value/exception to underlying generator
$blockingPromiseValue = null;
$blockingPromiseException = null;
try {
$blockingPromiseValue = yield $blockingPromise;
} catch (\Throwable $throwable) {
$blockingPromiseException = $throwable;
}
if ($blockingPromiseException) {
$generator->throw($blockingPromiseException);
} else {
$generator->send($blockingPromiseValue);
$wrapper = function (\Generator $generator, callable $fnSuccess, callable $fnFailure): \Generator {
try {
while ($generator->valid()) {
$blockingPromise = Internal\PromiseWrapper::fromGenerator($generator)->getAmpPromise();

// Forwards promise value/exception to underlying generator
$blockingPromiseValue = null;
$blockingPromiseException = null;
try {
$blockingPromiseValue = yield $blockingPromise;
} catch (\Throwable $throwable) {
$blockingPromiseException = $throwable;
}
if ($blockingPromiseException) {
$generator->throw($blockingPromiseException);
} else {
$generator->send($blockingPromiseValue);
}
}
} catch (\Throwable $throwable) {
$fnFailure($throwable);
}

return $generator->getReturn();
$fnSuccess($generator->getReturn());
};

return new Internal\PromiseWrapper(
new \Amp\Coroutine($wrapper($generator))
);
$deferred = new Internal\Deferred();
new \Amp\Coroutine($wrapper(
$generator,
[$deferred, 'resolve'],
function (\Throwable $throwable) use ($deferred) {
if ($deferred->getPromiseWrapper()->hasBeenYielded()) {
$deferred->reject($throwable);
} else {
\Amp\Loop::defer(function () use ($throwable) {
throw $throwable;
});
}
}
));

return $deferred->getPromise();
}

/**
Expand Down
5 changes: 5 additions & 0 deletions src/Adapter/Amp/Internal/Deferred.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public function getPromise(): Promise
return $this->promise;
}

public function getPromiseWrapper(): PromiseWrapper
{
return $this->promise;
}

/**
* {@inheritdoc}
*/
Expand Down
11 changes: 10 additions & 1 deletion src/Adapter/Amp/Internal/PromiseWrapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class PromiseWrapper implements Promise
*/
private $ampPromise;

private $hasBeenYielded = false;

public function __construct(\Amp\Promise $ampPromise)
{
$this->ampPromise = $ampPromise;
Expand All @@ -38,8 +40,15 @@ public static function fromGenerator(\Generator $generator): self
if (!$promise instanceof self) {
throw new \Error('Asynchronous function is yielding a ['.gettype($promise).'] instead of a Promise.');
}
$promise = self::downcast($promise);
$promise->hasBeenYielded = true;

return self::downcast($promise);
return $promise;
}

public function hasBeenYielded(): bool
{
return $this->hasBeenYielded;
}

/**
Expand Down
34 changes: 23 additions & 11 deletions src/Adapter/ReactPhp/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,37 +58,49 @@ function ($result) use (&$value, &$isRejected, &$promiseSettled) {
*/
public function async(\Generator $generator): Promise
{
$fnWrapGenerator = function (\Generator $generator, Deferred $deferred) use (&$fnWrapGenerator) {
$fnWrapGenerator = function (\Generator $generator, callable $fnSuccess, callable $fnFailure) use (&$fnWrapGenerator) {
try {
if (!$generator->valid()) {
return $deferred->resolve($generator->getReturn());
return $fnSuccess($generator->getReturn());
}
Internal\PromiseWrapper::fromGenerator($generator)
->getReactPromise()->then(
function ($result) use ($generator, $deferred, $fnWrapGenerator) {
function ($result) use ($generator, $fnSuccess, $fnFailure, $fnWrapGenerator) {
try {
$generator->send($result);
$fnWrapGenerator($generator, $deferred);
$fnWrapGenerator($generator, $fnSuccess, $fnFailure);
} catch (\Throwable $throwable) {
$deferred->reject($throwable);
$fnFailure($throwable);
}
},
function ($reason) use ($generator, $deferred, $fnWrapGenerator) {
function ($reason) use ($generator, $fnSuccess, $fnFailure, $fnWrapGenerator) {
try {
$generator->throw($reason);
$fnWrapGenerator($generator, $deferred);
$fnWrapGenerator($generator, $fnSuccess, $fnFailure);
} catch (\Throwable $throwable) {
$deferred->reject($throwable);
$fnFailure($throwable);
}
}
);
} catch (\Throwable $throwable) {
$deferred->reject($throwable);
$fnFailure($throwable);
}
};

$deferred = $this->deferred();
$fnWrapGenerator($generator, $deferred);
$deferred = new Internal\Deferred();
$fnWrapGenerator(
$generator,
[$deferred, 'resolve'],
function (\Throwable $throwable) use ($deferred) {
if ($deferred->getPromiseWrapper()->hasBeenYielded()) {
$deferred->reject($throwable);
} else {
$this->reactEventLoop->futureTick(function () use ($throwable) {
throw $throwable;
});
}
}
);

return $deferred->getPromise();
}
Expand Down
5 changes: 5 additions & 0 deletions src/Adapter/ReactPhp/Internal/Deferred.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public function getPromise(): Promise
return $this->promise;
}

public function getPromiseWrapper(): PromiseWrapper
{
return $this->promise;
}

/**
* {@inheritdoc}
*/
Expand Down
12 changes: 11 additions & 1 deletion src/Adapter/ReactPhp/Internal/PromiseWrapper.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ class PromiseWrapper implements Promise
*/
private $reactPromise;

private $hasBeenYielded = false;

public function __construct(\React\Promise\PromiseInterface $reactPromise)
{
$this->reactPromise = $reactPromise;
Expand All @@ -39,7 +41,15 @@ public static function fromGenerator(\Generator $generator): self
throw new \Error('Asynchronous function is yielding a ['.gettype($promise).'] instead of a Promise.');
}

return self::downcast($promise);
$promise = self::downcast($promise);
$promise->hasBeenYielded = true;

return $promise;
}

public function hasBeenYielded(): bool
{
return $this->hasBeenYielded;
}

/**
Expand Down
69 changes: 41 additions & 28 deletions src/Adapter/Tornado/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ class EventLoop implements \M6Web\Tornado\EventLoop
* @var Internal\StreamEventLoop
*/
private $streamLoop;

/**
* @var Internal\Task[]
*/
private $tasks = [];

public function __construct()
Expand Down Expand Up @@ -41,40 +45,55 @@ function (\Throwable $throwable) use (&$finalAction, &$promiseIsPending) {
return count($this->tasks) !== 0;
};

$fnThrowIfNotNull = function (?\Throwable $throwable) {
if ($throwable !== null) {
throw $throwable;
}
};

$globalException = null;
// Returns a callback to propagate a value to a generator via $function
$fnSafeGeneratorCallback = function (Internal\Task $task, string $function) use (&$globalException) {
return function ($value) use ($task, $function, &$globalException) {
try {
$task->getGenerator()->$function($value);
$this->tasks[] = $task;
} catch (\Throwable $exception) {
if ($task->getPromise()->hasBeenYielded()) {
$task->getPromise()->reject($exception);
} else {
$globalException = $exception;
}
}
};
};

do {
// Copy tasks list to safely allow tasks addition by tasks themselves
$allTasks = $this->tasks;
$this->tasks = [];
foreach ($allTasks as $task) {
try {
if (!$task->generator->valid()) {
$task->promise->resolve($task->generator->getReturn());
if (!$task->getGenerator()->valid()) {
$task->getPromise()->resolve($task->getGenerator()->getReturn());
// This task is finished
continue;
}

$blockingPromise = Internal\PendingPromise::fromGenerator($task->generator);
$blockingPromise = Internal\PendingPromise::fromGenerator($task->getGenerator());
$blockingPromise->addCallbacks(
function ($value) use ($task) {
try {
$task->generator->send($value);
$this->tasks[] = $task;
} catch (\Throwable $exception) {
$task->promise->reject($exception);
}
},
function (\Throwable $throwable) use ($task) {
try {
$task->generator->throw($throwable);
$this->tasks[] = $task;
} catch (\Throwable $exception) {
$task->promise->reject($exception);
}
}
$fnSafeGeneratorCallback($task, 'send'),
$fnSafeGeneratorCallback($task, 'throw')
);
} catch (\Throwable $exception) {
$task->promise->reject($exception);
if ($task->getPromise()->hasBeenYielded()) {
$task->getPromise()->reject($exception);
} else {
throw $exception;
}
}

$fnThrowIfNotNull($globalException);
}
} while ($promiseIsPending && $somethingToDo());

Expand All @@ -86,15 +105,9 @@ function (\Throwable $throwable) use ($task) {
*/
public function async(\Generator $generator): Promise
{
$task = new class() {
public $generator;
public $promise;
};
$task->generator = $generator;
$task->promise = new Internal\PendingPromise();
$this->tasks[] = $task;
$this->tasks[] = ($task = new Internal\Task($generator));

return $task->promise;
return $task->getPromise();
}

/**
Expand Down
11 changes: 10 additions & 1 deletion src/Adapter/Tornado/Internal/PendingPromise.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class PendingPromise implements Promise
private $throwable;
private $callbacks = [];
private $isSettled = false;
private $hasBeenYielded = false;

public static function downcast(Promise $promise): self
{
Expand All @@ -29,7 +30,15 @@ public static function fromGenerator(\Generator $generator): self
throw new \Error('Asynchronous function is yielding a ['.gettype($promise).'] instead of a Promise.');
}

return self::downcast($promise);
$promise = self::downcast($promise);
$promise->hasBeenYielded = true;

return $promise;
}

public function hasBeenYielded(): bool
{
return $this->hasBeenYielded;
}

public function resolve($value): self
Expand Down
32 changes: 18 additions & 14 deletions src/Adapter/Tornado/Internal/StreamEventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,27 @@ private function internalLoop(EventLoop $eventLoop): \Generator

$read = $this->readStreams;
$write = $this->writeStreams;
stream_select($read, $write, $except, 0);
$nbStreams = @\stream_select($read, $write, $except, 0);

foreach ($read as $stream) {
$streamId = (int) $stream;
$pendingPromise = $this->pendingPromises[$streamId];
unset($this->readStreams[$streamId]);
unset($this->pendingPromises[$streamId]);
$pendingPromise->resolve($stream);
}
if ($nbStreams !== false) {
foreach ($read as $stream) {
$streamId = (int) $stream;
$pendingPromise = $this->pendingPromises[$streamId];
unset($this->readStreams[$streamId]);
unset($this->pendingPromises[$streamId]);
$pendingPromise->resolve($stream);
}

foreach ($write as $stream) {
$streamId = (int) $stream;
$pendingPromise = $this->pendingPromises[$streamId];
unset($this->writeStreams[$streamId]);
unset($this->pendingPromises[$streamId]);
$pendingPromise->resolve($stream);
foreach ($write as $stream) {
$streamId = (int) $stream;
$pendingPromise = $this->pendingPromises[$streamId];
unset($this->writeStreams[$streamId]);
unset($this->pendingPromises[$streamId]);
$pendingPromise->resolve($stream);
}
}

yield $eventLoop->idle();
}
}

Expand Down
Loading

0 comments on commit 0f06d20

Please sign in to comment.