Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implemet batch aggregation on the client #318

Merged
merged 8 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions panoptes_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@
from panoptes_client.subject_workflow_status import SubjectWorkflowStatus
from panoptes_client.caesar import Caesar
from panoptes_client.inaturalist import Inaturalist
from panoptes_client.batch_aggregation import BatchAggregation
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that if we want users to only interact with aggregations via the Workflow, we probably do not need this line.

44 changes: 44 additions & 0 deletions panoptes_client/batch_aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from panoptes_client.panoptes import Panoptes, PanoptesAPIException


class BatchAggregation(object):

def get_aggregations(self, workflow_id):
return Panoptes.client().get(f'/aggregations?workflow_id={workflow_id}')

def get_aggregation(self, agg_id):
return Panoptes.client().get(f'/aggregations/{agg_id}')

def create_aggregation(self, payload):
return Panoptes.client().post(f'/aggregations', json=payload)

def delete_aggregation(self, agg_id, etag):
return Panoptes.client().delete(f'/aggregations/{agg_id}', etag=etag)

def fetch_and_delete_aggregation(self, agg_id):
try:
single_agg = self.get_aggregation(agg_id)
self.delete_aggregation(agg_id, single_agg[1])
except PanoptesAPIException as err:
raise err

def run_aggregation(self, payload, delete_if_exists):
try:
# if exists and delete_if_exists, delete the first
response = self.get_aggregations(payload["aggregations"]["links"]["workflow"])
if response and response[0]:
has_agg = len(response[0]['aggregations']) > 0
if delete_if_exists and has_agg:
agg_id = response[0]['aggregations'][0]['id']
# delete
self.fetch_and_delete_aggregation(agg_id)
# create
return self.create_aggregation(payload)[0]
elif not has_agg:
return self.create_aggregation(payload)[0]
else:
return response[0]
else:
self.create_aggregation(payload)[0]
except PanoptesAPIException as err:
raise err
72 changes: 72 additions & 0 deletions panoptes_client/tests/test_batch_aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import unittest
import sys
from panoptes_client.batch_aggregation import BatchAggregation

if sys.version_info <= (3, 0):
from mock import patch
else:
from unittest.mock import patch

class TestBatchAggregation(unittest.TestCase):

def setUp(self):
super().setUp()
self.mock_client_patch = patch('panoptes_client.panoptes.Panoptes.client')
self.mock_client = self.mock_client_patch.start()

self.addCleanup(self.mock_client_patch.stop)

batch_agg_get_aggregations_patch = patch.object(BatchAggregation, 'get_aggregations')
fetch_and_delete_aggregation_patch = patch.object(BatchAggregation, 'fetch_and_delete_aggregation')
create_aggregation_patch = patch.object(BatchAggregation, 'create_aggregation')

self.batch_agg_get_aggregations_mock = batch_agg_get_aggregations_patch.start()
self.fetch_and_delete_aggregation_mock = fetch_and_delete_aggregation_patch.start()
self.create_aggregation_mock = create_aggregation_patch.start()

self.addCleanup(batch_agg_get_aggregations_patch.stop)
self.addCleanup(fetch_and_delete_aggregation_patch.stop)
self.addCleanup(create_aggregation_patch.stop)

self.payload = {
"aggregations": {
"links": {
"user": 1,
"workflow": 1,
}
}
}

self.agg_mock_value = [{
'aggregations': [{
'id': '1',
'href': '/aggregations/1',
'created_at': '2024-08-13T10:26:32.560Z',
'updated_at': '2024-08-13T10:26:32.576Z',
'uuid': None,
'task_id': 'task_id',
'status': 'pending',
'links': {'project': '1', 'workflow': '1', 'user': '1'}
}]
}, 'etag']

def test_run_aggregation_with_delete_if_exists(self):
self.batch_agg_get_aggregations_mock.return_value = self.agg_mock_value

BatchAggregation().run_aggregation(payload=self.payload, delete_if_exists=True)
self.fetch_and_delete_aggregation_mock.assert_called_with(self.agg_mock_value[0]['aggregations'][0]['id'])
self.create_aggregation_mock.assert_called_with(self.payload)

def test_run_aggregation_with_previously_created_case(self):
self.batch_agg_get_aggregations_mock.return_value = self.agg_mock_value

BatchAggregation().run_aggregation(payload=self.payload, delete_if_exists=False)
self.fetch_and_delete_aggregation_mock.assert_not_called()
self.create_aggregation_mock.assert_not_called()

def test_run_aggregation_with_no_previously_created_case(self):
self.batch_agg_get_aggregations_mock.return_value = [{'aggregations': []}, 'etag']

BatchAggregation().run_aggregation(payload=self.payload, delete_if_exists=False)
self.fetch_and_delete_aggregation_mock.assert_not_called()
self.create_aggregation_mock.assert_called_with(self.payload)
60 changes: 60 additions & 0 deletions panoptes_client/tests/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from panoptes_client.panoptes import PanoptesAPIException
from panoptes_client.workflow import Workflow
from panoptes_client.caesar import Caesar
from panoptes_client.batch_aggregation import BatchAggregation

if sys.version_info <= (3, 0):
from mock import patch
Expand All @@ -24,6 +25,27 @@ def setUp(self):
self.addCleanup(caesar_get_patch.stop)
self.addCleanup(caesar_put_patch.stop)

