Skip to content

Commit

Permalink
feat: task pagination (#616)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sciator authored Nov 29, 2023
1 parent 2f9e5ed commit 0c60b5d
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.39.0 [unreleased]

### Features
1. [#616](https://github.com/influxdata/influxdb-client-python/pull/616): Add `find_tasks_iter` function that allow iterate through all pages of tasks.

## 1.38.0 [2023-10-02]

### Bug Fixes
Expand Down
6 changes: 6 additions & 0 deletions examples/task_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@
task_request = TaskCreateRequest(flux=flux, org=org, description="Task Description", status="active")
task = tasks_api.create_task(task_create_request=task_request)
print(task)

tasks = tasks_api.find_tasks_iter()

# print all tasks id
for task in tasks:
print(task.id)
64 changes: 63 additions & 1 deletion influxdb_client/client/tasks_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,38 @@
AddResourceMemberRequestBody, RunManually, Run, LogEvent


class _Page:
def __init__(self, values, has_next, next_after):
self.has_next = has_next
self.values = values
self.next_after = next_after

@staticmethod
def empty():
return _Page([], False, None)

@staticmethod
def initial(after):
return _Page([], True, after)


class _PageIterator:
def __init__(self, page: _Page, get_next_page):
self.page = page
self.get_next_page = get_next_page

def __iter__(self):
return self

def __next__(self):
if not self.page.values:
if self.page.has_next:
self.page = self.get_next_page(self.page)
if not self.page.values:
raise StopIteration
return self.page.values.pop(0)


class TasksApi(object):
"""Implementation for '/api/v2/tasks' endpoint."""

Expand All @@ -25,7 +57,7 @@ def find_task_by_id(self, task_id) -> Task:
return task

def find_tasks(self, **kwargs):
"""List all tasks.
"""List all tasks up to set limit (max 500).
:key str name: only returns tasks with the specified name
:key str after: returns tasks after specified ID
Expand All @@ -37,6 +69,23 @@ def find_tasks(self, **kwargs):
"""
return self._service.get_tasks(**kwargs).tasks

def find_tasks_iter(self, **kwargs):
"""Iterate over all tasks with pagination.
:key str name: only returns tasks with the specified name
:key str after: returns tasks after specified ID
:key str user: filter tasks to a specific user ID
:key str org: filter tasks to a specific organization name
:key str org_id: filter tasks to a specific organization ID
:key int limit: the number of tasks in one page
:return: Tasks iterator
"""

def get_next_page(page: _Page):
return self._find_tasks_next_page(page, **kwargs)

return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page))

def create_task(self, task: Task = None, task_create_request: TaskCreateRequest = None) -> Task:
"""Create a new task."""
if task_create_request is not None:
Expand Down Expand Up @@ -210,3 +259,16 @@ def get_logs(self, task_id: str) -> List['LogEvent']:
def find_tasks_by_user(self, task_user_id):
"""List all tasks by user."""
return self.find_tasks(user=task_user_id)

def _find_tasks_next_page(self, page: _Page, **kwargs):
if not page.has_next:
return _Page.empty()

args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs
tasks_response = self._service.get_tasks(**args)

tasks = tasks_response.tasks
has_next = tasks_response.links.next is not None
last_id = tasks[-1].id if tasks else None

return _Page(tasks, has_next, last_id)
31 changes: 31 additions & 0 deletions tests/test_TasksApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,37 @@ def test_find_task_by_user_id(self):
print(tasks)
self.assertEqual(len(tasks), 1)

def test_find_tasks_iter(self):
task_name = self.generate_name("it task")
num_of_tasks = 10

for _ in range(num_of_tasks):
self.tasks_api.create_task_cron(task_name, TASK_FLUX, "0 2 * * *", self.organization.id)

def count_unique_ids(tasks):
return len(set(map(lambda task: task.id, tasks)))

# get tasks in 3-4 batches
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks // 3)
self.assertEqual(count_unique_ids(tasks), num_of_tasks)

# get tasks in one equaly size batch
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks)
self.assertEqual(count_unique_ids(tasks), num_of_tasks)

# get tasks in one batch
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks + 1)
self.assertEqual(count_unique_ids(tasks), num_of_tasks)

# get no tasks
tasks = self.tasks_api.find_tasks_iter(name= task_name + "blah")
self.assertEqual(count_unique_ids(tasks), 0)

# skip some tasks
*_, split_task = self.tasks_api.find_tasks(name= task_name, limit= num_of_tasks // 3)
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= 3, after= split_task.id)
self.assertEqual(count_unique_ids(tasks), num_of_tasks - num_of_tasks // 3)

def test_delete_task(self):
task = self.tasks_api.create_task_cron(self.generate_name("it_task"), TASK_FLUX, "0 2 * * *",
self.organization.id)
Expand Down

0 comments on commit 0c60b5d

Please sign in to comment.