Skip to content

Commit

Permalink
Fixed unwatched generators issue with Synchronous Tornado adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
Benoit Viguier committed Oct 24, 2018
1 parent 9a3c81a commit 2cbff81
Showing 1 changed file with 37 additions and 39 deletions.
76 changes: 37 additions & 39 deletions src/Adapter/Tornado/SynchronousEventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,33 @@

class SynchronousEventLoop implements \M6Web\Tornado\EventLoop
{
/**
* @var \Throwable[]
*/
private $asyncThrowables = [];

/**
* {@inheritdoc}
*/
public function wait(Promise $promise)
{
// Is it a fulfilled promise
if (property_exists($promise, 'value')) {
return $promise->value;
// If there are some uncaught exceptions, throw the first one.
if ($throwable = reset($this->asyncThrowables)) {
throw $throwable;
}

// Is it a rejected promise?
if (property_exists($promise, 'exception')) {
throw $promise->exception;
}
$promise = Internal\PendingPromise::downcast($promise);
$result = null;
$promise->addCallbacks(
function ($value) use (&$result) {
$result = $value;
},
function (\Throwable $throwable) {
throw $throwable;
}
);

throw new \LogicException('Cannot wait a promise not created from the same EventLoop');
return $result;
}

/**
Expand All @@ -32,29 +43,26 @@ public function async(\Generator $generator): Promise
{
try {
while ($generator->valid()) {
$blockingPromise = $generator->current();

if (!$blockingPromise instanceof Promise) {
throw new \Error('Asynchronous function is yielding a ['.gettype($blockingPromise).'] instead of a Promise.');
}

// Resolves blocking promise and forwards result to the generator
$blockingPromiseValue = null;
$blockingPromiseException = null;
try {
$blockingPromiseValue = $this->wait($blockingPromise);
} catch (\Throwable $exception) {
$blockingPromiseException = $exception;
}
if ($blockingPromiseException) {
$generator->throw($blockingPromiseException);
} else {
$generator->send($blockingPromiseValue);
}
Internal\PendingPromise::fromGenerator($generator)->addCallbacks(
function ($value) use ($generator) {
$generator->send($value);
},
function (\Throwable $throwable) use ($generator) {
// Since this exception is caught, remove it from the list
$index = array_search($throwable, $this->asyncThrowables, true);
if ($index !== false) {
unset($this->asyncThrowables[$index]);
}
$generator->throw($throwable);
}
);
}

return $this->promiseFulfilled($generator->getReturn());
} catch (\Throwable $exception) {
// Will have to check that this exception is caught later…
$this->asyncThrowables[] = $exception;

return $this->promiseRejected($exception);
}
}
Expand Down Expand Up @@ -101,25 +109,15 @@ public function promiseRace(Promise ...$promises): Promise
*/
public function promiseFulfilled($value): Promise
{
$promise = new class() implements Promise {
public $value;
};
$promise->value = $value;

return $promise;
return (new Internal\PendingPromise())->resolve($value);
}

/**
* {@inheritdoc}
*/
public function promiseRejected(\Throwable $throwable): Promise
{
$promise = new class() implements Promise {
public $exception;
};
$promise->exception = $throwable;

return $promise;
return (new Internal\PendingPromise())->reject($throwable);
}

/**
Expand Down

0 comments on commit 2cbff81

Please sign in to comment.