Skip to content

Commit

Permalink
added more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
DinisCruz committed Oct 7, 2024
1 parent 24fd91b commit 658dab5
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 31 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "modules/OSBot-Utils"]
path = modules/OSBot-Utils
url = [email protected]:owasp-sbot/OSBot-Utils.git
1 change: 1 addition & 0 deletions modules/OSBot-Utils
Submodule OSBot-Utils added at 710984
29 changes: 26 additions & 3 deletions osbot_prefect/server/Prefect__Cloud_API.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from osbot_utils.utils.Dev import pprint

from osbot_utils.utils.Lists import list_index_by
from osbot_utils.utils.Misc import list_set
from osbot_utils.base_classes.Type_Safe import Type_Safe
Expand All @@ -17,14 +19,35 @@ def flow__delete(self, flow_id):
response = self.prefect_rest_api.delete(target='flows', target_id=flow_id)
return response.get('status') == 'ok'

def flow_run__create(self, flow_id, flow_run_definition):
return self.prefect_rest_api.create(target='flow_run', data=flow_run_definition)#.get('data') or {}
def flow_run(self, flow_id):
return self.prefect_rest_api.read(target='flow_runs', target_id=flow_id).get('data') or {}

def flow_run__create(self, flow_run_definition):
return self.prefect_rest_api.create(target='flow_runs', data=flow_run_definition).get('data') or {}

def flow_run__set_state(self, flow_run_id, state):
kwargs = dict(target = 'flow_runs' ,
target_id = flow_run_id ,
target_action = 'set_state' ,
target_data = { 'state': state })

response = self.prefect_rest_api.update_action(**kwargs)
return response.get('status') == 'ok'


def flow_run__delete(self, flow_run_id):
response = self.prefect_rest_api.delete(target='flow_runs', target_id=flow_run_id)
return response.get('status') == 'ok'

def flow_run__update(self, flow_run_id, flow_run_definition):
response = self.prefect_rest_api.update(target='flow_runs', target_id=flow_run_id, target_data=flow_run_definition)
return response.get('status') == 'ok'

def flows(self, limit=5):
return self.prefect_rest_api.filter(target='flows', limit=limit).get('data') or []

def flows_ids(self, limit=5): # todo: see if there is a way to get these IDs directly via a GraphQL query
flows = self.flows(limit=limit)
return list_set(list_index_by(values=flows, index_by='id'))
return [flow.id for flow in flows]


26 changes: 21 additions & 5 deletions osbot_prefect/server/Prefect__Rest_API.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from osbot_utils.utils.Http import url_join_safe
from osbot_utils.utils.Env import get_env
from osbot_utils.base_classes.Type_Safe import Type_Safe
from osbot_utils.utils.Objects import dict_to_obj
from osbot_utils.utils.Status import status_ok, status_error

ENV_NAME__PREFECT_CLOUD__API_KEY = 'PREFECT_CLOUD__API_KEY'
Expand Down Expand Up @@ -39,24 +40,28 @@ def requests__for_method(self, method, path, data=None):
response = method(endpoint, headers=headers, params=data)
elif method == requests.head: # For HEAD requests, no payload or parameters are needed
response = method(endpoint, headers=headers)
elif method == requests.post: # For POST and other requests, pass data as JSON in the request body
elif method == requests.post: # For POST requests, pass data as JSON in the request body
response = method(endpoint, headers=headers, json=data)
elif method == requests.patch: # For PATCH requests, pass data as JSON in the request body
response = method(endpoint, headers=headers, json=data)
else:
return status_error("Unsupported request method") # Return an error if the method is not supported

status_code = response.status_code # Handle the response and return an appropriate result
status_code = response.status_code # Handle the response and return an appropriate result
content_type = response.headers.get('Content-Type', '')
if 200 <= status_code < 300:
if method == requests.head: # For HEAD requests, return the headers as the response data
return status_ok(data=response.headers)
if content_type == 'application/json': # For successful JSON responses, return the JSON data
return status_ok(data=response.json())
json_data = response.json()
json_as_obj = dict_to_obj(json_data)
return status_ok(data=json_as_obj)
return status_ok(data=response.text) # For other successful requests, return the JSON data

return status_error(message=f"{method.__name__.upper()} request to {path}, failed with status {status_code}", error=response.text) # For failed requests, return an error message with status and response text


def requests__delete(self, path, params=None): # Wrapper for executing GET requests
def requests__delete(self, path, params=None): # Wrapper for executing DELETE requests
return self.requests__for_method(requests.delete, path, data=params)

def requests__get(self, path, params=None): # Wrapper for executing GET requests
Expand All @@ -68,6 +73,9 @@ def requests__post(self, path, data):
def requests__head(self, path): # Wrapper for executing HEAD requests
return self.requests__for_method(requests.head, path)

def requests__update(self, path, data): # Wrapper for executing PATCH requests
return self.requests__for_method(requests.patch, path, data=data)

# request helpers

def create(self, target, data):
Expand All @@ -86,4 +94,12 @@ def filter(self, target, limit=5): # todo: add support for fetching all
path = f'/{target}/filter'
data = { "sort" : "CREATED_DESC",
"limit": limit }
return self.requests__post(path, data)
return self.requests__post(path, data)

def update(self, target, target_id, target_data):
path = f'/{target}/{target_id}'
return self.requests__update(path, target_data)

def update_action(self, target, target_id, target_action, target_data):
path = f'/{target}/{target_id}/{target_action}'
return self.requests__post(path, target_data)
6 changes: 3 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

