Skip to content

Commit

Permalink
Merged from dev
Browse files Browse the repository at this point in the history
  • Loading branch information
Beatriz Saldana committed Dec 10, 2024
2 parents 17b6f51 + 17485f6 commit a2721dc
Show file tree
Hide file tree
Showing 11 changed files with 2,265 additions and 1,988 deletions.
2 changes: 2 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ This package uses [Great Expectations](https://greatexpectations.io/) to validat
- You can prevent Great Expectations from running for a dataset by setting `gx_enabled: false` in the configuration for the dataset.
1. Test data processing by running `adt test_config.yaml --upload` and ensure that HTML reports with all expectations are generated and uploaded to the proper folder in Synapse.

**Note:** If you are adding a new expectation and you want to allow for "fuzzy validation" (e.g. you expect X% of the values in a column to match the expectation, but the remaining Y% are allowed to not match), you will need to make use of the `mostly` [parameter](https://docs.greatexpectations.io/docs/0.18/reference/learn/expectations/standard_arguments/#mostly). This package is set up to surface "warnings" for instances where the `mostly` parameter is used to show users which expectations have some failed values although the overall validation still passes.

#### Custom Expectations

This repository is currently home to three custom expectations that were created for use on `agora-data-tools` datasets:
Expand Down
3,936 changes: 2,063 additions & 1,873 deletions Pipfile.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions gx_suite_definitions/gene_info.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@
"validator.expect_column_values_to_be_of_type(\"median_expression\", \"str\")\n",
"with open(\"../src/agoradatatools/great_expectations/gx/json_schemas/gene_info/median_expression.json\", \"r\") as file:\n",
" median_expression_schema = json.load(file)\n",
"validator.expect_column_values_to_match_json_schema(\"median_expression\", json_schema=median_expression_schema)"
"validator.expect_column_values_to_match_json_schema(\"median_expression\", json_schema=median_expression_schema, mostly=0.95)"
]
},
{
Expand Down Expand Up @@ -271,7 +271,7 @@
"source": [
"# biodomains\n",
"validator.expect_column_values_to_be_of_type(\"biodomains\", \"list\")\n",
"validator.expect_column_values_to_have_list_members_of_type(column=\"biodomains\", member_type=\"str\")\n",
"validator.expect_column_values_to_have_list_members_of_type(column=\"biodomains\", member_type=\"str\", mostly=0.95)\n",
"validator.expect_column_values_to_have_list_members(column=\"biodomains\", list_members={\n",
" 'Apoptosis',\n",
" 'Vasculature',\n",
Expand Down Expand Up @@ -350,7 +350,7 @@
"outputs": [],
"source": [
"# uniprotkb_accessions\n",
"validator.expect_column_values_to_be_of_type(\"uniprotkb_accessions\", \"list\")\n",
"validator.expect_column_values_to_be_of_type(\"uniprotkb_accessions\", \"list\", mostly=0.95)\n",
"validator.expect_column_values_to_have_list_members_of_type(column=\"uniprotkb_accessions\", member_type=\"str\")"
]
},
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ install_requires =
pandas~=2.0.0
numpy~=1.21
setuptools~=70.0.0
synapseclient~=4.0.0
synapseclient==4.4.1
PyYAML~=6.0
pyarrow~=14.0.1
typer~=0.7.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@
"array",
"null"
]
}
},
"mostly": 0.95
},
"meta": {}
},
Expand Down Expand Up @@ -494,7 +495,8 @@
"expectation_type": "expect_column_values_to_have_list_members_of_type",
"kwargs": {
"column": "biodomains",
"member_type": "str"
"member_type": "str",
"mostly": 0.95
},
"meta": {}
},
Expand All @@ -503,25 +505,25 @@
"kwargs": {
"column": "biodomains",
"list_members": [
"Oxidative Stress",
"Myelination",
"Vasculature",
"Synapse",
"Metal Binding and Homeostasis",
"Immune Response",
"DNA Repair",
"Autophagy",
"RNA Spliceosome",
"APP Metabolism",
"Apoptosis",
"Endolysosome",
"Vasculature",
"DNA Repair",
"Immune Response",
"Proteostasis",
"Mitochondrial Metabolism",
"Cell Cycle",
"Epigenetic",
"Lipid Metabolism",
"Metal Binding and Homeostasis",
"RNA Spliceosome",
"Tau Homeostasis",
"Myelination",
"Epigenetic",
"Mitochondrial Metabolism",
"Proteostasis",
"Structural Stabilization",
"Cell Cycle"
"Apoptosis",
"Oxidative Stress",
"APP Metabolism",
"Structural Stabilization"
]
},
"meta": {}
Expand Down Expand Up @@ -629,6 +631,7 @@
"expectation_type": "expect_column_values_to_be_of_type",
"kwargs": {
"column": "uniprotkb_accessions",
"mostly": 0.95,
"type_": "list"
},
"meta": {}
Expand Down
90 changes: 59 additions & 31 deletions src/agoradatatools/gx.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,22 @@


