Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ add swoole adapter #43

Draft
wants to merge 23 commits into
base: master
Choose a base branch
from
Draft

Conversation

matyo91
Copy link
Contributor

@matyo91 matyo91 commented Apr 19, 2021

Allow add Swoole Adapter

@matyo91
Copy link
Contributor Author

matyo91 commented Apr 19, 2021

Hi !

This is an unfinished PR but some tests pass.
I choose to contribute because I liked the video about coroutine : https://www.youtube.com/watch?v=7TvIIt4c8uY

And there is some hype about Swoole (https://github.com/laravel/octane) and this was not implemented yet.

I'm stuck about eventloop->wait() impementation as I didn't found a smart way to wait for a single Swoole Promise without being into Corouting::create() wrapper by using Channel() way.

I did try with code above without success.

Coroutine::suspend($uid);
Event::wait();
Coroutine::resume($uid);

The code is inspired from this repository and https://github.com/streamcommon/promise. At the end, there should be a way to remove usleep for waiting resolve state and having pure callback.

May we should consider an implementation of Swoole promises with https://www.swoole.co.uk/docs/modules/swoole-timer-tick.

Travis should embark ext-swoole extension.

Copy link
Contributor

@b-viguier b-viguier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @matyo91 👋 😄
Thanks a lot for your contribution! We never had time to look at Swoole, this is a good opportunity 😉
I didn't look the whole PR yet, nevertheless I noticed some important points in the wait function, and maybe it will impact other functions, so I propose to go step by step… and in the meantime I will be able to read Swoole documentations about other functions 😅

Feel free to ask if something is not clear.
About CI, we intend to move it on Github actions, but let's see what we could do to run Swoole on TravisCI 👍

src/Adapter/Swoole/EventLoop.php Show resolved Hide resolved
@matyo91 matyo91 marked this pull request as draft April 19, 2021 15:01
Copy link
Contributor

@b-viguier b-viguier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @matyo91 ,
I spent some time to read the documentation, and now it's clearer to me 😅
The main point is that they do not expose any concept of Promise, and it makes things harder. You PR propose a complete Promise implementation (A+ style), but I think it's unnecessary in this case, too generic for the problem to solve. Furthermore, if we write so much code, it means that we are re-implementing all the logic in Tornado instead of using Swoole extension… I think we miss the point.

Here some ideas I had after reading the documentation. I still didn't test anything, it's purely theoretical, so if you are welcome to give any feedback 👍

The central concepts of Tornado are Generators and Promises: generators yield some promises and must be resumed when promise is resolved. Then, we should wrap each Tornado generator (asynchronous function) in a Swoole Coroutine and as soon as the generator is blocked by a yielded promise, we use Swoole channel to pause the coroutine while waiting the promise resolution and let Swoole do its EventLoop job. When The coroutine receive a value through the channel, we resume the generator with this value. (⚠️ We have to deal with Exceptions, maybe we should send a DTO in channels to now if we are sending a regular value or an exception to throw 🤔 )

Next task is to find how to send value in those channels and it will depend of each Promise. For example, for the idle, we create a promise with it's associated channel, a launch a Coroutine calling defer and then we send null to the channel. (⚠️ Is it possible in swoole to push 1 message in channel and to pop it as many times that we have "listeners"? If not, we have to invent something, kind of dispatcher behind the channel and the listeners).

There are a lot of possible edge cases (exceptions, background coroutines…) but I think we should focus on these core concepts first. If we have async idle and delay working correctly, remaining tasks should come smoothly 🤞

ℹ️ Currently, I'm on vacations, but as soon as I have some time I will try to make some tests on my own 😄

src/Adapter/Swoole/Internal/SwoolePromise.php Outdated Show resolved Hide resolved
@matyo91
Copy link
Contributor Author

matyo91 commented Apr 21, 2021

The main point is that they do not expose any concept of Promise, and it makes things harder. You PR propose a complete Promise implementation (A+ style), but I think it's unnecessary in this case, too generic for the problem to solve. Furthermore, if we write so much code, it means that we are re-implementing all the logic in Tornado instead of using Swoole extension… I think we miss the point.

I agree, this is for now an unoptimized usage of Swoole. A+ promise was easier implementation but more generic. All code should be reduced to Tornado logic as soon as all tests pass !

Here some ideas I had after reading the documentation. I still didn't test anything, it's purely theoretical, so if you are welcome to give any feedback 👍

For now, I do not have more feedback than documentation, playing with Swoole php examples and discover how it work by practice.

For the rest, thanks for more advices and see how we go with this new approach. I will look closer to fulfill testIdle test then.

@matyo91
Copy link
Contributor Author

matyo91 commented Apr 21, 2021

Usage with Coroutine Yield and Coroutine Resume is interesting too.

A good implementation usage can be found in Barrier

@b-viguier
Copy link
Contributor

ℹ️ Here an attempt, but it doesn't work 🤷‍♂️

PHP Warning:  Swoole\Coroutine\Scheduler::start(): eventLoop has already been created. unable to start Swoole\Coroutine\Scheduler in @swoole-src/library/core/Coroutine/functions.php on line 24
Code
<?php

class Promise
{
    private $cids = [];
    private $isSettled = false;
    private $value;

    public function yield(): void
    {
        if($this->isSettled) {
            return;
        }

        $this->cids[\Swoole\Coroutine::getCid()] = true;
        \Swoole\Coroutine::yield();
    }

    public function resolve($value): void
    {
        echo "resolving with $value\n";
        assert(false === $this->isSettled);
        $this->isSettled = true;
        $this->value = $value;
        foreach ($this->cids as $cid => $dummy) {
            \Swoole\Coroutine::resume($cid);
        }
        $this->cids = [];
    }

    public function value()
    {
        assert($this->isSettled);
        return $this->value;
    }
};

function async(\Generator $generator): Promise {
    $generatorPromise = new Promise();
    \Co\run(function() use($generator, $generatorPromise) {
        while($generator->valid()) {
            echo "Promise found\n";
            $promise = $generator->current();
            $promise->yield();
            $generator->send($promise->value());
        }

        $generatorPromise->resolve($generator->getReturn());
    });

    return $generatorPromise;
}

function delay(int $ms) {
    $promise = new Promise();
    \Co\run(function() use($ms, $promise) {
        \Co\System::sleep($ms / 1000 /* ms -> s */);
        $promise->resolve(null);
    });

    return $promise;
}

function wait(Promise $promise) {

    $value = null;
    async((function() use($promise, &$value) {
        echo "wait\n";
        $value = yield $promise;
        Swoole\Event::exit();
    })());
    \Swoole\Event::wait();

    return $value;
}

function generator(string $name, int $delay): \Generator {
    echo "[$name] Waiting $delay ms…\n";
    yield delay($delay);
    echo "[$name] Done!\n";

    return "[$name] Finished\n";
}

$result = wait(async(generator('A', 2000)));
echo "Result: $result\n";

@matyo91
Copy link
Contributor Author

matyo91 commented Apr 21, 2021

I won't recommend Co\run as this this involve Swoole\Coroutine\Scheduler and there are no stop() method, but only run().

May try with Coroutine::create only (Co\go is the alias function)

@matyo91
Copy link
Contributor Author

matyo91 commented Apr 21, 2021

This seem's working php examples/00--swooleSimpleWait.php
Also 'ABCABA' work too php examples/00--swooleABCABA.php

@matyo91
Copy link
Contributor Author

matyo91 commented Apr 21, 2021

I integrated your stuff into a new EventLoop and will see how we can go so far.

@matyo91 matyo91 force-pushed the swoole branch 2 times, most recently from b75a49c to 3bc952a Compare April 21, 2021 22:22
@matyo91
Copy link
Contributor Author

matyo91 commented Apr 22, 2021

Current status lot of ✅

bin/phpunit tests/Adapter/Swoole/EventLoopTest.php

Tests: 40, Assertions: 49, Errors: 4, Failures: 4.

Copy link
Contributor

@b-viguier b-viguier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I have a lot of questions today 😅
I see that you are spending a lot of time on this PR, but currently I'm not sure you are on the good path 😕
I'll try to retake my (not working) POC by taking care of your interesting remark, because I need to know more about Swoole to help you correctly…

src/Adapter/Swoole/Internal/DummyPromise.php Outdated Show resolved Hide resolved
/**
* {@inheritdoc}
*/
public function promiseAll(Promise ...$promises): Promise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use a [WaitGroup°(https://www.swoole.co.uk/docs/modules/swoole-coroutine-waitgroup) ? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An implementation can be found in Swoole\Internal\SwoolePromise->all with git history.
This seem's to work well with shiftCoroutine for now.
As soon as all test pass, we could try to move on WaitGoup.

});
}

