Skip to content

Commit

Permalink
🔨 yield event loop test
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Apr 24, 2021
1 parent d7cee78 commit 0dead13
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 80 deletions.
42 changes: 21 additions & 21 deletions src/Adapter/Swoole/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class EventLoop implements \M6Web\Tornado\EventLoop
{
private $cids;
private $oldCids;
private $pendingThrowPromises;
private $unhandledFailingPromises;

public function __construct()
{
Expand All @@ -28,7 +28,7 @@ public function __construct()

$this->cids = [];
$this->oldCids = [];
$this->pendingThrowPromises = [];
$this->unhandledFailingPromises = [];
}

public function __destruct()
Expand Down Expand Up @@ -82,7 +82,7 @@ private function pushCoroutine(): void

private function createPromise(): DummyPromise
{
return new DummyPromise(function (DummyPromise $promise) {
return new DummyPromise(function () {
$this->shiftCoroutine();
});
}
Expand All @@ -91,9 +91,9 @@ private function getValue($value)
{
if ($value instanceof DummyPromise && !$this->isPending($value)) {
if ($value->getException() !== null) {
foreach ($this->pendingThrowPromises as $index => $promise) {
foreach ($this->unhandledFailingPromises as $index => $promise) {
if ($promise === $value) {
unset($this->pendingThrowPromises[$index]);
unset($this->unhandledFailingPromises[$index]);
}
}
throw $value->getException();
Expand All @@ -108,7 +108,7 @@ private function getValue($value)
}
}

foreach ($this->pendingThrowPromises as $p) {
foreach ($this->unhandledFailingPromises as $p) {
$this->getValue($p);
}

Expand Down Expand Up @@ -179,17 +179,17 @@ public function async(Generator $generator): Promise
$generator->throw($exception);
$fnWrapGenerator($generator, $deferred);
} catch (\Throwable $throwable) {
$this->pendingThrowPromises[] = $deferred;
$this->unhandledFailingPromises[] = $deferred;
$deferred->reject($throwable);
}
}
});
};

$deferred = $this->createPromise();
$deferred = $this->deferred();
$fnWrapGenerator($generator, $deferred);

return $deferred;
return $deferred->getPromise();
}

/**
Expand All @@ -202,7 +202,7 @@ public function promiseAll(Promise ...$promises): Promise
return $this->promiseFulfilled([]);
}

$deferred = $this->createPromise();
$deferred = $this->deferred();
$result = [array_fill(0, $ticks, false)];

// To ensure that the last resolved promise resolves the global promise immediately
Expand All @@ -229,7 +229,7 @@ public function promiseAll(Promise ...$promises): Promise
$this->async($waitOnePromise($index, $promise));
}

return $deferred;
return $deferred->getPromise();
}

/**
Expand All @@ -254,7 +254,7 @@ public function promiseRace(Promise ...$promises): Promise
return $this->promiseFulfilled(null);
}

$deferred = $this->createPromise();
$deferred = $this->deferred();

foreach ($promises as $promise) {
DummyPromise::wrap($promise)->addCallback(function (DummyPromise $promise) use ($deferred) {
Expand All @@ -268,7 +268,7 @@ public function promiseRace(Promise ...$promises): Promise
});
}

return $deferred;
return $deferred->getPromise();
}

/**
Expand Down Expand Up @@ -298,31 +298,31 @@ public function promiseRejected(Throwable $throwable): Promise
*/
public function idle(): Promise
{
$promise = $this->createPromise();
Coroutine::create(function () use ($promise) {
$deferred = $this->deferred();
Coroutine::create(function () use ($deferred) {
$this->pushCoroutine();
// Coroutine::defer(function () use ($promise) {
$promise->resolve(null);
$deferred->resolve(null);
// });
});

return $promise;
return $deferred->getPromise();
}

/**
* {@inheritdoc}
*/
public function delay(int $milliseconds): Promise
{
$promise = $this->createPromise();
Coroutine::create(function () use ($milliseconds, $promise) {
$deferred = $this->deferred();
Coroutine::create(function () use ($milliseconds, $deferred) {
$this->pushCoroutine();
// Coroutine::sleep($milliseconds / 1000);
usleep($milliseconds * 1000);
$promise->resolve(null);
$deferred->resolve(null);
});

return $promise;
return $deferred->getPromise();
}

/**
Expand Down
139 changes: 80 additions & 59 deletions src/Adapter/Swoole/YieldEventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
namespace M6Web\Tornado\Adapter\Swoole;

use function extension_loaded;
use JetBrains\PhpStorm\Pure;
use M6Web\Tornado\Adapter\Swoole\Internal\YieldPromise;
use M6Web\Tornado\Adapter\Swoole\Internal\DummyPromise;
use M6Web\Tornado\Deferred;
use M6Web\Tornado\Promise;
use RuntimeException;
Expand All @@ -20,6 +19,29 @@ public function __construct()
}
}

private function isPending(DummyPromise $promise): bool
{
if ($promise->isPending()) {
return $promise->isPending();
}

if ($promise->getException() === null) {
if ($promise->getValue() instanceof DummyPromise) {
return $promise->getValue()->isPending();
}

if (is_array($promise->getValue())) {
foreach ($promise->getValue() as $value) {
if ($value instanceof DummyPromise && $value->isPending()) {
return $value->isPending();
}
}
}
}

return $promise->isPending();
}

/**
* {@inheritdoc}
*/
Expand All @@ -40,58 +62,54 @@ public function wait(Promise $promise)
*/
public function async(\Generator $generator): Promise
{
$generatorPromise = new YieldPromise();
Coroutine::create(function () use ($generator, $generatorPromise) {
while ($generator->valid()) {
$promise = YieldPromise::wrap($generator->current());
$promise->yield();
$generator->send($promise->value());
}
$fnWrapGenerator = function (\Generator $generator, Deferred $deferred) use (&$fnWrapGenerator) {
Coroutine::create(function () use ($generator, $deferred, $fnWrapGenerator) {
if (!$generator->valid()) {
$deferred->resolve($generator->getReturn());
return;
}

$generatorPromise->resolve($generator->getReturn());
});
$promise = DummyPromise::wrap($generator->current());
if($this->isPending($promise)) {
$cid = Coroutine::getCid();
$promise->addCallback(function () use ($cid) {
Coroutine::resume($cid);
});
Coroutine::yield();
}
$generator->send($promise->getValue());
$fnWrapGenerator($generator, $deferred);
});
};

return $generatorPromise;
$deferred = $this->deferred();
$fnWrapGenerator($generator, $deferred);

return $deferred->getPromise();
}

/**
* {@inheritdoc}
*/
public function promiseAll(Promise ...$promises): Promise
{
$nbPromises = count($promises);
if ($nbPromises === 0) {
return $this->promiseFulfilled([]);
}

$globalPromise = new YieldPromise();
$allResults = array_fill(0, $nbPromises, false);

// To ensure that the last resolved promise resolves the global promise immediately
$waitOnePromise = function (int $index, Promise $promise) use ($globalPromise, &$nbPromises, &$allResults): \Generator {
try {
$allResults[$index] = yield $promise;
} catch (\Throwable $throwable) {
// Prevent to reject the globalPromise twice
if ($nbPromises > 0) {
$nbPromises = -1;
$globalPromise->reject($throwable);

return;
}
}

// Last resolved promise resolved globalPromise
if (--$nbPromises === 0) {
$globalPromise->resolve($allResults);
}
};

$wg = new Coroutine\WaitGroup();
$result = [];
foreach ($promises as $index => $promise) {
$this->async($waitOnePromise($index, $promise));
$this->async((static function() use($wg, &$result, $index, $promise){
$wg->add();
$result[$index] = yield $promise;
$wg->done();
})());
}

return $globalPromise;
$deferred = $this->deferred();
Coroutine::create(function() use($wg, $deferred, &$result) {
$wg->wait();
$deferred->resolve($result);
});

return $deferred->getPromise();
}

/**
Expand All @@ -113,7 +131,7 @@ public function promiseRace(Promise ...$promises): Promise
*/
public function promiseFulfilled($value): Promise
{
$promise = new YieldPromise();
$promise = new DummyPromise();
$promise->resolve($value);

return $promise;
Expand All @@ -124,7 +142,7 @@ public function promiseFulfilled($value): Promise
*/
public function promiseRejected(\Throwable $throwable): Promise
{
$promise = new YieldPromise();
$promise = new DummyPromise();
$promise->reject($throwable);

return $promise;
Expand All @@ -135,50 +153,53 @@ public function promiseRejected(\Throwable $throwable): Promise
*/
public function idle(): Promise
{
$promise = new YieldPromise();
Coroutine::create(function () use ($promise) {
Coroutine::defer(function () use ($promise) {
$promise->resolve(null);
$deferred = $this->deferred();
Coroutine::create(function () use ($deferred) {
Coroutine::defer(function () use ($deferred) {
usleep(1000);
$deferred->resolve(null);
});
});

return $promise;
return $deferred->getPromise();
}

/**
* {@inheritdoc}
*/
public function delay(int $milliseconds): Promise
{
$promise = new YieldPromise();
Coroutine::create(function () use ($milliseconds, $promise) {
Coroutine::sleep($milliseconds / 1000);
$promise->resolve(null);
$deferred = $this->deferred();
Coroutine::create(function () use ($milliseconds, $deferred) {
//Coroutine::sleep($milliseconds / 1000);
usleep($milliseconds * 1000);
$deferred->resolve(null);
});

return $promise;
return $deferred->getPromise();
}

/**
* {@inheritdoc}
*/
#[Pure]
public function deferred(): Deferred
{
return new YieldPromise();
}
public function deferred(): Deferred
{
return new DummyPromise();
}

/**
* {@inheritdoc}
*/
public function readable($stream): Promise
{
return $this->promiseFulfilled($stream);
}

/**
* {@inheritdoc}
*/
public function writable($stream): Promise
{
return $this->promiseFulfilled($stream);
}
}
14 changes: 14 additions & 0 deletions tests/Adapter/Swoole/YieldEventLoopTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace M6WebTest\Tornado\Adapter\Swoole;

use M6Web\Tornado\Adapter\Swoole;
use M6Web\Tornado\EventLoop;

class YieldEventLoopTest extends \M6WebTest\Tornado\EventLoopTest
{
protected function createEventLoop(): EventLoop
{
return new Swoole\YieldEventLoop();
}
}

0 comments on commit 0dead13

Please sign in to comment.