class GreatExpectationsRunner:
"""Class to run great expectations on a dataset and upload the HTML report to Synapse"""
"""Class to run great expectations on a dataset and upload the HTML report to Synapse
Attributes:
failures (bool): Whether or not the GX run had any failed expectations.
failure_message (str): Message of the GX run if any expectations failed.
warnings (bool): Whether or not the GX run had any warnings.
warning_message (str): Summary message for the GX run if any expectations had warnings.
report_file (str): Synapse ID of the GX report file.
report_version (int): Version number of the GX report file.
report_link (str): URL of the specific version of the GX report file.
"""

failures: bool = False
failure_message: Optional[str] = None
warnings: bool = False
warning_message: Optional[str] = None
report_file: Optional[str] = None
report_version: Optional[int] = None
report_link: Optional[str] = None
Expand Down Expand Up @@ -67,7 +79,7 @@ def _get_data_context_location(self) -> str:
gx_directory = os.path.join(script_dir, "great_expectations")
return gx_directory

def _check_if_expectation_suite_exists(self) -> bool:
def check_if_expectation_suite_exists(self) -> bool:
"""Checks if the expectation suite exists in the great_expectations workspace"""
exists = (
self.expectation_suite_name in self.context.list_expectation_suite_names()
Expand All @@ -78,7 +90,7 @@ def _check_if_expectation_suite_exists(self) -> bool:
)
return exists

