Skip to content

Commit

Permalink
feat: start validation models
Browse files Browse the repository at this point in the history
  • Loading branch information
Midnighter committed Jul 31, 2024
1 parent bdeba8e commit 11cbf55
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 26 deletions.
144 changes: 137 additions & 7 deletions src/data_orchestration/metanetx/asset.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,166 @@
"""Provide MetaNetX assets."""

from dagster import graph_asset
from polars import DataFrame
from pandas import DataFrame

from .op import mnx_table


@graph_asset(
config={
"mnx_table": {"ops": {"fetch_table": {"config": {"table": "comp_depr.tsv"}}}},
"mnx_table": {
"ops": {
"fetch_table": {"config": {"table": "comp_depr.tsv"}},
"etl_table": {"config": {"model": "MetaNetXDeprecation"}},
},
},
},
tags={"layer": "bronze"},
)
def mnx_comp_depr() -> DataFrame:
"""Define the comp_depr table asset."""
"""Define the deprecated compartment identifiers asset."""
return mnx_table()


@graph_asset(
config={
"mnx_table": {"ops": {"fetch_table": {"config": {"table": "comp_prop.tsv"}}}},
"mnx_table": {
"ops": {
"fetch_table": {"config": {"table": "comp_prop.tsv"}},
"etl_table": {"config": {"model": "MetaNetXCompartmentProperty"}},
},
},
},
tags={"layer": "bronze"},
)
def mnx_comp_prop() -> DataFrame:
"""Define the comp_prop table asset."""
"""Define the compartment properties asset."""
return mnx_table()


@graph_asset(
config={
"mnx_table": {"ops": {"fetch_table": {"config": {"table": "comp_xref.tsv"}}}},
"mnx_table": {
"ops": {
"fetch_table": {"config": {"table": "comp_xref.tsv"}},
"etl_table": {"config": {"model": "MetaNetXCrossReference"}},
},
},
},
tags={"layer": "bronze"},
)
def mnx_comp_xref() -> DataFrame:
"""Define the comp_xref table asset."""
"""Define the compartment cross-references asset."""
return mnx_table()


@graph_asset(
config={
"mnx_table": {
"ops": {
"fetch_table": {"config": {"table": "chem_depr.tsv"}},
"etl_table": {"config": {"model": "MetaNetXDeprecation"}},
},
},
},
tags={"layer": "bronze"},
)
def mnx_chem_depr() -> DataFrame:
"""Define the deprecated chemical identifiers asset."""
return mnx_table()


@graph_asset(
config={
"mnx_table": {
"ops": {
"fetch_table": {"config": {"table": "chem_isom.tsv"}},
"etl_table": {"config": {"model": "MetaNetXStereochemistry"}},
},
},
},
tags={"layer": "bronze"},
)
def mnx_chem_isom() -> DataFrame:
"""Define the stereochemistry asset."""
return mnx_table()


@graph_asset(
config={
"mnx_table": {
"ops": {
"fetch_table": {"config": {"table": "chem_prop.tsv"}},
"etl_table": {"config": {"model": "MetaNetXChemicalProperty"}},
},
},
},
tags={"layer": "bronze"},
)
def mnx_chem_prop() -> DataFrame:
"""Define the chemical properties asset."""
return mnx_table()


@graph_asset(
config={
"mnx_table": {
"ops": {
"fetch_table": {"config": {"table": "chem_xref.tsv"}},
"etl_table": {"config": {"model": "MetaNetXCrossReference"}},
},
},
},
tags={"layer": "bronze"},
)
def mnx_chem_xref() -> DataFrame:
"""Define the chemical cross-references asset."""
return mnx_table()


@graph_asset(
config={
"mnx_table": {
"ops": {
"fetch_table": {"config": {"table": "reac_depr.tsv"}},
"etl_table": {"config": {"model": "MetaNetXDeprecation"}},
},
},
},
tags={"layer": "bronze"},
)
def mnx_reac_depr() -> DataFrame:
"""Define the deprecated reaction identifiers asset."""
return mnx_table()


@graph_asset(
config={
"mnx_table": {
"ops": {
"fetch_table": {"config": {"table": "reac_prop.tsv"}},
"etl_table": {"config": {"model": "MetaNetXReactionProperty"}},
}
},
},
tags={"layer": "bronze"},
)
def mnx_reac_prop() -> DataFrame:
"""Define the reaction properties asset."""
return mnx_table()


@graph_asset(
config={
"mnx_table": {
"ops": {
"fetch_table": {"config": {"table": "reac_xref.tsv"}},
"etl_table": {"config": {"model": "MetaNetXCrossReference"}},
}
},
},
tags={"layer": "bronze"},
)
def mnx_reac_xref() -> DataFrame:
"""Define the reaction cross-references asset."""
return mnx_table()
58 changes: 40 additions & 18 deletions src/data_orchestration/metanetx/op.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import json

import pandas as pd
import pandera as pa
import polars as pl
from dagster import Out, Output, graph, op
from dagster import OpExecutionContext, Out, Output, graph, op
from upath import UPath
from zstandard import ZstdDecompressor

from data_orchestration.helpers import polars_metadata
from data_orchestration.helpers import ValidationModelConfig, pandas_metadata

from . import types
from .config import MetaNetXTableConfig
from .resource import MetaNetXResource

Expand All @@ -21,26 +26,43 @@ def fetch_table(
)


@op(out=Out(io_manager_key="polars_io_manager"))
def load_table(path: UPath) -> Output[pl.DataFrame]:
"""Load a MetaNetX table."""
@op(out=Out(io_manager_key="pandas_io_manager"))
def etl_table(
context: OpExecutionContext,
config: ValidationModelConfig,
path: UPath,
) -> Output[pd.DataFrame]:
"""ETL a MetaNetX table."""
with (
path.open(mode="rb") as handle,
ZstdDecompressor().stream_reader(handle, closefd=False) as decompressor,
):
try:
result = pl.read_csv(
decompressor,
has_header=False,
separator="\t",
comment_prefix="#",
)
except pl.exceptions.NoDataError:
result = pl.DataFrame()
return Output(value=result, metadata=polars_metadata(result))
table = pl.read_csv(
decompressor,
has_header=False,
separator="\t",
comment_prefix="#",
).to_pandas()
# TODO: Set column names from config.

model = getattr(types, config.model)
try:
result = model.validate(table, lazy=True)
except pa.errors.SchemaErrors as exc:
context.log.error( # noqa: TRY400
"Schema errors:\n\n%s",
json.dumps(exc.message, indent=2),
)
context.log.error( # noqa: TRY400
"Offending rows:\n\n%s",
exc.data,
)
result = table.loc[~table.index.isin(exc.data.index), :].copy()

return Output(value=result, metadata=pandas_metadata(result))


@graph
def mnx_table() -> pl.DataFrame:
"""Define the comp_depr table asset."""
return load_table(fetch_table())
def mnx_table() -> pd.DataFrame:
"""Define the graph of operations for loading a MetaNetX table."""
return etl_table(fetch_table())
2 changes: 1 addition & 1 deletion src/data_orchestration/metanetx/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class MetaNetXResource(ConfigurableResource):
str,
Field(
default=".",
description="The base path where to store table files.",
description="The base path where to store downloaded files.",
),
]

Expand Down
Loading

0 comments on commit 11cbf55

Please sign in to comment.