diff --git a/gevent_async/executor.py b/gevent_async/executor.py index 16c8896e..2efec74e 100644 --- a/gevent_async/executor.py +++ b/gevent_async/executor.py @@ -1,5 +1,5 @@ import functools -from concurrent.futures import Future +from concurrent.futures import Future, InvalidStateError from typing import Callable, TypeVar from gevent import threadpool @@ -26,13 +26,13 @@ def wrapper(*w_args: P.args, **w_kwargs: P.kwargs) -> None: # Swallow InvalidStateError in case Python future was cancelled try: python_fut.set_result(result) - except: + except InvalidStateError: pass except Exception as exc: # Swallow InvalidStateError in case Python future was cancelled try: python_fut.set_exception(exc) - except: + except InvalidStateError: pass # Submit our wrapper to gevent diff --git a/gevent_async/starter.py b/gevent_async/starter.py index 54430036..43e8356b 100644 --- a/gevent_async/starter.py +++ b/gevent_async/starter.py @@ -30,7 +30,7 @@ async def async_main(): result = await client.execute_workflow( workflow.GreetingWorkflow.run, "Temporal", - id=f"gevent_async-workflow-id", + id="gevent_async-workflow-id", task_queue="gevent_async-task-queue", ) logging.info(f"Workflow result: {result}") diff --git a/gevent_async/worker.py b/gevent_async/worker.py index 8a119dc7..72f1e1bd 100644 --- a/gevent_async/worker.py +++ b/gevent_async/worker.py @@ -17,25 +17,26 @@ def main(): logging.basicConfig(level=logging.INFO) - # Create a new event loop so we can run_until_complete on it. We could - # just use asyncio.run like starter does, but this approach allows us to - # listen for a signal and stop the worker gracefully. - loop = asyncio.new_event_loop() - - # Make SIGINT trigger an event that will shutdown the worker - interrupt_event = asyncio.Event() - gevent.signal_handler(signal.SIGINT, loop.call_soon_threadsafe, interrupt_event.set) - - # Create single-worker gevent executor to run event loop, waiting for - # result. This executor cannot be used for anything else in Temporal, it is - # just a single thread for running asyncio. This means that inside of - # async_main we must create another executor specifically for executing - # activity and workflow tasks. + # Create single-worker gevent executor and run asyncio.run(async_main()) in + # it, waiting for result. This executor cannot be used for anything else in + # Temporal, it is just a single thread for running asyncio. This means that + # inside of async_main we must create another executor specifically for + # executing activity and workflow tasks. with GeventExecutor(max_workers=1) as executor: - executor.submit(loop.run_until_complete, async_main(interrupt_event)).result() + executor.submit(asyncio.run, async_main()).result() -async def async_main(interrupt_event: asyncio.Event): +async def async_main(): + # Create ctrl+c handler. We do this by telling gevent on SIGINT to set the + # asyncio event. But asyncio calls are not thread safe, so we have to invoke + # it via call_soon_threadsafe. + interrupt_event = asyncio.Event() + gevent.signal_handler( + signal.SIGINT, + asyncio.get_running_loop().call_soon_threadsafe, + interrupt_event.set, + ) + # Connect client client = await Client.connect("localhost:7233")