Skip to content

Commit

Permalink
Add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajkoti committed Dec 17, 2024
1 parent 122110a commit 9d6fa8f
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

import pytest

from cosmos.constants import _default_s3_conn
from cosmos.exceptions import CosmosValueError
from cosmos.io import (
_configure_remote_target_path,
_construct_dest_file_path,
upload_artifacts_to_aws_s3,
upload_artifacts_to_azure_wasb,
Expand Down Expand Up @@ -110,3 +112,51 @@ def test_upload_artifacts_to_cloud_storage_success(dummy_kwargs):

mock_configure.assert_called_once()
assert mock_copy.call_count == 2


@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release")
@patch("cosmos.io.remote_target_path")
def test_configure_remote_target_path_no_conn_id(mock_remote_target_path):
"""Test when no remote_conn_id is provided, but conn_id is resolved from scheme."""
mock_remote_target_path.return_value = "s3://bucket/path/to/file"
mock_storage_path = MagicMock()
with patch("cosmos.io.urlparse") as mock_urlparse:
mock_urlparse.return_value.scheme = "s3"
with patch("airflow.io.path.ObjectStoragePath") as mock_object_storage:
mock_object_storage.return_value = mock_storage_path
mock_storage_path.exists.return_value = True

result = _configure_remote_target_path()
assert result == (mock_object_storage.return_value, _default_s3_conn)


@pytest.mark.skipif(not AIRFLOW_IO_AVAILABLE, reason="Airflow did not have Object Storage until the 2.8 release")
@patch("cosmos.io.remote_target_path")
def test_configure_remote_target_path_conn_id_is_none(mock_remote_target_path):
"""Test when conn_id cannot be resolved and is None."""
mock_remote_target_path.return_value = "abcd://bucket/path/to/file"
mock_storage_path = MagicMock()
with patch("cosmos.io.urlparse") as mock_urlparse:
mock_urlparse.return_value.scheme = "abcd"
with patch("airflow.io.path.ObjectStoragePath") as mock_object_storage:
mock_object_storage.return_value = mock_storage_path
mock_storage_path.exists.return_value = True

result = _configure_remote_target_path()
assert result == (None, None)


@patch("cosmos.settings.AIRFLOW_IO_AVAILABLE", False)
@patch("cosmos.io.remote_target_path")
def test_configure_remote_target_path_airflow_io_unavailable(mock_remote_target_path):
"""Test when AIRFLOW_IO_AVAILABLE is False."""
mock_remote_target_path.return_value = "s3://bucket/path/to/file"
mock_storage_path = MagicMock()
with patch("cosmos.io.urlparse") as mock_urlparse:
mock_urlparse.return_value.scheme = "s3"
with patch("airflow.io.path.ObjectStoragePath") as mock_object_storage:
mock_object_storage.return_value = mock_storage_path
mock_storage_path.exists.return_value = True
with pytest.raises(CosmosValueError) as exc_info:
_configure_remote_target_path()
assert "Object Storage feature is unavailable" in str(exc_info.value)

0 comments on commit 9d6fa8f

Please sign in to comment.