Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update io classes #2

Merged
merged 6 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,4 @@ jobs:
test-python:
uses: NERC-CEH/dri-cicd/.github/workflows/test-python.yml@main
with:
optional_dependencies: "[lint,test,all]"

build-test-deploy-docker:
needs: [test-python]
uses: NERC-CEH/dri-cicd/.github/workflows/build-test-deploy-docker.yml@main
with:
package_name: driutils
secrets:
AWS_REGION: ${{ secrets.AWS_REGION }}
AWS_ROLE_ARN: ${{ secrets.AWS_ROLE_ARN }}
optional_dependencies: "[lint,test,all]"
25 changes: 0 additions & 25 deletions Dockerfile

This file was deleted.

47 changes: 43 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,54 @@ dependencies = [
]
```

## Readers
## I/O Modules

### AWS S3

#### Reading
The `S3Reader` class reads files from an S3 object store using the `boto3` client. It simply requires a client to work with to initialize. Provisioning of credentials is left to the user and their s3_client

```python
from driutils.io.aws import S3Reader
import boto3

client = boto3.client("s3")
reader = S3Reader(client)

# Request a file from AWS S3

object_bytes = reader.read(bucket="my-bucket", key="Path/to/file")
```

#### Writing
The `S3Writer` operates in the same way as the reader but supplies a `write` method instead of `read`. The `body` argument is expected by AWS to be a `bytes` object, which is left to the user to provide.

```python
from driutils.io.aws import S3Writer
import boto3

client = boto3.client("s3")
body = b"I'm a byte encoded document"
writer = S3Writer(client)

# Submit a file to AWS S3

object_bytes = reader.read(bucket="my-bucket", key="Path/to/file", body=body)
```

#### Reading/Writing Combo Class
The `S3ReaderWriter` behaves the same as the prior classes but supplies both commands in one class

### DuckDB

#### Readers

### DuckDB Reader
The DuckDB classes use the duckdb python interface to read files from local documents or S3 object storage - this comes with the capacity to use custom s3 endpoints.

To read a local file:
```python

from driutils.read import DuckDBFileReader
from driutils.io.duckdb import DuckDBFileReader

reader = DuckDBFileReader()
query = "SELECT * FROM READ_PARQUET('myfile.parquet');"
Expand Down Expand Up @@ -122,7 +161,7 @@ To read from an S3 storage location there is a more configuration available and

