From 8ed9dcd80711b0ad94f4aba1b601446191af45fa Mon Sep 17 00:00:00 2001 From: Lauren Biggers Date: Mon, 24 Apr 2023 17:29:54 -0500 Subject: [PATCH] [DC-2692] synthetic script enhancements * uses cleaning rules to clean survey_conduct table data * removes duplicated code to create cleaned survey_conduct table data * prepares to potentially run all rules from RDR ingest to RT clean dataset * still only runs a subset of rules marked as run_for_synthetic --- data_steward/cdr_cleaner/clean_cdr.py | 3 +- .../clean_survey_conduct_recurring_surveys.py | 5 +- .../deid/survey_version_info.py | 3 +- .../drop_orphaned_survey_conduct_ids.py | 9 +- .../drop_unverified_survey_data.py | 11 +- .../update_survey_source_concept_id.py | 5 +- .../tools/create_combined_backup_dataset.py | 4 + data_steward/tools/create_synthetic.py | 101 ++---------------- 8 files changed, 33 insertions(+), 108 deletions(-) diff --git a/data_steward/cdr_cleaner/clean_cdr.py b/data_steward/cdr_cleaner/clean_cdr.py index 53d1192785..0d159c39ad 100644 --- a/data_steward/cdr_cleaner/clean_cdr.py +++ b/data_steward/cdr_cleaner/clean_cdr.py @@ -404,7 +404,8 @@ DataStage.SYNTHETIC.value: RDR_CLEANING_CLASSES + COMBINED_CLEANING_CLASSES + REGISTERED_TIER_DEID_CLEANING_CLASSES + - REGISTERED_TIER_DEID_BASE_CLEANING_CLASSES, + REGISTERED_TIER_DEID_BASE_CLEANING_CLASSES + + REGISTERED_TIER_DEID_CLEAN_CLEANING_CLASSES, } diff --git a/data_steward/cdr_cleaner/cleaning_rules/clean_survey_conduct_recurring_surveys.py b/data_steward/cdr_cleaner/cleaning_rules/clean_survey_conduct_recurring_surveys.py index 20dc2fd170..f050bf70e0 100644 --- a/data_steward/cdr_cleaner/cleaning_rules/clean_survey_conduct_recurring_surveys.py +++ b/data_steward/cdr_cleaner/cleaning_rules/clean_survey_conduct_recurring_surveys.py @@ -74,7 +74,7 @@ ON m.questionnaire_response_id = sc.survey_conduct_id WHERE m.questionnaire_response_id IS NOT NULL ) sub -WHERE sub.survey_conduct_id = sc.survey_conduct_id +WHERE sub.survey_conduct_id = sc.survey_conduct_id AND sc.survey_conduct_id IN (SELECT survey_conduct_id FROM `{{project_id}}.{{sandbox_dataset_id}}.{{sandbox_table_id}}`) """) @@ -103,7 +103,8 @@ def __init__(self, project_id=project_id, dataset_id=dataset_id, sandbox_dataset_id=sandbox_dataset_id, - table_namer=table_namer) + table_namer=table_namer, + run_for_synthetic=True) def get_query_specs(self, *args, **keyword_args) -> query_spec_list: """ diff --git a/data_steward/cdr_cleaner/cleaning_rules/deid/survey_version_info.py b/data_steward/cdr_cleaner/cleaning_rules/deid/survey_version_info.py index 3b6b230ee7..38854577f7 100644 --- a/data_steward/cdr_cleaner/cleaning_rules/deid/survey_version_info.py +++ b/data_steward/cdr_cleaner/cleaning_rules/deid/survey_version_info.py @@ -69,7 +69,8 @@ def __init__(self, dataset_id, sandbox_dataset_id, clean_survey_dataset_id=None, - table_namer=None): + table_namer=None, + run_for_synthetic=True): """ Initialize the class with proper info. diff --git a/data_steward/cdr_cleaner/cleaning_rules/drop_orphaned_survey_conduct_ids.py b/data_steward/cdr_cleaner/cleaning_rules/drop_orphaned_survey_conduct_ids.py index 299abf33dc..12e855a02b 100644 --- a/data_steward/cdr_cleaner/cleaning_rules/drop_orphaned_survey_conduct_ids.py +++ b/data_steward/cdr_cleaner/cleaning_rules/drop_orphaned_survey_conduct_ids.py @@ -27,10 +27,10 @@ SANDBOX_QUERY = JINJA_ENV.from_string(""" CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_dataset_id}}.{{sandbox_table_id}}` AS ( - SELECT * + SELECT * FROM `{{project_id}}.{{dataset_id}}.survey_conduct` WHERE survey_conduct_id NOT IN ( - SELECT DISTINCT questionnaire_response_id + SELECT DISTINCT questionnaire_response_id FROM `{{project_id}}.{{dataset_id}}.observation` WHERE questionnaire_response_id IS NOT NULL ) @@ -40,7 +40,7 @@ DELETE_QUERY = JINJA_ENV.from_string(""" DELETE FROM `{{project_id}}.{{dataset_id}}.survey_conduct` WHERE survey_conduct_id IN ( - SELECT DISTINCT survey_conduct_id + SELECT DISTINCT survey_conduct_id FROM `{{project_id}}.{{sandbox_dataset_id}}.{{sandbox_table_id}}` ) """) @@ -69,7 +69,8 @@ def __init__(self, project_id=project_id, dataset_id=dataset_id, sandbox_dataset_id=sandbox_dataset_id, - table_namer=table_namer) + table_namer=table_namer, + run_for_synthetic=True) def setup_rule(self, client): """ diff --git a/data_steward/cdr_cleaner/cleaning_rules/drop_unverified_survey_data.py b/data_steward/cdr_cleaner/cleaning_rules/drop_unverified_survey_data.py index 1ab03dfb9e..c034fac681 100644 --- a/data_steward/cdr_cleaner/cleaning_rules/drop_unverified_survey_data.py +++ b/data_steward/cdr_cleaner/cleaning_rules/drop_unverified_survey_data.py @@ -37,7 +37,7 @@ SELECT o.* FROM `{{project_id}}.{{dataset_id}}.observation` o LEFT JOIN `{{project_id}}.{{dataset_id}}.survey_conduct` sc - ON sc.survey_conduct_id = o.questionnaire_response_id + ON sc.survey_conduct_id = o.questionnaire_response_id WHERE sc.survey_source_concept_id = 0 OR sc.survey_concept_id = 0 ) """) @@ -53,7 +53,7 @@ CLEAN_OBSERVATION = JINJA_ENV.from_string(""" DELETE FROM `{{project_id}}.{{dataset_id}}.observation` WHERE questionnaire_response_id IN ( - SELECT questionnaire_response_id + SELECT questionnaire_response_id FROM `{{project_id}}.{{sandbox_dataset_id}}.{{sandbox_table_id}}` ) """) @@ -61,7 +61,7 @@ CLEAN_SURVEY_CONDUCT = JINJA_ENV.from_string(""" DELETE FROM `{{project_id}}.{{dataset_id}}.survey_conduct` WHERE survey_conduct_id IN ( - SELECT survey_conduct_id + SELECT survey_conduct_id FROM `{{project_id}}.{{sandbox_dataset_id}}.{{sandbox_table_id}}` ) """) @@ -72,7 +72,7 @@ COUNT(CASE WHEN questionnaire_response_id IS NOT NULL THEN 1 ELSE 0 END) as invalid_obs FROM `{{project_id}}.{{dataset_id}}.survey_conduct` sc LEFT JOIN `{{project_id}}.{{dataset_id}}.observation` o -ON sc.survey_conduct_id = o.questionnaire_response_id +ON sc.survey_conduct_id = o.questionnaire_response_id WHERE sc.survey_source_concept_id = 0 OR sc.survey_concept_id = 0 """) @@ -106,7 +106,8 @@ def __init__(self, CleanSurveyConductRecurringSurveys, UpdateSurveySourceConceptId ], - table_namer=table_namer) + table_namer=table_namer, + run_for_synthetic=True) self.counts_query = COUNTS_QUERY.render(project_id=self.project_id, dataset_id=self.dataset_id) diff --git a/data_steward/cdr_cleaner/cleaning_rules/update_survey_source_concept_id.py b/data_steward/cdr_cleaner/cleaning_rules/update_survey_source_concept_id.py index cb464263df..46f9b8c3f1 100644 --- a/data_steward/cdr_cleaner/cleaning_rules/update_survey_source_concept_id.py +++ b/data_steward/cdr_cleaner/cleaning_rules/update_survey_source_concept_id.py @@ -36,7 +36,7 @@ CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_dataset_id}}.{{sandbox_table_id}}` AS ( SELECT * FROM `{{project_id}}.{{dataset_id}}.survey_conduct` WHERE survey_concept_id != survey_source_concept_id -OR survey_concept_id NOT IN (SELECT concept_id FROM `{{project_id}}.{{dataset_id}}.concept` WHERE vocabulary_id IN ('PPI','AoU_Custom','AoU_General')) +OR survey_concept_id NOT IN (SELECT concept_id FROM `{{project_id}}.{{dataset_id}}.concept` WHERE vocabulary_id IN ('PPI','AoU_Custom','AoU_General')) ) """) @@ -80,7 +80,8 @@ def __init__(self, dataset_id=dataset_id, sandbox_dataset_id=sandbox_dataset_id, depends_on=[CleanSurveyConductRecurringSurveys], - table_namer=table_namer) + table_namer=table_namer, + run_for_synthetic=True) def get_query_specs(self, *args, **keyword_args) -> query_spec_list: """ diff --git a/data_steward/constants/tools/create_combined_backup_dataset.py b/data_steward/constants/tools/create_combined_backup_dataset.py index 6f0aa9c978..5ed90df4f7 100644 --- a/data_steward/constants/tools/create_combined_backup_dataset.py +++ b/data_steward/constants/tools/create_combined_backup_dataset.py @@ -110,6 +110,8 @@ FROM `{{rdr_dataset_id}}.{{domain_table}}` AS t {% else %} FROM `{{rdr_dataset_id}}.{{domain_table}}` +-- added to avoid having an empty ehr dataset for synthetic data only -- +{% if 'synthetic' not in combined_sandbox_dataset_id %} UNION ALL SELECT DISTINCT '{{ehr_dataset_id}}' AS src_dataset_id, @@ -126,6 +128,8 @@ WHERE t.person_id = c.person_id) {% endif %} {% endif %} +-- closes the synthetic only needs if/else clause -- +{% endif %} """) LOAD_QUERY = JINJA_ENV.from_string(""" diff --git a/data_steward/tools/create_synthetic.py b/data_steward/tools/create_synthetic.py index 8200135b68..92fbf3d20a 100644 --- a/data_steward/tools/create_synthetic.py +++ b/data_steward/tools/create_synthetic.py @@ -10,7 +10,7 @@ # Project imports from cdr_cleaner import clean_cdr from cdr_cleaner.args_parser import add_kwargs_to_args -from common import CDR_SCOPES, FITBIT_TABLES, ID_CONSTANT_FACTOR, JINJA_ENV +from common import CDR_SCOPES, FITBIT_TABLES, ID_CONSTANT_FACTOR, JINJA_ENV, SURVEY_CONDUCT from constants.cdr_cleaner import clean_cdr as consts from constants.tools import create_combined_backup_dataset as combine_consts from gcloud.bq import BigQueryClient @@ -114,91 +114,6 @@ def create_datasets(client: BigQueryClient, name: str, input_dataset: str, return datasets -def _fix_survey_conduct_records(client, project_id, staging_ds, sandbox_ds): - # sandbox and drop orphaned records - que = JINJA_ENV.from_string(""" - CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_ds}}.rdr_dc2714_rm_orphan_survey_conduct` AS ( - SELECT * FROM `{{project_id}}.{{staging_ds}}.survey_conduct` sc - WHERE NOT EXISTS ( - SELECT 1 FROM `{{project_id}}.{{staging_ds}}.observation` o - WHERE sc.survey_conduct_id = o.questionnaire_response_id)); - - DELETE FROM `{{project_id}}.{{staging_ds}}.survey_conduct` sc - WHERE EXISTS ( - SELECT 1 FROM `{{project_id}}.{{sandbox_ds}}.rdr_dc2714_rm_orphan_survey_conduct` sb - WHERE sc.survey_conduct_id = sb.survey_conduct_id - );""") - que = que.render(project_id=project_id, - staging_ds=staging_ds, - sandbox_ds=sandbox_ds) - - # This is now done in the pipeline. - # IF the next fix is implemented in the pipeline, this function can be removed. - # resp = client.query(que) - # resp.result() - - # fix cope survey responses - que = (f""" - -- save all cope and minute survey responses -- - CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_ds}}.rdr_dc2713_survey_conduct` AS ( - SELECT * - FROM `{{project_id}}.{{staging_ds}}.survey_conduct` - WHERE REGEXP_CONTAINS(survey_source_value, r'(?i)(^cope$)|(^cope_)') - ); - - -- update cope and minute survey responses -- - UPDATE `{{project_id}}.{{staging_ds}}.survey_conduct` s - SET survey_concept_id = CASE WHEN m.cope_month = 'may' THEN 2100000002 - WHEN m.cope_month = 'june' THEN 2100000003 - WHEN m.cope_month = 'july' THEN 2100000004 - WHEN m.cope_month = 'nov' THEN 2100000005 - WHEN m.cope_month = 'dec' THEN 2100000006 - WHEN m.cope_month = 'feb' THEN 2100000007 - WHEN m.cope_month = 'vaccine1' THEN 905047 - WHEN m.cope_month = 'vaccine2' THEN 905055 - WHEN m.cope_month = 'vaccine3' THEN 765936 - WHEN m.cope_month = 'vaccine4' THEN 1741006 - ELSE s.survey_concept_id - END, - survey_source_concept_id = CASE - WHEN m.cope_month = 'may' THEN 2100000002 - WHEN m.cope_month = 'june' THEN 2100000003 - WHEN m.cope_month = 'july' THEN 2100000004 - WHEN m.cope_month = 'nov' THEN 2100000005 - WHEN m.cope_month = 'dec' THEN 2100000006 - WHEN m.cope_month = 'feb' THEN 2100000007 - WHEN m.cope_month = 'vaccine1' THEN 905047 - WHEN m.cope_month = 'vaccine2' THEN 905055 - WHEN m.cope_month = 'vaccine3' THEN 765936 - WHEN m.cope_month = 'vaccine4' THEN 1741006 - ELSE s.survey_concept_id - END - FROM `{{project_id}}.{{staging_ds}}.cope_survey_semantic_version_map` m - WHERE s.survey_conduct_id = m.questionnaire_response_id; - - -- save all records that will be changed -- - CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_ds}}.rdr_dc2713_survey_conduct_source_value` AS ( - SELECT * - FROM `{{project_id}}.{{staging_ds}}.survey_conduct` s - LEFT JOIN `{{project_id}}.{{staging_ds}}.concept` c - ON s.survey_concept_id = c.concept_id - AND survey_concept_id = 0 - ); - - -- update the survey_source_value field -- - UPDATE `{{project_id}}.{{staging_ds}}.survey_conduct` s - SET s.survey_source_value = c.concept_code - FROM `{{project_id}}.{{staging_ds}}.concept` c - WHERE s.survey_concept_id = c.concept_id - AND s.survey_concept_id > 0; - """) - que = que.render(project_id=project_id, - staging_ds=staging_ds, - sandbox_ds=sandbox_ds) - resp = client.query(que) - return resp.result() - - def _create_empty_fitbit_tables(bq_client, project_id, final_dataset_name): for table in FITBIT_TABLES: schema_list = fields_for(table) @@ -254,7 +169,7 @@ def create_tier(project_id: str, input_dataset: str, release_tag: str, # 1. add mapping tables # EHR consent table is not added because we are not generating # synthetic EHR data. There will not be any to map. - for domain_table in combine_consts.DOMAIN_TABLES: + for domain_table in combine_consts.DOMAIN_TABLES + [SURVEY_CONDUCT]: LOGGER.info(f'Mapping {domain_table}...') generate_combined_mapping_tables(bq_client, domain_table, datasets[consts.STAGING], '', @@ -266,12 +181,6 @@ def create_tier(project_id: str, input_dataset: str, release_tag: str, bq_client, f'{project_id}.{datasets[consts.STAGING]}.{domain_table}') - LOGGER.warning( - "Is `_fix_survey_conduct_records` still needed for generating synthetic data? " - "If unnecessary, remove the function and the call line.") - _fix_survey_conduct_records(bq_client, project_id, datasets[consts.STAGING], - datasets[consts.SANDBOX]) - # Run cleaning rules cleaning_args = [ '-p', project_id, '-d', datasets[consts.STAGING], '-b', @@ -378,6 +287,12 @@ def main(raw_args=None) -> dict: datasets = create_tier(args.project_id, dataset_id, args.release_tag, args.target_principal, **kwargs) + LOGGER.info( + f"Dataset with synthetic survey data created at `{args.project_id}.{dataset_id}`.\n" + f"Review dataset before publishing to CB dev environment.\n" + f"If changes are needed (schema, data, etc.), make them as appropriate. If changes are\n" + f"made to data only, be sure to make tickets to change the synthetic script." + ) return datasets