Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Investigate issues with gevent.monkey.patch_all() #59

Closed
cretz opened this issue Jun 29, 2022 · 10 comments · Fixed by #403
Closed

Investigate issues with gevent.monkey.patch_all() #59

cretz opened this issue Jun 29, 2022 · 10 comments · Fixed by #403
Labels
enhancement New feature or request

Comments

@cretz
Copy link
Member

cretz commented Jun 29, 2022

Is your feature request related to a problem? Please describe.

Users on an older version of gevent (1.5.0) reported that running gevent.monkey.patch_all() breaks workflow asyncio.

Describe the solution you'd like

  • Replicate the error on that gevent version (a simple workflow is probably fine)
  • Upgrade gevent to see if it still occurs
    • If not, essentially bisect to find offending version minimum and maybe document in README
    • If so, find workaround and document in README
@cretz cretz added the enhancement New feature or request label Jun 29, 2022
@alecgorge
Copy link

Does this mean gevent with Temporal is meant to be a supported configuration? Do you need to use synchronous Activities to use gevent code?

Is there any documentation on the usage together? I tried looking for some earlier and couldn’t find it.

@cretz
Copy link
Member Author

cretz commented Jun 29, 2022

Does this mean gevent with Temporal is meant to be a supported configuration?

No, the SDK does not use gevent in any way nor depend on it in any way. This is just a case where a user had an old gevent library in their existing code and it clashed with Temporal due to some bug in gevent's global side-effecting code that may have since been fixed. This issue is simply me acknowledging it so I can test and if necessary add a warning to the README for users of older gevent libraries.

Do you need to use synchronous Activities to use gevent code? Is there any documentation on the usage together? I tried looking for some earlier and couldn’t find it.

You may have to and there is no Temporal documentation here (nor is there with any Python async library integration beyond builtin asyncio). Synchronous activities are just normal functions that can do anything you'd do in a normal Python call, so yes you do have to use those for non-async def functions. I suppose you may technically be able to use something like https://github.com/gfmio/asyncio-gevent but I have not tested and in general would not recommend altering the built-in asyncio event loop implementation (or doing any of gevent's global "magic"/"patching" if you can avoid it).

@alecgorge
Copy link

Thanks for the context. I am planning on testing our gevent setup with Temporal. I will report back the results w/gevent version once I have done so

@alecgorge
Copy link

As a follow-up, we use gevent 1.3.7 and ran into a number of issues. We haven't done a full investigation but here are some of the things we did:

  • Only used synchronous activities
  • Set activity_executor to an instance of a gevent monekypatched concurrent.future.ThreadPoolExecutor
  • Avoided usage of any WorkflowHandle.result() calls (including execute_workflow) and prevented any gevent greenlets from executing

When things worked, it was awesome! Unfortunately, in general we experienced unpredictable behavior in our code where it would mysteriously hang indefinitely (usually around monkeypatched IO calls). We haven't had time to dig in and identify exactly where the incompatibility is. We have a general sense of uneasiness of using gevent monkeypatched code with asyncio code.

If I have time, I'm going to try an experiment where I run Temporal in a process forked off before gevent monkeypatching and using a cross-process executor. I don't expect it to work, but it is a fun exercise!

@reallistic
Copy link

This is still failing on latest gevent (21.12.0) and temporalio (0.1b2).

Minimal code to reproduce:

from gevent.monkey import patch_thread; patch_thread()

import asyncio
from datetime import timedelta

from temporalio import activity
from temporalio import workflow
from temporalio.client import Client
from temporalio.service import RPCError, RPCStatusCode
from temporalio.worker import Worker

@activity.defn
async def get_name(prefix: str) -> str:
    return f"{prefix} angela"


@workflow.defn
class SimpleWorkflow:
    @workflow.run
    async def run(self) -> str:
        return await workflow.execute_activity(
            get_name,
            "Dear",
            start_to_close_timeout=timedelta(seconds=10),
        )


async def start_temporal():
    temporal_client = await Client.connect("localhost:7233")
    try:
        await temporal_client.start_workflow(
            SimpleWorkflow.run,
            id="simple_workflow",
            task_queue="test-queue",
        )
    except RPCError as e:
        if e.status is not RPCStatusCode.ALREADY_EXISTS:
            raise

    worker = Worker(
        temporal_client,
        task_queue="test-queue",
        workflows=[SimpleWorkflow],
        activities=[get_name],
    )
    await worker.run()


if __name__ == "__main__":
    asyncio.run(start_temporal())

Error

