Skip to content

Commit

Permalink
Fixed unwatched generators issue with Tornado adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
Benoit Viguier committed Oct 24, 2018
1 parent 4d1d819 commit 9a3c81a
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 39 deletions.
49 changes: 32 additions & 17 deletions src/Adapter/Tornado/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,29 @@ function (\Throwable $throwable) use (&$finalAction, &$promiseIsPending) {
return count($this->tasks) !== 0;
};

$fnThrowIfNotNull = function (?\Throwable $throwable) {
if ($throwable !== null) {
throw $throwable;
}
};

$globalException = null;
// Returns a callback to propagate a value to a generator via $function
$fnSafeGeneratorCallback = function (Internal\Task $task, string $function) use (&$globalException) {
return function ($value) use ($task, $function, &$globalException) {
try {
$task->getGenerator()->$function($value);
$this->tasks[] = $task;
} catch (\Throwable $exception) {
if ($task->getPromise()->hasBeenYielded()) {
$task->getPromise()->reject($exception);
} else {
$globalException = $exception;
}
}
};
};

do {
// Copy tasks list to safely allow tasks addition by tasks themselves
$allTasks = $this->tasks;
Expand All @@ -59,26 +82,18 @@ function (\Throwable $throwable) use (&$finalAction, &$promiseIsPending) {

$blockingPromise = Internal\PendingPromise::fromGenerator($task->getGenerator());
$blockingPromise->addCallbacks(
function ($value) use ($task) {
try {
$task->getGenerator()->send($value);
$this->tasks[] = $task;
} catch (\Throwable $exception) {
$task->getPromise()->reject($exception);
}
},
function (\Throwable $throwable) use ($task) {
try {
$task->getGenerator()->throw($throwable);
$this->tasks[] = $task;
} catch (\Throwable $exception) {
$task->getPromise()->reject($exception);
}
}
$fnSafeGeneratorCallback($task, 'send'),
$fnSafeGeneratorCallback($task, 'throw')
);
} catch (\Throwable $exception) {
$task->getPromise()->reject($exception);
if ($task->getPromise()->hasBeenYielded()) {
$task->getPromise()->reject($exception);
} else {
throw $exception;
}
}

$fnThrowIfNotNull($globalException);
}
} while ($promiseIsPending && $somethingToDo());

Expand Down
11 changes: 10 additions & 1 deletion src/Adapter/Tornado/Internal/PendingPromise.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class PendingPromise implements Promise
private $throwable;
private $callbacks = [];
private $isSettled = false;
private $hasBeenYielded = false;

public static function downcast(Promise $promise): self
{
Expand All @@ -29,7 +30,15 @@ public static function fromGenerator(\Generator $generator): self
throw new \Error('Asynchronous function is yielding a ['.gettype($promise).'] instead of a Promise.');
}

return self::downcast($promise);
$promise = self::downcast($promise);
$promise->hasBeenYielded = true;

return $promise;
}

public function hasBeenYielded(): bool
{
return $this->hasBeenYielded;
}

public function resolve($value): self
Expand Down
32 changes: 18 additions & 14 deletions src/Adapter/Tornado/Internal/StreamEventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,27 @@ private function internalLoop(EventLoop $eventLoop): \Generator

$read = $this->readStreams;
$write = $this->writeStreams;
stream_select($read, $write, $except, 0);
$nbStreams = @\stream_select($read, $write, $except, 0);

foreach ($read as $stream) {
$streamId = (int) $stream;
$pendingPromise = $this->pendingPromises[$streamId];
unset($this->readStreams[$streamId]);
unset($this->pendingPromises[$streamId]);
$pendingPromise->resolve($stream);
}
if ($nbStreams !== false) {
foreach ($read as $stream) {
$streamId = (int) $stream;
$pendingPromise = $this->pendingPromises[$streamId];
unset($this->readStreams[$streamId]);
unset($this->pendingPromises[$streamId]);
$pendingPromise->resolve($stream);
}

foreach ($write as $stream) {
$streamId = (int) $stream;
$pendingPromise = $this->pendingPromises[$streamId];
unset($this->writeStreams[$streamId]);
unset($this->pendingPromises[$streamId]);
$pendingPromise->resolve($stream);
foreach ($write as $stream) {
$streamId = (int) $stream;
$pendingPromise = $this->pendingPromises[$streamId];
unset($this->writeStreams[$streamId]);
unset($this->pendingPromises[$streamId]);
$pendingPromise->resolve($stream);
}
}

yield $eventLoop->idle();
}
}

Expand Down
6 changes: 6 additions & 0 deletions tests/Adapter/Amp/EventLoopTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,10 @@ protected function createEventLoop(): EventLoop
{
return new Amp\EventLoop();
}

public function testStreamShouldReadFromWritable($expectedSequence = '')
{
// Because Amp resolve promises in a slightly different order.
parent::testStreamShouldReadFromWritable('W0R0W12345R12R34W6R56R');
}
}
6 changes: 0 additions & 6 deletions tests/Adapter/ReactPhp/EventLoopTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,4 @@ protected function createEventLoop(): EventLoop
{
return new ReactPhp\EventLoop(new StreamSelectLoop());
}

public function testStreamShouldReadFromWritable($expectedSequence = '')
{
// Because ReactPhp resolve promise in a slightly different order.
parent::testStreamShouldReadFromWritable('W0R0W12345R12W6R34R56R');
}
}
2 changes: 1 addition & 1 deletion tests/EventLoopTest/StreamsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private function createStreamPair()
return $sockets;
}

public function testStreamShouldReadFromWritable($expectedSequence = 'W0R0W12345R12R34W6R56R')
public function testStreamShouldReadFromWritable($expectedSequence = 'W0R0W12345R12W6R34R56R')
{
$tokens = ['0', '12345', '6'];
[$streamIn, $streamOut] = $this->createStreamPair();
Expand Down

0 comments on commit 9a3c81a

Please sign in to comment.