Skip to content

Commit

Permalink
remove/convert create_or_update_job function
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewThien committed Dec 4, 2024
1 parent 1691117 commit 7f02e0e
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 59 deletions.
5 changes: 2 additions & 3 deletions app/workers/RulesConceptsActivity/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from shared.mapping.models import ScanReportConcept, ScanReportTable
from shared_code import db
from shared_code.db import (
create_or_update_job,
update_job,
JobStageType,
StageStatusType,
Expand Down Expand Up @@ -299,7 +298,7 @@ def _handle_table(
)

# Starting the concepts reusing process
create_or_update_job(
update_job(
JobStageType.REUSE_CONCEPTS,
StageStatusType.IN_PROGRESS,
scan_report_table=table,
Expand Down Expand Up @@ -342,7 +341,7 @@ def main(msg: Dict[str, str]):
_, vocab_dictionary = blob_parser.get_data_dictionary(data_dictionary_blob)

# Starting the concepts building from OMOP vocab process
create_or_update_job(
update_job(
JobStageType.BUILD_CONCEPTS_FROM_DICT,
StageStatusType.IN_PROGRESS,
scan_report_table=table,
Expand Down
3 changes: 1 addition & 2 deletions app/workers/RulesOrchestrator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

from shared.services.rules import find_existing_concepts_count
from shared_code.db import (
create_or_update_job,
update_job,
JobStageType,
StageStatusType,
Expand Down Expand Up @@ -47,7 +46,7 @@ def orchestrator_function(context: df.DurableOrchestrationContext):
page_size = int(os.environ.get("PAGE_SIZE", "1000"))
num_pages = max((concepts_count + page_size - 1) // page_size, 1)

create_or_update_job(
update_job(
JobStageType.GENERATE_RULES,
StageStatusType.IN_PROGRESS,
scan_report_table=ScanReportTable.objects.get(id=table_id),
Expand Down
63 changes: 9 additions & 54 deletions app/workers/shared_code/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,55 +42,6 @@ class JobStageType(Enum):
DOWNLOAD_RULES = "Generate and download mapping rules JSON"


def create_or_update_job(
stage: JobStageType,
# The initial status. For a job that doesn't have any further updates, this is the final status.
status: StageStatusType,
scan_report: Optional[ScanReport] = None,
scan_report_table: Optional[ScanReportTable] = None,
details: Optional[str] = None,
) -> None:
"""
Function to create or update (if that is the second time running this function) a job record,
based on the passed stage, status and object's ID
"""
stage_entity = JobStage.objects.get(value=stage.name)
status_entity = StageStatus.objects.get(value=status.name)
# For a job related to SR
if scan_report:
# Check if the record already exists
if not Job.objects.filter(scan_report=scan_report, stage=stage_entity).exists():
# If not, create a new record
Job.objects.create(
scan_report=scan_report,
stage=stage_entity,
status=status_entity,
details=details,
)
else:
# If yes, update it, in case re-running the job
update_job(stage, status, scan_report=scan_report, details=details)
# For a job related to SR table
if scan_report_table:
# Check if the record already exists
if not Job.objects.filter(
scan_report_table=scan_report_table, stage=stage_entity
).exists():
Job.objects.create(
scan_report_table=scan_report_table,
stage=stage_entity,
status=status_entity,
details=details,
)
else:
update_job(
stage,
status,
scan_report_table=scan_report_table,
details=details,
)


def update_job(
stage: JobStageType,
status: StageStatusType,
Expand All @@ -110,15 +61,19 @@ def update_job(
# Get stage and status entities
job_stage_entity = JobStage.objects.get(value=stage.name)
stage_status_entity = StageStatus.objects.get(value=status.name)
# Get the job enity based on the passed id and stage

# Get the job entity based on the passed id and stage,
# then filter and order to get the latest job record to update
if scan_report:
scan_report_job = Job.objects.get(
scan_report_job = Job.objects.filter(
scan_report=scan_report, stage=job_stage_entity
)
).order_by("-created_at")[0]

if scan_report_table:
scan_report_job = Job.objects.get(
scan_report_job = Job.objects.filter(
scan_report_table=scan_report_table, stage=job_stage_entity
)
).order_by("-created_at")[0]

# Update status and details
scan_report_job.status = stage_status_entity
if details:
Expand Down

0 comments on commit 7f02e0e

Please sign in to comment.