-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Gevent sample #84
Gevent sample #84
Conversation
# Swallow InvalidStateError in case Python future was cancelled | ||
try: | ||
python_fut.set_exception(exc) | ||
except: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this might end up in production code, I suggest catching the specific exception that you indicate in the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically we support 3.7+ and this class didn't get added until 3.8 and we try to write our code for all versions. Adding this makes the 3.7 CI angry (even though technically this sample only works with 3.8 because gevent only works with 3.8).
Tried running this on Mac and the worker hangs on |
This does not work on Linux, move to draft while still investigating... |
Fixed, please review |
Confirmed it works on Mac now too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one suggestion on the documentation. Otherwise LGTM.
# Gevent's returned futures do not map well to Python futures, so we | ||
# must translate. We can't just use set_result/set_exception because | ||
# done callbacks are not always called in gevent's case and it doesn't | ||
# seem to support cancel, so we instead wrap the caller function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[non-blocking/optional] I think it might be clearer to explicitly state "Gevent's returned futures do not implement all of the things expected by concurrent.futures.Future
" rather than saying they "do not map well". I wasn't sure what the latter meant, so I wound up reading gevent code to figure out what this meant.
P = ParamSpec("P") | ||
|
||
|
||
class GeventExecutor(threadpool.ThreadPoolExecutor): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @cretz, I believe gevent.threadpool.ThreadPoolExecutor is the unpatched thread pool, so really this PR is just using native threads.
cf. https://www.gevent.org/api/gevent.threadpool.html#gevent.threadpool.ThreadPoolExecutor :
A version of concurrent.futures.ThreadPoolExecutor that always uses native threads, even when threading is monkey-patched.
I verified by running this code locally with the patched concurrent.futures.ThreadPoolExecutor
and it throws this error, as seen before in our explorations:
[~/temporalio_gevent/src] ∴ ~/.local/bin/poetry run python -m gevent_async.worker
INFO:root:Worker started, ctrl+c to exit
INFO:temporalio.workflow:Running workflow with parameter Temporal ({'attempt': 1, 'namespace': 'default', 'run_id': '543fef67-e25d-4bd8-926a-cca7bc78fd91', 'task_queue': 'gevent_async-task-queue', 'workflow_id': 'gevent_async-workflow-id', 'workflow_type': 'GreetingWorkflow'})
ERROR:temporalio.worker._workflow:Failed handling activation on workflow with run ID 543fef67-e25d-4bd8-926a-cca7bc78fd91
Traceback (most recent call last):
File "/home/discord/.cache/pypoetry/virtualenvs/temporalio-gevent-EEP-UPYz-py3.10/lib/python3.10/site-packages/temporalio/worker/_workflow.py", line 208, in _handle_activation
completion = await asyncio.wait_for(
File "/usr/lib/python3.10/asyncio/tasks.py", line 405, in wait_for
loop = events.get_running_loop()
RuntimeError: no running event loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly. We are intentionally using native threads so that gevent doesn't try to mess with them so you can have asyncio running in a native thread and gevent doing other things elsewhere. The whole crux of the original issue is that you couldn't even use Temporal and gevent in the same process because gevent tries to take over, but this allows them to share the process space.
If we need further investigation on using greenlets maybe from activities or something, we can look into that separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cretz please correct me if my understanding is wrong: if this is intentionally using native threads, it is creating a new thread for each activity it runs on line 39 despite the comment on line 38 saying that it will "[s]ubmit our wrapper to gevent". this means that all the code executed in activities needs to be threadsafe.
our codebase isn't threadsafe (pretty common for gevent codebases i think) but we are still giving this a go to see if we can get basic execution working.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is creating a new thread for each activity it runs on line 39 despite the comment on line 38 saying that it will "[s]ubmit our wrapper to gevent"
Not a new thread, This is calling gevent.threadpool.ThreadPoolExecutor.submit which reuses existing threads (I believe). But yes, they aren't the threads shared by gevent.
this means that all the code executed in activities needs to be threadsafe.
Not async def
activities, but yes, regular def
activities in the Temporal Python SDK are executed via the executor passed and most people use concurrent.futures.ThreadPoolExecutor which does require thread safety yes.
our codebase isn't threadsafe (pretty common for gevent codebases i think) but we are still giving this a go to see if we can get basic execution working.
Yes, this is admittedly not true "gevent activities" but rather just a demonstration of showing that Temporal can live in the same process as a gevent-encumbered Python process.
We could go one further and look into having an executor specific for activities that bridged the native Python thread to greenlet gap. I did not go that far, but we can try. It'd probably just be a concurrent.futures.Executor
implementation backed by gevent.pool.Pool
.
The examples shown here never worked for us because (a) our app wasn't threadsafe and (b) we didn't want to be spawning N separate native threads. Here is a solution I put together that should only have 2 threads: one aux thread to handle asyncio things and one gevent main thread that processes activities: https://gist.github.com/alecgorge/63e52b764b10adc46d61b1773f9a2085 It doesn't handle a mixture of async and sync (gevent) activities, but that is fine for our purposes. |
Great! Yeah, that's the crux of it, is that asyncio/workflow threads and gevent threads don't mix. The entire asyncio event loop running in a single separate thread from gevent is the best way (however you can accomplish that, like y'all did). |
What was changed
Create a gevent sample showing how to use Temporal client, worker, workflows, and activities in a gevent-patched environment