Skip to content

Commit

Permalink
🔨 refactor promise and clean
Browse files Browse the repository at this point in the history
  • Loading branch information
matyo91 committed Apr 21, 2021
1 parent 39ee2e6 commit 7b2c723
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 144 deletions.
1 change: 1 addition & 0 deletions src/Adapter/ReactPhp/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public function async(\Generator $generator): Promise
try {
if (!$generator->valid()) {
$deferred->resolve($generator->getReturn());
return;
}
$promise = $generator->current();
if (!$promise instanceof Internal\PromiseWrapper) {
Expand Down
44 changes: 6 additions & 38 deletions src/Adapter/Swoole/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
use M6Web\Tornado\Deferred;
use M6Web\Tornado\Promise;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
use Swoole\Coroutine\WaitGroup;
use Swoole\Event;

class EventLoop implements \M6Web\Tornado\EventLoop
Expand All @@ -35,43 +33,22 @@ public function wait(Promise $promise)
$isRejected = false;
$promiseSettled = false;

//Coroutine::create(function() use ($promise, &$value, &$isRejected, &$promiseSettled) {
$ticks = 1;
//$wg = new WaitGroup(1);
//$channel = new Channel($ticks);
Internal\PromiseWrapper::toHandledPromise($promise, $this->unhandledFailingPromises)->getSwoolePromise()->then(
function ($result) use (/* $wg , $channel, */&$value, &$promiseSettled) {
function ($result) use (&$value, &$promiseSettled) {
$promiseSettled = true;
$value = $result;
//$channel->push(true);
//$wg->done();
Event::exit();
},
function ($result) use (/* $wg , $channel, */&$value, &$isRejected, &$promiseSettled) {
function ($result) use (&$value, &$isRejected, &$promiseSettled) {
$promiseSettled = true;
$value = $result;
$isRejected = true;
//$channel->push(true);
//$wg->done();
Event::exit();
}
);
if (!$promiseSettled) {
Event::wait();
}
//while ($ticks--) {
//$channel->pop();
//}
//$channel->close();
//$wg->wait();
//});
//while (!$promiseSettled) {
// @codeCoverageIgnoreStart
//usleep(SwoolePromise::PROMISE_WAIT);
// @codeCoverageIgnoreEnd
//}
//Event::wait();
//swoole_event_wait();

if (!$promiseSettled) {
throw new \Error('Impossible to resolve the promise, no more task to execute.');
Expand All @@ -96,6 +73,7 @@ public function async(\Generator $generator): Promise
try {
if (!$generator->valid()) {
$deferred->resolve($generator->getReturn());
return;
}
$promise = $generator->current();
if (!$promise instanceof Internal\PromiseWrapper) {
Expand Down Expand Up @@ -225,22 +203,12 @@ public function promiseRejected(\Throwable $throwable): Promise
*/
public function idle(): Promise
{
/*return Internal\PromiseWrapper::createUnhandled(new SwoolePromise(function($resolve) {
return Internal\PromiseWrapper::createUnhandled(new SwoolePromise(function($resolve) {
Coroutine::defer(function () use ($resolve) {
//Coroutine::sleep(1.0);
//usleep(1000000);
//Coroutine::sleep(0.001);
$resolve(null);
});
}), $this->unhandledFailingPromises);*/
return Internal\PromiseWrapper::createUnhandled(SwoolePromise::resolve(null), $this->unhandledFailingPromises, function() {
return new SwoolePromise(function($resolve) {
Coroutine::defer(function () use ($resolve) {
//Coroutine::sleep(1.0);
//usleep(1);
$resolve(null);
});
});
});
}), $this->unhandledFailingPromises);
}

/**
Expand Down
149 changes: 43 additions & 106 deletions src/Adapter/Swoole/Internal/SwoolePromise.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,34 @@
*/
final class SwoolePromise implements Promise
{
const PROMISE_WAIT = 100;

const STATE_PENDING = 1;
const STATE_FULFILLED = 0;
const STATE_REJECTED = -1;

/** @var int */
protected $state = self::STATE_PENDING;

/** @var mixed */
private $result;
private $onResolve;

private $resolvedCallback;
const STATUS_RESOLVE = 1;
const STATUS_ERROR = -1;

/**
* PromiseCo constructor.
* Promise constructor.
*
* @param callable $executor
*/
public function __construct(callable $executor)
{
$this->resolvedCallback = static function() {};

if (!extension_loaded('swoole')) {
throw new RuntimeException(
'SwoolePromise MUST running only in CLI mode with swoole extension.'
);
}
// @codeCoverageIgnoreEnd

$this->onResolve = function($status, $value) {
$this->result = [$status, $value];
};

$resolve = function ($value) {
$this->setResult($value, self::STATE_FULFILLED);
($this->onResolve)(self::STATUS_RESOLVE, $value);
};
$reject = function ($value) {
$this->setResult($value, self::STATE_REJECTED);
$reject = function ($error) {
($this->onResolve)(self::STATUS_ERROR, $error);
};
Coroutine::create(function (callable $executor, callable $resolve, callable $reject) {
try {
Expand Down Expand Up @@ -109,37 +103,6 @@ final public function catch(callable $onRejected): SwoolePromise
return $this->then(null, $onRejected);
}

/**
* Change promise state
*
* @param integer $state
* @return void
*/
final protected function setState(int $state): void
{
$this->state = $state;
}

/**
* Promise is pending
*
* @return boolean
*/
final protected function isPending(): bool
{
return $this->state == self::STATE_PENDING;
}

/**
* Promise is fulfilled
*
* @return boolean
*/
final protected function isFulfilled(): bool
{
return $this->state == self::STATE_FULFILLED;
}

/**
* {@inheritDoc}
*
Expand All @@ -149,21 +112,37 @@ final protected function isFulfilled(): bool
*/
public function then(?callable $onFulfilled = null, ?callable $onRejected = null): SwoolePromise
{
if($this->result === null) {
return self::create(function (callable $resolve, callable $reject) use ($onFulfilled, $onRejected) {
$this->onResolve = function($status, $value) use ($resolve, $reject, $onFulfilled, $onRejected) {
if($status === self::STATUS_RESOLVE) {
if($onFulfilled !== null) {
$onFulfilled($value);
}
$resolve($value);
} else if($status === self::STATUS_ERROR) {
if($onRejected !== null) {
$onRejected($value);
}
$reject($value);
}
};
});
}

return self::create(function (callable $resolve, callable $reject) use ($onFulfilled, $onRejected) {
$this->resolvedCallback = function() use ($resolve, $reject, $onFulfilled, $onRejected) {
$callable = $this->isFulfilled() ? $onFulfilled : $onRejected;
if (!is_callable($callable)) {
$resolve($this->result);
return;
if($this->result[0] === self::STATUS_RESOLVE) {
$value = $this->result[1];
if($onFulfilled !== null) {
$onFulfilled($value);
}
try {
$resolve($callable($this->result));
} catch (\Throwable $error) {
$reject($error);
$resolve($value);
} else if($this->result[0] === self::STATUS_ERROR) {
$error = $this->result[1];
if($onRejected !== null) {
$onRejected($error);
}
};
if(!$this->isPending()) {
($this->resolvedCallback)();
$reject($error);
}
});
}
Expand All @@ -180,26 +159,23 @@ public static function all(iterable $promises): SwoolePromise
$ticks = count($promises);

$firstError = null;
//$channel = new Channel($ticks);
$result = [];
$key = 0;
foreach ($promises as $promise) {
if (!$promise instanceof SwoolePromise) {
//$channel->close();
throw new RuntimeException(
'Supported only SwoolePromise instance'
);
}
$promise->then(function ($value) use ($key, &$result, &$ticks, $resolve/* $channel*/) {
$promise->then(function ($value) use ($key, &$result, &$ticks, $resolve) {
$result[$key] = $value;
$ticks--;
//$channel->push(true);
if($ticks === 0) {
ksort($result);
$resolve($result);
}
return $value;
}, function ($error) use (/*$channel, */&$firstError, &$ticks, $reject) {
//$channel->push(true);
}, function ($error) use (&$firstError, &$ticks, $reject) {
$ticks--;
if ($firstError === null) {
$firstError = $error;
Expand All @@ -208,45 +184,6 @@ public static function all(iterable $promises): SwoolePromise
});
$key++;
}
//while ($ticks--) {
// $channel->pop();
//}
//$channel->close();

/*if ($firstError !== null) {
$reject($firstError);
return;
}
$resolve($result);*/
});
}

/**
* Set resolved result
*
* @param mixed $value
* @return void
*/
private function setResult($value, $state): void
{
if ($value instanceof self) {
$resolved = false;
$callable = function ($value) use (&$resolved) {
$this->setResult($value);
$resolved = true;
};
$value->then($callable, $callable);
// resolve async locking error (code to remove)
//while (!$resolved) {
// @codeCoverageIgnoreStart
//usleep(self::PROMISE_WAIT);
// @codeCoverageIgnoreEnd
//}
} else if ($this->isPending()) {
$this->result = $value;
$this->setState($state);
($this->resolvedCallback)();
}
}
}

0 comments on commit 7b2c723

Please sign in to comment.