Skip to content

Commit

Permalink
Merge pull request #62 from mtiemann-os-climate/main-docstrings
Browse files Browse the repository at this point in the history
Add/fix docstrings to all files.
  • Loading branch information
ModeSevenIndustrialSolutions authored May 2, 2024
2 parents e8603d1 + 2d5c1cc commit f4ff0fe
Show file tree
Hide file tree
Showing 13 changed files with 335 additions and 40 deletions.
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

python tools to assist with standardized data ingestion workflows

## Installation, Usage, and Release Management

### Install from PyPi

```
```python
pip install osc-ingest-tools
```

Expand Down Expand Up @@ -100,14 +102,14 @@ checks.
Enabling automatic formatting via [pre-commit](https://pre-commit.com/) is
recommended:

```
```shell
pip install black isort pre-commit
pre-commit install
```

To ensure compliance with static check tools, developers may wish to run;

```
```shell
pip install black isort
# auto-sort imports
isort .
Expand All @@ -117,7 +119,7 @@ black .

Code can then be tested using tox.

```
```shell
# run static checks and tests
tox
# run only tests
Expand All @@ -139,7 +141,7 @@ To release a new version of this library, authorized developers should;

E.g.,

```
```shell
git commit -sm "Release v0.3.4"
git tag v0.3.4
git push --follow-tags
Expand Down
2 changes: 2 additions & 0 deletions osc_ingest_trino/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions to simplify use of S3-based data, Pandas dataframes, and Trino SQL tables."""

from .boto3_utils import attach_s3_bucket, upload_directory_to_s3
from .dotenv_utils import load_credentials_dotenv
from .sqlcols import (
Expand Down
10 changes: 10 additions & 0 deletions osc_ingest_trino/boto3_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""AWS S3 interoperability functions."""

import os
from pathlib import Path
from typing import Union
Expand All @@ -12,6 +14,13 @@


