From 8dedd0c0a800f77f3f727ae16967911adf230063 Mon Sep 17 00:00:00 2001 From: Adrien Faure Date: Wed, 13 Dec 2023 17:05:05 +0100 Subject: [PATCH] [WIP] make tests pass with resource span --- oar/cli/oarnodes.py | 3 +- oar/lib/basequery.py | 16 ++++++-- oar/lib/job_handling.py | 21 ++++++++--- oar/lib/node.py | 72 +++++++++++++++++++----------------- oar/lib/resource_handling.py | 36 +++++++++++++----- pyproject.toml | 4 +- tests/api/test_job.py | 9 ++++- 7 files changed, 102 insertions(+), 59 deletions(-) diff --git a/oar/cli/oarnodes.py b/oar/cli/oarnodes.py index 70d43d42..16d0c358 100644 --- a/oar/cli/oarnodes.py +++ b/oar/cli/oarnodes.py @@ -55,7 +55,8 @@ def get_resources_for_job(session): res = ( session.query(Resource, Job) .filter(Job.assigned_moldable_job == AssignedResource.moldable_id) - .filter(AssignedResource.resource_id == Resource.id) + .filter(Resource.id >= AssignedResource.resource_id) + .filter(Resource.id < AssignedResource.resource_id + AssignedResource.span) .filter(Job.state == "Running") .order_by(Resource.id) .all() diff --git a/oar/lib/basequery.py b/oar/lib/basequery.py index b1c3f569..a315a128 100644 --- a/oar/lib/basequery.py +++ b/oar/lib/basequery.py @@ -210,8 +210,10 @@ def get_assigned_jobs_resources(self, jobs): AssignedResource, Job.assigned_moldable_job == AssignedResource.moldable_id, ) - .filter(Resource.id >= AssignedResource.resource_id) - .filter(Resource.id < AssignedResource.resource_id + AssignedResource.span) + .filter( + Resource.id >= AssignedResource.resource_id, + Resource.id < AssignedResource.resource_id + AssignedResource.span, + ) .filter(Job.id.in_([job.id for job in jobs])) .order_by(Job.id.asc()) ) @@ -230,7 +232,10 @@ def get_assigned_one_job_resources(self, job): AssignedResource, Job.assigned_moldable_job == AssignedResource.moldable_id, ) - .join(Resource, Resource.id == AssignedResource.resource_id) + .filter( + Resource.id >= AssignedResource.resource_id, + Resource.id < AssignedResource.resource_id + AssignedResource.span, + ) # .filter(job_id_column == job.id) ) return query @@ -255,7 +260,10 @@ def get_jobs_resource(self, resource_id): db = self.session query = ( db.query(Job) - .filter(AssignedResource.resource_id == resource_id) + .filter( + Resource.id >= AssignedResource.resource_id, + Resource.id < AssignedResource.resource_id + AssignedResource.span, + ) .filter(MoldableJobDescription.id == AssignedResource.moldable_id) .filter(MoldableJobDescription.job_id == Job.id) ) diff --git a/oar/lib/job_handling.py b/oar/lib/job_handling.py index 5422f1fa..45ca69af 100644 --- a/oar/lib/job_handling.py +++ b/oar/lib/job_handling.py @@ -4,7 +4,6 @@ import os import random import re -from oar.lib.resource import ResourceSet from procset import ProcSet from sqlalchemy import distinct, func, insert, text @@ -36,6 +35,7 @@ WalltimeChange, ) from oar.lib.plugins import find_plugin_function +from oar.lib.resource import ResourceSet from oar.lib.resource_handling import ( get_current_resources_with_suspended_job, update_current_scheduler_priority, @@ -841,7 +841,7 @@ def add_resource_job_pairs(session, moldable_id): { "moldable_job_id": res_mld_id.moldable_id, "resource_id": res_mld_id.resource_id, - "span": res_mld_id.span + "span": res_mld_id.span, } for res_mld_id in resources_mld_ids ] @@ -1209,9 +1209,12 @@ def update_scheduler_last_job_date(session, date, moldable_id): else: session.query(Resource).filter( AssignedResource.moldable_id == moldable_id - ).filter(Resource.id == AssignedResource.resource_id).update( + ).filter(Resource.id >= AssignedResource.resource_id).filter( + Resource.id < AssignedResource.resource_id + AssignedResource.span + ).update( {Resource.last_job_date: date}, synchronize_session=False ) + session.commit() @@ -1445,7 +1448,8 @@ def get_cpuset_values(session, config, cpuset_field, moldable_id): results = ( session.query(Resource.network_address, getattr(Resource, cpuset_field)) .filter(AssignedResource.moldable_id == moldable_id) - .filter(AssignedResource.resource_id == Resource.id) + .filter(Resource.id >= AssignedResource.resource_id) + .filter(Resource.id < AssignedResource.resource_id + AssignedResource.span) .filter(Resource.network_address != "") .filter(text(sql_where_string)) .group_by(Resource.network_address, getattr(Resource, cpuset_field)) @@ -1605,7 +1609,8 @@ def get_job_current_hostnames(session, job_id): session.query(distinct(Resource.network_address)) .filter(AssignedResource.index == "CURRENT") .filter(MoldableJobDescription.index == "CURRENT") - .filter(AssignedResource.resource_id == Resource.id) + .filter(Resource.id >= AssignedResource.resource_id) + .filter(Resource.id < AssignedResource.resource_id + AssignedResource.span) .filter(MoldableJobDescription.id == AssignedResource.moldable_id) .filter(MoldableJobDescription.job_id == job_id) .filter(Resource.network_address != "") @@ -1980,7 +1985,8 @@ def get_job_host_log(session, moldable_id): res = ( session.query(distinct(Resource.network_address)) .filter(AssignedResource.moldable_id == moldable_id) - .filter(Resource.id == AssignedResource.resource_id) + .filter(Resource.id >= AssignedResource.resource_id) + .filter(Resource.id < AssignedResource.resource_id + AssignedResource.span) .filter(Resource.network_address != "") .filter(Resource.type == "default") .all() @@ -2623,6 +2629,7 @@ def get_timer_armed_job( def archive_some_moldable_job_nodes(session, config, moldable_id, hosts): """Sets the index fields to LOG in the table assigned_resources""" + # TODO if config["DB_TYPE"] == "Pg": session.query(AssignedResource).filter( AssignedResource.moldable_id == moldable_id @@ -2641,6 +2648,8 @@ def get_job_resources_properties(session, job_id): .filter(Job.id == job_id) .filter(Job.assigned_moldable_job == AssignedResource.moldable_id) .filter(AssignedResource.resource_id == Resource.id) + .filter(Resource.id >= AssignedResource.resource_id) + .filter(Resource.id < AssignedResource.resource_id + AssignedResource.span) .order_by(Resource.id) .all() ) diff --git a/oar/lib/node.py b/oar/lib/node.py index 231b2468..4a8b564b 100644 --- a/oar/lib/node.py +++ b/oar/lib/node.py @@ -119,34 +119,35 @@ def get_gantt_hostname_to_wake_up(session: Session, date: int, wakeup_time: int) hosts = [h_tpl[0] for h_tpl in hostnames] return hosts -def get_gantt_hostname_to_wake_up_(session, date, wakeup_time): - """Get hostname that we must wake up to launch jobs""" - hostnames = ( - session.query(Resource.network_address) - .filter(GanttJobsResource.moldable_id == GanttJobsPrediction.moldable_id) - .filter(MoldableJobDescription.id == GanttJobsPrediction.moldable_id) - .filter(Job.id == MoldableJobDescription.job_id) - .filter(GanttJobsPrediction.start_time <= date + wakeup_time) - .filter(Job.state == "Waiting") - .filter(Resource.id == GanttJobsResource.resource_id) - .filter(Resource.state == "Absent") - .filter(Resource.network_address != "") - .filter(Resource.type == "default") - .filter( - (GanttJobsPrediction.start_time + MoldableJobDescription.walltime) - <= Resource.available_upto - ) - .group_by(Resource.network_address) - .all() - ) - hosts = [h_tpl[0] for h_tpl in hostnames] - return hosts +# TODO fail merge +# def get_gantt_hostname_to_wake_up(session, date, wakeup_time): +# """Get hostname that we must wake up to launch jobs""" +# # get save assignement +# hostnames = ( +# session.query(Resource.network_address) +# .filter(GanttJobsResource.moldable_id == GanttJobsPrediction.moldable_id) +# .filter(MoldableJobDescription.id == GanttJobsPrediction.moldable_id) +# .filter(Job.id == MoldableJobDescription.job_id) +# .filter(GanttJobsPrediction.start_time <= date + wakeup_time) +# .filter(Job.state == "Waiting") +# .filter(Resource.id == GanttJobsResource.resource_id) +# .filter(Resource.state == "Absent") +# .filter(Resource.network_address != "") +# .filter(Resource.type == "default") +# .filter( +# (GanttJobsPrediction.start_time + MoldableJobDescription.walltime) +# <= Resource.available_upto +# ) +# .group_by(Resource.network_address) +# .all() +# ) +# hosts = [h_tpl[0] for h_tpl in hostnames] +# return hosts -def get_gantt_hostname_to_wake_up(session, date, wakeup_time): - """Get hostname that we must wake up to launch jobs""" - # get save assignement +def get_gantt_hostname_to_wake_up_(session, date, wakeup_time): + """Get hostname that we must wake up to launch jobs""" hostnames = ( session.query(Resource.network_address) .filter(GanttJobsResource.moldable_id == GanttJobsPrediction.moldable_id) @@ -168,12 +169,8 @@ def get_gantt_hostname_to_wake_up(session, date, wakeup_time): hosts = [h_tpl[0] for h_tpl in hostnames] return hosts -<<<<<<< HEAD -def get_next_job_date_on_node(session: Session, hostname: str): -======= -def get_next_job_date_on_node(session, hostname): ->>>>>>> ec4caec ([test] clean after rebase) +def get_next_job_date_on_node(session: Session, hostname: str): result = ( session.query(func.min(GanttJobsPrediction.start_time)) .filter(Resource.network_address == hostname) @@ -203,8 +200,11 @@ def get_alive_nodes_with_jobs( """Returns the list of occupied nodes""" result = ( session.query(distinct(Resource.network_address)) - .filter(Resource.id == AssignedResource.resource_id) - .filter(AssignedResource.moldable_id == MoldableJobDescription.id) + # .filter(AssignedResource.moldable_id == MoldableJobDescription.id) + .filter( + Resource.id >= AssignedResource.resource_id, + Resource.id < AssignedResource.resource_id + AssignedResource.span, + ) .filter(MoldableJobDescription.job_id == Job.id) .filter( Job.state.in_( @@ -398,7 +398,8 @@ def get_current_assigned_nodes( results = ( session.query(distinct(Resource.network_address)) .filter(AssignedResource.index == "CURRENT") - .filter(Resource.id == AssignedResource.resource_id) + .filter(Resource.id >= AssignedResource.resource_id) + .filter(Resource.id < AssignedResource.resource_id + AssignedResource.span) .filter(Resource.type == "default") .all() ) @@ -428,7 +429,10 @@ def get_node_job_to_frag(session: Session, hostname: str) -> List[int]: .filter(AssignedResource.index == "CURRENT") .filter(MoldableJobDescription.index == "CURRENT") .filter(Resource.network_address == hostname) - .filter(AssignedResource.resource_id == Resource.id) + .filter( + Resource.id >= AssignedResource.resource_id, + Resource.id < AssignedResource.resource_id + AssignedResource.span, + ) .filter(AssignedResource.moldable_id == MoldableJobDescription.id) .filter(MoldableJobDescription.job_id == Job.id) .filter(Job.state != "Terminated") diff --git a/oar/lib/resource_handling.py b/oar/lib/resource_handling.py index 132971b5..e8169f0a 100644 --- a/oar/lib/resource_handling.py +++ b/oar/lib/resource_handling.py @@ -202,7 +202,10 @@ def remove_resource(session, resource_id, user=None): if state == "Dead": results = ( session.query(Job.id, Job.assigned_moldable_job) - .filter(AssignedResource.resource_id == resource_id) + .filter( + Resource.id >= AssignedResource.resource_id, + Resource.id < AssignedResource.resource_id + AssignedResource.span, + ) .filter(AssignedResource.moldable_id == Job.assigned_moldable_job) .all() ) @@ -256,7 +259,10 @@ def get_current_assigned_job_resources(session, moldable_id): session.query(Resource) .filter(AssignedResource.index == "CURRENT") .filter(AssignedResource.moldable_id == moldable_id) - .filter(Resource.id == AssignedResource.resource_id) + .filter( + Resource.id >= AssignedResource.resource_id, + Resource.id < AssignedResource.resource_id + AssignedResource.span, + ) .all() ) return res @@ -314,7 +320,8 @@ def update_resource_nextFinaudDecision(session, resource_id, finaud_decision): def update_scheduler_last_job_date(session, date, moldable_id): session.query(Resource).filter(AssignedResource.moldable_id == moldable_id).filter( - AssignedResource.resource_id == Resource.resource_id + Resource.id >= AssignedResource.resource_id, + Resource.id < AssignedResource.resource_id + AssignedResource.span, ).update({Resource.last_job_date: date}, synchronize_session=False) @@ -380,7 +387,11 @@ def update_current_scheduler_priority(session, config, job, value, state): session.query(distinct(getattr(Resource, f))) .filter(AssignedResource.index == "CURRENT") .filter(AssignedResource.moldable_id == job.assigned_moldable_job) - .filter(AssignedResource.resource_id == Resource.id) + .filter( + Resource.id >= AssignedResource.resource_id, + Resource.id + < AssignedResource.resource_id + AssignedResource.span, + ) .all() ) @@ -516,12 +527,17 @@ def get_count_busy_resources( active_moldable_job_ids = session.query(Job.assigned_moldable_job).filter( Job.state.in_(("toLaunch", "Running", "Resuming")) ) - count_busy_resources = ( - session.query(func.count(distinct(AssignedResource.resource_id))) - .filter(AssignedResource.moldable_id.in_(active_moldable_job_ids)) - .scalar() - ) - return count_busy_resources + count_busy_resources: List[AssignedResource] = ( + session.query(AssignedResource).filter( + AssignedResource.moldable_id.in_(active_moldable_job_ids) + ) + ).all() + + total = 0 + for resource in count_busy_resources: + total += resource.span + + return total def resources_creation( diff --git a/pyproject.toml b/pyproject.toml index ced31d98..3b697bc9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -60,8 +60,8 @@ tabulate = ">=0.8.9, <1.0" Click = ">=8.0.0" pyzmq = "<25.0, >=22.0.3" requests = ">=2.24.0" -# procset = "^1.0" -procset = { git = "https://gitlab.inria.fr/bleuse/procset.py", rev="typing" } +procset = "^1.0" +# procset = { git = "https://gitlab.inria.fr/bleuse/procset.py", rev="typing" } # procset = { path = "/home/adfaure/code/procset.py", develop = false } simplejson = "^3.17.2" psutil = "^5.8.0" diff --git a/tests/api/test_job.py b/tests/api/test_job.py index de020378..a5f5811e 100644 --- a/tests/api/test_job.py +++ b/tests/api/test_job.py @@ -4,7 +4,7 @@ from oar.api.url_utils import replace_query_params from oar.kao.meta_sched import meta_schedule from oar.lib.job_handling import insert_job, set_job_state -from oar.lib.models import FragJob, Job +from oar.lib.models import AssignedResource, FragJob, Job def test_jobs_index(client, minimal_db_initialization): @@ -78,7 +78,7 @@ def test_app_jobs_get_one_details(client, minimal_db_initialization, setup_confi """GET /jobs/?details=true""" config, db = setup_config job_id = insert_job( - minimal_db_initialization, res=[(60, [("resource_id=8", "")])], properties="" + minimal_db_initialization, res=[(60, [("resource_id=4", "")])], properties="" ) meta_schedule(minimal_db_initialization, config, "internal") res = client.get("/jobs/{}?details=true".format(job_id)) @@ -95,6 +95,11 @@ def test_app_jobs_get_resources(client, minimal_db_initialization, setup_config) ) meta_schedule(minimal_db_initialization, config, "internal") res = client.get("/jobs/{}/resources".format(job_id)) + + for ar in minimal_db_initialization.query(AssignedResource).all(): + print(vars(ar)) + + print(res.json()) assert len(res.json()["items"]) == 4