84 changes: 64 additions & 20 deletions tests/integration/server/test_Prefect__Cloud_API.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from unittest import TestCase
from unittest import TestCase

from osbot_utils.utils.Dev import pprint
from osbot_utils.utils.Lists import list_in_list
from osbot_utils.utils.Misc import list_set, random_id, is_guid, random_text
from osbot_utils.utils.Env import load_dotenv, get_env

from osbot_prefect.server.Prefect__Cloud_API import Prefect__Cloud_API
from osbot_prefect.server.Prefect__Rest_API import ENV_NAME__PREFECT_TARGET_SERVER
from osbot_utils.utils.Misc import list_set, random_id, is_guid, random_text
from osbot_utils.utils.Env import load_dotenv, get_env
from osbot_prefect.server.Prefect__Cloud_API import Prefect__Cloud_API
from osbot_utils.utils.Objects import dict_to_obj, obj_data, obj_to_dict


class test_Prefect__Cloud_API(TestCase):
Expand All @@ -15,7 +14,7 @@ class test_Prefect__Cloud_API(TestCase):
def setUpClass(cls) -> None:
load_dotenv()
cls.prefect_cloud_api = Prefect__Cloud_API()
cls.flow_id = cls.prefect_cloud_api.flow__create({'name': random_text('pytest-class-flow')}).get('id')
cls.flow_id = cls.prefect_cloud_api.flow__create({'name': random_text('pytest-class-flow')}).id

@classmethod
def tearDownClass(cls):
Expand All @@ -24,20 +23,17 @@ def tearDownClass(cls):
def test__setUpClass(self):
assert is_guid(self.flow_id)


def test_confirm_local_docker(self):
assert get_env(ENV_NAME__PREFECT_TARGET_SERVER) == 'http://localhost:4200/api'

def test_flow_create(self):
with self.prefect_cloud_api as _:
flow_definition = { "name": random_id(prefix="pytest-method-flow"),
"tags": [ "created-by-pytest" ,
"local-prefect-server"]}

flow_data_1 = _.flow__create(flow_definition)
flow_data_2 = _.flow__create(flow_definition)
flow_id = flow_data_1.get('id' )
flow_name = flow_data_1.get('name')
flow_tags = flow_data_1.get('tags')
flow_id = flow_data_1.id
flow_name = flow_data_1.name
flow_tags = flow_data_1.tags
flow_data_3 = _.flow(flow_id)

delete_response_1 = _.flow__delete(flow_id)
Expand All @@ -53,20 +49,68 @@ def test_flow_create(self):
assert delete_response_2 is False


def test_flow_run__create(self):
flow_run_definition = { "name": "my-flow-run",
"flow_id": self.flow_id }
#pprint(flow_run_definition)
def test_flow_run__create__update(self):
flow_name = random_id(prefix="flow-name")
tags_1 = ["aaa", "bbb", "ccc"]
tags_2 = ["ddd", "eee", "fff"]
flow_run_definition = { "name" : flow_name ,
"flow_id": self.flow_id ,
"state" : { "type": "SCHEDULED" },
"tags" : tags_1 }
flow_run_update = { 'tags' : tags_2 }

with self.prefect_cloud_api as _:
flow_run_1 = _.flow_run__create(flow_run_definition)
flow_run_id = flow_run_1.id
flow_run_2 = _.flow_run(flow_run_id)

assert dict_to_obj(flow_run_1).flow_id == self.flow_id
assert is_guid(flow_run_id) is True
assert flow_run_1.name == flow_name
assert flow_run_1.tags == tags_1
assert flow_run_1.flow_id == self.flow_id
assert flow_run_1.run_count == 0
assert flow_run_1.state.type == 'SCHEDULED'
assert flow_run_1.state.name == 'Scheduled'
assert flow_run_1.parent_task_run_id is None
assert flow_run_1.state.state_details.flow_run_id == flow_run_id
assert flow_run_1.empirical_policy.max_retries == 0


delattr(flow_run_1, 'estimated_start_time_delta')
delattr(flow_run_2, 'estimated_start_time_delta')
assert flow_run_1 == flow_run_2

assert _.flow_run__update(flow_run_1.id, flow_run_update) is True

flow_run_3 = _.flow_run(flow_run_id)
assert flow_run_3.tags == tags_2
assert _.flow_run__delete(flow_run_id) is True

def test_flow_run__set_state(self):
flow_run_state_1 = { "type": "RUNNING" }
#flow_run_state_2 = { "type": "COMPLETED" }

flow_name = random_id(prefix="flow-name")
flow_run_definition = { "name": flow_name,
"flow_id": self.flow_id}
with self.prefect_cloud_api as _:
flow_run = _.flow_run__create(flow_run_definition)

assert _.flow_run(flow_run.id).state.type == "PENDING"
assert _.flow_run__set_state(flow_run.id, flow_run_state_1) is True
assert _.flow_run(flow_run.id).state.type == "RUNNING"
assert _.flow_run__delete(flow_run.id) is True

def test_flows(self):
flows = self.prefect_cloud_api.flows()
for flow in flows:
assert list_in_list(["created", "id", "tags", "updated"], list_set(flow)) is True
assert is_guid(flow.id)

def test_flow(self):
with self.prefect_cloud_api as _:
flows_ids = _.flows_ids()
if flows_ids:
flow_id = flows_ids.pop()
flow = self.prefect_cloud_api.flow(flow_id=flow_id)
assert list_in_list(["created", "id", "tags", "updated"], list_set(flow)) is True
assert flow.id == flow_id

0 comments on commit 658dab5

Please sign in to comment.