-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implemet batch aggregation on the client #318
Changes from 3 commits
eae281b
5a90961
5f596f0
51ac389
46b396f
c795df1
731f4aa
cd657af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
from panoptes_client.panoptes import Panoptes, PanoptesAPIException | ||
|
||
|
||
class BatchAggregation(object): | ||
|
||
def get_aggregations(self, workflow_id): | ||
return Panoptes.client().get(f'/aggregations?workflow_id={workflow_id}') | ||
|
||
def get_aggregation(self, agg_id): | ||
return Panoptes.client().get(f'/aggregations/{agg_id}') | ||
|
||
def create_aggregation(self, payload): | ||
return Panoptes.client().post(f'/aggregations', json=payload) | ||
|
||
def delete_aggregation(self, agg_id, etag): | ||
return Panoptes.client().delete(f'/aggregations/{agg_id}', etag=etag) | ||
|
||
def fetch_and_delete_aggregation(self, agg_id): | ||
try: | ||
single_agg = self.get_aggregation(agg_id) | ||
self.delete_aggregation(agg_id, single_agg[1]) | ||
except PanoptesAPIException as err: | ||
raise err | ||
|
||
def run_aggregation(self, payload, delete_if_exists): | ||
try: | ||
# if exists and delete_if_exists, delete the first | ||
response = self.get_aggregations(payload["aggregations"]["links"]["workflow"]) | ||
if response and response[0]: | ||
has_agg = len(response[0]['aggregations']) > 0 | ||
if delete_if_exists and has_agg: | ||
agg_id = response[0]['aggregations'][0]['id'] | ||
# delete | ||
self.fetch_and_delete_aggregation(agg_id) | ||
# create | ||
return self.create_aggregation(payload)[0] | ||
elif not has_agg: | ||
return self.create_aggregation(payload)[0] | ||
else: | ||
return response[0] | ||
else: | ||
self.create_aggregation(payload)[0] | ||
except PanoptesAPIException as err: | ||
raise err |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
import unittest | ||
import sys | ||
from panoptes_client.batch_aggregation import BatchAggregation | ||
|
||
if sys.version_info <= (3, 0): | ||
from mock import patch | ||
else: | ||
from unittest.mock import patch | ||
|
||
class TestBatchAggregation(unittest.TestCase): | ||
|
||
def setUp(self): | ||
super().setUp() | ||
self.mock_client_patch = patch('panoptes_client.panoptes.Panoptes.client') | ||
self.mock_client = self.mock_client_patch.start() | ||
|
||
self.addCleanup(self.mock_client_patch.stop) | ||
|
||
batch_agg_get_aggregations_patch = patch.object(BatchAggregation, 'get_aggregations') | ||
fetch_and_delete_aggregation_patch = patch.object(BatchAggregation, 'fetch_and_delete_aggregation') | ||
create_aggregation_patch = patch.object(BatchAggregation, 'create_aggregation') | ||
|
||
self.batch_agg_get_aggregations_mock = batch_agg_get_aggregations_patch.start() | ||
self.fetch_and_delete_aggregation_mock = fetch_and_delete_aggregation_patch.start() | ||
self.create_aggregation_mock = create_aggregation_patch.start() | ||
|
||
self.addCleanup(batch_agg_get_aggregations_patch.stop) | ||
self.addCleanup(fetch_and_delete_aggregation_patch.stop) | ||
self.addCleanup(create_aggregation_patch.stop) | ||
|
||
self.payload = { | ||
"aggregations": { | ||
"links": { | ||
"user": 1, | ||
"workflow": 1, | ||
} | ||
} | ||
} | ||
|
||
self.agg_mock_value = [{ | ||
'aggregations': [{ | ||
'id': '1', | ||
'href': '/aggregations/1', | ||
'created_at': '2024-08-13T10:26:32.560Z', | ||
'updated_at': '2024-08-13T10:26:32.576Z', | ||
'uuid': None, | ||
'task_id': 'task_id', | ||
'status': 'pending', | ||
'links': {'project': '1', 'workflow': '1', 'user': '1'} | ||
}] | ||
}, 'etag'] | ||
|
||
def test_run_aggregation_with_delete_if_exists(self): | ||
self.batch_agg_get_aggregations_mock.return_value = self.agg_mock_value | ||
|
||
BatchAggregation().run_aggregation(payload=self.payload, delete_if_exists=True) | ||
self.fetch_and_delete_aggregation_mock.assert_called_with(self.agg_mock_value[0]['aggregations'][0]['id']) | ||
self.create_aggregation_mock.assert_called_with(self.payload) | ||
|
||
def test_run_aggregation_with_previously_created_case(self): | ||
self.batch_agg_get_aggregations_mock.return_value = self.agg_mock_value | ||
|
||
BatchAggregation().run_aggregation(payload=self.payload, delete_if_exists=False) | ||
self.fetch_and_delete_aggregation_mock.assert_not_called() | ||
self.create_aggregation_mock.assert_not_called() | ||
|
||
def test_run_aggregation_with_no_previously_created_case(self): | ||
self.batch_agg_get_aggregations_mock.return_value = [{'aggregations': []}, 'etag'] | ||
|
||
BatchAggregation().run_aggregation(payload=self.payload, delete_if_exists=False) | ||
self.fetch_and_delete_aggregation_mock.assert_not_called() | ||
self.create_aggregation_mock.assert_called_with(self.payload) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
from panoptes_client.panoptes import PanoptesAPIException | ||
from panoptes_client.workflow import Workflow | ||
from panoptes_client.caesar import Caesar | ||
from panoptes_client.batch_aggregation import BatchAggregation | ||
|
||
if sys.version_info <= (3, 0): | ||
from mock import patch | ||
|
@@ -24,6 +25,27 @@ def setUp(self): | |
self.addCleanup(caesar_get_patch.stop) | ||
self.addCleanup(caesar_put_patch.stop) | ||
|
||
batch_agg_run_aggregation_patch = patch.object(BatchAggregation, 'run_aggregation') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like the batch agg patches are so far away from the actual related tests, its easy to forget about them existing. I wonder if its worth having a separate class within the same file for these changes and rename the ones geared towards caesar to separate. i.e.
Another way to mitigate (other than splitting tests into classes ) is to patch on the test. eg
|
||
batch_agg_get_aggregations_patch = patch.object(BatchAggregation, 'get_aggregations') | ||
self.batch_agg_run_aggregation_mock = batch_agg_run_aggregation_patch.start() | ||
self.batch_agg_get_aggregations_mock = batch_agg_get_aggregations_patch.start() | ||
|
||
self.addCleanup(batch_agg_run_aggregation_patch.stop) | ||
self.addCleanup(batch_agg_get_aggregations_patch.stop) | ||
|
||
self.agg_mock_value = [{ | ||
'aggregations': [{ | ||
'id': '1', | ||
'href': '/aggregations/1', | ||
'created_at': '2024-08-13T10:26:32.560Z', | ||
'updated_at': '2024-08-13T10:26:32.576Z', | ||
'uuid': None, | ||
'task_id': 'task_id', | ||
'status': 'pending', | ||
'links': {'project': '1', 'workflow': '1', 'user': '1'} | ||
}] | ||
}, 'etag'] | ||
|
||
def test_save_to_caesar_update(self): | ||
workflow = Workflow(1) | ||
workflow.save_to_caesar() | ||
|
@@ -208,3 +230,41 @@ def test_add_caesar_rule_effect_invalid_effect(self): | |
|
||
self.caesar_post_mock.assert_not_called() | ||
self.assertEqual('Invalid action for rule type', str(invalid_effect_err.exception)) | ||
|
||
def test_get_batch_aggregations_without_delete_param(self): | ||
workflow = Workflow(1) | ||
workflow.run_batch_aggregation(user=1) | ||
payload = { | ||
"aggregations": { | ||
"links": { | ||
"user": 1, | ||
"workflow": workflow.id, | ||
} | ||
} | ||
} | ||
self.batch_agg_run_aggregation_mock.assert_called_with(payload, False) | ||
|
||
def test_get_batch_aggregations_with_invalid_params(self): | ||
with self.assertRaises(TypeError): | ||
workflow = Workflow(1) | ||
workflow.run_batch_aggregation(user=None) | ||
|
||
self.batch_agg_run_aggregation_mock.assert_not_called() | ||
|
||
def test_check_batch_aggregation_run_status(self): | ||
self.batch_agg_get_aggregations_mock.return_value = self.agg_mock_value | ||
|
||
workflow = Workflow(1) | ||
status = workflow.check_batch_aggregation_run_status() | ||
|
||
self.batch_agg_get_aggregations_mock.assert_called_once() | ||
self.assertEqual(status, 'pending') | ||
|
||
def test_get_batch_aggregation_links(self): | ||
self.batch_agg_get_aggregations_mock.return_value = self.agg_mock_value | ||
|
||
workflow = Workflow(1) | ||
links = workflow.get_batch_aggregation_links() | ||
|
||
self.batch_agg_get_aggregations_mock.assert_called_once() | ||
self.assertEqual(links, None) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,8 @@ | |
from panoptes_client.utils import batchable | ||
|
||
from panoptes_client.caesar import Caesar | ||
from panoptes_client.batch_aggregation import BatchAggregation | ||
from panoptes_client.user import User | ||
|
||
|
||
class Workflow(PanoptesObject, Exportable): | ||
|
@@ -530,6 +532,55 @@ def configure_for_alice(self): | |
self.add_alice_reducers() | ||
self.add_alice_rules_and_effects() | ||
|
||
def get_batch_aggregations(self): | ||
""" | ||
This method will fetch existing aggregations if any. It will return a dict with the existing aggregations. | ||
""" | ||
|
||
return BatchAggregation().get_aggregations(self.id)[0] | ||
|
||
def run_batch_aggregation(self, user=None, delete_if_exists=False): | ||
""" | ||
This method will start a new batch aggregation run, Will return a dict with the created aggregation if successful. | ||
|
||
- **user** can be either a :py:class:`.User` or an ID. | ||
- **delete_if_exists** parameter is optional if true, deletes any previous instance | ||
- | ||
Examples:: | ||
|
||
Workflow(1234).run_batch_aggregation(1234) | ||
Workflow(1234).run_batch_aggregation(user=1234, delete_if_exists=True) | ||
""" | ||
|
||
if(isinstance(user, User)): | ||
_user_id = user.id | ||
elif (isinstance(user, (int, str,))): | ||
_user_id = user | ||
else: | ||
raise TypeError | ||
|
||
payload = { | ||
"aggregations": { | ||
"links": { | ||
"user": _user_id, | ||
"workflow": self.id, | ||
} | ||
} | ||
} | ||
return BatchAggregation().run_aggregation(payload, delete_if_exists) | ||
|
||
def check_batch_aggregation_run_status(self): | ||
""" | ||
This method will fetch existing aggregation status if any. | ||
""" | ||
return self.get_batch_aggregations()['aggregations'][0]['status'] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there should be a check if there are no aggregations. i.e. Same with |
||
|
||
def get_batch_aggregation_links(self): | ||
""" | ||
This method will fetch existing aggregation links if any. | ||
""" | ||
return self.get_batch_aggregations()['aggregations'][0]['uuid'] | ||
|
||
@property | ||
def versions(self): | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noting that if we want users to only interact with aggregations via the Workflow, we probably do not need this line.