private function resolve($value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case $value can be something else than a DummyPromise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the case when resolve is called for async only, but not for wait.
note that resolve is private, just factoring parts.

}
}

return $value;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't understand this while function… it's recursive, but also related to array… I don't see the point 😕

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolve is involving in wait and async function.
I agree the recursive function shouldn't be specific to types.
I didn't find a smart way to get the resolution at the origin and propagate it's values.

Note that array is involved in promiseAll and return an array of promise. This may justify the array type check.

{
if($value instanceof DummyPromise && !$value->isPending()) {
if($value->getException() !== null) {
throw $value->getException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ If a promise is rejected with an exception, this exception must be forwarded to all generators with the Generator::throw method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case in $generator->send($this->resolve($promise)); in async


$cid = array_shift($this->cids);
if(Coroutine::exists($cid)) {
Coroutine::resume($cid);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we should resume a coroutine only if the DummyPromise (blocking related generator) is resolved… why to always resume each coroutine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is the case as all promises for Swoole\EventLoop are instanciated as below

return new DummyPromise(function () {
  $this->shiftCoroutine();
});

At resolution time of the DummyPromise, this will resume the first Swoole Coroutine in the queue.


while ($promise->isPending()) {
$this->shiftCoroutine();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, here your are recoding a scheduler/event-loop… you try to wake-up every coroutine until something is done, but it won't be efficient. I think we should let the Swoole EventLoop do this Job, Tornado just provide an interface over it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did actually end up with this logic with the usage for DummyPromise after some tries.
Your proposal logic stay in Swoole/YieldEventLoop, but I suspended the work for now.

Coroutine::create(function() use($milliseconds, $promise) {
$this->pushCoroutine();
//Coroutine::sleep($milliseconds / 1000);
usleep($milliseconds * 1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you use Coroutine::sleep($milliseconds / 1000);? It's exactly what we want! 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some tests are not passing (eg testDelay with 42 ms) as Coroutine::sleep does not accept less than 0.001 and this throws an error.
May be we should add a condition for that case.

@b-viguier
Copy link
Contributor

Hi @matyo91 ,
I followed your recommendation to not use Co\run and it works like a charm! Here my code, and I added a promiseAll function using WaitGroup. I feel this is the way to go!

I just faced an issue: if I let some "unfinished" coroutines while stopping the EventLoop, Swoole complains about a DeadLock. It's logical since these coroutines can't be finished, but it's the expected behaviour… I don't see any trivial way to "delete" a coroutine that we don't want to finish, do you now a way? Maybe through exit_condition option?

Code
<?php

class Promise
{
    private $cids = [];
    private $isSettled = false;
    private $value;

    public function yield(): void
    {
        if($this->isSettled) {
            return;
        }

        $this->cids[\Swoole\Coroutine::getCid()] = true;
        \Swoole\Coroutine::yield();
    }

    public function resolve($value): void
    {
        assert(false === $this->isSettled);
        $this->isSettled = true;
        $this->value = $value;
        foreach ($this->cids as $cid => $dummy) {
            \Swoole\Coroutine::resume($cid);
        }
        $this->cids = [];
    }

    public function value()
    {
        assert($this->isSettled);
        return $this->value;
    }
};

function async(\Generator $generator): Promise {
    $generatorPromise = new Promise();
    Swoole\Coroutine::create(function() use($generator, $generatorPromise) {
        while($generator->valid()) {
            $promise = $generator->current();
            $promise->yield();
            $generator->send($promise->value());
        }

        $generatorPromise->resolve($generator->getReturn());
    });

    return $generatorPromise;
}

function delay(int $ms) {
    $promise = new Promise();
    Swoole\Coroutine::create(function() use($ms, $promise) {
        \Co\System::sleep($ms / 1000 /* ms -> s */);
        $promise->resolve(null);
    });

    return $promise;
}

function wait(Promise $promise) {

    $value = null;
    async((function() use($promise, &$value) {
        $value = yield $promise;
        Swoole\Event::exit();
    })());
    \Swoole\Event::wait();

    return $value;
}

function promiseAll(Promise ...$promises): Promise
{
    $wg = new \Swoole\Coroutine\WaitGroup();
    $result = [];
    foreach ($promises as $index => $promise) {
        async((function() use($wg, &$result, $index, $promise){
            $wg->add();
            $result[$index] = yield $promise;
            $wg->done();
        })());
    }

    $promise = new Promise();
    Swoole\Coroutine::create(function() use($wg, $promise, &$result) {
        $wg->wait();
        $promise->resolve($result);
    });

    return $promise;
}

function myAsyncFunction(string $name, int $delay): \Generator {
    echo "[$name] Waiting $delay ms…\n";
    yield delay($delay);
    echo "[$name] Done!\n";

    return "[$name] Finished\n";
}

$result = wait(
    promiseAll(
        async(myAsyncFunction('B', 2000)),
        async(myAsyncFunction('C', 1000)),
        async(myAsyncFunction('A', 1500)),
    )
);
echo "Result: \n";
var_dump($result);

@matyo91
Copy link
Contributor Author

matyo91 commented Apr 22, 2021

I followed your recommendation to not use Co\run and it works like a charm! Here my code, and I added a promiseAll function using WaitGroup. I feel this is the way to go!
🚀

I just faced an issue: if I let some "unfinished" coroutines while stopping the EventLoop, Swoole complains about a DeadLock. It's logical since these coroutines can't be finished, but it's the expected behaviour… I don't see any trivial way to "delete" a coroutine that we don't want to finish, do you now a way? Maybe through exit_condition option?

I got same errors too. For that part, i didn't have clue about the documentation, if there are any options about it.
In EventLoop we can manually make the clean in __destruct by collecting all Coroutine::getCid and play with Coroutine::exists and Coroutine::suspend

I propose we integrate your work in Adapter\Swoole\YieldEventLoop with Adapter\Swoole\Internal\YieldPromise.
And we could compare at the end with Adapter\Swoole\EventLoop and Adapter\Swoole\Internal\DummyPromise.

@matyo91
Copy link
Contributor Author

matyo91 commented Apr 22, 2021

The reason why I ended with EventLoop->shiftCoroutine promise is the fact that I can have a fine granular control on the execution order of Swoole coroutine.

If I take your code and change a little to reproduce EventLoopTest->testIdle.
At the end, the output show ABCBAA when using yield delay($delay) in the generator.
If I switch for yield idle(); in the generator, then I get AAABBC to the output.

I didn't figure out how to get the correct test passing result that should be ABCABA according to the test.
Swoole seem's to get his own way to order the promise resolution.

<?php


class Promise
{
    private $cids = [];
    private $isSettled = false;
    private $value;

    public function yield(): void
    {
        if($this->isSettled) {
            return;
        }

        $this->cids[\Swoole\Coroutine::getCid()] = true;
        \Swoole\Coroutine::yield();
    }

    public function resolve($value): void
    {
        assert(false === $this->isSettled);
        $this->isSettled = true;
        $this->value = $value;
        foreach ($this->cids as $cid => $dummy) {
            \Swoole\Coroutine::resume($cid);
        }
        $this->cids = [];
    }

    public function value()
    {
        assert($this->isSettled);
        return $this->value;
    }
};

function async(\Generator $generator): Promise {
    $generatorPromise = new Promise();
    Swoole\Coroutine::create(function() use($generator, $generatorPromise) {
        while($generator->valid()) {
            $promise = $generator->current();
            $promise->yield();
            $generator->send($promise->value());
        }

        $generatorPromise->resolve($generator->getReturn());
    });

    return $generatorPromise;
}

function idle() {
    $promise = new Promise();
    Swoole\Coroutine::create(function() use ($promise) {
        Swoole\Coroutine::defer(function() use ($promise){
            $promise->resolve(null);
        });
    });

    return $promise;
}

function delay(int $ms) {
    $promise = new Promise();
    Swoole\Coroutine::create(function() use($ms, $promise) {
        \Co\System::sleep($ms / 1000 /* ms -> s */);
        $promise->resolve(null);
    });

    return $promise;
}

function wait(Promise $promise) {

    $value = null;
    async((function() use($promise, &$value) {
        $value = yield $promise;
        Swoole\Event::exit();
    })());
    \Swoole\Event::wait();

    return $value;
}

function promiseAll(Promise ...$promises): Promise
{
    $wg = new \Swoole\Coroutine\WaitGroup();
    $result = [];
    foreach ($promises as $index => $promise) {
        async((function() use($wg, &$result, $index, $promise){
            $wg->add();
            $result[$index] = yield $promise;
            $wg->done();
        })());
    }

    $promise = new Promise();
    Swoole\Coroutine::create(function() use($wg, $promise, &$result) {
        $wg->wait();
        $promise->resolve($result);
    });

    return $promise;
}

$outputBuffer = '';
$delay = 1; //ms
$myAsyncFunction = function(string $name, int $count) use (&$outputBuffer, $delay): \Generator {
    //echo "[$name] Waiting $delay ms…\n";
    while ($count--) {
        yield delay($delay);
        //yield idle();
        $outputBuffer .= $name;
    }
    //echo "[$name] Done!\n";

    //return "[$name] Finished\n";
};

$result = wait(
    promiseAll(
        async($myAsyncFunction('A', 3)),
        async($myAsyncFunction('B', 2)),
        async($myAsyncFunction('C', 1)),
    )
);
echo "Result: \n";
var_dump($outputBuffer);

@matyo91 matyo91 force-pushed the swoole branch 8 times, most recently from b3ab296 to 320e268 Compare April 24, 2021 03:19
@matyo91
Copy link
Contributor Author

matyo91 commented Apr 24, 2021

At least this display ABCABA

<?php

use Swoole\Coroutine;
use Swoole\Event;

Coroutine::create(function() use (&$outputBuffer) {
    $outputBuffer .= "A";
    Coroutine::sleep(0.001);
    $outputBuffer .= "A";
    Coroutine::sleep(0.001);
    $outputBuffer .= "A";
    Coroutine::sleep(0.001);
});
Coroutine::create(function() use (&$outputBuffer) {
    $outputBuffer .= "B";
    Coroutine::sleep(0.001);
    $outputBuffer .= "B";
    Coroutine::sleep(0.001);
});
Coroutine::create(function() use (&$outputBuffer) {
    $outputBuffer .= "C";
    Coroutine::sleep(0.001);
});
Event::wait();
print_r($outputBuffer);

and

use Swoole\Coroutine;
use Swoole\Event;

Coroutine::create(function() use (&$outputBuffer) {
    Coroutine::create(function() use (&$outputBuffer) {
        Coroutine::defer(function () use (&$outputBuffer) {
            $outputBuffer .= "A";
        });
    });
    Coroutine::create(function() use (&$outputBuffer) {
        Coroutine::defer(function () use (&$outputBuffer) {
            $outputBuffer .= "B";
        });
    });
    Coroutine::create(function() use (&$outputBuffer) {
        Coroutine::defer(function () use (&$outputBuffer) {
            $outputBuffer .= "C";
        });
    });
    Coroutine::create(function() use (&$outputBuffer) {
        Coroutine::defer(function () use (&$outputBuffer) {
            $outputBuffer .= "A";
        });
    });
    Coroutine::create(function() use (&$outputBuffer) {
        Coroutine::defer(function () use (&$outputBuffer) {
            $outputBuffer .= "B";
        });
    });
    Coroutine::create(function() use (&$outputBuffer) {
        Coroutine::defer(function () use (&$outputBuffer) {
            $outputBuffer .= "A";
        });
    });
});
Event::wait();
print_r($outputBuffer);

@matyo91 matyo91 force-pushed the swoole branch 2 times, most recently from d1ad39c to 16908da Compare April 24, 2021 15:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants