Skip to content

Commit

Permalink
[DC-2692] synthetic script
Browse files Browse the repository at this point in the history
* changes required when running the synthetic script all the way through
* the script did finish
* more changes are expected
  • Loading branch information
lrwb-aou committed Sep 21, 2022
1 parent 7cb4490 commit 55bd1a6
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 26 deletions.
3 changes: 2 additions & 1 deletion data_steward/cdr_cleaner/clean_cdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@
CONTROLLED_TIER_FITBIT_CLEANING_CLASSES,
DataStage.SYNTHETIC.value:
RDR_CLEANING_CLASSES + COMBINED_CLEANING_CLASSES +
REGISTERED_TIER_DEID_CLEANING_CLASSES,
REGISTERED_TIER_DEID_CLEANING_CLASSES +
REGISTERED_TIER_DEID_BASE_CLEANING_CLASSES,
}


Expand Down
11 changes: 2 additions & 9 deletions data_steward/cdr_cleaner/clean_cdr_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,15 +217,8 @@ def infer_rule(clazz, project_id, dataset_id, sandbox_dataset_id, table_namer,
kwargs = get_custom_kwargs(clazz, **kwargs)

# setting default values that won't raise errors
def query_function():
"""
Imitates base class get_query_specs()
Can be removed once all classes are base classed
:return: list of query dicts generated by rule
"""
return []

setup_function = lambda: object
query_function = lambda: []
setup_function = lambda client: object
function_name = ''
module_name = ''
line_no = ''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ def __init__(self,
SetConceptIdsForSurveyQuestionsAnswers,
UpdateFamilyHistoryCodes
],
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

def get_query_specs(self, *args, **keyword_args) -> query_spec_list:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ def __init__(self,
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

def get_query_specs(self, *args, **keyword_args):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def __init__(self,
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

def get_query_specs(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ def __init__(self,
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

def get_query_specs(self, *args, **keyword_args):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ def __init__(self,
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

def get_query_specs(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ def __init__(self,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
depends_on=[GenerateExtTables, COPESurveyVersionTask],
table_namer=table_namer)
table_namer=table_namer,
run_for_synthetic=True)

def get_query_specs(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def __init__(self,
project_id=project_id,
dataset_id=dataset_id,
sandbox_dataset_id=sandbox_dataset_id,
affected_tables=[OBSERVATION + '_ext'],
affected_tables=[f'{OBSERVATION}_ext'],
depends_on=[GenerateExtTables],
table_namer=table_namer,
run_for_synthetic=True)
Expand Down
137 changes: 128 additions & 9 deletions data_steward/tools/create_synthetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
from datetime import datetime

# Third party imports
from google.cloud import bigquery

# Project imports
from cdr_cleaner import clean_cdr
from cdr_cleaner.args_parser import add_kwargs_to_args
from common import CDR_SCOPES
from common import CDR_SCOPES, FITBIT_TABLES
from constants.cdr_cleaner import clean_cdr as consts
from constants.tools import create_combined_backup_dataset as combine_consts
from gcloud.bq import BigQueryClient
from resources import fields_for
from tools import import_rdr_omop
from tools.create_combined_backup_dataset import generate_combined_mapping_tables
from tools.recreate_person import update_person
Expand Down Expand Up @@ -112,6 +114,109 @@ def create_datasets(client: BigQueryClient, name: str, input_dataset: str,
return datasets


def _fix_survey_conduct_records(client, project_id, staging_ds, sandbox_ds):
# sandbox and drop orphaned records
que = (f"""
CREATE OR REPLACE TABLE `{project_id}.{sandbox_ds}.rdr_dc2714_rm_orphan_survey_conduct` AS (
SELECT * FROM `{project_id}.{staging_ds}.survey_conduct` sc
WHERE NOT EXISTS (
SELECT 1 FROM `{project_id}.{staging_ds}.observation` o
WHERE sc.survey_conduct_id = o.questionnaire_response_id));
DELETE FROM `{project_id}.{staging_ds}.survey_conduct` sc
WHERE EXISTS (
SELECT 1 FROM `{project_id}.{sandbox_ds}.rdr_dc2714_rm_orphan_survey_conduct` sb
WHERE sc.survey_conduct_id = sb.survey_conduct_id
);""")

resp = client.query(que)
resp.result()

# fix cope survey responses
que = (f"""
/* save all cope and minute survey responses */
CREATE OR REPLACE TABLE `{project_id}.{sandbox_ds}.rdr_dc2713_survey_conduct` AS (
SELECT *
FROM `{project_id}.{staging_ds}.survey_conduct`
WHERE REGEXP_CONTAINS(survey_source_value, r'(?i)(^cope$)|(^cope_)')
);
/* update cope and minute survey responses */
UPDATE `{project_id}.{staging_ds}.survey_conduct` s
SET survey_concept_id = CASE WHEN m.cope_month = 'may' THEN 2100000002
WHEN m.cope_month = 'june' THEN 2100000003
WHEN m.cope_month = 'july' THEN 2100000004
WHEN m.cope_month = 'nov' THEN 2100000005
WHEN m.cope_month = 'dec' THEN 2100000006
WHEN m.cope_month = 'feb' THEN 2100000007
WHEN m.cope_month = 'vaccine1' THEN 905047
WHEN m.cope_month = 'vaccine2' THEN 905055
WHEN m.cope_month = 'vaccine3' THEN 765936
WHEN m.cope_month = 'vaccine4' THEN 1741006
ELSE s.survey_concept_id
END,
survey_source_concept_id = CASE
WHEN m.cope_month = 'may' THEN 2100000002
WHEN m.cope_month = 'june' THEN 2100000003
WHEN m.cope_month = 'july' THEN 2100000004
WHEN m.cope_month = 'nov' THEN 2100000005
WHEN m.cope_month = 'dec' THEN 2100000006
WHEN m.cope_month = 'feb' THEN 2100000007
WHEN m.cope_month = 'vaccine1' THEN 905047
WHEN m.cope_month = 'vaccine2' THEN 905055
WHEN m.cope_month = 'vaccine3' THEN 765936
WHEN m.cope_month = 'vaccine4' THEN 1741006
ELSE s.survey_concept_id
END
FROM `{project_id}.{staging_ds}.cope_survey_semantic_version_map` m
WHERE s.survey_conduct_id = m.questionnaire_response_id;
/* save all records that will be changed */
CREATE OR REPLACE TABLE `{project_id}.{sandbox_ds}.rdr_dc2713_survey_conduct_source_value` AS (
SELECT *
FROM `{project_id}.{staging_ds}.survey_conduct` s
LEFT JOIN `{project_id}.{staging_ds}.concept` c
ON s.survey_concept_id = c.concept_id
AND survey_concept_id = 0
);
/* update the survey_source_value field */
UPDATE `{project_id}.{staging_ds}.survey_conduct` s
SET s.survey_source_value = c.concept_code
FROM `{project_id}.{staging_ds}.concept` c
WHERE s.survey_concept_id = c.concept_id
AND s.survey_concept_id > 0;
""")

resp = client.query(que)
resp.result()


def _create_empty_fitbit_tables(bq_client, project_id, final_dataset_name):
for table in FITBIT_TABLES:
schema_list = fields_for(table)
ta = bigquery.Table(f'{project_id}.{final_dataset_name}.{table}',
schema_list)
bq_client.create_table(ta)


def _remove_mapping_tables(bq_client, project_id, final_dataset_name):
for table in bq_client.list_tables(f'{project_id}.{final_dataset_name}'):
if table.table_id.startswith('_mapping_'):
LOGGER.info(f"Removing table {table.full_table_id}")
bq_client.delete_table(table)


def update_domain_table_id(client: BigQueryClient, fq_table: str):
que = (
f"UPDATE `{fq_table}` "
f"SET {fq_table.split('.')[-1]}_id = {fq_table.split('.')[-1]}_id + 1000000000000000 "
f"WHERE 1=1")

resp = client.query(que)
resp.result()


def create_tier(project_id: str, input_dataset: str, release_tag: str,
run_as: str, **kwargs) -> dict:
"""
Expand Down Expand Up @@ -162,9 +267,22 @@ def create_tier(project_id: str, input_dataset: str, release_tag: str,
update_person(bq_client, datasets[consts.STAGING])

# Snapshot the staging dataset to final dataset
LOGGER.info("Snapshotting the final dataset")
bq_client.build_and_copy_contents(datasets[consts.STAGING],
final_dataset_name)

LOGGER.info(
f"Snapshot complete. Final dataset is at `{project_id}.{final_dataset_name}`"
)

LOGGER.info("Generating empty fitbit tables")
_create_empty_fitbit_tables(bq_client, project_id, final_dataset_name)
LOGGER.info("Empty fitbit table generation complete")

LOGGER.info(
"Removing mapping tables because they are not part of the dataset released to researchers"
)
_remove_mapping_tables(bq_client, project_id, final_dataset_name)
LOGGER.info("Done removing mapping tables")
return datasets


Expand Down Expand Up @@ -226,20 +344,21 @@ def main(raw_args=None) -> dict:
clean_cdr.validate_custom_params(cleaning_classes, **kwargs)

# load synthetic data from the bucket
import_rdr_omop.main([
dataset_obj = import_rdr_omop.main([
'--rdr_bucket', args.bucket_name, '--run_as', args.target_principal,
'--export_date',
datetime.now().strftime("%Y-%m-%d"), '--curation_project',
args.project_id, '--vocab_dataset', args.vocab_dataset, '--console_log'
])

# # Creates synthetic dataset and runs a subset of cleaning rules marked for synthetic data
# datasets = create_tier(args.credentials_filepath, args.project_id,
# args.idataset, args.release_tag,
# args.target_principal, **kwargs)
dataset_obj = dataset_obj.dataset_id

# return datasets
# Creates synthetic dataset and runs a subset of cleaning rules marked for synthetic data
datasets = create_tier(args.project_id, dataset_obj, args.release_tag,
args.target_principal, **kwargs)

return datasets


if __name__ == '__main__':
main()
main()
2 changes: 2 additions & 0 deletions data_steward/tools/import_rdr_omop.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ def main(raw_args=None):
copy_vocab_tables(bq_client, new_dataset_name, args.vocabulary)
return dataset_object
if __name__ == '__main__':
main()

0 comments on commit 55bd1a6

Please sign in to comment.