Failed handling activation on workflow with run ID f5df1f62-b1b3-4a84-8af8-0837c77e7208
Traceback (most recent call last):
  File "/Users/michase/workspace/ntflx/fastapi_temporal_poc/.venv/lib/python3.9/site-packages/temporalio/worker/workflow.py", line 186, in _handle_activation
    completion = await asyncio.wait_for(
  File "/Users/michase/.newt-cache/pyenv/versions/cpython-3.9.14/lib/python3.9/asyncio/tasks.py", line 435, in wait_for
    loop = events.get_running_loop()
RuntimeError: no running event loop
Failed completing activation on workflow with run ID f5df1f62-b1b3-4a84-8af8-0837c77e7208
Traceback (most recent call last):
  File "/Users/michase/workspace/ntflx/fastapi_temporal_poc/.venv/lib/python3.9/site-packages/temporalio/worker/workflow.py", line 234, in _handle_activation
    await self._bridge_worker().complete_workflow_activation(completion)
  File "/Users/michase/workspace/ntflx/fastapi_temporal_poc/.venv/lib/python3.9/site-packages/temporalio/bridge/worker.py", line 98, in complete_workflow_activation
    await self._ref.complete_workflow_activation(comp.SerializeToString())
RuntimeError: no running event loop

Explanation

NOTE that only threading needs to be patched

I believe there is something funky with how gevent patches thread locals and how thread locals are used to store the current event loop.

The code here that calls asyncio._set_running_loop(None) is overwriting the loop on the main thread instead of in the thread in use from spawning the workflow in a thread pool executor here. This happens on the main thread because the worker thread pool executor spawns greenlets instead of true threads. The greenlets run on the main thread. Under the hood, pure-python uses a thread local to manage the running loop and gevent should be patching that. However, cpython manages this using a c specific implementation that gevent cannot patch.

Similar to @alecgorge I tried to pass in a gevent.threadpool.ThreadPoolExecutor to the worker but the futures it returns are not asyncio compliant and raise ValueError for calls such as asyncio.ensure_future (which is invoked under wait_for). There may be a way to patch the gevent ThreadPoolExecutor in a simple subclass but I have not looked into that.

Potential fix

Use set_event_loop and get_event_loop instead. Those implementations do still use the underlying c calls for get_running_loop. However, with this I can push the worker into a real thread. The base event loop policy does not create a new loop in the non-main thread so it should prevent _get_running_loop from returning a value and, instead the event loop policy will be used for managing the loop. That maintains a thread local that can be patched.

I will add an example with this soon.

@reallistic
Copy link

After a deep dive into the asyncio code I found that a running loop always sets the c specific thread local and there is no (easy) way around that.

That said, I was able to get the gevent ThreadPoolExecutor working with the following:

    worker = Worker(
        temporal_client,
        task_queue="test-queue",
        workflows=[SimpleWorkflow],
        activities=[get_name],
        workflow_task_executor=CFGThreadPool(4)
    )
    await worker.run()

class CFGThreadPool(gevent.threadpool.ThreadPoolExecutor):
    def _set_result_and_wake(self, fut, loop, result, exc_info):
        if fut.cancelled():
            return

        # call_soon_threadsafe wakes up the loop under the hood
        if exc_info is not None:
            loop.call_soon_threadsafe(fut.set_exception, exc_info)
        else:
            loop.call_soon_threadsafe(fut.set_result, result)

    def submit(self, func, *args, **kwargs):
        loop = asyncio.get_running_loop()
        fut = concurrent.futures.Future()

        @functools.wraps(func)
        def wrapper(*w_args, **w_kwargs) -> None:

            result = None
            exc_info = None

            try:
                result = func(*w_args, **w_kwargs)
            except Exception as ex:
                exc_info = ex

            self._set_result_and_wake(fut, loop, result, exc_info)

            return result


        super().submit(wrapper, *args, **kwargs)
        return fut


@cretz
Copy link
Member Author

cretz commented Oct 10, 2022

@reallistic - Thanks for this deep dive. I have this on my schedule to dig into and even write an integration test for (if isolatable, i.e. can "unpatch" afterwards).

Use set_event_loop and get_event_loop instead.

I think this is the right approach and we should do this if it in fact fixes it. Who knows who else is setting default event loops on threads somehow. I just set to None because I thought I was in control of the thread.

@reallistic
Copy link

I just set to None because I thought I was in control of the thread.

This is 100% sane and reasonable. The issue is that gevent monkey patches pure python threading but not the c implementation.

Also, changing _(s|g)et_running_loop to (s|g)et_event_loop is a good idea to prevent changes in the private API from breaking things.

However, using those apis does not help out with gevent. This is because the main loop used to connect the client and run the Worker (not the _WorkflowInstanceImpl ) has to be running via either asyncio.run or loop.run_until_complete. Both implementations call _set_running_loop which sets the loop directly on the c thread local. The public get_event_loop call first calls _get_running_loop prior to consulting get_event_loop_policy().get_event_loop(). Thus, when gevent monkey patches threading, the main loop will always be returned effectively ignoring the public set_event_loop.

The only approach I can think of to allow gevent to work would be to use regular asyncio calls instead of a custom event loop. I recognize though that doing such an implementation would require a decent amount of work and you would lose the safeguards of preventing multi-threading etc.

@cretz
Copy link
Member Author

cretz commented Oct 10, 2022

Also, changing _(s|g)et_running_loop to (s|g)et_event_loop is a good idea to prevent changes in the private API from breaking things.

Completely agree, but there was something w/ the default event loop policy on some platform I think was causing me problems. But I forget what and I didn't document it, so will definitely try to move back to all public API.

The only approach I can think of [...]

We really have to keep our custom event loop. It's the underpinning of what makes workflows work well.

Maybe we can have a similar monkey patch that overrides gevent for the life of the run_once and then puts it back after. I'll have to check gevent, but surely we're not the first that have wanted to do advanced asyncio loop management in a gevent-enabled process. I'll have to dig into what they really do. I only need a short synchronous window to own the loop on a thread run.

@cretz
Copy link
Member Author

cretz commented Oct 11, 2023

I am happy to report I have succeeded combining gevent and Temporal: temporalio/samples-python#84. @reallistic - I basically took your approach but I created a separate executor for running the asyncio event loop in a native gevent thread (separate from the executor that is used to execute workflow/activity tasks).

I will have to update the README in this repo with the update and then I'll be able to close this out I think unless I'm missing something.

cretz added a commit to cretz/temporal-sdk-python that referenced this issue Oct 20, 2023
cretz added a commit that referenced this issue Oct 24, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants