Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DC-3485] Update generate_ehr_upload_pids.py to automatically exclude sites without submissions #1804

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions data_steward/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@
ZIP_CODE_AGGREGATION_MAP = 'zip_code_aggregation_map'
DEID_QUESTIONNAIRE_RESPONSE_MAP = '_deid_questionnaire_response_map'

OPERATIONS_ANALYTICS = 'operations_analytics'
EHR_UPLOAD_PIDS = 'ehr_upload_pids'

AIAN_LIST = 'aian_list'

# Participant Summary
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[
{
"type": "string",
"name": "Org_ID",
"mode": "nullable",
"description": "Organization ID of the HPO site."
},
{
"type": "string",
"name": "HPO_ID",
"mode": "nullable",
"description": "HPO ID of the HPO site."
},
{
"type": "string",
"name": "Site_Name",
"mode": "nullable",
"description": "HPO site name."
},
{
"type": "integer",
"name": "Display_Order",
"mode": "nullable",
"description": "Order in which HPO sites are shown in the report."
}
]
156 changes: 83 additions & 73 deletions data_steward/tools/generate_ehr_upload_pids.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

from gcloud.bq import BigQueryClient

from constants.utils import bq as bq_consts
from common import JINJA_ENV
from constants.utils.bq import LOOKUP_TABLES_DATASET_ID, HPO_SITE_ID_MAPPINGS_TABLE_ID
from common import (CONDITION_OCCURRENCE, DRUG_EXPOSURE, EHR_UPLOAD_PIDS,
JINJA_ENV, OBSERVATION, OPERATIONS_ANALYTICS, PERSON,
PROCEDURE_OCCURRENCE, VISIT_OCCURRENCE)

LOGGER = logging.getLogger(__name__)

Expand All @@ -15,76 +17,88 @@
LOGGER.addHandler(handler)
LOGGER.setLevel(logging.DEBUG)

EHR_UPLOAD_PIDS_BQ_SCRIPT = JINJA_ENV.from_string('''
SELECT ARRAY_TO_STRING(ARRAY_AGG(FORMAT
("""SELECT
person_id,
current_datetime() AS report_run_time,
Org_ID as org_id,
HPO_ID as hpo_id,
Site_Name as site_name,
TIMESTAMP_MICROS(t.last_modified_time * 1000) AS latest_upload_time
pid_tables = [
PERSON, CONDITION_OCCURRENCE, PROCEDURE_OCCURRENCE, DRUG_EXPOSURE,
OBSERVATION, VISIT_OCCURRENCE
]

HPO_IDS_QUERY = JINJA_ENV.from_string("""
SELECT LOWER(hpo_id) AS hpo_id FROM `{{project_id}}.{{dataset_id}}.{{table_id}}`
""")

EHR_UPLOAD_PIDS_BQ_QUERY = JINJA_ENV.from_string("""
CREATE OR REPLACE VIEW `{{project_id}}.{{operations_dataset_id}}.{{ehr_pids_view}}`
OPTIONS(description='A participant-level view of when ehr data was sent. NOTE: the RDR calls this view to support HealthPro (1/27/21)')
AS {% for hpo_site in hpo_sites %}
SELECT
person_id,
current_datetime() AS report_run_time,
Org_ID as org_id,
HPO_ID as hpo_id,
Site_Name as site_name,
TIMESTAMP_MICROS(t.last_modified_time * 1000) AS latest_upload_time
Comment on lines +34 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hiro-mishima Our biggest problem with the ehr_upload_pids was the "Query too large" error, which prevented us from doing this. This feels like it is making the query more complicated. Let me know if you've already tested this and it works fine.

FROM
`{{project_id}}.{{ehr_dataset_id}}.%s_person` p,
`{{project_id}}.{{lookup_dataset_id}}.{{hpo_mappings}}` m,
`{{project_id}}.{{ehr_dataset_id}}.__TABLES__` t
WHERE t.table_id = '%s_person'
AND m.HPO_ID = '%s'
AND person_id IN (
SELECT person_id FROM `{{project_id}}.{{ehr_dataset_id}}.%s_person` UNION DISTINCT
SELECT person_id FROM `{{project_id}}.{{ehr_dataset_id}}.%s_condition_occurrence` UNION DISTINCT
SELECT person_id FROM `{{project_id}}.{{ehr_dataset_id}}.%s_procedure_occurrence` UNION DISTINCT
SELECT person_id FROM `{{project_id}}.{{ehr_dataset_id}}.%s_drug_exposure` UNION DISTINCT
SELECT person_id FROM `{{project_id}}.{{ehr_dataset_id}}.%s_observation` UNION DISTINCT
SELECT person_id FROM `{{project_id}}.{{ehr_dataset_id}}.%s_visit_occurrence`)""",
LOWER(HPO_ID), LOWER(HPO_ID), HPO_ID, LOWER(HPO_ID), LOWER(HPO_ID), LOWER(HPO_ID),
LOWER(HPO_ID), LOWER(HPO_ID), LOWER(HPO_ID))), "\\nUNION ALL \\n") as q
FROM `{{project_id}}.{{lookup_dataset_id}}.{{hpo_mappings}}`
WHERE HPO_ID NOT IN ({{excluded_sites_str}})
''')


