Skip to content

Commit

Permalink
converts to many-to-many instead of one-to-one for file mappings
Browse files Browse the repository at this point in the history
  • Loading branch information
jmbhughes committed Nov 27, 2023
1 parent e3eb63d commit fb8cc08
Show file tree
Hide file tree
Showing 12 changed files with 206 additions and 357 deletions.
49 changes: 31 additions & 18 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,29 +1,42 @@
root: "/home/marcus.hughes/running_test/"
file_version: "0.0.1"

launcher:
max_seconds_waiting: 100
escalated_priority: 100
max_flows_running: 30

priority:
levels:
level0_process_flow:
initial: 5
seconds: [30, 120, 600]
escalation: [10, 20, 30]
priority:
initial: 5
seconds: [ 30, 120, 600 ]
escalation: [ 10, 20, 30 ]
schedule:
latency: 3
window_duration_seconds: 3
cron_rule: "******"
options:
num_quads: 100

level1_process_flow:
initial: 6
seconds: [ 30, 120, 600 ]
escalation: [ 11, 21, 31]
level2_process_flow:
initial: 7
seconds: [30, 120, 600 ]
escalation: [ 12, 22, 32 ]
priority:
initial: 6
seconds: [ 30, 120, 600 ]
escalation: [ 11, 21, 31 ]
schedule:
latency: 3
window_duration_seconds: 3
cron_rule: "******"
options:

scheduler:
level2_process_flow:
latency: 3
window_duration: 3

process_options:
pointing:
num_quads: 100
priority:
initial: 7
seconds: [ 30, 120, 600 ]
escalation: [ 12, 22, 32 ]
schedule:
latency: 3
window_duration_seconds: 3
cron_rule: "******"
options:
28 changes: 28 additions & 0 deletions old_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
root: "/home/marcus.hughes/running_test/"

launcher:
max_seconds_waiting: 100
max_flows_running: 30

priority:
level0_process_flow:
initial: 5
seconds: [30, 120, 600]
escalation: [10, 20, 30]
level1_process_flow:
initial: 6
seconds: [ 30, 120, 600 ]
escalation: [ 11, 21, 31]
level2_process_flow:
initial: 7
seconds: [30, 120, 600 ]
escalation: [ 12, 22, 32 ]

scheduler:
level2_process_flow:
latency: 3
window_duration: 3

process_options:
pointing:
num_quads: 100
238 changes: 0 additions & 238 deletions punchpipe/controlsegment/flows.py

This file was deleted.

6 changes: 3 additions & 3 deletions punchpipe/controlsegment/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ def count_running_flows(session):

@task
def escalate_long_waiting_flows(session, pipeline_config):
for flow_type in pipeline_config['priority']:
for max_seconds_waiting, escalated_priority in zip(pipeline_config['priority'][flow_type]['seconds'],
pipeline_config['priority'][flow_type]['escalation']):
for flow_type in pipeline_config['levels']:
for max_seconds_waiting, escalated_priority in zip(pipeline_config['levels'][flow_type]['priority']['seconds'],
pipeline_config['levels'][flow_type]['priority']['escalation']):
since = datetime.now() - timedelta(seconds=max_seconds_waiting)
session.query(Flow).where(and_(Flow.state == "planned",
Flow.creation_time < since,
Expand Down
27 changes: 17 additions & 10 deletions punchpipe/controlsegment/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import itertools

from punchpipe.controlsegment.db import File, FileRelationship
from punchpipe.controlsegment.util import get_database_session, update_file_state, load_pipeline_configuration

Expand All @@ -15,25 +17,30 @@ def generic_scheduler_flow_logic(query_ready_files_func,
session = get_database_session()

# find all files that are ready to run
parent_files = []
ready_file_ids = query_ready_files_func(session, pipeline_config)
for file_id in ready_file_ids:
# mark the file as progressed so that there aren't duplicate processing flows
update_file_state(session, file_id, "progressed")
if ready_file_ids:
for file_id in ready_file_ids:
# mark the file as progressed so that there aren't duplicate processing flows
update_file_state(session, file_id, "progressed")

# get the prior level file's information
parent_file = session.query(File).where(File.file_id == file_id).one()
# get the prior level file's information
parent_files += session.query(File).where(File.file_id == file_id).all()

# prepare the new level flow and file
child_file = construct_child_file_info(parent_file)
database_flow_info = construct_child_flow_info(parent_file, child_file, pipeline_config)
session.add(child_file)
children_files = construct_child_file_info(parent_files, pipeline_config)
database_flow_info = construct_child_flow_info(parent_files, children_files, pipeline_config)
session.add(*children_files)
session.add(database_flow_info)
session.commit()

# set the processing flow now that we know the flow_id after committing the flow info
child_file.processing_flow = database_flow_info.flow_id
for child_file in children_files:
child_file.processing_flow = database_flow_info.flow_id
session.commit()

# create a file relationship between the prior and next levels
session.add(FileRelationship(parent=parent_file.file_id, child=child_file.file_id))
for parent_file in parent_files:
for child_file in children_files:
session.add(FileRelationship(parent=parent_file.file_id, child=child_file.file_id))
session.commit()
Loading

0 comments on commit fb8cc08

Please sign in to comment.