def upload_directory_to_s3(path: Union[Path, str], bucket: Bucket, prefix: str, verbose: bool = False) -> None:
"""Upload files to an S3 bucket.
path -- the directory containing all files to be uploaded.
bucket -- the S3 bucket.
prefix -- the prefix prepended to each filename before uploading.
verbose -- if True, print each file uploaded (with its prefix).
"""
path = str(path)
prefix = str(prefix)
for subdir, dirs, files in os.walk(path):
Expand All @@ -25,6 +34,7 @@ def upload_directory_to_s3(path: Union[Path, str], bucket: Bucket, prefix: str,


def attach_s3_bucket(env_var_prefix: str) -> Bucket:
"""Return the S3 Bucket resource asscoiated with env_var_prefix (typically from `credentials.env`)."""
s3 = boto3.resource(
service_name="s3",
endpoint_url=os.environ[f"{env_var_prefix}_ENDPOINT"],
Expand Down
9 changes: 6 additions & 3 deletions osc_ingest_trino/dotenv_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions to read credentials files and inject secrets into the environment."""

import os
import pathlib

Expand All @@ -9,9 +11,10 @@


def load_credentials_dotenv() -> None:
# Load some standard environment variables from a dot-env file, if it exists.
# If no such file can be found, does not fail, and so allows these environment vars to
# be populated in some other way
"""Load some standard environment variables from a dot-env file, if it exists.
If no such file can be found, do not raise, allowing these environment vars to be populated in some other way.
"""
dotenv_dir = os.environ.get("CREDENTIAL_DOTENV_DIR", os.environ.get("PWD", "/opt/app-root/src"))
dotenv_path = pathlib.Path(dotenv_dir) / "credentials.env"
if os.path.exists(dotenv_path):
Expand Down
5 changes: 5 additions & 0 deletions osc_ingest_trino/sqlcols.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions to translate Pandas column names to SQL column names."""

import re
from typing import List, Union, cast

Expand All @@ -16,6 +18,7 @@

# 63 seems to be a common max column name length
def sql_compliant_name(name: Union[List[str], str], maxlen=63) -> Union[List[str], str]:
"""Convert name to a SQL-compliant table or column name, abbreviating some common words."""
if isinstance(name, list):
return [cast(str, sql_compliant_name(e, maxlen=maxlen)) for e in name]
w = str(name).casefold().rstrip().lstrip()
Expand All @@ -40,6 +43,7 @@ def sql_compliant_name(name: Union[List[str], str], maxlen=63) -> Union[List[str


def enforce_sql_column_names(df: pd.DataFrame, inplace: bool = False, maxlen: int = 63) -> pd.DataFrame:
"""Ensure that all column names for df are SQL-compliant."""
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
icols = df.columns.to_list()
Expand All @@ -51,6 +55,7 @@ def enforce_sql_column_names(df: pd.DataFrame, inplace: bool = False, maxlen: in


def enforce_partition_column_order(df: pd.DataFrame, pcols: List[str], inplace: bool = False) -> pd.DataFrame:
"""Reorder columns names of df to match the order given by pcols."""
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if not isinstance(pcols, list):
Expand Down
10 changes: 10 additions & 0 deletions osc_ingest_trino/sqltypes.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions to translate Pandas dataframes to SQL equivalents."""

from typing import Dict

import pandas as pd
Expand Down Expand Up @@ -25,6 +27,7 @@


def pandas_type_to_sql(pt: str, typemap: Dict[str, str] = {}):
"""Return the SQL type corresponding to the pandas type `pt` (using special mappings, if any, from `typemap`)."""
if not isinstance(typemap, dict):
raise ValueError("typemap must be a dict")
# user defined typemap overrides _p2smap
Expand All @@ -40,6 +43,13 @@ def create_table_schema_pairs(
colmap: Dict[str, str] = {},
indent: int = 4,
) -> str:
"""Create SQL column, type pairs that can appear in a CREATE TABLE operation.
df -- the dataframe to be rendered as a SQL table
typemap -- mappings from dtypes to SQL types above and beyond our defaults
colmap -- mappings of df column names to SQL column names if not using defaults
indent -- how many spaces of indent to make our SQL declarations pretty
"""
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if not isinstance(colmap, dict):
Expand Down
39 changes: 39 additions & 0 deletions osc_ingest_trino/trino_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Trino interoperability functions."""

import math
import os
import uuid
Expand Down Expand Up @@ -32,6 +34,13 @@ def attach_trino_engine(
schema: Optional[str] = None,
verbose: Optional[bool] = False,
) -> Engine:
"""Return a SQLAlchemy engine object representing a Trino instance.
env_var_prefix -- a prefix for all environment variables related to the Trino instance.
catalog -- the Trino catalog.
schema -- the Trino schema.
verbose -- if True, print the full string used to connect.
"""
sqlstring = "trino://{user}@{host}:{port}".format(
user=os.environ[f"{env_var_prefix}_USER"],
host=os.environ[f"{env_var_prefix}_HOST"],
Expand All @@ -57,6 +66,12 @@ def attach_trino_engine(
def _do_sql(
sql: Union[sqlalchemy.sql.elements.TextClause, str], engine: Engine, verbose: bool = False
) -> Optional[Sequence[Row[Any]]]:
"""Execute SQL query, returning the query result.
sql -- the SQL query.
engine -- the SQLAlchemy engine representing the Trino database.
verbose -- if True, print the values returned from executing the string.
"""
if type(sql) is not sqlalchemy.sql.elements.TextClause:
sql = text(str(sql))
if verbose:
Expand Down Expand Up @@ -86,6 +101,22 @@ def fast_pandas_ingest_via_hive( # noqa: C901
colmap: Dict[str, str] = {},
verbose: bool = False,
) -> None:
"""Efficiently export a dataframe into a Trino database.
df -- the dataframe to export.
engine -- the SQLAlchemy engine representing the Trino database.
catalog -- the Trino catalog.
schema -- the Trino schema.
table -- the name of the table created in the schema.
hive_bucket -- the backing store of the Hive metastore.
hive_catalog -- the Hive metastore catalog (where schemas are created).
hive_schema -- the Hive metastore schema (where tables will be created).
partition_columns -- if not empty, defines the partition columns of the table created.
overwrite -- if True, an existing table will be overwritten.
typemap -- used to format types that cannot otherwise be properly inferred.
colmap -- used to format column names that cannot otherwise be properly inferred.
verbose -- if True, print the queries being executed and the results of those queries.
"""
uh8 = uuid.uuid4().hex[:8]
hive_table = f"ingest_temp_{uh8}"

Expand Down Expand Up @@ -157,6 +188,8 @@ def fast_pandas_ingest_via_hive( # noqa: C901


class TrinoBatchInsert(object):
"""A class used to bundle together basic Trino parameters."""

def __init__(
self,
catalog: Optional[str] = None,
Expand All @@ -165,6 +198,7 @@ def __init__(
optimize: bool = False,
verbose: bool = False,
):
"""Initialize TrinoBatchInsert objects."""
self.catalog = catalog
self.schema = schema
self.batch_size = batch_size
Expand All @@ -175,6 +209,7 @@ def __init__(
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html
# https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method
def __call__(self, sqltbl: Table, dbcxn: Connection, columns: List[str], data_iter: List[Tuple]) -> None:
"""Implement `callable` interface for row-by-row insertion."""
fqname = self._full_table_name(sqltbl)
batch: List[str] = []
self.ninserts = 0
Expand All @@ -201,6 +236,7 @@ def __call__(self, sqltbl: Table, dbcxn: Connection, columns: List[str], data_it
print(f"execute optimize: {x}")

def _do_insert(self, dbcxn: Connection, fqname: str, batch: List[str]) -> None:
"""Implement actual row-by-row insertion of BATCH data into table FQNAME using DBCXN database connection."""
if self.verbose:
print(f"inserting {len(batch)} records")
TrinoBatchInsert._print_batch(batch)
Expand All @@ -218,6 +254,7 @@ def _do_insert(self, dbcxn: Connection, fqname: str, batch: List[str]) -> None:
print(f"batch insert result: {x}")

def _full_table_name(self, sqltbl: Table) -> str:
"""Return fully qualified table name for SQLTBL table within this TrinoBatchInsert object."""
# start with table name
name: str = f"{sqltbl.name}"
# prepend schema - allow override from this class
Expand All @@ -231,6 +268,7 @@ def _full_table_name(self, sqltbl: Table) -> str:

@staticmethod
def _sqlform(x: Any) -> str:
"""Format the value of x so it can appear in a SQL Values context."""
if x is None:
return "NULL"
if isinstance(x, str):
Expand All @@ -254,6 +292,7 @@ def _sqlform(x: Any) -> str:

@staticmethod
def _print_batch(batch: List[str]) -> None:
"""For batch, a list of SQL query lines, print up to the first 5 such."""
if len(batch) > 5:
print("\n".join(f" {e}" for e in batch[:3]))
print(" ...")
Expand Down
2 changes: 2 additions & 0 deletions osc_ingest_trino/unmanaged/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions create and clean up unmanaged Hive tables."""

from .unmanaged_hive_ingest import (
drop_unmanaged_data,
drop_unmanaged_table,
Expand Down
8 changes: 8 additions & 0 deletions osc_ingest_trino/unmanaged/unmanaged_hive_ingest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Functions to create, ingest, and drop unmanaged Hive tables."""

import shutil
import uuid

Expand All @@ -17,6 +19,7 @@


def _remove_trailing_slash(s):
"""Remove trailing slash from s."""
s = str(s)
if len(s) == 0:
return s
Expand All @@ -26,10 +29,12 @@ def _remove_trailing_slash(s):


def _prefix(pfx, schema, table):
"""Translate pfx, schema, and table names into S3 bucket name."""
return _remove_trailing_slash(pfx).format(schema=schema, table=table)


def drop_unmanaged_table(catalog, schema, table, engine, bucket, prefix=_default_prefix, verbose=False):
"""Drop catalog.schema.table from Hive metastore and also delete its S3 backing store."""
sql = text(f"drop table if exists {catalog}.{schema}.{table}")
with engine.begin() as cxn:
qres = cxn.execute(sql)
Expand All @@ -40,6 +45,7 @@ def drop_unmanaged_table(catalog, schema, table, engine, bucket, prefix=_default


def drop_unmanaged_data(schema, table, bucket, prefix=_default_prefix, verbose=False):
"""Delete data that may have been orphaned when its table was dropped in Hive metastore."""
dres = bucket.objects.filter(Prefix=f"{_prefix(prefix, schema, table)}/").delete()
if verbose:
print(dres)
Expand All @@ -49,6 +55,7 @@ def drop_unmanaged_data(schema, table, bucket, prefix=_default_prefix, verbose=F
def ingest_unmanaged_parquet(
df, schema, table, bucket, partition_columns=[], append=True, workdir="/tmp", prefix=_default_prefix, verbose=False
):
"""Ingest data from df into Hive metastore table with backing store bucket."""
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if not isinstance(partition_columns, list):
Expand Down Expand Up @@ -83,6 +90,7 @@ def ingest_unmanaged_parquet(
def unmanaged_parquet_tabledef(
df, catalog, schema, table, bucket, partition_columns=[], typemap={}, colmap={}, verbose=False
):
"""Return a SQL string that would create a table suitable for ingesting df into Hive metastore backed by bucket."""
if not isinstance(df, pd.DataFrame):
raise ValueError("df must be a pandas DataFrame")
if not isinstance(partition_columns, list):
Expand Down
Loading

0 comments on commit f4ff0fe

Please sign in to comment.