From a19de886ae38fb4a4ec57f508f14b1ef50b78e1b Mon Sep 17 00:00:00 2001 From: Lauren Biggers Date: Mon, 10 Oct 2022 17:09:29 -0500 Subject: [PATCH] [DC-2692] more changes * changed f-string usage to jinja2 templates * used pre-defined variable for constant value * removed redundant code to reuse existing dataset copy utility * removed conflict code --- .../tools/create_combined_backup_dataset.py | 12 ---- data_steward/tools/create_rdr_snapshot.py | 40 +---------- data_steward/tools/create_synthetic.py | 68 +++++++++++-------- 3 files changed, 40 insertions(+), 80 deletions(-) diff --git a/data_steward/constants/tools/create_combined_backup_dataset.py b/data_steward/constants/tools/create_combined_backup_dataset.py index 24e2263c84..5c6dd434e5 100644 --- a/data_steward/constants/tools/create_combined_backup_dataset.py +++ b/data_steward/constants/tools/create_combined_backup_dataset.py @@ -118,18 +118,6 @@ ON t.{{domain_table}}_id = v.{{domain_table}}_id {% endif %} -<<<<<<< Updated upstream -======= -SELECT DISTINCT -'{{ehr_dataset_id}}' AS src_dataset_id, -t.{{domain_table}}_id AS src_{{domain_table}}_id, -v.src_hpo_id AS src_hpo_id, -t.{{domain_table}}_id AS {{domain_table}}_id, -'{{domain_table}}' as src_table_id -FROM `{{ehr_dataset_id}}.{{domain_table}}` AS t -JOIN `{{ehr_dataset_id}}._mapping_{{domain_table}}` AS v -ON t.{{domain_table}}_id = v.{{domain_table}}_id ->>>>>>> Stashed changes {% if person_id_flag %} WHERE EXISTS (SELECT 1 FROM `{{combined_dataset_id}}.{{ehr_consent_table_id}}` AS c diff --git a/data_steward/tools/create_rdr_snapshot.py b/data_steward/tools/create_rdr_snapshot.py index dea7473c45..0b7524b261 100755 --- a/data_steward/tools/create_rdr_snapshot.py +++ b/data_steward/tools/create_rdr_snapshot.py @@ -101,11 +101,8 @@ def main(raw_args=None): datasets = create_datasets(bq_client, args.rdr_dataset, args.release_tag) # copy raw data into staging dataset - copy_raw_rdr_tables(bq_client, args.rdr_dataset, datasets.get('staging')) - LOGGER.info("going to sleep") - import time - time.sleep(30) - LOGGER.info("done sleeping") + bq_client.copy_dataset(args.rdr_datasets, datasets.get('staging')) + # clean the RDR staging dataset cleaning_args = [ '-p', args.curation_project_id, '-d', @@ -140,39 +137,6 @@ def main(raw_args=None): f'`{bq_client.project}.{datasets.get("clean")}`, is complete.') -def copy_raw_rdr_tables(client, rdr_dataset, rdr_staging): - LOGGER.info( - f'Beginning COPY of raw rdr tables from `{rdr_dataset}` to `{rdr_staging}`' - ) - # get list of tables - src_tables = client.list_tables(rdr_dataset) - - # create a copy job config - job_config = bigquery.job.CopyJobConfig( - write_disposition=bigquery.job.WriteDisposition.WRITE_EMPTY) - - for table_item in src_tables: - job_config.labels = { - 'table_name': table_item.table_id, - 'copy_from': rdr_dataset, - 'copy_to': rdr_staging - } - - destination_table = f'{client.project}.{rdr_staging}.{table_item.table_id}' - # job_id defined to the second precision - job_id = (f'rdr_staging_copy_{table_item.table_id.lower()}_' - f'{datetime.now().strftime("%Y%m%d_%H%M%S")}') - # copy each table to rdr dataset - client.copy_table(table_item.reference, - destination_table, - job_id=job_id, - job_config=job_config) - - LOGGER.info( - f'RDR raw table COPY from `{rdr_dataset}` to `{rdr_staging}` is complete' - ) - - def create_datasets(client, rdr_dataset, release_tag): rdr_clean = f'{release_tag}_rdr' rdr_staging = f'{rdr_clean}_staging' diff --git a/data_steward/tools/create_synthetic.py b/data_steward/tools/create_synthetic.py index 1d4f43ba8f..36d7242ce1 100644 --- a/data_steward/tools/create_synthetic.py +++ b/data_steward/tools/create_synthetic.py @@ -10,7 +10,7 @@ # Project imports from cdr_cleaner import clean_cdr from cdr_cleaner.args_parser import add_kwargs_to_args -from common import CDR_SCOPES, FITBIT_TABLES +from common import CDR_SCOPES, FITBIT_TABLES, ID_CONSTANT_FACTOR, JINJA_ENV from constants.cdr_cleaner import clean_cdr as consts from constants.tools import create_combined_backup_dataset as combine_consts from gcloud.bq import BigQueryClient @@ -116,33 +116,36 @@ def create_datasets(client: BigQueryClient, name: str, input_dataset: str, def _fix_survey_conduct_records(client, project_id, staging_ds, sandbox_ds): # sandbox and drop orphaned records - que = (f""" - CREATE OR REPLACE TABLE `{project_id}.{sandbox_ds}.rdr_dc2714_rm_orphan_survey_conduct` AS ( - SELECT * FROM `{project_id}.{staging_ds}.survey_conduct` sc + que = JINJA_ENV.from_string(""" + CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_ds}}.rdr_dc2714_rm_orphan_survey_conduct` AS ( + SELECT * FROM `{{project_id}}.{{staging_ds}}.survey_conduct` sc WHERE NOT EXISTS ( - SELECT 1 FROM `{project_id}.{staging_ds}.observation` o + SELECT 1 FROM `{{project_id}}.{{staging_ds}}.observation` o WHERE sc.survey_conduct_id = o.questionnaire_response_id)); - DELETE FROM `{project_id}.{staging_ds}.survey_conduct` sc + DELETE FROM `{{project_id}}.{{staging_ds}}.survey_conduct` sc WHERE EXISTS ( - SELECT 1 FROM `{project_id}.{sandbox_ds}.rdr_dc2714_rm_orphan_survey_conduct` sb + SELECT 1 FROM `{{project_id}}.{{sandbox_ds}}.rdr_dc2714_rm_orphan_survey_conduct` sb WHERE sc.survey_conduct_id = sb.survey_conduct_id );""") + que = que.render(project_id=project_id, + staging_ds=staging_ds, + sandbox_ds=sandbox_ds) resp = client.query(que) resp.result() # fix cope survey responses que = (f""" - /* save all cope and minute survey responses */ - CREATE OR REPLACE TABLE `{project_id}.{sandbox_ds}.rdr_dc2713_survey_conduct` AS ( + -- save all cope and minute survey responses -- + CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_ds}}.rdr_dc2713_survey_conduct` AS ( SELECT * - FROM `{project_id}.{staging_ds}.survey_conduct` + FROM `{{project_id}}.{{staging_ds}}.survey_conduct` WHERE REGEXP_CONTAINS(survey_source_value, r'(?i)(^cope$)|(^cope_)') ); - /* update cope and minute survey responses */ - UPDATE `{project_id}.{staging_ds}.survey_conduct` s + -- update cope and minute survey responses -- + UPDATE `{{project_id}}.{{staging_ds}}.survey_conduct` s SET survey_concept_id = CASE WHEN m.cope_month = 'may' THEN 2100000002 WHEN m.cope_month = 'june' THEN 2100000003 WHEN m.cope_month = 'july' THEN 2100000004 @@ -168,26 +171,28 @@ def _fix_survey_conduct_records(client, project_id, staging_ds, sandbox_ds): WHEN m.cope_month = 'vaccine4' THEN 1741006 ELSE s.survey_concept_id END - FROM `{project_id}.{staging_ds}.cope_survey_semantic_version_map` m + FROM `{{project_id}}.{{staging_ds}}.cope_survey_semantic_version_map` m WHERE s.survey_conduct_id = m.questionnaire_response_id; - /* save all records that will be changed */ - CREATE OR REPLACE TABLE `{project_id}.{sandbox_ds}.rdr_dc2713_survey_conduct_source_value` AS ( + -- save all records that will be changed -- + CREATE OR REPLACE TABLE `{{project_id}}.{{sandbox_ds}}.rdr_dc2713_survey_conduct_source_value` AS ( SELECT * - FROM `{project_id}.{staging_ds}.survey_conduct` s - LEFT JOIN `{project_id}.{staging_ds}.concept` c + FROM `{{project_id}}.{{staging_ds}}.survey_conduct` s + LEFT JOIN `{{project_id}}.{{staging_ds}}.concept` c ON s.survey_concept_id = c.concept_id AND survey_concept_id = 0 ); - /* update the survey_source_value field */ - UPDATE `{project_id}.{staging_ds}.survey_conduct` s + -- update the survey_source_value field -- + UPDATE `{{project_id}}.{{staging_ds}}.survey_conduct` s SET s.survey_source_value = c.concept_code - FROM `{project_id}.{staging_ds}.concept` c + FROM `{{project_id}}.{{staging_ds}}.concept` c WHERE s.survey_concept_id = c.concept_id AND s.survey_concept_id > 0; """) - + que = que.render(project_id=project_id, + staging_ds=staging_ds, + sandbox_ds=sandbox_ds) resp = client.query(que) resp.result() @@ -208,11 +213,12 @@ def _remove_mapping_tables(bq_client, project_id, final_dataset_name): def _update_domain_table_id(client: BigQueryClient, fq_table: str): - que = ( - f"UPDATE `{fq_table}` " - f"SET {fq_table.split('.')[-1]}_id = {fq_table.split('.')[-1]}_id + 1000000000000000 " - f"WHERE 1=1") + que = JINJA_ENV.from_string( + "UPDATE `{{fq_table}}` " + "SET {{fq_table.split('.')[-1]}}_id = {{fq_table.split('.')[-1]}}_id + {{constant_factor}} " + "WHERE 1=1") + que = que.render(fq_table=fq_table, constant_factor=ID_CONSTANT_FACTOR) resp = client.query(que) resp.result() @@ -260,6 +266,9 @@ def create_tier(project_id: str, input_dataset: str, release_tag: str, bq_client, f'{project_id}.{datasets[consts.STAGING]}.{domain_table}') + LOGGER.warning( + "Is `_fix_survey_conduct_records` still needed for generating synthetic data? " + "If unnecessary, remove the function and the call line.") _fix_survey_conduct_records(bq_client, project_id, datasets[consts.STAGING], datasets[consts.SANDBOX]) @@ -278,11 +287,14 @@ def create_tier(project_id: str, input_dataset: str, release_tag: str, # run synthetic data rules. will run synthetic extension table generation too. clean_cdr.main(args=synthetic_cleaning_args) - # TODO: # 2. mimic publishing guidelines so the person table looks correct. publish internally first to # verify all required datatypes exist. Afterward, can copy to the correct dev environment. update_person(bq_client, datasets[consts.STAGING]) + LOGGER.info("Generating empty fitbit tables") + _create_empty_fitbit_tables(bq_client, project_id, datasets[consts.STAGING]) + LOGGER.info("Empty fitbit table generation complete") + # Snapshot the staging dataset to final dataset LOGGER.info("Snapshotting the final dataset") bq_client.build_and_copy_contents(datasets[consts.STAGING], @@ -291,10 +303,6 @@ def create_tier(project_id: str, input_dataset: str, release_tag: str, f"Snapshot complete. Final dataset is at `{project_id}.{final_dataset_name}`" ) - LOGGER.info("Generating empty fitbit tables") - _create_empty_fitbit_tables(bq_client, project_id, final_dataset_name) - LOGGER.info("Empty fitbit table generation complete") - LOGGER.info( "Removing mapping tables because they are not part of the dataset released to researchers" )