Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: task pagination #616

Merged
merged 8 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.39.0 [unreleased]

### Features
1. [#616](https://github.com/influxdata/influxdb-client-python/pull/616): Add `find_tasks_iter` function that allow iterate through all pages of tasks.

## 1.38.0 [2023-10-02]

### Bug Fixes
Expand Down
6 changes: 6 additions & 0 deletions examples/task_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@
task_request = TaskCreateRequest(flux=flux, org=org, description="Task Description", status="active")
task = tasks_api.create_task(task_create_request=task_request)
print(task)

tasks = tasks_api.find_tasks_iter()

# print all tasks id
for task in tasks:
print(task.id)
64 changes: 63 additions & 1 deletion influxdb_client/client/tasks_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,38 @@
AddResourceMemberRequestBody, RunManually, Run, LogEvent


class _Page:
def __init__(self, values, has_next, next_after):
self.has_next = has_next
self.values = values
self.next_after = next_after

@staticmethod
def empty():
return _Page([], False, None)

@staticmethod
def initial(after):
return _Page([], True, after)


class _PageIterator:
def __init__(self, page: _Page, get_next_page):
self.page = page
self.get_next_page = get_next_page

def __iter__(self):
return self

def __next__(self):
if not self.page.values:
if self.page.has_next:
self.page = self.get_next_page(self.page)
if not self.page.values:
raise StopIteration
return self.page.values.pop(0)


class TasksApi(object):
"""Implementation for '/api/v2/tasks' endpoint."""

Expand All @@ -25,7 +57,7 @@ def find_task_by_id(self, task_id) -> Task:
return task

def find_tasks(self, **kwargs):
"""List all tasks.
"""List all tasks up to set limit (max 500).

:key str name: only returns tasks with the specified name
:key str after: returns tasks after specified ID
Expand All @@ -37,6 +69,23 @@ def find_tasks(self, **kwargs):
"""
return self._service.get_tasks(**kwargs).tasks

def find_tasks_iter(self, **kwargs):
"""Iterate over all tasks with pagination.

:key str name: only returns tasks with the specified name
:key str after: returns tasks after specified ID
:key str user: filter tasks to a specific user ID
:key str org: filter tasks to a specific organization name
:key str org_id: filter tasks to a specific organization ID
:key int limit: the number of tasks in one page
:return: Tasks iterator
"""

def get_next_page(page: _Page):
return self._find_tasks_next_page(page, **kwargs)

return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page))

def create_task(self, task: Task = None, task_create_request: TaskCreateRequest = None) -> Task:
"""Create a new task."""
if task_create_request is not None:
Expand Down Expand Up @@ -210,3 +259,16 @@ def get_logs(self, task_id: str) -> List['LogEvent']:
def find_tasks_by_user(self, task_user_id):
"""List all tasks by user."""
return self.find_tasks(user=task_user_id)

def _find_tasks_next_page(self, page: _Page, **kwargs):
if not page.has_next:
return _Page.empty()

args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs
tasks_response = self._service.get_tasks(**args)

tasks = tasks_response.tasks
has_next = tasks_response.links.next is not None
last_id = tasks[-1].id if tasks else None

return _Page(tasks, has_next, last_id)
31 changes: 31 additions & 0 deletions tests/test_TasksApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,37 @@ def test_find_task_by_user_id(self):
print(tasks)
self.assertEqual(len(tasks), 1)

def test_find_tasks_iter(self):
task_name = self.generate_name("it task")
num_of_tasks = 10

for _ in range(num_of_tasks):
self.tasks_api.create_task_cron(task_name, TASK_FLUX, "0 2 * * *", self.organization.id)

def count_unique_ids(tasks):
return len(set(map(lambda task: task.id, tasks)))

# get tasks in 3-4 batches
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks // 3)
self.assertEqual(count_unique_ids(tasks), num_of_tasks)

# get tasks in one equaly size batch
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks)
self.assertEqual(count_unique_ids(tasks), num_of_tasks)

# get tasks in one batch
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks + 1)
self.assertEqual(count_unique_ids(tasks), num_of_tasks)

# get no tasks
tasks = self.tasks_api.find_tasks_iter(name= task_name + "blah")
self.assertEqual(count_unique_ids(tasks), 0)

# skip some tasks
*_, split_task = self.tasks_api.find_tasks(name= task_name, limit= num_of_tasks // 3)
tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= 3, after= split_task.id)
self.assertEqual(count_unique_ids(tasks), num_of_tasks - num_of_tasks // 3)

def test_delete_task(self):
task = self.tasks_api.create_task_cron(self.generate_name("it_task"), TASK_FLUX, "0 2 * * *",
self.organization.id)
Expand Down
Loading