Skip to content

Commit

Permalink
[PT-4557] Query jobs (#630)
Browse files Browse the repository at this point in the history
* Draft implementation

* Job querying without sorting

* Add sorting support

* Clean up
  • Loading branch information
javidq authored Jun 12, 2024
1 parent aba4c27 commit 591d442
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 14 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ You can check your current version with the following command:
```

For more information, see [UP42 Python package description](https://pypi.org/project/up42-py/).
## 1.0.4a16

**Jun 11, 2024**

- Added job querying to `processing.py`

## 1.0.4a15

**Jun 11, 2024**
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "up42-py"
version = "1.0.4a15"
version = "1.0.4a16"
description = "Python SDK for UP42, the geospatial marketplace and developer platform."
authors = ["UP42 GmbH <[email protected]>"]
license = "https://github.com/up42/up42-py/blob/master/LICENSE"
Expand Down
70 changes: 67 additions & 3 deletions tests/test_processing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import dataclasses
import datetime
import random
import urllib.parse
import uuid
from typing import Any, List, Optional
from unittest import mock

import pystac
Expand All @@ -11,7 +13,7 @@

from tests import helpers
from tests.fixtures import fixtures_globals as constants
from up42 import processing
from up42 import processing, utils

PROCESS_ID = "process-id"
VALIDATION_URL = f"{constants.API_HOST}/v2/processing/processes/{PROCESS_ID}/validation"
Expand All @@ -34,7 +36,8 @@
)

JOB_ID = str(uuid.uuid4())
GET_JOB_URL = f"{constants.API_HOST}/v2/processing/jobs/{JOB_ID}"
JOBS_URL = f"{constants.API_HOST}/v2/processing/jobs"
JOB_URL = f"{JOBS_URL}/{JOB_ID}"
CREDITS = 1
ACCOUNT_ID = str(uuid.uuid4())
DEFINITION = {
Expand Down Expand Up @@ -280,5 +283,66 @@ def test_should_provide_inputs(self, requests_mock: req_mock.Mocker):

class TestJob:
def test_should_get_job(self, requests_mock: req_mock.Mocker):
requests_mock.get(url=GET_JOB_URL, json=JOB_METADATA)
requests_mock.get(url=JOB_URL, json=JOB_METADATA)
assert processing.Job.get(JOB_ID) == JOB

@pytest.mark.parametrize("process_id", [None, [PROCESS_ID]])
@pytest.mark.parametrize("workspace_id", [None, constants.WORKSPACE_ID])
@pytest.mark.parametrize("status", [None, [processing.JobStatus.CAPTURED]])
@pytest.mark.parametrize("min_duration", [None, 1])
@pytest.mark.parametrize("max_duration", [None, 10])
@pytest.mark.parametrize("sort_by", [None, processing.JobSorting.process_id.desc])
def test_should_get_all_jobs(
self,
requests_mock: req_mock.Mocker,
process_id: Optional[List[str]],
workspace_id: Optional[str],
status: Optional[List[processing.JobStatus]],
min_duration: Optional[int],
max_duration: Optional[int],
sort_by: Optional[utils.SortingField],
):
query_params: dict[str, Any] = {}
if process_id:
query_params["processID"] = process_id
if workspace_id:
query_params["workspaceID"] = workspace_id
if status:
query_params["status"] = [entry.value for entry in status]
if min_duration:
query_params["minDuration"] = min_duration
if max_duration:
query_params["maxDuration"] = max_duration
if sort_by:
query_params["sort"] = str(sort_by)

query = urllib.parse.urlencode(query_params)

next_page_url = f"{JOBS_URL}/next"
requests_mock.get(
url=JOBS_URL + (query and f"?{query}"),
json={
"jobs": [JOB_METADATA] * 3,
"links": [{"rel": "next", "href": next_page_url}],
},
)
requests_mock.get(
url=next_page_url,
json={
"jobs": [JOB_METADATA] * 2,
"links": [],
},
)
assert (
list(
processing.Job.all(
process_id=process_id,
workspace_id=workspace_id,
status=status,
min_duration=min_duration,
max_duration=max_duration,
sort_by=sort_by,
)
)
== [JOB] * 5
)
13 changes: 13 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,16 @@ def test_read_json_fails_if_path_not_found(str_or_path):
assert utils.read_json(path_or_dict=str_or_path)

assert str(str_or_path) in str(ex.value)


class TestSortingField:
@pytest.mark.parametrize("ascending", [True, False])
def test_should_provide_directions(self, ascending: bool):
field = utils.SortingField(name="name", ascending=ascending)
assert field.asc == utils.SortingField(name="name", ascending=True)
assert field.desc == utils.SortingField(name="name", ascending=False)

def test_should_stringify(self):
field = utils.SortingField(name="name")
assert str(field.asc) == "name,asc"
assert str(field.desc) == "name,desc"
68 changes: 58 additions & 10 deletions up42/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
import dataclasses
import datetime
import enum
from typing import ClassVar, List, Optional, TypedDict, Union
from typing import ClassVar, Iterator, List, Optional, TypedDict, Union

import pystac
import requests

from up42 import base, host
from up42 import base, host, utils


@dataclasses.dataclass(frozen=True)
Expand Down Expand Up @@ -42,6 +42,17 @@ class JobMetadata(TypedDict):
updated: str


class JobSorting:
process_id = utils.SortingField("processID")
status = utils.SortingField("status")
created = utils.SortingField("created")
credits = utils.SortingField("creditConsumption.credits")


def _to_datetime(value: Optional[str]):
return value and datetime.datetime.fromisoformat(value.rstrip("Z"))


@dataclasses.dataclass
class Job:
session = base.Session()
Expand All @@ -56,10 +67,6 @@ class Job:
started: Optional[datetime.datetime] = None
finished: Optional[datetime.datetime] = None

@staticmethod
def __to_datetime(value: Optional[str]):
return value and datetime.datetime.fromisoformat(value.rstrip("Z"))

@staticmethod
def from_metadata(metadata: JobMetadata) -> "Job":
return Job(
Expand All @@ -69,10 +76,10 @@ def from_metadata(metadata: JobMetadata) -> "Job":
workspace_id=metadata["workspaceID"],
definition=metadata["definition"],
status=JobStatus(metadata["status"]),
created=Job.__to_datetime(metadata["created"]),
started=Job.__to_datetime(metadata["started"]),
finished=Job.__to_datetime(metadata["finished"]),
updated=Job.__to_datetime(metadata["updated"]),
created=_to_datetime(metadata["created"]),
started=_to_datetime(metadata["started"]),
finished=_to_datetime(metadata["finished"]),
updated=_to_datetime(metadata["updated"]),
)

@classmethod
Expand All @@ -81,6 +88,47 @@ def get(cls, job_id: str) -> "Job":
metadata = cls.session.get(url).json()
return cls.from_metadata(metadata)

@classmethod
def all(
cls,
process_id: Optional[List[str]] = None,
workspace_id: Optional[str] = None,
status: Optional[List[JobStatus]] = None,
min_duration: Optional[int] = None,
max_duration: Optional[int] = None,
sort_by: Optional[utils.SortingField] = None,
*,
# used only for performance tuning and testing only
page_size: Optional[int] = None,
) -> Iterator["Job"]:
query_params = {
key: str(value)
for key, value in {
"workspaceID": workspace_id,
"processID": process_id,
"status": [entry.value for entry in status] if status else None,
"minDuration": min_duration,
"maxDuration": max_duration,
"limit": page_size,
"sort": sort_by,
}.items()
if value
}

def get_pages():
page = cls.session.get(host.endpoint("/v2/processing/jobs"), params=query_params).json()
while page:
yield page["jobs"]
next_page_url = next(
(link["href"] for link in page["links"] if link["rel"] == "next"),
None,
)
page = next_page_url and cls.session.get(next_page_url).json()

for page in get_pages():
for metadata in page:
yield Job.from_metadata(metadata)


CostType = Union[int, float, "Cost"]

Expand Down
19 changes: 19 additions & 0 deletions up42/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
import dataclasses
import datetime
import functools
import importlib.metadata
Expand Down Expand Up @@ -424,3 +425,21 @@ def stac_client(auth: requests.auth.AuthBase):
url=host.endpoint("/v2/assets/stac/"),
request_modifier=request_modifier,
)


@dataclasses.dataclass(frozen=True)
class SortingField:
name: str
ascending: bool = True

@property
def asc(self):
return SortingField(name=self.name)

@property
def desc(self):
return SortingField(name=self.name, ascending=False)

def __str__(self):
order = "asc" if self.ascending else "desc"
return f"{self.name},{order}"

0 comments on commit 591d442

Please sign in to comment.