Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gevent sample #84

Merged
merged 3 commits into from
Oct 20, 2023
Merged

Gevent sample #84

merged 3 commits into from
Oct 20, 2023

Conversation

cretz
Copy link
Member

@cretz cretz commented Oct 11, 2023

What was changed

Create a gevent sample showing how to use Temporal client, worker, workflows, and activities in a gevent-patched environment

# Swallow InvalidStateError in case Python future was cancelled
try:
python_fut.set_exception(exc)
except:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this might end up in production code, I suggest catching the specific exception that you indicate in the comment.

Copy link
Member Author

@cretz cretz Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically we support 3.7+ and this class didn't get added until 3.8 and we try to write our code for all versions. Adding this makes the 3.7 CI angry (even though technically this sample only works with 3.8 because gevent only works with 3.8).

gevent_async/starter.py Outdated Show resolved Hide resolved
@josh-berry
Copy link
Contributor

josh-berry commented Oct 11, 2023

Tried running this on Mac and the worker hangs on Client.connect("localhost:7233") (with the latest released CLI). Looking into why. (And yes, I did make sure the dev server is running.)

@cretz cretz marked this pull request as draft October 13, 2023 12:17
@cretz
Copy link
Member Author

cretz commented Oct 13, 2023

This does not work on Linux, move to draft while still investigating...

@cretz
Copy link
Member Author

cretz commented Oct 13, 2023

Fixed, please review

@cretz cretz requested a review from a team October 13, 2023 13:16
@cretz cretz marked this pull request as ready for review October 13, 2023 13:17
@josh-berry
Copy link
Contributor

Confirmed it works on Mac now too.

Copy link
Contributor

@josh-berry josh-berry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one suggestion on the documentation. Otherwise LGTM.

Comment on lines +16 to +19
# Gevent's returned futures do not map well to Python futures, so we
# must translate. We can't just use set_result/set_exception because
# done callbacks are not always called in gevent's case and it doesn't
# seem to support cancel, so we instead wrap the caller function.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[non-blocking/optional] I think it might be clearer to explicitly state "Gevent's returned futures do not implement all of the things expected by concurrent.futures.Future" rather than saying they "do not map well". I wasn't sure what the latter meant, so I wound up reading gevent code to figure out what this meant.

@cretz cretz merged commit c587d20 into temporalio:main Oct 20, 2023
7 checks passed
@cretz cretz deleted the gevent branch October 20, 2023 12:29
P = ParamSpec("P")


class GeventExecutor(threadpool.ThreadPoolExecutor):
Copy link

@abeland abeland Oct 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @cretz, I believe gevent.threadpool.ThreadPoolExecutor is the unpatched thread pool, so really this PR is just using native threads.

cf. https://www.gevent.org/api/gevent.threadpool.html#gevent.threadpool.ThreadPoolExecutor :

A version of concurrent.futures.ThreadPoolExecutor that always uses native threads, even when threading is monkey-patched.

I verified by running this code locally with the patched concurrent.futures.ThreadPoolExecutor and it throws this error, as seen before in our explorations:

[~/temporalio_gevent/src] ∴ ~/.local/bin/poetry run python -m gevent_async.worker
INFO:root:Worker started, ctrl+c to exit
INFO:temporalio.workflow:Running workflow with parameter Temporal ({'attempt': 1, 'namespace': 'default', 'run_id': '543fef67-e25d-4bd8-926a-cca7bc78fd91', 'task_queue': 'gevent_async-task-queue', 'workflow_id': 'gevent_async-workflow-id', 'workflow_type': 'GreetingWorkflow'})
ERROR:temporalio.worker._workflow:Failed handling activation on workflow with run ID 543fef67-e25d-4bd8-926a-cca7bc78fd91
Traceback (most recent call last):
  File "/home/discord/.cache/pypoetry/virtualenvs/temporalio-gevent-EEP-UPYz-py3.10/lib/python3.10/site-packages/temporalio/worker/_workflow.py", line 208, in _handle_activation
    completion = await asyncio.wait_for(
  File "/usr/lib/python3.10/asyncio/tasks.py", line 405, in wait_for
    loop = events.get_running_loop()
RuntimeError: no running event loop

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly. We are intentionally using native threads so that gevent doesn't try to mess with them so you can have asyncio running in a native thread and gevent doing other things elsewhere. The whole crux of the original issue is that you couldn't even use Temporal and gevent in the same process because gevent tries to take over, but this allows them to share the process space.

If we need further investigation on using greenlets maybe from activities or something, we can look into that separately.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cretz please correct me if my understanding is wrong: if this is intentionally using native threads, it is creating a new thread for each activity it runs on line 39 despite the comment on line 38 saying that it will "[s]ubmit our wrapper to gevent". this means that all the code executed in activities needs to be threadsafe.

our codebase isn't threadsafe (pretty common for gevent codebases i think) but we are still giving this a go to see if we can get basic execution working.

Copy link
Member Author

@cretz cretz Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is creating a new thread for each activity it runs on line 39 despite the comment on line 38 saying that it will "[s]ubmit our wrapper to gevent"

Not a new thread, This is calling gevent.threadpool.ThreadPoolExecutor.submit which reuses existing threads (I believe). But yes, they aren't the threads shared by gevent.

this means that all the code executed in activities needs to be threadsafe.

Not async def activities, but yes, regular def activities in the Temporal Python SDK are executed via the executor passed and most people use concurrent.futures.ThreadPoolExecutor which does require thread safety yes.

our codebase isn't threadsafe (pretty common for gevent codebases i think) but we are still giving this a go to see if we can get basic execution working.

Yes, this is admittedly not true "gevent activities" but rather just a demonstration of showing that Temporal can live in the same process as a gevent-encumbered Python process.

We could go one further and look into having an executor specific for activities that bridged the native Python thread to greenlet gap. I did not go that far, but we can try. It'd probably just be a concurrent.futures.Executor implementation backed by gevent.pool.Pool.

@alecgorge
Copy link

The examples shown here never worked for us because (a) our app wasn't threadsafe and (b) we didn't want to be spawning N separate native threads.

Here is a solution I put together that should only have 2 threads: one aux thread to handle asyncio things and one gevent main thread that processes activities: https://gist.github.com/alecgorge/63e52b764b10adc46d61b1773f9a2085

It doesn't handle a mixture of async and sync (gevent) activities, but that is fine for our purposes.

@cretz
Copy link
Member Author

cretz commented Mar 20, 2024

Great! Yeah, that's the crux of it, is that asyncio/workflow threads and gevent threads don't mix. The entire asyncio event loop running in a single separate thread from gevent is the best way (however you can accomplish that, like y'all did).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants