Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Describe task queue #656

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 217 additions & 0 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,46 @@ async def get_worker_task_reachability(
)
)

async def describe_task_queue(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we considered a top level scaling/tuning API instead that takes task queue types as a filter? That might be more discoverable for somebody perusing the CLI/API command surface, who knows they want to do scaling but doesn't think of that when they see task queue.
It would also allow us to add deploy group as a filter once that ships.
And finally, it could allow us to embed other information within the API call such as related namespace-level information, for example if the namespace has rate limits that should also be taken into account.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional context. Versioning team originally shipped APIs within task queue, and Max asked us to make them top-level and simply refer to task queue from them. (Later on, we decided to switch to deploy group, but the top-level API concept remains.)
This feels like it could be analogous.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we considered a top level scaling/tuning API instead that takes task queue types as a filter?

I am not sure. This is the Python review for the SDK API that already exists in Go and calls the already existing server API. I think existential questions concerning the API may need to be asked in a place with broader reach.

self,
task_queue: str,
task_queue_types: Sequence[TaskQueueType] = [],
report_pollers: bool = False,
report_stats: bool = False,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> TaskQueueDescription:
"""Describe task queue.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This comment is redundant. Can you give a flavor for the type of information I should expect as a response and why I would use each one?

At this point I assumed I would get some info if I check neither and was wondering what it was. Wasn't until I looked at the code that I realized it wouldn't work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I will clarify that this returns poller and stats information and that currently you need to provide at least one of the booleans.


.. note::
This is only for unversioned workers. Worker versioning is not yet
supported for describing task queue.

Args:
task_queue: Name of the task queue. Sticky queues are not supported.
task_queue_types: Task queue types to report info about. If not
present or empty, all types are considered.
report_pollers: Include list of pollers for requested task queue types.
report_stats: Include task queue stats for requested task queue types.
Comment on lines +1130 to +1131

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these toggles here because it could degrade server performance to check them? Is there any kind of throughput limitation people should be aware of? (Otherwise, why are they necessary?)

Copy link
Member Author

@cretz cretz Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how the API was designed server side, but I would guess that they made these opt-in for performance reasons, yes. This is consistent with the Go SDK API that exists today already.

rpc_metadata: Headers used on each RPC call. Keys here override
client-level RPC metadata keys.
rpc_timeout: Optional RPC deadline to set for each RPC call.
"""
if not report_pollers and not report_stats:
raise ValueError(
"At least one of report_pollers or report_stats must be True"
)
Comment on lines +1136 to +1139

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Oh I see, so one is required. I assumed not since they were both optional, and there was a default set of info.
If you want, you could avoid this friction, and runtime errors, by having a required enum field for either/both.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An enum is a bit rough here because there are more coming (e.g. "report reachability"). We designed this as simple booleans to match Go SDK.

return await self._impl.describe_task_queue(
DescribeTaskQueueInput(
task_queue,
task_queue_types,
report_pollers,
report_stats,
rpc_metadata,
rpc_timeout,
)
)


class ClientConfig(TypedDict, total=False):
"""TypedDict of config originally passed to :py:meth:`Client`."""
Expand Down Expand Up @@ -4814,6 +4854,18 @@ class GetWorkerTaskReachabilityInput:
rpc_timeout: Optional[timedelta]


@dataclass
class DescribeTaskQueueInput:
"""Input for :py:meth:`OutboundInterceptor.describe_task_queue`."""

task_queue: str
task_queue_types: Sequence[TaskQueueType]
report_pollers: bool
report_stats: bool
rpc_metadata: Mapping[str, str]
rpc_timeout: Optional[timedelta]


@dataclass
class Interceptor:
"""Interceptor for clients.
Expand Down Expand Up @@ -4983,6 +5035,14 @@ async def get_worker_task_reachability(
"""Called for every :py:meth:`Client.get_worker_task_reachability` call."""
return await self.next.get_worker_task_reachability(input)

### Other

async def describe_task_queue(
self, input: DescribeTaskQueueInput
) -> TaskQueueDescription:
"""Called for every :py:meth:`Client.describe_task_queue` call."""
return await self.next.describe_task_queue(input)


class _ClientImpl(OutboundInterceptor):
def __init__(self, client: Client) -> None:
Expand Down Expand Up @@ -5726,6 +5786,27 @@ async def get_worker_task_reachability(
)
return WorkerTaskReachability._from_proto(resp)

### Other calls

async def describe_task_queue(
self, input: DescribeTaskQueueInput
) -> TaskQueueDescription:
req = temporalio.api.workflowservice.v1.DescribeTaskQueueRequest(
namespace=self._client.namespace,
task_queue=temporalio.api.taskqueue.v1.TaskQueue(name=input.task_queue),
api_mode=temporalio.api.enums.v1.DescribeTaskQueueMode.DESCRIBE_TASK_QUEUE_MODE_ENHANCED,
task_queue_types=[
temporalio.api.enums.v1.TaskQueueType.ValueType(t)
for t in input.task_queue_types
],
report_pollers=input.report_pollers,
report_stats=input.report_stats,
)
resp = await self._client.workflow_service.describe_task_queue(
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout
)
return TaskQueueDescription._from_proto(resp)


def _history_from_json(
history: Union[str, Dict[str, Any]],
Expand Down Expand Up @@ -6114,6 +6195,142 @@ def _to_proto(self) -> temporalio.api.enums.v1.TaskReachability.ValueType:
)


class TaskQueueType(IntEnum):
"""Type of task queue.

See :py:class:`temporalio.api.enums.v1.TaskQueueType`.
"""

WORKFLOW = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW)
ACTIVITY = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY)
NEXUS = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_NEXUS)


@dataclass
class TaskQueueDescription:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How this struct will look like once the per-buildId info is added?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't know, so we are trying not to guess. We are trying to build this as if versioning does not exist from a user POV (because it doesn't).

"""Description of a task queue."""

types: Mapping[TaskQueueType, TaskQueueTypeInfo]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer the following.

Suggested change
types: Mapping[TaskQueueType, TaskQueueTypeInfo]
unversioned_types: Mapping[TaskQueueType, TaskQueueTypeInfo]

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning is as follows.

The following diagram shows different "levels of task queue". ["Task Queue" is a overloaded term and it's been used to refer to any of the boxes in this diagram! In the server code at least we've refactored things and gave each box a different name.]

Now, from users perspective, Task Queue almost alway is the most top-level, black box. So DescribeTaskQueue, without any other specification, should describe that box.

Unversioned is just a slice of the whole Task Queue. A top-level field such as types is reasonable only if it maps to all slices (i.e. unversioned + build IDs). As server does not return all slices (and we rely on user specify the slices as of now), we cannot have a such top-level field. Maybe we can add it in the future.

As long as only unversioned stats are desired, it's totally fine, IMO, to put it in the top level proto but it should be qualified as such so user does not interpret it as something that holds the stats for all slices.

image

cc @cretz @Sushisource @dnr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

Copy link
Member Author

@cretz cretz Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think people not using versioning should not have to know anything about versioning or have the term versioning in their code anywhere. Can we pretend these stats were made available before versioning was a thing (which is kinda the case since versioning is not a thing yet)?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should call things what they really are. We cannot pretend versioning is not there because it is. Even if it is pre-release, we know it's going to be public-preview and GA eventually.

If we call this only types it'll be dangerous or confusing for people who use versioning.

In your proposal as well you do have a note that talks about versioning and unversioned, so it's not like those terms will be completely absent from developers mind.

Lastly, I'm not sure if not having the term "unversioned" in the code anywhere for users not using versioning should be a goal TBH. I agree that they should not have to use term "version" or "build ID" in their code though.

Copy link
Member Author

@cretz cretz Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should call things what they really are

I don't expect we're going to change Worker to UnversionedWorker either. We accept that the default is unversioned and we don't infect the default with the word unversioned anywhere. I do not think we should inconsistently start doing that.

In your proposal as well you do have a note that talks about versioning and unversioned, so it's not like those terms will be completely absent from developers mind.

I don't propose removing documentation about versioning, just API terms for those not using versioning.

If we call this only types it'll be dangerous or confusing for people who use versioning.

I think we can document this away. Same as anywhere else where unversioned default differs for people using versioning.

Versioning does not meaningfully exist right now, I don't think we should code as if it does. We're only punishing people with this terminology because we waited to implement backlog stats until some versioning parts were put into the API.

"""
Task queue type information, keyed by task queue type.

.. note::
This is only for unversioned workers. Worker versioning is not yet
supported for task queue description.
"""

@staticmethod
def _from_proto(
resp: temporalio.api.workflowservice.v1.DescribeTaskQueueResponse,
) -> TaskQueueDescription:
return TaskQueueDescription(
types={
TaskQueueType(type): TaskQueueTypeInfo._from_proto(info)
for type, info in resp.versions_info[""].types_info.items()
}
)


@dataclass
class TaskQueueTypeInfo:
"""Information for a specific task queue type."""

pollers: Sequence[TaskQueuePollerInfo]
"""
Information about recent pollers, or empty if not requested or none
recently.
"""

stats: Optional[TaskQueueStats]
"""Task queue stats, or none if not requested."""

@staticmethod
def _from_proto(
info: temporalio.api.taskqueue.v1.TaskQueueTypeInfo,
) -> TaskQueueTypeInfo:
return TaskQueueTypeInfo(
pollers=[
TaskQueuePollerInfo._from_proto(poller_info)
for poller_info in info.pollers
],
stats=TaskQueueStats._from_proto(info.stats)
if info.HasField("stats")
else None,
)


@dataclass
class TaskQueuePollerInfo:
"""Information for a specific task queue poller."""

last_access_time: Optional[datetime]
"""Time of the last poll if any."""

identity: str
"""Identity of the worker/client who is polling this task queue."""

rate_per_second: Optional[float]
"""Polling rate."""

@staticmethod
def _from_proto(
info: temporalio.api.taskqueue.v1.PollerInfo,
) -> TaskQueuePollerInfo:
return TaskQueuePollerInfo(
last_access_time=info.last_access_time.ToDatetime().replace(
tzinfo=timezone.utc
)
if info.HasField("last_access_time")
else None,
identity=info.identity,
rate_per_second=info.rate_per_second if info.rate_per_second != 0 else None,
)


@dataclass
class TaskQueueStats:
"""Statistics for a specific task queue type."""

approximate_backlog_count: int
"""
The approximate number of tasks backlogged in this task queue. May count
expired tasks but eventually converges to the right value.
"""

approximate_backlog_age: timedelta
"""
Approximate age of the oldest task in the backlog based on the create
timestamp of the task at the head of the queue.
"""

backlog_increase_rate: float
""":py:attr:`tasks_add_rate` - :py:attr:`tasks_dispatch_rate`"""

tasks_add_rate: float
"""
Approximate tasks per second added to the task queue based on activity
within a fixed window. This includes both backlogged and sync-matched tasks.
"""

tasks_dispatch_rate: float
"""
Approximate tasks per second dispatched to workers based on activity within
a fixed window. This includes both backlogged and sync-matched tasks.
"""

@staticmethod
def _from_proto(
stats: temporalio.api.taskqueue.v1.TaskQueueStats,
) -> TaskQueueStats:
return TaskQueueStats(
approximate_backlog_count=stats.approximate_backlog_count,
approximate_backlog_age=stats.approximate_backlog_age.ToTimedelta(),
backlog_increase_rate=stats.tasks_add_rate - stats.tasks_dispatch_rate,
tasks_add_rate=stats.tasks_add_rate,
tasks_dispatch_rate=stats.tasks_dispatch_rate,
)


class CloudOperationsClient:
"""Client for accessing Temporal Cloud Operations API.

Expand Down
56 changes: 56 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
SignalWorkflowInput,
StartWorkflowInput,
StartWorkflowUpdateInput,
TaskQueueType,
TaskReachabilityType,
TerminateWorkflowInput,
WorkflowContinuedAsNewError,
Expand Down Expand Up @@ -1323,3 +1324,58 @@ async def test_cloud_client_simple():
GetNamespaceRequest(namespace=os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"])
)
assert os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"] == result.namespace.namespace


@workflow.defn
class TaskQueueDescribeWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return f"Hello, {name}!"


async def test_describe_task_queue(client: Client):
task_queue = f"tq-{uuid.uuid4()}"
# Simple describe when nothing present
desc = await client.describe_task_queue(
task_queue, report_pollers=True, report_stats=True
)
# Confirm activity and workflow have no pollers
assert not desc.types[TaskQueueType.ACTIVITY].pollers
assert not desc.types[TaskQueueType.WORKFLOW].pollers

# Confirm no add rate
stats = desc.types[TaskQueueType.ACTIVITY].stats
assert stats and stats.tasks_add_rate == 0.0
stats = desc.types[TaskQueueType.WORKFLOW].stats
assert stats and stats.tasks_add_rate == 0.0

# Run some workflows
async with new_worker(
client, TaskQueueDescribeWorkflow, task_queue=task_queue
) as worker:
for i in range(10):
await client.execute_workflow(
TaskQueueDescribeWorkflow.run,
f"user{i}",
id=f"tq-{uuid.uuid4()}",
task_queue=task_queue,
)

# Describe again (while poller still running)
desc = await client.describe_task_queue(
task_queue, report_pollers=True, report_stats=True
)

# Confirm activity still has no pollers, but workflow has this one
assert not desc.types[TaskQueueType.ACTIVITY].pollers
assert len(desc.types[TaskQueueType.WORKFLOW].pollers) == 1
assert (
desc.types[TaskQueueType.WORKFLOW].pollers[0].identity
== client.service_client.config.identity
)

# Confirm activity still has no stats, but workflow does
stats = desc.types[TaskQueueType.ACTIVITY].stats
assert stats and stats.tasks_add_rate == 0.0
stats = desc.types[TaskQueueType.WORKFLOW].stats
assert stats and stats.tasks_add_rate != 0.0
Loading