diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f7973f1..3ab629b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 1.39.0 [unreleased] +### Features +1. [#616](https://github.com/influxdata/influxdb-client-python/pull/616): Add `find_tasks_iter` function that allow iterate through all pages of tasks. + ## 1.38.0 [2023-10-02] ### Bug Fixes diff --git a/examples/task_example.py b/examples/task_example.py index 55595ba9..242dcf4d 100644 --- a/examples/task_example.py +++ b/examples/task_example.py @@ -25,3 +25,9 @@ task_request = TaskCreateRequest(flux=flux, org=org, description="Task Description", status="active") task = tasks_api.create_task(task_create_request=task_request) print(task) + + tasks = tasks_api.find_tasks_iter() + + # print all tasks id + for task in tasks: + print(task.id) diff --git a/influxdb_client/client/tasks_api.py b/influxdb_client/client/tasks_api.py index dd85683b..9edb2ec9 100644 --- a/influxdb_client/client/tasks_api.py +++ b/influxdb_client/client/tasks_api.py @@ -11,6 +11,38 @@ AddResourceMemberRequestBody, RunManually, Run, LogEvent +class _Page: + def __init__(self, values, has_next, next_after): + self.has_next = has_next + self.values = values + self.next_after = next_after + + @staticmethod + def empty(): + return _Page([], False, None) + + @staticmethod + def initial(after): + return _Page([], True, after) + + +class _PageIterator: + def __init__(self, page: _Page, get_next_page): + self.page = page + self.get_next_page = get_next_page + + def __iter__(self): + return self + + def __next__(self): + if not self.page.values: + if self.page.has_next: + self.page = self.get_next_page(self.page) + if not self.page.values: + raise StopIteration + return self.page.values.pop(0) + + class TasksApi(object): """Implementation for '/api/v2/tasks' endpoint.""" @@ -25,7 +57,7 @@ def find_task_by_id(self, task_id) -> Task: return task def find_tasks(self, **kwargs): - """List all tasks. + """List all tasks up to set limit (max 500). :key str name: only returns tasks with the specified name :key str after: returns tasks after specified ID @@ -37,6 +69,23 @@ def find_tasks(self, **kwargs): """ return self._service.get_tasks(**kwargs).tasks + def find_tasks_iter(self, **kwargs): + """Iterate over all tasks with pagination. + + :key str name: only returns tasks with the specified name + :key str after: returns tasks after specified ID + :key str user: filter tasks to a specific user ID + :key str org: filter tasks to a specific organization name + :key str org_id: filter tasks to a specific organization ID + :key int limit: the number of tasks in one page + :return: Tasks iterator + """ + + def get_next_page(page: _Page): + return self._find_tasks_next_page(page, **kwargs) + + return iter(_PageIterator(_Page.initial(kwargs.get('after')), get_next_page)) + def create_task(self, task: Task = None, task_create_request: TaskCreateRequest = None) -> Task: """Create a new task.""" if task_create_request is not None: @@ -210,3 +259,16 @@ def get_logs(self, task_id: str) -> List['LogEvent']: def find_tasks_by_user(self, task_user_id): """List all tasks by user.""" return self.find_tasks(user=task_user_id) + + def _find_tasks_next_page(self, page: _Page, **kwargs): + if not page.has_next: + return _Page.empty() + + args = {**kwargs, 'after': page.next_after} if page.next_after is not None else kwargs + tasks_response = self._service.get_tasks(**args) + + tasks = tasks_response.tasks + has_next = tasks_response.links.next is not None + last_id = tasks[-1].id if tasks else None + + return _Page(tasks, has_next, last_id) diff --git a/tests/test_TasksApi.py b/tests/test_TasksApi.py index 2bea7659..dc936dfd 100644 --- a/tests/test_TasksApi.py +++ b/tests/test_TasksApi.py @@ -184,6 +184,37 @@ def test_find_task_by_user_id(self): print(tasks) self.assertEqual(len(tasks), 1) + def test_find_tasks_iter(self): + task_name = self.generate_name("it task") + num_of_tasks = 10 + + for _ in range(num_of_tasks): + self.tasks_api.create_task_cron(task_name, TASK_FLUX, "0 2 * * *", self.organization.id) + + def count_unique_ids(tasks): + return len(set(map(lambda task: task.id, tasks))) + + # get tasks in 3-4 batches + tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks // 3) + self.assertEqual(count_unique_ids(tasks), num_of_tasks) + + # get tasks in one equaly size batch + tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks) + self.assertEqual(count_unique_ids(tasks), num_of_tasks) + + # get tasks in one batch + tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= num_of_tasks + 1) + self.assertEqual(count_unique_ids(tasks), num_of_tasks) + + # get no tasks + tasks = self.tasks_api.find_tasks_iter(name= task_name + "blah") + self.assertEqual(count_unique_ids(tasks), 0) + + # skip some tasks + *_, split_task = self.tasks_api.find_tasks(name= task_name, limit= num_of_tasks // 3) + tasks = self.tasks_api.find_tasks_iter(name= task_name, limit= 3, after= split_task.id) + self.assertEqual(count_unique_ids(tasks), num_of_tasks - num_of_tasks // 3) + def test_delete_task(self): task = self.tasks_api.create_task_cron(self.generate_name("it_task"), TASK_FLUX, "0 2 * * *", self.organization.id)