From a69a706307bc1c0fb9ea484ab2225721fdefdc60 Mon Sep 17 00:00:00 2001 From: "Douglas Cerna (Soy Douglas)" Date: Wed, 22 Nov 2023 23:44:50 +0100 Subject: [PATCH] Replace TmpDir helper with pytest fixtures --- tests/test_aips_models.py | 49 +-- tests/test_create_dips_job.py | 578 +++++++++++++++++----------------- tests/tests_helpers.py | 23 -- 3 files changed, 315 insertions(+), 335 deletions(-) delete mode 100644 tests/tests_helpers.py diff --git a/tests/test_aips_models.py b/tests/test_aips_models.py index 3b6f802..93a26e4 100644 --- a/tests/test_aips_models.py +++ b/tests/test_aips_models.py @@ -1,27 +1,30 @@ #!/usr/bin/env python import os -import unittest + +import pytest from aips import models -from tests.tests_helpers import TmpDir - -THIS_DIR = os.path.abspath(os.path.dirname(__file__)) -TMP_DIR = os.path.join(THIS_DIR, ".tmp-aips-models") -DATABASE_FILE = os.path.join(TMP_DIR, "aips.db") - - -class TestAipsModels(unittest.TestCase): - def test_init_success(self): - """Test that the database, table and session are created.""" - assert not os.path.isfile(DATABASE_FILE) - assert not hasattr(models, "Session") - with TmpDir(TMP_DIR): - session = models.init(DATABASE_FILE) - assert os.path.isfile(DATABASE_FILE) - assert "aip" in models.Base.metadata.tables - assert hasattr(session, "add") - assert callable(session.add) - - def test_init_fail(self): - """Test that the database can't be created in a wrong path.""" - self.assertRaises(IOError, models.init, "/this/should/be/a/wrong/path/to.db") + + +def test_init_success(tmp_path): + """Test that the database, table and session are created.""" + tmp_dir = tmp_path / "dir" + tmp_dir.mkdir() + + DATABASE_FILE = (tmp_dir / "aips.db").as_posix() + + assert not os.path.isfile(DATABASE_FILE) + assert not hasattr(models, "Session") + + session = models.init(DATABASE_FILE) + + assert os.path.isfile(DATABASE_FILE) + assert "aip" in models.Base.metadata.tables + assert hasattr(session, "add") + assert callable(session.add) + + +def test_init_fail(): + """Test that the database can't be created in a wrong path.""" + with pytest.raises(IOError): + models.init("/this/should/be/a/wrong/path/to.db") diff --git a/tests/test_create_dips_job.py b/tests/test_create_dips_job.py index 6b257c6..200cb54 100644 --- a/tests/test_create_dips_job.py +++ b/tests/test_create_dips_job.py @@ -1,14 +1,13 @@ #!/usr/bin/env python import os -import unittest from pathlib import Path from unittest import mock +import pytest import requests from sqlalchemy import exc from aips import create_dips_job -from tests.tests_helpers import TmpDir SS_URL = "http://192.168.168.192:8000" @@ -17,11 +16,6 @@ LOCATION_UUID = "e9a08ce2-4e8e-4e01-bdea-09d8d8deff8b" ORIGIN_PIPELINE_UUID = "ad174753-6776-47e2-9a12-ac37837e5128" -THIS_DIR = os.path.abspath(os.path.dirname(__file__)) -TMP_DIR = os.path.join(THIS_DIR, ".tmp-create-dips-job") -OUTPUT_DIR = os.path.join(TMP_DIR, "output") -DATABASE_FILE = os.path.join(TMP_DIR, "aips.db") - AIP_FIXTURE_PATH = Path(__file__).parent.parent / "fixtures" / "aip.tar" AIP_CONTENT = b"" with open(AIP_FIXTURE_PATH, "rb") as f: @@ -49,299 +43,305 @@ } -class TestCreateDipsJob(unittest.TestCase): - def setUp(self): - self.args = { - "ss_url": SS_URL, - "ss_user": SS_USER_NAME, - "ss_api_key": SS_API_KEY, - "location_uuid": LOCATION_UUID, - "origin_pipeline_uuid": ORIGIN_PIPELINE_UUID, - "tmp_dir": TMP_DIR, - "output_dir": OUTPUT_DIR, - "database_file": DATABASE_FILE, - "delete_local_copy": True, - "upload_type": None, - "pipeline_uuid": "", - "cp_location_uuid": "", - "ds_location_uuid": "", - "shared_directory": "", - "atom_url": "", - "atom_email": "", - "atom_password": "", - "atom_slug": "", - "rsync_target": "", - } +@pytest.fixture +def args(tmp_path): + tmp_dir = tmp_path / "dir" + tmp_dir.mkdir() + output_dir = tmp_path / "output" + output_dir.mkdir() + database_file = tmp_dir / "aips.db" - def test_filter_aips(self): - """ - Test that AIPs without 'uuid' or 'current_location' - or in a different location are filtered. - """ - aips = [ - # Okay - { - "current_location": "/api/v2/location/e9a08ce2-4e8e-4e01-bdea-09d8d8deff8b/", - "origin_pipeline": "/api/v2/pipeline/ad174753-6776-47e2-9a12-ac37837e5128/", - "uuid": "0fef53b0-0573-4398-aa4f-ebf04fe711cf", - }, - # Wrong location - { - "current_location": "/api/v2/location/5c1c87e0-7d11-4f39-8dda-182b3a45031f/", - "origin_pipeline": "/api/v2/pipeline/ad174753-6776-47e2-9a12-ac37837e5128/", - "uuid": "7636f290-0b02-4323-b4bc-bd1ed191aaea", - }, - # Wrong pipeline - { - "current_location": "/api/v2/location/e9a08ce2-4e8e-4e01-bdea-09d8d8deff8b/", - "origin_pipeline": "/api/v2/pipeline/88050c7f-36a3-4900-9294-5a0411d69303/", - "uuid": "b9cd796c-2231-42e6-9cd1-0236d22958fa", + return { + "ss_url": SS_URL, + "ss_user": SS_USER_NAME, + "ss_api_key": SS_API_KEY, + "location_uuid": LOCATION_UUID, + "origin_pipeline_uuid": ORIGIN_PIPELINE_UUID, + "tmp_dir": tmp_dir.as_posix(), + "output_dir": output_dir.as_posix(), + "database_file": database_file.as_posix(), + "delete_local_copy": True, + "upload_type": None, + "pipeline_uuid": "", + "cp_location_uuid": "", + "ds_location_uuid": "", + "shared_directory": "", + "atom_url": "", + "atom_email": "", + "atom_password": "", + "atom_slug": "", + "rsync_target": "", + } + + +def test_filter_aips(): + """ + Test that AIPs without 'uuid' or 'current_location' + or in a different location are filtered. + """ + aips = [ + # Okay + { + "current_location": "/api/v2/location/e9a08ce2-4e8e-4e01-bdea-09d8d8deff8b/", + "origin_pipeline": "/api/v2/pipeline/ad174753-6776-47e2-9a12-ac37837e5128/", + "uuid": "0fef53b0-0573-4398-aa4f-ebf04fe711cf", + }, + # Wrong location + { + "current_location": "/api/v2/location/5c1c87e0-7d11-4f39-8dda-182b3a45031f/", + "origin_pipeline": "/api/v2/pipeline/ad174753-6776-47e2-9a12-ac37837e5128/", + "uuid": "7636f290-0b02-4323-b4bc-bd1ed191aaea", + }, + # Wrong pipeline + { + "current_location": "/api/v2/location/e9a08ce2-4e8e-4e01-bdea-09d8d8deff8b/", + "origin_pipeline": "/api/v2/pipeline/88050c7f-36a3-4900-9294-5a0411d69303/", + "uuid": "b9cd796c-2231-42e6-9cd1-0236d22958fa", + }, + # Missing UUID + { + "current_location": "/api/v2/location/e9a08ce2-4e8e-4e01-bdea-09d8d8deff8b/", + "origin_pipeline": "/api/v2/pipeline/ad174753-6776-47e2-9a12-ac37837e5128/", + }, + # Missing location + { + "origin_pipeline": "/api/v2/pipeline/ad174753-6776-47e2-9a12-ac37837e5128/", + "uuid": "6bbd3dee-b52f-476f-8136-bb3f0d025096", + }, + # Missing pipeline + { + "current_location": "/api/v2/location/e9a08ce2-4e8e-4e01-bdea-09d8d8deff8b/", + "uuid": "e6409b38-20e9-4739-bb4a-892f2fb300d3", + }, + ] + filtered_aips = create_dips_job.filter_aips( + aips, LOCATION_UUID, ORIGIN_PIPELINE_UUID + ) + assert filtered_aips == ["0fef53b0-0573-4398-aa4f-ebf04fe711cf"] + + +def test_main_fail_db(args): + """Test a fail when a database can't be created.""" + args["database_file"] = "/this/should/be/a/wrong/path/to.db" + ret = create_dips_job.main(**args) + assert ret == 1 + + +@mock.patch( + "requests.request", + side_effect=[mock.Mock(status_code=401, headers={}, spec=requests.Response)], +) +def test_main_fail_request(_request, args): + """Test a fail when an SS connection can't be established.""" + args["ss_api_key"] = "bad_api_key" + ret = create_dips_job.main(**args) + assert ret == 2 + + +@mock.patch( + "requests.request", + side_effect=[ + mock.Mock( + **{ + "status_code": 200, + "headers": requests.structures.CaseInsensitiveDict( + {"Content-Type": "application/json"} + ), + "json.return_value": AIPS_JSON, }, - # Missing UUID - { - "current_location": "/api/v2/location/e9a08ce2-4e8e-4e01-bdea-09d8d8deff8b/", - "origin_pipeline": "/api/v2/pipeline/ad174753-6776-47e2-9a12-ac37837e5128/", + spec=requests.Response + ) + ], +) +@mock.patch( + "requests.get", + side_effect=[ + mock.Mock( + **{ + "status_code": 200, + "headers": {}, + "iter_content.return_value": iter([AIP_CONTENT]), }, - # Missing location - { - "origin_pipeline": "/api/v2/pipeline/ad174753-6776-47e2-9a12-ac37837e5128/", - "uuid": "6bbd3dee-b52f-476f-8136-bb3f0d025096", + spec=requests.Response + ), + ], +) +def test_main_success(_get, _request, args): + """Test a success where one DIP is created.""" + ret = create_dips_job.main(**args) + assert ret is None + dip_path = os.path.join( + args["output_dir"], "test_B-3ea465ac-ea0a-4a9c-a057-507e794de332" + ) + assert os.path.isdir(dip_path) + + +@mock.patch( + "requests.request", + side_effect=[ + mock.Mock( + **{ + "status_code": 200, + "headers": requests.structures.CaseInsensitiveDict( + {"Content-Type": "application/json"} + ), + "json.return_value": AIPS_JSON, }, - # Missing pipeline - { - "current_location": "/api/v2/location/e9a08ce2-4e8e-4e01-bdea-09d8d8deff8b/", - "uuid": "e6409b38-20e9-4739-bb4a-892f2fb300d3", + spec=requests.Response + ) + ], +) +@mock.patch( + "requests.get", + side_effect=[ + mock.Mock( + **{ + "status_code": 200, + "headers": {}, + "iter_content.return_value": iter([AIP_CONTENT]), }, - ] - filtered_aips = create_dips_job.filter_aips( - aips, LOCATION_UUID, ORIGIN_PIPELINE_UUID + spec=requests.Response + ), + ], +) +def test_main_success_no_dip_creation(_get, _request, args): + """Test a success where one AIP was already processed.""" + effect = exc.IntegrityError({}, [], "") + session_add_patch = mock.patch("sqlalchemy.orm.Session.add", side_effect=effect) + with session_add_patch: + ret = create_dips_job.main(**args) + assert ret is None + dip_path = os.path.join( + args["output_dir"], "test_B-3ea465ac-ea0a-4a9c-a057-507e794de332" ) - assert filtered_aips == ["0fef53b0-0573-4398-aa4f-ebf04fe711cf"] + assert not os.path.isdir(dip_path) - def test_main_fail_db(self): - """Test a fail when a database can't be created.""" - self.args["database_file"] = "/this/should/be/a/wrong/path/to.db" - ret = create_dips_job.main(**self.args) - assert ret == 1 - @mock.patch( - "requests.request", - side_effect=[mock.Mock(status_code=401, headers={}, spec=requests.Response)], - ) - def test_main_fail_request(self, _request): - """Test a fail when an SS connection can't be established.""" - with TmpDir(TMP_DIR): - self.args["ss_api_key"] = "bad_api_key" - ret = create_dips_job.main(**self.args) - assert ret == 2 - - @mock.patch( - "requests.request", - side_effect=[ - mock.Mock( - **{ - "status_code": 200, - "headers": requests.structures.CaseInsensitiveDict( - {"Content-Type": "application/json"} - ), - "json.return_value": AIPS_JSON, - }, - spec=requests.Response - ) - ], - ) - @mock.patch( - "requests.get", - side_effect=[ - mock.Mock( - **{ - "status_code": 200, - "headers": {}, - "iter_content.return_value": iter([AIP_CONTENT]), - }, - spec=requests.Response - ), - ], - ) - def test_main_success(self, _get, _request): - """Test a success where one DIP is created.""" - with TmpDir(TMP_DIR), TmpDir(OUTPUT_DIR): - # breakpoint() - ret = create_dips_job.main(**self.args) - assert ret is None - dip_path = os.path.join( - OUTPUT_DIR, "test_B-3ea465ac-ea0a-4a9c-a057-507e794de332" - ) - assert os.path.isdir(dip_path) +@mock.patch("aips.create_dips_job.atom_upload.main") +@mock.patch("aips.create_dips_job.create_dip.main", return_value=1) +@mock.patch( + "requests.request", + side_effect=[ + mock.Mock( + **{ + "status_code": 200, + "headers": requests.structures.CaseInsensitiveDict( + {"Content-Type": "application/json"} + ), + "json.return_value": AIPS_JSON, + }, + spec=requests.Response + ) + ], +) +@mock.patch( + "requests.get", + side_effect=[ + mock.Mock( + **{ + "status_code": 200, + "headers": {}, + "iter_content.return_value": iter([AIP_CONTENT]), + }, + spec=requests.Response + ), + ], +) +def test_main_dip_creation_failed(_get, _request, create_dip, atom_upload, args): + """Test that a fail on DIP creation doesn't trigger an upload.""" + args["upload_type"] = "atom-upload" + create_dips_job.main(**args) + assert not atom_upload.called - @mock.patch( - "requests.request", - side_effect=[ - mock.Mock( - **{ - "status_code": 200, - "headers": requests.structures.CaseInsensitiveDict( - {"Content-Type": "application/json"} - ), - "json.return_value": AIPS_JSON, - }, - spec=requests.Response - ) - ], - ) - @mock.patch( - "requests.get", - side_effect=[ - mock.Mock( - **{ - "status_code": 200, - "headers": {}, - "iter_content.return_value": iter([AIP_CONTENT]), - }, - spec=requests.Response - ), - ], - ) - def test_main_success_no_dip_creation(self, _get, _request): - """Test a success where one AIP was already processed.""" - effect = exc.IntegrityError({}, [], "") - session_add_patch = mock.patch("sqlalchemy.orm.Session.add", side_effect=effect) - with TmpDir(TMP_DIR), TmpDir(OUTPUT_DIR), session_add_patch: - ret = create_dips_job.main(**self.args) - assert ret is None - dip_path = os.path.join( - OUTPUT_DIR, "test_B-3ea465ac-ea0a-4a9c-a057-507e794de332" - ) - assert not os.path.isdir(dip_path) - @mock.patch("aips.create_dips_job.atom_upload.main") - @mock.patch("aips.create_dips_job.create_dip.main", return_value=1) - @mock.patch( - "requests.request", - side_effect=[ - mock.Mock( - **{ - "status_code": 200, - "headers": requests.structures.CaseInsensitiveDict( - {"Content-Type": "application/json"} - ), - "json.return_value": AIPS_JSON, - }, - spec=requests.Response - ) - ], - ) - @mock.patch( - "requests.get", - side_effect=[ - mock.Mock( - **{ - "status_code": 200, - "headers": {}, - "iter_content.return_value": iter([AIP_CONTENT]), - }, - spec=requests.Response - ), - ], +@mock.patch("aips.create_dips_job.atom_upload.main", return_value=None) +@mock.patch("aips.create_dips_job.create_dip.main", return_value="fake/path") +@mock.patch( + "requests.request", + side_effect=[ + mock.Mock( + **{ + "status_code": 200, + "headers": requests.structures.CaseInsensitiveDict( + {"Content-Type": "application/json"} + ), + "json.return_value": AIPS_JSON, + }, + spec=requests.Response + ) + ], +) +@mock.patch( + "requests.get", + side_effect=[ + mock.Mock( + **{ + "status_code": 200, + "headers": {}, + "iter_content.return_value": iter([AIP_CONTENT]), + }, + spec=requests.Response + ), + ], +) +def test_main_success_atom_upload_call(_get, _request, create_dip, atom_upload, args): + """Test that an upload to AtoM is performed.""" + args.update( + { + "upload_type": "atom-upload", + "atom_url": "", + "atom_email": "", + "atom_password": "", + "atom_slug": "", + "rsync_target": "", + "delete_local_copy": True, + } ) - def test_main_dip_creation_failed(self, _get, _request, create_dip, atom_upload): - """Test that a fail on DIP creation doesn't trigger an upload.""" - with TmpDir(TMP_DIR), TmpDir(OUTPUT_DIR): - self.args["upload_type"] = "atom-upload" - create_dips_job.main(**self.args) - assert not atom_upload.called + create_dips_job.main(**args) + assert atom_upload.called - @mock.patch("aips.create_dips_job.atom_upload.main", return_value=None) - @mock.patch("aips.create_dips_job.create_dip.main", return_value="fake/path") - @mock.patch( - "requests.request", - side_effect=[ - mock.Mock( - **{ - "status_code": 200, - "headers": requests.structures.CaseInsensitiveDict( - {"Content-Type": "application/json"} - ), - "json.return_value": AIPS_JSON, - }, - spec=requests.Response - ) - ], - ) - @mock.patch( - "requests.get", - side_effect=[ - mock.Mock( - **{ - "status_code": 200, - "headers": {}, - "iter_content.return_value": iter([AIP_CONTENT]), - }, - spec=requests.Response - ), - ], - ) - def test_main_success_atom_upload_call( - self, _get, _request, create_dip, atom_upload - ): - """Test that an upload to AtoM is performed.""" - with TmpDir(TMP_DIR), TmpDir(OUTPUT_DIR): - self.args.update( - { - "upload_type": "atom-upload", - "atom_url": "", - "atom_email": "", - "atom_password": "", - "atom_slug": "", - "rsync_target": "", - "delete_local_copy": True, - } - ) - create_dips_job.main(**self.args) - assert atom_upload.called - @mock.patch("aips.create_dips_job.storage_service_upload.main", return_value=None) - @mock.patch("aips.create_dips_job.create_dip.main", return_value="fake/path") - @mock.patch( - "requests.request", - side_effect=[ - mock.Mock( - **{ - "status_code": 200, - "headers": requests.structures.CaseInsensitiveDict( - {"Content-Type": "application/json"} - ), - "json.return_value": AIPS_JSON, - }, - spec=requests.Response - ) - ], - ) - @mock.patch( - "requests.get", - side_effect=[ - mock.Mock( - **{ - "status_code": 200, - "headers": {}, - "iter_content.return_value": iter([AIP_CONTENT]), - }, - spec=requests.Response - ), - ], +@mock.patch("aips.create_dips_job.storage_service_upload.main", return_value=None) +@mock.patch("aips.create_dips_job.create_dip.main", return_value="fake/path") +@mock.patch( + "requests.request", + side_effect=[ + mock.Mock( + **{ + "status_code": 200, + "headers": requests.structures.CaseInsensitiveDict( + {"Content-Type": "application/json"} + ), + "json.return_value": AIPS_JSON, + }, + spec=requests.Response + ) + ], +) +@mock.patch( + "requests.get", + side_effect=[ + mock.Mock( + **{ + "status_code": 200, + "headers": {}, + "iter_content.return_value": iter([AIP_CONTENT]), + }, + spec=requests.Response + ), + ], +) +def test_main_success_ss_upload_call(_get, _request, create_dip, ss_upload, args): + """Test that an upload to AtoM is performed.""" + args.update( + { + "upload_type": "ss-upload", + "pipeline_uuid": "", + "cp_location_uuid": "", + "ds_location_uuid": "", + "shared_directory": "", + "delete_local_copy": True, + } ) - def test_main_success_ss_upload_call(self, _get, _request, create_dip, ss_upload): - """Test that an upload to AtoM is performed.""" - with TmpDir(TMP_DIR), TmpDir(OUTPUT_DIR): - self.args.update( - { - "upload_type": "ss-upload", - "pipeline_uuid": "", - "cp_location_uuid": "", - "ds_location_uuid": "", - "shared_directory": "", - "delete_local_copy": True, - } - ) - create_dips_job.main(**self.args) - assert ss_upload.called + create_dips_job.main(**args) + assert ss_upload.called diff --git a/tests/tests_helpers.py b/tests/tests_helpers.py deleted file mode 100644 index e8e06df..0000000 --- a/tests/tests_helpers.py +++ /dev/null @@ -1,23 +0,0 @@ -import os -import shutil - - -class TmpDir: - """Context manager to clear and create a temporary directory and destroy it - after usage. - """ - - def __init__(self, tmp_dir_path): - self.tmp_dir_path = tmp_dir_path - - def __enter__(self): - if os.path.isdir(self.tmp_dir_path): - shutil.rmtree(self.tmp_dir_path) - os.makedirs(self.tmp_dir_path) - return self.tmp_dir_path - - def __exit__(self, exc_type, exc_value, traceback): - if os.path.isdir(self.tmp_dir_path): - shutil.rmtree(self.tmp_dir_path) - if exc_type: - return None