diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index 4eab4c6..7a935aa 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -11,13 +11,4 @@ jobs: test-python: uses: NERC-CEH/dri-cicd/.github/workflows/test-python.yml@main with: - optional_dependencies: "[lint,test,all]" - - build-test-deploy-docker: - needs: [test-python] - uses: NERC-CEH/dri-cicd/.github/workflows/build-test-deploy-docker.yml@main - with: - package_name: driutils - secrets: - AWS_REGION: ${{ secrets.AWS_REGION }} - AWS_ROLE_ARN: ${{ secrets.AWS_ROLE_ARN }} + optional_dependencies: "[lint,test,all]" \ No newline at end of file diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index bc8676e..0000000 --- a/Dockerfile +++ /dev/null @@ -1,25 +0,0 @@ -# Build virtualenv -FROM python:3.12-slim as build -WORKDIR /app -COPY pyproject.toml README.md /app/ -COPY src /app/src -COPY .git /app/.git -RUN pip install --upgrade pip pdm -# Installs the codebase in editable mode into .venv -RUN pdm install - -# Build production containerdocker -# Only the ./.venv ./src ./tests are present in the production image -FROM python:3.12-slim as prod -WORKDIR /app -RUN groupadd -g 999 python && \ - useradd -m -r -u 999 -g python python -RUN chown python:python /app -COPY --chown=python:python --from=build /app/.venv /app/.venv -COPY --chown=python:python --from=build /app/src /app/src -COPY --chown=python:python tests/ /app/tests - -USER python -ENV PATH="/app/.venv/bin:$PATH" -ENV VIRTUAL_ENV="/app/.venv" -CMD ["python", "-m", "driutils"] \ No newline at end of file diff --git a/README.md b/README.md index d16effe..bb34f46 100644 --- a/README.md +++ b/README.md @@ -81,15 +81,54 @@ dependencies = [ ] ``` -## Readers +## I/O Modules + +### AWS S3 + +#### Reading +The `S3Reader` class reads files from an S3 object store using the `boto3` client. It simply requires a client to work with to initialize. Provisioning of credentials is left to the user and their s3_client + +```python +from driutils.io.aws import S3Reader +import boto3 + +client = boto3.client("s3") +reader = S3Reader(client) + +# Request a file from AWS S3 + +object_bytes = reader.read(bucket="my-bucket", key="Path/to/file") +``` + +#### Writing +The `S3Writer` operates in the same way as the reader but supplies a `write` method instead of `read`. The `body` argument is expected by AWS to be a `bytes` object, which is left to the user to provide. + +```python +from driutils.io.aws import S3Writer +import boto3 + +client = boto3.client("s3") +body = b"I'm a byte encoded document" +writer = S3Writer(client) + +# Submit a file to AWS S3 + +object_bytes = reader.read(bucket="my-bucket", key="Path/to/file", body=body) +``` + +#### Reading/Writing Combo Class +The `S3ReaderWriter` behaves the same as the prior classes but supplies both commands in one class + +### DuckDB + +#### Readers -### DuckDB Reader The DuckDB classes use the duckdb python interface to read files from local documents or S3 object storage - this comes with the capacity to use custom s3 endpoints. To read a local file: ```python -from driutils.read import DuckDBFileReader +from driutils.io.duckdb import DuckDBFileReader reader = DuckDBFileReader() query = "SELECT * FROM READ_PARQUET('myfile.parquet');" @@ -122,7 +161,7 @@ To read from an S3 storage location there is a more configuration available and The reader is instantiated like this: ```python -from driutils.read import import DuckDBS3Reader +from driutils.duckdb import DuckDBS3Reader # Automatic authentication from your environment auto_auth_reader = DuckDBS3Reader("auto") diff --git a/pyproject.toml b/pyproject.toml index 54c16a1..f4b49fb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ dependencies = [ "mypy_boto3_s3", "moto", "polars", + "tenacity", ] name = "dri-utils" dynamic = ["version"] diff --git a/src/driutils/io/aws.py b/src/driutils/io/aws.py new file mode 100644 index 0000000..20f7ba1 --- /dev/null +++ b/src/driutils/io/aws.py @@ -0,0 +1,84 @@ +import logging + +from botocore.client import BaseClient +from botocore.exceptions import ClientError +from mypy_boto3_s3.client import S3Client + +from driutils.io.interfaces import ReaderInterface, WriterInterface + +logger = logging.getLogger(__name__) + + +class S3Base: + """Base class to reuse initializer""" + + _connection: S3Client + """The S3 client used to perform the work""" + + def __init__(self, s3_client: S3Client) -> None: + """Initializes + + Args: + s3_client: The S3 client used to do work + """ + + if not isinstance(s3_client, BaseClient): + raise TypeError(f"'s3_client must be a BaseClient, not '{type(s3_client)}'") + + self._connection = s3_client + + +class S3Reader(S3Base, ReaderInterface): + """Class for handling file reads using the AWS S3 client""" + + def read(self, bucket_name: str, s3_key: str) -> bytes: + """ + Retrieves an object from an S3 bucket. + + If any step fails, it logs an error and re-raises the exception. + + Args: + bucket_name: The name of the S3 bucket. + s3_key: The key (path) of the object within the bucket. + + Returns: + bytes: raw bytes of the S3 object + + Raises: + Exception: If there's any error in retrieving or parsing the object. + """ + try: + data = self._connection.get_object(Bucket=bucket_name, Key=s3_key) + return data["Body"].read() + except (RuntimeError, ClientError) as e: + logger.error(f"Failed to get {s3_key} from {bucket_name}") + logger.exception(e) + raise e + + +class S3Writer(S3Base, WriterInterface): + """Writes to an S3 bucket""" + + def write(self, bucket_name: str, key: str, body: bytes) -> None: + """Uploads an object to an S3 bucket. + + This function attempts to upload a byte object to a specified S3 bucket + using the provided S3 client. If the upload fails, it logs an error + message and re-raises the exception. + + Args: + bucket_name: The name of the S3 bucket. + key: The key (path) of the object within the bucket. + body: data to write to s3 object + + Raises: + TypeError: If body is not bytes + """ + if not isinstance(body, bytes): + raise TypeError(f"'body' must be 'bytes', not '{type(body)}") + + self._connection.put_object(Bucket=bucket_name, Key=key, Body=body) + + +class S3ReaderWriter(S3Reader, S3Writer): + """Class to handle reading and writing in S3""" diff --git a/src/driutils/io/read.py b/src/driutils/io/duckdb.py similarity index 77% rename from src/driutils/io/read.py rename to src/driutils/io/duckdb.py index 84f515b..c5980dc 100644 --- a/src/driutils/io/read.py +++ b/src/driutils/io/duckdb.py @@ -1,40 +1,17 @@ -from abc import ABC, abstractmethod -from typing import Any, List, Optional, Self +import logging +from typing import List, Optional import duckdb from duckdb import DuckDBPyConnection +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed +from driutils.io.interfaces import ContextClass, ReaderInterface from driutils.utils import remove_protocol_from_url +logger = logging.getLogger(__name__) -class ReaderInterface(ABC): - """Abstract implementation for a IO reader""" - _connection: Any - """Reference to the connection object""" - - def __enter__(self) -> Self: - """Creates a connection when used in a context block""" - return self - - def __exit__(self, *args) -> None: - """Closes the connection when exiting the context""" - self.close() - - def __del__(self): - """Closes the connection when deleted""" - self.close() - - def close(self) -> None: - """Closes the connection""" - self._connection.close() - - @abstractmethod - def read(self, *args, **kwargs) -> Any: - """Reads data from a source""" - - -class DuckDBReader(ReaderInterface): +class DuckDBReader(ContextClass, ReaderInterface): """Abstract implementation of a DuckDB Reader""" _connection: DuckDBPyConnection @@ -43,6 +20,12 @@ class DuckDBReader(ReaderInterface): def __init__(self) -> None: self._connection = duckdb.connect() + @retry( + retry=retry_if_exception_type(duckdb.InvalidInputException), + wait=wait_fixed(2), + stop=stop_after_attempt(3), + reraise=True, + ) def read(self, query: str, params: Optional[List] = None) -> DuckDBPyConnection: """Requests to read a file @@ -51,7 +34,17 @@ def read(self, query: str, params: Optional[List] = None) -> DuckDBPyConnection: params: The parameters to supplement the query. """ - return self._connection.execute(query, params) + try: + return self._connection.execute(query, params) + except duckdb.HTTPException: + logger.error(f"Failed to find data from web query: {query}") + raise + except duckdb.IOException: + logger.error(f"Failed to read file from query: {query}") + raise + except duckdb.InvalidInputException: + logger.error(f"Corrupt data found from query: {query}") + raise class DuckDBS3Reader(DuckDBReader): @@ -79,14 +72,11 @@ def __init__(self, auth_type: str, endpoint_url: Optional[str] = None, use_ssl: self._connection.install_extension("httpfs") self._connection.load_extension("httpfs") - self._connection.execute(""" - SET force_download = true; - SET http_keep_alive = false; - """) + self._connection.execute("SET force_download = true;") self._authenticate(auth_type, endpoint_url, use_ssl) - def _authenticate(self, method: str, endpoint_url: Optional[str] = None, use_ssl: Optional[bool] = None) -> None: + def _authenticate(self, method: str, endpoint_url: Optional[str] = None, use_ssl: bool = True) -> None: """Handles authentication selection Args: diff --git a/src/driutils/io/interfaces.py b/src/driutils/io/interfaces.py new file mode 100644 index 0000000..5bcd2d4 --- /dev/null +++ b/src/driutils/io/interfaces.py @@ -0,0 +1,41 @@ +"""Core classes used by writers""" + +from abc import ABC, abstractmethod +from typing import Any, Self + + +class ContextClass: + _connection: Any + """Reference to the connection object""" + + def __enter__(self) -> Self: + """Creates a connection when used in a context block""" + return self + + def __exit__(self, *args) -> None: + """Closes the connection when exiting the context""" + self.close() + + def __del__(self): + """Closes the connection when deleted""" + self.close() + + def close(self) -> None: + """Closes the connection""" + self._connection.close() + + +class ReaderInterface(ABC): + """Abstract implementation for a IO reader""" + + @abstractmethod + def read(self, *args, **kwargs) -> Any: + """Reads data from a source""" + + +class WriterInterface(ABC): + """Interface for defining parquet writing objects""" + + @abstractmethod + def write(self, *args, **kwargs) -> None: + """Abstract method for read operations""" diff --git a/src/driutils/io/write.py b/src/driutils/io/write.py deleted file mode 100644 index ae1c2f4..0000000 --- a/src/driutils/io/write.py +++ /dev/null @@ -1,55 +0,0 @@ -"""Module for handling data writing logic""" - -from abc import ABC, abstractmethod - -from botocore.client import BaseClient -from mypy_boto3_s3.client import S3Client - - -class WriterInterface(ABC): - """Interface for defining parquet writing objects""" - - @abstractmethod - def write(self, *args, **kwargs) -> None: - """Abstract method for read operations""" - - -class S3Writer(WriterInterface): - """Writes to an S3 bucket""" - - s3_client: S3Client - """Handle to the the s3 client used to read data""" - - def __init__(self, s3_client: S3Client): - """Initializes the class - - Args: - s3_client: The s3 client used to retrieve data from - Raises: - TypeError - """ - - if not isinstance(s3_client, BaseClient): - raise TypeError(f"`s3_client` must be a `S3Client` not `{type(s3_client)}`") - - self.s3_client = s3_client - - def write(self, bucket_name: str, key: str, body: bytes) -> None: - """Uploads an object to an S3 bucket. - - This function attempts to upload a byte object to a specified S3 bucket - using the provided S3 client. If the upload fails, it logs an error - message and re-raises the exception. - - Args: - bucket_name: The name of the S3 bucket. - key: The key (path) of the object within the bucket. - body: data to write to s3 object - - Raises: - RuntimeError, ClientError - """ - if not isinstance(body, bytes): - raise TypeError(f"'body' must be 'bytes', not '{type(body)}") - - self.s3_client.put_object(Bucket=bucket_name, Key=key, Body=body) diff --git a/tests/io/test_aws.py b/tests/io/test_aws.py new file mode 100644 index 0000000..7ffc2fe --- /dev/null +++ b/tests/io/test_aws.py @@ -0,0 +1,87 @@ +import unittest +from unittest.mock import MagicMock, Mock +from driutils.io.aws import S3Writer, S3Reader +import boto3 +from botocore.client import BaseClient +from mypy_boto3_s3.client import S3Client +from parameterized import parameterized +from botocore.exceptions import ClientError + +class TestS3Writer(unittest.TestCase): + + @classmethod + def setUpClass(cls) -> None: + + cls.s3_client: S3Client = boto3.client("s3") #type: ignore + + def test_s3_client_type(self): + """Returns an object if s3_client is of type `boto3.client.s3`, otherwise + raises an error""" + + # Happy path + writer = S3Writer(self.s3_client) + + self.assertIsInstance(writer._connection, BaseClient) + + # Bad path + + with self.assertRaises(TypeError): + S3Writer("not an s3 client") #type: ignore + + + @parameterized.expand([1, "body", 1.123, {"key": b"bytes"}]) + def test_error_raises_if_write_without_bytes(self, body): + """Tests that a type error is raised if the wrong type body used""" + + writer = S3Writer(self.s3_client) + writer._connection = MagicMock() + with self.assertRaises(TypeError): + writer.write("bucket", "key", body) + + writer._connection.put_object.assert_not_called() + + def test_write_called(self): + """Tests that the writer can be executed""" + + body = b"Test data" + + writer = S3Writer(self.s3_client) + writer._connection = MagicMock() + writer.write("bucket", "key", body) + + writer._connection.put_object.assert_called_once_with(Bucket="bucket", Key="key", Body=body) + +class TestS3Reader(unittest.TestCase): + """Test suite for the S3 client reader""" + + @classmethod + def setUpClass(cls) -> None: + + cls.s3_client: S3Client = boto3.client("s3") #type: ignore + cls.bucket = "my-bucket" + cls.key = "my-key" + def test_error_caught_if_read_fails(self) -> None: + """Tests that a ClientError is raised if read fails""" + + reader = S3Reader(self.s3_client) + fake_error = ClientError(operation_name='InvalidKeyPair.Duplicate', error_response={ + 'Error': { + 'Code': 'Duplicate', + 'Message': 'This is a custom message' + } + }) + reader._connection.get_object = MagicMock(side_effect=fake_error) + + with self.assertRaises((RuntimeError, ClientError)): + reader.read(self.bucket, self.key) + + def test_get_request_made(self) -> None: + """Test that the get request is made to s3 client""" + + reader = S3Reader(self.s3_client) + reader._connection = MagicMock() + + + reader.read(self.bucket, self.key) + + reader._connection.get_object.assert_called_once_with(Bucket=self.bucket, Key=self.key) diff --git a/tests/io/test_readers.py b/tests/io/test_duckdb.py similarity index 69% rename from tests/io/test_readers.py rename to tests/io/test_duckdb.py index ce7801b..47ea725 100644 --- a/tests/io/test_readers.py +++ b/tests/io/test_duckdb.py @@ -1,6 +1,7 @@ import unittest from unittest.mock import patch, MagicMock -from driutils.io.read import DuckDBFileReader, DuckDBS3Reader +from driutils.io.duckdb import DuckDBFileReader, DuckDBS3Reader +import duckdb from duckdb import DuckDBPyConnection from parameterized import parameterized @@ -12,7 +13,7 @@ def test_initialization(self): self.assertIsInstance(reader._connection, DuckDBPyConnection) - @patch("driutils.io.read.DuckDBFileReader.close") + @patch("driutils.io.duckdb.DuckDBFileReader.close") def test_context_manager_is_functional(self, mock): """Should be able to use context manager to auto-close file connection""" @@ -21,7 +22,7 @@ def test_context_manager_is_functional(self, mock): mock.assert_called_once() - @patch("driutils.io.read.DuckDBFileReader.close") + @patch("driutils.io.duckdb.DuckDBFileReader.close") def test_connection_closed_on_delete(self, mock): """Tests that duckdb connection is closed when object is deleted""" @@ -53,6 +54,14 @@ def test_read_executes_query(self): reader._connection.execute.assert_called_once_with(query, params) + def test_read_missing_file_raises_error(self): + """Test that a missing file raises an IOException""" + reader = DuckDBFileReader() + query = f"SELECT * FROM read_parquet('notafile.parquet')" + + with self.assertRaises(duckdb.IOException): + reader.read(query) + class TestDuckDBS3Reader(unittest.TestCase): @parameterized.expand(["a", 1, "cutom_endpoint"]) @@ -63,7 +72,7 @@ def test_value_error_if_invalid_auth_option(self, value): DuckDBS3Reader(value) @parameterized.expand(["auto", "AUTO", "aUtO"]) - @patch("driutils.io.read.DuckDBS3Reader._authenticate") + @patch("driutils.io.duckdb.DuckDBS3Reader._authenticate") def test_upper_or_lowercase_option_accepted(self, value, mock): """Tests that the auth options can be provided in any case""" DuckDBS3Reader(value) @@ -99,4 +108,28 @@ def test_error_if_custom_endpoint_not_provided(self): endpoint_url_not_given""" with self.assertRaises(ValueError): - DuckDBS3Reader("custom_endpoint") \ No newline at end of file + DuckDBS3Reader("custom_endpoint") + + def test_read_parquet_by_query_with_invalid_key_error(self): + """ Test that an invalid key raises error + """ + reader = DuckDBS3Reader("auto") + bucket = "fake-bucket" + key = "non_existent_key.parquet" + query = f"SELECT * FROM read_parquet('s3://{bucket}/{key}')" + + with self.assertRaises(duckdb.HTTPException): + reader.read(query) + + def test_read_parquet_retry(self): + """ Test that the retry decorator works as expected + """ + reader = DuckDBS3Reader("auto") + query = f"SELECT * FROM read_parquet('README.md')" + + with self.assertRaises(duckdb.InvalidInputException): + reader.read(query) + + stats = reader.read.statistics + self.assertEqual(stats['attempt_number'], 3) # Should have tried 3 times + self.assertEqual(stats['idle_for'], 4) # Should have waited 2 seconds between each try diff --git a/tests/io/test_writers.py b/tests/io/test_writers.py deleted file mode 100644 index fa1ae6b..0000000 --- a/tests/io/test_writers.py +++ /dev/null @@ -1,52 +0,0 @@ -import unittest -from unittest.mock import MagicMock -from driutils.io.write import S3Writer -import boto3 -from botocore.client import BaseClient -from mypy_boto3_s3.client import S3Client -from parameterized import parameterized - -class TestS3Writer(unittest.TestCase): - - @classmethod - def setUpClass(cls) -> None: - - cls.s3_client: S3Client = boto3.client("s3", endpoint_url="http://localhost:4566") #type: ignore - - def test_s3_client_type(self): - """Returns an object if s3_client is of type `boto3.client.s3`, otherwise - raises an error""" - - # Happy path - writer = S3Writer(self.s3_client) - - self.assertIsInstance(writer.s3_client, BaseClient) - - # Bad path - - with self.assertRaises(TypeError): - S3Writer("not an s3 client") #type: ignore - - - @parameterized.expand([1, "body", 1.123, {"key": b"bytes"}]) - def test_error_raises_if_write_without_bytes(self, body): - """Tests that a type error is raised if the wrong type body used""" - - writer = S3Writer(self.s3_client) - writer.s3_client = MagicMock() - with self.assertRaises(TypeError): - writer.write("bucket", "key", body) - - writer.s3_client.put_object.assert_not_called() - - def test_write_called(self): - """Tests that the writer can be executed""" - - body = b"Test data" - - writer = S3Writer(self.s3_client) - writer.s3_client = MagicMock() - writer.write("bucket", "key", body) - - writer.s3_client.put_object.assert_called_once_with(Bucket="bucket", Key="key", Body=body) - \ No newline at end of file