From 5e57616e98414a97e85b52e060d638a74c745adc Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Mon, 30 Sep 2024 10:11:57 -0500 Subject: [PATCH 1/2] Describe task queue --- temporalio/client.py | 204 +++++++++++++++++++++++++++++++++++++++++++ tests/test_client.py | 56 ++++++++++++ 2 files changed, 260 insertions(+) diff --git a/temporalio/client.py b/temporalio/client.py index 7e5992f7..5b4a944e 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -1108,6 +1108,45 @@ async def get_worker_task_reachability( ) ) + async def describe_task_queue( + self, + task_queue: str, + task_queue_types: Sequence[TaskQueueType] = [], + report_pollers: bool = False, + report_stats: bool = False, + rpc_metadata: Mapping[str, str] = {}, + rpc_timeout: Optional[timedelta] = None, + ) -> TaskQueueDescription: + """ + Describe task queue. + + .. note:: + This is only for unversioned workers. Worker versioning is not yet + supported for describing task queue. + + Args: + task_queue: Name of the task queue. Sticky queues are not supported. + task_queue_types: Task queue types to report info about. If not + present or empty, all types are considered. + report_pollers: Include list of pollers for requested task queue types. + report_stats: Include task queue stats for requested task queue types. + """ + + if not report_pollers and not report_stats: + raise ValueError( + "At least one of report_pollers or report_stats must be True" + ) + return await self._impl.describe_task_queue( + DescribeTaskQueueInput( + task_queue, + task_queue_types, + report_pollers, + report_stats, + rpc_metadata, + rpc_timeout, + ) + ) + class ClientConfig(TypedDict, total=False): """TypedDict of config originally passed to :py:meth:`Client`.""" @@ -4814,6 +4853,18 @@ class GetWorkerTaskReachabilityInput: rpc_timeout: Optional[timedelta] +@dataclass +class DescribeTaskQueueInput: + """Input for :py:meth:`OutboundInterceptor.describe_task_queue`.""" + + task_queue: str + task_queue_types: Sequence[TaskQueueType] + report_pollers: bool + report_stats: bool + rpc_metadata: Mapping[str, str] + rpc_timeout: Optional[timedelta] + + @dataclass class Interceptor: """Interceptor for clients. @@ -4983,6 +5034,13 @@ async def get_worker_task_reachability( """Called for every :py:meth:`Client.get_worker_task_reachability` call.""" return await self.next.get_worker_task_reachability(input) + ### Other + + async def describe_task_queue( + self, input: DescribeTaskQueueInput + ) -> TaskQueueDescription: + return await self.next.describe_task_queue(input) + class _ClientImpl(OutboundInterceptor): def __init__(self, client: Client) -> None: @@ -5726,6 +5784,27 @@ async def get_worker_task_reachability( ) return WorkerTaskReachability._from_proto(resp) + ### Other calls + + async def describe_task_queue( + self, input: DescribeTaskQueueInput + ) -> TaskQueueDescription: + req = temporalio.api.workflowservice.v1.DescribeTaskQueueRequest( + namespace=self._client.namespace, + task_queue=temporalio.api.taskqueue.v1.TaskQueue(name=input.task_queue), + api_mode=temporalio.api.enums.v1.DescribeTaskQueueMode.DESCRIBE_TASK_QUEUE_MODE_ENHANCED, + task_queue_types=[ + temporalio.api.enums.v1.TaskQueueType.ValueType(t) + for t in input.task_queue_types + ], + report_pollers=input.report_pollers, + report_stats=input.report_stats, + ) + resp = await self._client.workflow_service.describe_task_queue( + req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout + ) + return TaskQueueDescription._from_proto(resp) + def _history_from_json( history: Union[str, Dict[str, Any]], @@ -6114,6 +6193,131 @@ def _to_proto(self) -> temporalio.api.enums.v1.TaskReachability.ValueType: ) +class TaskQueueType(IntEnum): + WORKFLOW = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW) + ACTIVITY = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY) + NEXUS = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_NEXUS) + + +@dataclass +class TaskQueueDescription: + types: Mapping[TaskQueueType, TaskQueueTypeInfo] + """ + Task queue type information, keyed by task queue type. + + .. note:: + This is only for unversioned workers. Worker versioning is not yet + supported for task queue description. + """ + + @staticmethod + def _from_proto( + resp: temporalio.api.workflowservice.v1.DescribeTaskQueueResponse, + ) -> TaskQueueDescription: + return TaskQueueDescription( + types={ + TaskQueueType(type): TaskQueueTypeInfo._from_proto(info) + for type, info in resp.versions_info[""].types_info.items() + } + ) + + +@dataclass +class TaskQueueTypeInfo: + pollers: Sequence[TaskQueuePollerInfo] + """ + Information about recent pollers, or empty if not requested or none + recently. + """ + + stats: Optional[TaskQueueStats] + """ + Task queue stats, or none if not requested. + """ + + @staticmethod + def _from_proto( + info: temporalio.api.taskqueue.v1.TaskQueueTypeInfo, + ) -> TaskQueueTypeInfo: + return TaskQueueTypeInfo( + pollers=[ + TaskQueuePollerInfo._from_proto(poller_info) + for poller_info in info.pollers + ], + stats=TaskQueueStats._from_proto(info.stats) + if info.HasField("stats") + else None, + ) + + +@dataclass +class TaskQueuePollerInfo: + last_access_time: Optional[datetime] + # Time of the last poll if any. + + identity: str + # Identity of the worker/client who is polling this task queue. + + rate_per_second: Optional[float] + # Polling rate. + + @staticmethod + def _from_proto( + info: temporalio.api.taskqueue.v1.PollerInfo, + ) -> TaskQueuePollerInfo: + return TaskQueuePollerInfo( + last_access_time=info.last_access_time.ToDatetime().replace( + tzinfo=timezone.utc + ) + if info.HasField("last_access_time") + else None, + identity=info.identity, + rate_per_second=info.rate_per_second if info.rate_per_second != 0 else None, + ) + + +@dataclass +class TaskQueueStats: + approximate_backlog_count: int + """ + The approximate number of tasks backlogged in this task queue. May count + expired tasks but eventually converges to the right value. + """ + + approximate_backlog_age: timedelta + """ + Approximate age of the oldest task in the backlog based on the create + timestamp of the task at the head of the queue. + """ + + backlog_increase_rate: float + """:py:attr:`tasks_add_rate` - :py:attr:`tasks_dispatch_rate`""" + + tasks_add_rate: float + """ + Approximate tasks per second added to the task queue based on activity + within a fixed window. This includes both backlogged and sync-matched tasks. + """ + + tasks_dispatch_rate: float + """ + Approximate tasks per second dispatched to workers based on activity within + a fixed window. This includes both backlogged and sync-matched tasks. + """ + + @staticmethod + def _from_proto( + stats: temporalio.api.taskqueue.v1.TaskQueueStats, + ) -> TaskQueueStats: + return TaskQueueStats( + approximate_backlog_count=stats.approximate_backlog_count, + approximate_backlog_age=stats.approximate_backlog_age.ToTimedelta(), + backlog_increase_rate=stats.tasks_add_rate - stats.tasks_dispatch_rate, + tasks_add_rate=stats.tasks_add_rate, + tasks_dispatch_rate=stats.tasks_dispatch_rate, + ) + + class CloudOperationsClient: """Client for accessing Temporal Cloud Operations API. diff --git a/tests/test_client.py b/tests/test_client.py index 03ce68ff..98245a02 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -65,6 +65,7 @@ SignalWorkflowInput, StartWorkflowInput, StartWorkflowUpdateInput, + TaskQueueType, TaskReachabilityType, TerminateWorkflowInput, WorkflowContinuedAsNewError, @@ -1323,3 +1324,58 @@ async def test_cloud_client_simple(): GetNamespaceRequest(namespace=os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"]) ) assert os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"] == result.namespace.namespace + + +@workflow.defn +class TaskQueueDescribeWorkflow: + @workflow.run + async def run(self, name: str) -> str: + return f"Hello, {name}!" + + +async def test_describe_task_queue(client: Client): + task_queue = f"tq-{uuid.uuid4()}" + # Simple describe when nothing present + desc = await client.describe_task_queue( + task_queue, report_pollers=True, report_stats=True + ) + # Confirm activity and workflow have no pollers + assert not desc.types[TaskQueueType.ACTIVITY].pollers + assert not desc.types[TaskQueueType.WORKFLOW].pollers + + # Confirm no add rate + stats = desc.types[TaskQueueType.ACTIVITY].stats + assert stats and stats.tasks_add_rate == 0.0 + stats = desc.types[TaskQueueType.WORKFLOW].stats + assert stats and stats.tasks_add_rate == 0.0 + + # Run some workflows + async with new_worker( + client, TaskQueueDescribeWorkflow, task_queue=task_queue + ) as worker: + for i in range(10): + await client.execute_workflow( + TaskQueueDescribeWorkflow.run, + f"user{i}", + id=f"tq-{uuid.uuid4()}", + task_queue=task_queue, + ) + + # Describe again (while poller still running) + desc = await client.describe_task_queue( + task_queue, report_pollers=True, report_stats=True + ) + + # Confirm activity still has no pollers, but workflow has this one + assert not desc.types[TaskQueueType.ACTIVITY].pollers + assert len(desc.types[TaskQueueType.WORKFLOW].pollers) == 1 + assert ( + desc.types[TaskQueueType.WORKFLOW].pollers[0].identity + == client.service_client.config.identity + ) + + # Confirm activity still has no stats, but workflow does + stats = desc.types[TaskQueueType.ACTIVITY].stats + assert stats and stats.tasks_add_rate == 0.0 + stats = desc.types[TaskQueueType.WORKFLOW].stats + assert stats and stats.tasks_add_rate != 0.0 From a9970790d4a8340a6ba6fd50b3e32ce21199e09e Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Mon, 30 Sep 2024 10:19:52 -0500 Subject: [PATCH 2/2] Minor updates --- temporalio/client.py | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index 5b4a944e..d54f7b8d 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -1117,8 +1117,7 @@ async def describe_task_queue( rpc_metadata: Mapping[str, str] = {}, rpc_timeout: Optional[timedelta] = None, ) -> TaskQueueDescription: - """ - Describe task queue. + """Describe task queue. .. note:: This is only for unversioned workers. Worker versioning is not yet @@ -1130,8 +1129,10 @@ async def describe_task_queue( present or empty, all types are considered. report_pollers: Include list of pollers for requested task queue types. report_stats: Include task queue stats for requested task queue types. + rpc_metadata: Headers used on each RPC call. Keys here override + client-level RPC metadata keys. + rpc_timeout: Optional RPC deadline to set for each RPC call. """ - if not report_pollers and not report_stats: raise ValueError( "At least one of report_pollers or report_stats must be True" @@ -5039,6 +5040,7 @@ async def get_worker_task_reachability( async def describe_task_queue( self, input: DescribeTaskQueueInput ) -> TaskQueueDescription: + """Called for every :py:meth:`Client.describe_task_queue` call.""" return await self.next.describe_task_queue(input) @@ -6194,6 +6196,11 @@ def _to_proto(self) -> temporalio.api.enums.v1.TaskReachability.ValueType: class TaskQueueType(IntEnum): + """Type of task queue. + + See :py:class:`temporalio.api.enums.v1.TaskQueueType`. + """ + WORKFLOW = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW) ACTIVITY = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY) NEXUS = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_NEXUS) @@ -6201,6 +6208,8 @@ class TaskQueueType(IntEnum): @dataclass class TaskQueueDescription: + """Description of a task queue.""" + types: Mapping[TaskQueueType, TaskQueueTypeInfo] """ Task queue type information, keyed by task queue type. @@ -6224,6 +6233,8 @@ def _from_proto( @dataclass class TaskQueueTypeInfo: + """Information for a specific task queue type.""" + pollers: Sequence[TaskQueuePollerInfo] """ Information about recent pollers, or empty if not requested or none @@ -6231,9 +6242,7 @@ class TaskQueueTypeInfo: """ stats: Optional[TaskQueueStats] - """ - Task queue stats, or none if not requested. - """ + """Task queue stats, or none if not requested.""" @staticmethod def _from_proto( @@ -6252,14 +6261,16 @@ def _from_proto( @dataclass class TaskQueuePollerInfo: + """Information for a specific task queue poller.""" + last_access_time: Optional[datetime] - # Time of the last poll if any. + """Time of the last poll if any.""" identity: str - # Identity of the worker/client who is polling this task queue. + """Identity of the worker/client who is polling this task queue.""" rate_per_second: Optional[float] - # Polling rate. + """Polling rate.""" @staticmethod def _from_proto( @@ -6278,6 +6289,8 @@ def _from_proto( @dataclass class TaskQueueStats: + """Statistics for a specific task queue type.""" + approximate_backlog_count: int """ The approximate number of tasks backlogged in this task queue. May count