def _get_results_path(self, checkpoint_result: CheckpointResult) -> str:
def get_results_path(self, checkpoint_result: CheckpointResult) -> str:
"""Gets the path to the most recent HTML report for a checkpoint,
copies it to a Synapse-API friendly name, and returns the new path
Expand Down Expand Up @@ -106,7 +118,7 @@ def _get_results_path(self, checkpoint_result: CheckpointResult) -> str:
shutil.copy(original_results_path, new_results_path)
return new_results_path

def _upload_results_file_to_synapse(self, results_path: str) -> None:
def upload_results_file_to_synapse(self, results_path: str) -> None:
"""Uploads a results file to Synapse. Assigns class attributes associated
with the report file.
Expand Down Expand Up @@ -148,43 +160,61 @@ def convert_nested_columns_to_json(
df[column] = df[column].apply(json.dumps)
return df

def get_failed_expectations(self, checkpoint_result: CheckpointResult) -> str:
"""Gets the failed expectations from a CheckpointResult and returns them as a formatted string
def set_warnings_and_failures(self, checkpoint_result: CheckpointResult) -> None:
"""Sets class attributes for warnings and failures given a CheckpointResult
Args:
checkpoint_result (CheckpointResult): CheckpointResult object
Returns:
fail_message (str): String with information on which fields and expectations failed
"""
warning_dict = {self.expectation_suite_name: {}}
fail_dict = {self.expectation_suite_name: {}}
expectation_results = checkpoint_result.list_validation_results()[0]["results"]

for result in expectation_results:
if not result["success"]:
column = result["expectation_config"]["kwargs"]["column"]
failed_expectation = result["expectation_config"]["expectation_type"]
if not fail_dict[self.expectation_suite_name].get(column, None):
fail_dict[self.expectation_suite_name][column] = []
fail_dict[self.expectation_suite_name][column].append(
failed_expectation
column = result["expectation_config"]["kwargs"].get(
"column",
"/".join(result["expectation_config"]["kwargs"].get("column_list", [])),
)
expectation = result["expectation_config"]["expectation_type"]
if result["success"]:
if result["result"].get("partial_unexpected_list", None):
warning_dict[self.expectation_suite_name].setdefault(
column, []
).append(expectation)
else:
fail_dict[self.expectation_suite_name].setdefault(column, []).append(
expectation
)

self.warning_message, self.warnings = self._generate_message(
warning_dict, "warnings"
)
self.failure_message, self.failures = self._generate_message(
fail_dict, "failures"
)

def _generate_message(
self, result_dict: dict, message_type: str
) -> typing.Tuple[str, bool]:
"""Generate message and status for warnings or failures."""
messages = []
for _, fields_dict in fail_dict.items():
for field, failed_expectations in fields_dict.items():
for suite_name, fields_dict in result_dict.items():
for field, expectations in fields_dict.items():
messages.append(
f"{field} has failed expectations {', '.join(failed_expectations)}"
f"In the {suite_name} dataset, '{field}' has failed values for expectations {', '.join(expectations)}"
)

fail_message = ("Great Expectations data validation has failed: ") + "; ".join(
messages
message = (
(f"Great Expectations data validation has the following {message_type}: ")
+ "; ".join(messages)
if messages
else None
)

return fail_message
return message, bool(message)

def run(self) -> None:
"""Run great expectations on a dataset and upload the results to Synapse."""

if not self._check_if_expectation_suite_exists():
if not self.check_if_expectation_suite_exists():
return

logger.info(f"Running data validation on {self.expectation_suite_name}")
Expand All @@ -209,11 +239,9 @@ def run(self) -> None:
logger.info(
f"Data validation complete for {self.expectation_suite_name}. Uploading results to Synapse."
)
latest_reults_path = self._get_results_path(checkpoint_result)
latest_reults_path = self.get_results_path(checkpoint_result)

if self.upload_folder:
self._upload_results_file_to_synapse(latest_reults_path)
self.set_warnings_and_failures(checkpoint_result)

if not checkpoint_result.success:
self.failures = True
self.failure_message = self.get_failed_expectations(checkpoint_result)
if self.upload_folder:
self.upload_results_file_to_synapse(latest_reults_path)
2 changes: 2 additions & 0 deletions src/agoradatatools/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ def process_dataset(
),
gx_failures=gx_runner.failures,
gx_failure_message=gx_runner.failure_message,
gx_warnings=gx_runner.warnings,
gx_warning_message=gx_runner.warning_message,
)

if upload and not gx_runner.failures:
Expand Down
4 changes: 4 additions & 0 deletions src/agoradatatools/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class DatasetReport:
gx_report_link: URL of the specific version of the GX report file.
gx_failures: Whether or not the GX run had any failed expectations.
gx_failure_message: Message of the GX run if any expectations failed.
gx_warnings: Whether or not the GX run had any warnings.
gx_warning_message: Summary message for the GX run if any expectations had warnings.
adt_output_file: Synapse ID of the ADT output file.
adt_output_version: Version number of the ADT output file.
adt_output_link: URL of the specific version of the ADT output file.
Expand All @@ -39,6 +41,8 @@ class DatasetReport:
gx_report_link: Optional[str] = field(default=None)
gx_failures: Optional[bool] = field(default=False)
gx_failure_message: Optional[str] = field(default=None)
gx_warnings: Optional[bool] = field(default=False)
gx_warning_message: Optional[str] = field(default=None)
adt_output_file: Optional[str] = field(default=None)
adt_output_version: Optional[int] = field(default=None)
adt_output_link: Optional[str] = field(default=None)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_assets/gx/checkpoint_result_fail.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
}
},
{
"success": false,
"success": true,
"expectation_config": {
"expectation_type": "expect_column_value_lengths_to_equal",
"kwargs": {
Expand Down
16 changes: 6 additions & 10 deletions tests/test_assets/gx/checkpoint_result_pass.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,9 @@
},
"result": {
"element_count": 15991,
"unexpected_count": 1,
"unexpected_percent": 0.006253517603652055,
"partial_unexpected_list": [
"ENSG00"
],
"unexpected_count": 0,
"unexpected_percent": 0.0,
"partial_unexpected_list": [],
"missing_count": 0,
"missing_percent": 0.0,
"unexpected_percent_total": 0.006253517603652055,
Expand Down Expand Up @@ -115,11 +113,9 @@
},
"result": {
"element_count": 15991,
"unexpected_count": 1,
"unexpected_percent": 0.006253517603652055,
"partial_unexpected_list": [
"ENSG00"
],
"unexpected_count": 0,
"unexpected_percent": 0.0,
"partial_unexpected_list": [],
"missing_count": 0,
"missing_percent": 0.0,
"unexpected_percent_total": 0.006253517603652055,
Expand Down
Loading

0 comments on commit a2721dc

Please sign in to comment.