batch_agg_run_aggregation_patch = patch.object(BatchAggregation, 'run_aggregation')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the batch agg patches are so far away from the actual related tests, its easy to forget about them existing.

I wonder if its worth having a separate class within the same file for these changes and rename the ones geared towards caesar to separate.

i.e.

class TestWorkflowCaesarFeatures(unittest.TestCase):
    def setUp(self):
        caesar test setup stuff 

class TestWorkflowRunAggregation(unittest.TestCase):
     def setUp(self):
        BATCH AGG SETUP STUFF
      
     def test_run_aggregation_stuff(self):

Another way to mitigate (other than splitting tests into classes ) is to patch on the test. eg

@patch('run_aggregation', return_value=EXPECTED RETURN VALUE)
def test_run_aggregation(self, mock_run_agregation):
   WHATEVER TEST

batch_agg_get_aggregations_patch = patch.object(BatchAggregation, 'get_aggregations')
self.batch_agg_run_aggregation_mock = batch_agg_run_aggregation_patch.start()
self.batch_agg_get_aggregations_mock = batch_agg_get_aggregations_patch.start()

self.addCleanup(batch_agg_run_aggregation_patch.stop)
self.addCleanup(batch_agg_get_aggregations_patch.stop)

self.agg_mock_value = [{
'aggregations': [{
'id': '1',
'href': '/aggregations/1',
'created_at': '2024-08-13T10:26:32.560Z',
'updated_at': '2024-08-13T10:26:32.576Z',
'uuid': None,
'task_id': 'task_id',
'status': 'pending',
'links': {'project': '1', 'workflow': '1', 'user': '1'}
}]
}, 'etag']

def test_save_to_caesar_update(self):
workflow = Workflow(1)
workflow.save_to_caesar()
Expand Down Expand Up @@ -208,3 +230,41 @@ def test_add_caesar_rule_effect_invalid_effect(self):

self.caesar_post_mock.assert_not_called()
self.assertEqual('Invalid action for rule type', str(invalid_effect_err.exception))

def test_get_batch_aggregations_without_delete_param(self):
workflow = Workflow(1)
workflow.run_batch_aggregation(user=1)
payload = {
"aggregations": {
"links": {
"user": 1,
"workflow": workflow.id,
}
}
}
self.batch_agg_run_aggregation_mock.assert_called_with(payload, False)

def test_get_batch_aggregations_with_invalid_params(self):
with self.assertRaises(TypeError):
workflow = Workflow(1)
workflow.run_batch_aggregation(user=None)

self.batch_agg_run_aggregation_mock.assert_not_called()

def test_check_batch_aggregation_run_status(self):
self.batch_agg_get_aggregations_mock.return_value = self.agg_mock_value

workflow = Workflow(1)
status = workflow.check_batch_aggregation_run_status()

self.batch_agg_get_aggregations_mock.assert_called_once()
self.assertEqual(status, 'pending')

def test_get_batch_aggregation_links(self):
self.batch_agg_get_aggregations_mock.return_value = self.agg_mock_value

workflow = Workflow(1)
links = workflow.get_batch_aggregation_links()

self.batch_agg_get_aggregations_mock.assert_called_once()
self.assertEqual(links, None)
51 changes: 51 additions & 0 deletions panoptes_client/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from panoptes_client.utils import batchable

from panoptes_client.caesar import Caesar
from panoptes_client.batch_aggregation import BatchAggregation
from panoptes_client.user import User


class Workflow(PanoptesObject, Exportable):
Expand Down Expand Up @@ -530,6 +532,55 @@ def configure_for_alice(self):
self.add_alice_reducers()
self.add_alice_rules_and_effects()

def get_batch_aggregations(self):
"""
This method will fetch existing aggregations if any. It will return a dict with the existing aggregations.
"""

return BatchAggregation().get_aggregations(self.id)[0]

def run_batch_aggregation(self, user=None, delete_if_exists=False):
"""
This method will start a new batch aggregation run, Will return a dict with the created aggregation if successful.

- **user** can be either a :py:class:`.User` or an ID.
- **delete_if_exists** parameter is optional if true, deletes any previous instance
-
Examples::

Workflow(1234).run_batch_aggregation(1234)
Workflow(1234).run_batch_aggregation(user=1234, delete_if_exists=True)
"""

if(isinstance(user, User)):
_user_id = user.id
elif (isinstance(user, (int, str,))):
_user_id = user
else:
raise TypeError

payload = {
"aggregations": {
"links": {
"user": _user_id,
"workflow": self.id,
}
}
}
return BatchAggregation().run_aggregation(payload, delete_if_exists)

def check_batch_aggregation_run_status(self):
"""
This method will fetch existing aggregation status if any.
"""
return self.get_batch_aggregations()['aggregations'][0]['status']
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be a check if there are no aggregations. i.e.
when self.get_batch_aggregations()['aggregations'] length of resulting array is 0.

Same with get_batch_aggregation_links (method below on L578)


def get_batch_aggregation_links(self):
"""
This method will fetch existing aggregation links if any.
"""
return self.get_batch_aggregations()['aggregations'][0]['uuid']

@property
def versions(self):
"""
Expand Down
Loading