Skip to content

Commit

Permalink
Update io classes (#2)
Browse files Browse the repository at this point in the history
* Added retry logic for invalid parquet files

* Restructured

Tested S3Reader

Added S3ReaderWriter

* Updated readme

* Mocks

* Removed docker build

* Comment corrections
  • Loading branch information
lewis-chambers authored Sep 27, 2024
1 parent d99f456 commit 720d2a0
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 186 deletions.
11 changes: 1 addition & 10 deletions .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,4 @@ jobs:
test-python:
uses: NERC-CEH/dri-cicd/.github/workflows/test-python.yml@main
with:
optional_dependencies: "[lint,test,all]"

build-test-deploy-docker:
needs: [test-python]
uses: NERC-CEH/dri-cicd/.github/workflows/build-test-deploy-docker.yml@main
with:
package_name: driutils
secrets:
AWS_REGION: ${{ secrets.AWS_REGION }}
AWS_ROLE_ARN: ${{ secrets.AWS_ROLE_ARN }}
optional_dependencies: "[lint,test,all]"
25 changes: 0 additions & 25 deletions Dockerfile

This file was deleted.

47 changes: 43 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,54 @@ dependencies = [
]
```

## Readers
## I/O Modules

### AWS S3

#### Reading
The `S3Reader` class reads files from an S3 object store using the `boto3` client. It simply requires a client to work with to initialize. Provisioning of credentials is left to the user and their s3_client

```python
from driutils.io.aws import S3Reader
import boto3

client = boto3.client("s3")
reader = S3Reader(client)

# Request a file from AWS S3

object_bytes = reader.read(bucket="my-bucket", key="Path/to/file")
```

#### Writing
The `S3Writer` operates in the same way as the reader but supplies a `write` method instead of `read`. The `body` argument is expected by AWS to be a `bytes` object, which is left to the user to provide.

```python
from driutils.io.aws import S3Writer
import boto3

client = boto3.client("s3")
body = b"I'm a byte encoded document"
writer = S3Writer(client)

# Submit a file to AWS S3

object_bytes = reader.read(bucket="my-bucket", key="Path/to/file", body=body)
```

#### Reading/Writing Combo Class
The `S3ReaderWriter` behaves the same as the prior classes but supplies both commands in one class

### DuckDB

#### Readers

### DuckDB Reader
The DuckDB classes use the duckdb python interface to read files from local documents or S3 object storage - this comes with the capacity to use custom s3 endpoints.

To read a local file:
```python

from driutils.read import DuckDBFileReader
from driutils.io.duckdb import DuckDBFileReader

reader = DuckDBFileReader()
query = "SELECT * FROM READ_PARQUET('myfile.parquet');"
Expand Down Expand Up @@ -122,7 +161,7 @@ To read from an S3 storage location there is a more configuration available and

The reader is instantiated like this:
```python
from driutils.read import import DuckDBS3Reader
from driutils.duckdb import DuckDBS3Reader

# Automatic authentication from your environment
auto_auth_reader = DuckDBS3Reader("auto")
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies = [
"mypy_boto3_s3",
"moto",
"polars",
"tenacity",
]
name = "dri-utils"
dynamic = ["version"]
Expand Down
84 changes: 84 additions & 0 deletions src/driutils/io/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import logging

from botocore.client import BaseClient
from botocore.exceptions import ClientError
from mypy_boto3_s3.client import S3Client

from driutils.io.interfaces import ReaderInterface, WriterInterface

logger = logging.getLogger(__name__)


class S3Base:
"""Base class to reuse initializer"""

_connection: S3Client
"""The S3 client used to perform the work"""

def __init__(self, s3_client: S3Client) -> None:
"""Initializes
Args:
s3_client: The S3 client used to do work
"""

if not isinstance(s3_client, BaseClient):
raise TypeError(f"'s3_client must be a BaseClient, not '{type(s3_client)}'")

self._connection = s3_client


class S3Reader(S3Base, ReaderInterface):
"""Class for handling file reads using the AWS S3 client"""

def read(self, bucket_name: str, s3_key: str) -> bytes:
"""
Retrieves an object from an S3 bucket.
If any step fails, it logs an error and re-raises the exception.
Args:
bucket_name: The name of the S3 bucket.
s3_key: The key (path) of the object within the bucket.
Returns:
bytes: raw bytes of the S3 object
Raises:
Exception: If there's any error in retrieving or parsing the object.
"""
try:
data = self._connection.get_object(Bucket=bucket_name, Key=s3_key)
return data["Body"].read()
except (RuntimeError, ClientError) as e:
logger.error(f"Failed to get {s3_key} from {bucket_name}")
logger.exception(e)
raise e


class S3Writer(S3Base, WriterInterface):
"""Writes to an S3 bucket"""

def write(self, bucket_name: str, key: str, body: bytes) -> None:
"""Uploads an object to an S3 bucket.
This function attempts to upload a byte object to a specified S3 bucket
using the provided S3 client. If the upload fails, it logs an error
message and re-raises the exception.
Args:
bucket_name: The name of the S3 bucket.
key: The key (path) of the object within the bucket.
body: data to write to s3 object
Raises:
TypeError: If body is not bytes
"""
if not isinstance(body, bytes):
raise TypeError(f"'body' must be 'bytes', not '{type(body)}")

self._connection.put_object(Bucket=bucket_name, Key=key, Body=body)


class S3ReaderWriter(S3Reader, S3Writer):
"""Class to handle reading and writing in S3"""
60 changes: 25 additions & 35 deletions src/driutils/io/read.py → src/driutils/io/duckdb.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,17 @@
from abc import ABC, abstractmethod
from typing import Any, List, Optional, Self
import logging
from typing import List, Optional

import duckdb
from duckdb import DuckDBPyConnection
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed

from driutils.io.interfaces import ContextClass, ReaderInterface
from driutils.utils import remove_protocol_from_url

logger = logging.getLogger(__name__)

class ReaderInterface(ABC):
"""Abstract implementation for a IO reader"""

_connection: Any
"""Reference to the connection object"""

def __enter__(self) -> Self:
"""Creates a connection when used in a context block"""
return self

def __exit__(self, *args) -> None:
"""Closes the connection when exiting the context"""
self.close()

def __del__(self):
"""Closes the connection when deleted"""
self.close()

def close(self) -> None:
"""Closes the connection"""
self._connection.close()

@abstractmethod
def read(self, *args, **kwargs) -> Any:
"""Reads data from a source"""


class DuckDBReader(ReaderInterface):
class DuckDBReader(ContextClass, ReaderInterface):
"""Abstract implementation of a DuckDB Reader"""

_connection: DuckDBPyConnection
Expand All @@ -43,6 +20,12 @@ class DuckDBReader(ReaderInterface):
def __init__(self) -> None:
self._connection = duckdb.connect()

@retry(
retry=retry_if_exception_type(duckdb.InvalidInputException),
wait=wait_fixed(2),
stop=stop_after_attempt(3),
reraise=True,
)
def read(self, query: str, params: Optional[List] = None) -> DuckDBPyConnection:
"""Requests to read a file
Expand All @@ -51,7 +34,17 @@ def read(self, query: str, params: Optional[List] = None) -> DuckDBPyConnection:
params: The parameters to supplement the query.
"""

return self._connection.execute(query, params)
try:
return self._connection.execute(query, params)
except duckdb.HTTPException:
logger.error(f"Failed to find data from web query: {query}")
raise
except duckdb.IOException:
logger.error(f"Failed to read file from query: {query}")
raise
except duckdb.InvalidInputException:
logger.error(f"Corrupt data found from query: {query}")
raise


class DuckDBS3Reader(DuckDBReader):
Expand Down Expand Up @@ -79,14 +72,11 @@ def __init__(self, auth_type: str, endpoint_url: Optional[str] = None, use_ssl:

self._connection.install_extension("httpfs")
self._connection.load_extension("httpfs")
self._connection.execute("""
SET force_download = true;
SET http_keep_alive = false;
""")
self._connection.execute("SET force_download = true;")

self._authenticate(auth_type, endpoint_url, use_ssl)

def _authenticate(self, method: str, endpoint_url: Optional[str] = None, use_ssl: Optional[bool] = None) -> None:
def _authenticate(self, method: str, endpoint_url: Optional[str] = None, use_ssl: bool = True) -> None:
"""Handles authentication selection
Args:
Expand Down
41 changes: 41 additions & 0 deletions src/driutils/io/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Core classes used by writers"""

from abc import ABC, abstractmethod
from typing import Any, Self


class ContextClass:
_connection: Any
"""Reference to the connection object"""

def __enter__(self) -> Self:
"""Creates a connection when used in a context block"""
return self

def __exit__(self, *args) -> None:
"""Closes the connection when exiting the context"""
self.close()

def __del__(self):
"""Closes the connection when deleted"""
self.close()

def close(self) -> None:
"""Closes the connection"""
self._connection.close()


class ReaderInterface(ABC):
"""Abstract implementation for a IO reader"""

@abstractmethod
def read(self, *args, **kwargs) -> Any:
"""Reads data from a source"""


class WriterInterface(ABC):
"""Interface for defining parquet writing objects"""

@abstractmethod
def write(self, *args, **kwargs) -> None:
"""Abstract method for read operations"""
55 changes: 0 additions & 55 deletions src/driutils/io/write.py

This file was deleted.

Loading

0 comments on commit 720d2a0

Please sign in to comment.