def get_excluded_hpo_ids_str(excluded_hpo_ids):
"""
Formats list of hpo_ids or None to add to bq script, adds empty hpo_id
`{{project_id}}.{{ehr_dataset_id}}.{{hpo_site}}_person` p,
`{{project_id}}.{{lookup_dataset_id}}.{{hpo_mappings}}` m,
`{{project_id}}.{{ehr_dataset_id}}.__TABLES__` t
WHERE t.table_id = '{{hpo_site}}_person'
AND LOWER(m.HPO_ID) = '{{hpo_site}}'
AND person_id IN (
{% for pid_table in pid_tables %}
SELECT person_id FROM `{{project_id}}.{{ehr_dataset_id}}.{{hpo_site}}_{{pid_table}}`
{% if not loop.last %} UNION DISTINCT {% endif %}
{% endfor %}
){% if not loop.last %} UNION ALL {% endif %}
{% endfor %}
""")

:param excluded_hpo_ids: List output by args parser or None
:return: String of hpo_ids enclosed in single quotes along with empty hpo_ids
"""
if excluded_hpo_ids is None:
excluded_hpo_ids = []
# use uppercase for all hpo_ids as is in the table
excluded_hpo_ids = [hpo_id.upper() for hpo_id in excluded_hpo_ids]
# exclude empty site since lookup table contains it
excluded_hpo_ids.append('')
excluded_hpo_ids_str = ', '.join(
[f"'{hpo_id}'" for hpo_id in excluded_hpo_ids])
return excluded_hpo_ids_str


def generate_ehr_upload_pids_query(project_id,
ehr_dataset_id,
excluded_hpo_ids=None):

def update_ehr_upload_pids_view(project_id, ehr_dataset_id, bq_client=None):
"""
Generate query for all hpo_ids except specified
Update (=create or replace) ehr_upload_pids view.

:param project_id: Identifies the project
:param ehr_dataset_id: Identifies the ehr dataset
:param excluded_hpo_ids: List of sites
:return: Query string to use in ehr_upload_pids view
:param bq_client: BigQuery client
:return:
"""
bq_client = BigQueryClient(project_id)
excluded_hpo_ids_str = get_excluded_hpo_ids_str(excluded_hpo_ids)
query = EHR_UPLOAD_PIDS_BQ_SCRIPT.render(
if not bq_client:
bq_client = BigQueryClient(project_id)

hpo_query = HPO_IDS_QUERY.render(project_id=project_id,
dataset_id=LOOKUP_TABLES_DATASET_ID,
table_id=HPO_SITE_ID_MAPPINGS_TABLE_ID)

response = bq_client.query(hpo_query)
result = list(response.result())
hpo_sites = [row[0] for row in result]

hpo_sites_with_submission = [
hpo_id for hpo_id in hpo_sites if all(
bq_client.table_exists(f'{hpo_id}_{table}', ehr_dataset_id)
for table in pid_tables)
]
LOGGER.info(
f'The following HPO sites will be included in the view `{project_id}.{OPERATIONS_ANALYTICS}.{EHR_UPLOAD_PIDS}`. '
'These sites are listed in the site mapping table and they have submitted files: '
)
LOGGER.info(', '.join(hpo_sites_with_submission))

query = EHR_UPLOAD_PIDS_BQ_QUERY.render(
project_id=project_id,
operations_dataset_id=OPERATIONS_ANALYTICS,
ehr_pids_view=EHR_UPLOAD_PIDS,
ehr_dataset_id=ehr_dataset_id,
lookup_dataset_id=bq_consts.LOOKUP_TABLES_DATASET_ID,
hpo_mappings=bq_consts.HPO_SITE_ID_MAPPINGS_TABLE_ID,
excluded_sites_str=excluded_hpo_ids_str)
query_job = bq_client.query(query)
res = query_job.result().to_dataframe()
full_query = res["q"].to_list()[0]
return full_query
lookup_dataset_id=LOOKUP_TABLES_DATASET_ID,
hpo_mappings=HPO_SITE_ID_MAPPINGS_TABLE_ID,
hpo_sites=hpo_sites_with_submission,
pid_tables=pid_tables)

_ = bq_client.query(query).result()

LOGGER.info(
"The view is updated. Ensure the view is accessible without errors: "
f"`{project_id}.{OPERATIONS_ANALYTICS}.{EHR_UPLOAD_PIDS}`")


def get_args_parser():
Expand All @@ -103,19 +117,15 @@ def get_args_parser():
action='store',
help='Identifies the ehr dataset',
required=True)
parser.add_argument('-i',
'--excluded_hpo_ids',
action='store',
nargs='+',
dest='excluded_hpo_ids',
help='Identifies sites with no tables in ehr dataset',
required=False)
return parser


if __name__ == '__main__':
args_parser = get_args_parser()
args = args_parser.parse_args()
ehr_upload_pids_query = generate_ehr_upload_pids_query(
args.project_id, args.ehr_dataset_id, args.excluded_hpo_ids)
LOGGER.info(ehr_upload_pids_query)

bq_client = BigQueryClient(args.project_id)

update_ehr_upload_pids_view(args.project_id,
args.ehr_dataset_id,
bq_client=bq_client)
Loading