The reader is instantiated like this:
```python
from driutils.read import import DuckDBS3Reader
from driutils.duckdb import DuckDBS3Reader

# Automatic authentication from your environment
auto_auth_reader = DuckDBS3Reader("auto")
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies = [
"mypy_boto3_s3",
"moto",
"polars",
"tenacity",
]
name = "dri-utils"
dynamic = ["version"]
Expand Down
84 changes: 84 additions & 0 deletions src/driutils/io/aws.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import logging

from botocore.client import BaseClient
from botocore.exceptions import ClientError
from mypy_boto3_s3.client import S3Client

from driutils.io.interfaces import ReaderInterface, WriterInterface

logger = logging.getLogger(__name__)


class S3Base:
"""Base class to reuse initializer"""

_connection: S3Client
"""The S3 client used to perform the work"""

def __init__(self, s3_client: S3Client) -> None:
"""Initializes

Args:
s3_client: The S3 client used to do work
"""

if not isinstance(s3_client, BaseClient):
raise TypeError(f"'s3_client must be a BaseClient, not '{type(s3_client)}'")

self._connection = s3_client


class S3Reader(S3Base, ReaderInterface):
"""Class for handling file reads using the AWS S3 client"""

def read(self, bucket_name: str, s3_key: str) -> bytes:
"""
Retrieves an object from an S3 bucket.

If any step fails, it logs an error and re-raises the exception.

Args:
bucket_name: The name of the S3 bucket.
s3_key: The key (path) of the object within the bucket.

Returns:
bytes: raw bytes of the S3 object

Raises:
Exception: If there's any error in retrieving or parsing the object.
"""
try:
data = self._connection.get_object(Bucket=bucket_name, Key=s3_key)
return data["Body"].read()
except (RuntimeError, ClientError) as e:
logger.error(f"Failed to get {s3_key} from {bucket_name}")
logger.exception(e)
raise e


class S3Writer(S3Base, WriterInterface):
"""Writes to an S3 bucket"""

def write(self, bucket_name: str, key: str, body: bytes) -> None:
"""Uploads an object to an S3 bucket.

This function attempts to upload a byte object to a specified S3 bucket
using the provided S3 client. If the upload fails, it logs an error
message and re-raises the exception.

Args:
bucket_name: The name of the S3 bucket.
key: The key (path) of the object within the bucket.
body: data to write to s3 object

Raises:
lewis-chambers marked this conversation as resolved.
Show resolved Hide resolved
TypeError: If body is not bytes
"""
if not isinstance(body, bytes):
raise TypeError(f"'body' must be 'bytes', not '{type(body)}")

self._connection.put_object(Bucket=bucket_name, Key=key, Body=body)


class S3ReaderWriter(S3Reader, S3Writer):
"""Class to handle reading and writing in S3"""
60 changes: 25 additions & 35 deletions src/driutils/io/read.py → src/driutils/io/duckdb.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,17 @@
from abc import ABC, abstractmethod
from typing import Any, List, Optional, Self
import logging
from typing import List, Optional

import duckdb
from duckdb import DuckDBPyConnection
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed

from driutils.io.interfaces import ContextClass, ReaderInterface
from driutils.utils import remove_protocol_from_url

logger = logging.getLogger(__name__)

class ReaderInterface(ABC):
"""Abstract implementation for a IO reader"""

_connection: Any
"""Reference to the connection object"""

def __enter__(self) -> Self:
"""Creates a connection when used in a context block"""
return self

def __exit__(self, *args) -> None:
"""Closes the connection when exiting the context"""
self.close()

def __del__(self):
"""Closes the connection when deleted"""
self.close()

def close(self) -> None:
"""Closes the connection"""
self._connection.close()

@abstractmethod
def read(self, *args, **kwargs) -> Any:
"""Reads data from a source"""


class DuckDBReader(ReaderInterface):
class DuckDBReader(ContextClass, ReaderInterface):
"""Abstract implementation of a DuckDB Reader"""

_connection: DuckDBPyConnection
Expand All @@ -43,6 +20,12 @@ class DuckDBReader(ReaderInterface):
def __init__(self) -> None:
self._connection = duckdb.connect()

@retry(
retry=retry_if_exception_type(duckdb.InvalidInputException),
wait=wait_fixed(2),
stop=stop_after_attempt(3),
reraise=True,
)
def read(self, query: str, params: Optional[List] = None) -> DuckDBPyConnection:
"""Requests to read a file

Expand All @@ -51,7 +34,17 @@ def read(self, query: str, params: Optional[List] = None) -> DuckDBPyConnection:
params: The parameters to supplement the query.
"""

return self._connection.execute(query, params)
try:
return self._connection.execute(query, params)
except duckdb.HTTPException:
logger.error(f"Failed to find data from web query: {query}")
raise
except duckdb.IOException:
logger.error(f"Failed to read file from query: {query}")
raise
except duckdb.InvalidInputException:
logger.error(f"Corrupt data found from query: {query}")
raise


class DuckDBS3Reader(DuckDBReader):
Expand Down Expand Up @@ -79,14 +72,11 @@ def __init__(self, auth_type: str, endpoint_url: Optional[str] = None, use_ssl:

self._connection.install_extension("httpfs")
self._connection.load_extension("httpfs")
self._connection.execute("""
SET force_download = true;
SET http_keep_alive = false;
""")
self._connection.execute("SET force_download = true;")

self._authenticate(auth_type, endpoint_url, use_ssl)

def _authenticate(self, method: str, endpoint_url: Optional[str] = None, use_ssl: Optional[bool] = None) -> None:
def _authenticate(self, method: str, endpoint_url: Optional[str] = None, use_ssl: bool = True) -> None:
"""Handles authentication selection

Args:
Expand Down
41 changes: 41 additions & 0 deletions src/driutils/io/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Core classes used by writers"""

from abc import ABC, abstractmethod
from typing import Any, Self


class ContextClass:
_connection: Any
"""Reference to the connection object"""

def __enter__(self) -> Self:
"""Creates a connection when used in a context block"""
return self

def __exit__(self, *args) -> None:
"""Closes the connection when exiting the context"""
self.close()

def __del__(self):
"""Closes the connection when deleted"""
self.close()

def close(self) -> None:
"""Closes the connection"""
self._connection.close()


class ReaderInterface(ABC):
"""Abstract implementation for a IO reader"""

@abstractmethod
def read(self, *args, **kwargs) -> Any:
"""Reads data from a source"""


class WriterInterface(ABC):
"""Interface for defining parquet writing objects"""

@abstractmethod
def write(self, *args, **kwargs) -> None:
"""Abstract method for read operations"""
55 changes: 0 additions & 55 deletions src/driutils/io/write.py

This file was deleted.

Loading