Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: task pagination #616

Merged
merged 8 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions examples/task_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@
task_request = TaskCreateRequest(flux=flux, org=org, description="Task Description", status="active")
task = tasks_api.create_task(task_create_request=task_request)
print(task)

tasks = tasks_api.find_tasks_iter()

# print all tasks id
for task in tasks:
print(task.id)
59 changes: 58 additions & 1 deletion influxdb_client/client/tasks_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,23 @@
AddResourceMemberRequestBody, RunManually, Run, LogEvent


class _TasksIterator:
def __init__(self, values, next) -> None:
self.values = values
self.next = next
Sciator marked this conversation as resolved.
Show resolved Hide resolved

def __iter__(self):
return self if self.values else (_ for _ in ())

def __next__(self):
if not self.values:
if self.next:
self.values, self.next = self.next()
if not self.values:
raise StopIteration
return self.values.pop(0)
Sciator marked this conversation as resolved.
Show resolved Hide resolved


class TasksApi(object):
"""Implementation for '/api/v2/tasks' endpoint."""

Expand All @@ -25,7 +42,7 @@ def find_task_by_id(self, task_id) -> Task:
return task

def find_tasks(self, **kwargs):
"""List all tasks.
"""List all tasks up to set limit (max 500).

:key str name: only returns tasks with the specified name
:key str after: returns tasks after specified ID
Expand All @@ -37,6 +54,46 @@ def find_tasks(self, **kwargs):
"""
return self._service.get_tasks(**kwargs).tasks

def _find_tasks_paged(self, **kwargs):
"""List all tasks with ability to list next tasks after limit.

:key str name: only returns tasks with the specified name
:key str after: returns tasks after specified ID
:key str user: filter tasks to a specific user ID
:key str org: filter tasks to a specific organization name
:key str org_id: filter tasks to a specific organization ID
:key int limit: the number of tasks to return in one page
:return: Tasks, Next or None
"""
tasks_response = self._service.get_tasks(**kwargs)
tasks = tasks_response.tasks

has_next = tasks_response.links.next is not None
last_id = tasks[-1].id if tasks else None

def next():
Sciator marked this conversation as resolved.
Show resolved Hide resolved
if has_next and last_id is not None:
return self._find_tasks_paged(**{**kwargs, 'after': last_id})
else:
return [], None

return tasks, next
Sciator marked this conversation as resolved.
Show resolved Hide resolved

def find_tasks_iter(self, **kwargs):
"""Iterate over all tasks with pagination.

:key str name: only returns tasks with the specified name
:key str after: returns tasks after specified ID
:key str user: filter tasks to a specific user ID
:key str org: filter tasks to a specific organization name
:key str org_id: filter tasks to a specific organization ID
:key int limit: the number of tasks in one page
:return: Tasks iterator
"""
tasks, next = self._find_tasks_paged(**kwargs)
Sciator marked this conversation as resolved.
Show resolved Hide resolved

return iter(_TasksIterator(tasks, next))

def create_task(self, task: Task = None, task_create_request: TaskCreateRequest = None) -> Task:
"""Create a new task."""
if task_create_request is not None:
Expand Down
26 changes: 26 additions & 0 deletions tests/test_TasksApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,32 @@ def test_find_task_by_user_id(self):
print(tasks)
self.assertEqual(len(tasks), 1)

def test_find_tasks_iter(self):
task_name = self.generate_name("it task")
num_of_tasks = 10

for _ in range(num_of_tasks):
self.tasks_api.create_task_cron(task_name, TASK_FLUX, "0 2 * * *", self.organization.id)

def count_unique_ids(tasks):
return len(set(map(lambda task: task.id, tasks)))

# get tasks in 3-4 batches
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks // 3)
self.assertEqual(count_unique_ids(tasks), num_of_tasks)

# get tasks in one equaly size batch
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks)
self.assertEqual(count_unique_ids(tasks), num_of_tasks)

# get tasks in one batch
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks + 1)
self.assertEqual(count_unique_ids(tasks), num_of_tasks)

# get no tasks
tasks = self.tasks_api.find_tasks_iter(name= task_name + "blah")
self.assertEqual(count_unique_ids(tasks), 0)

def test_delete_task(self):
task = self.tasks_api.create_task_cron(self.generate_name("it_task"), TASK_FLUX, "0 2 * * *",
self.organization.id)
Expand Down
Loading