diff --git a/protos/Crane.proto b/protos/Crane.proto index edf0b87c..09ccfb49 100644 --- a/protos/Crane.proto +++ b/protos/Crane.proto @@ -191,7 +191,15 @@ message QueryPartitionInfoRequest { } message QueryPartitionInfoReply { - repeated PartitionInfo partition_info = 1; + repeated PartitionInfo partition_info_list = 1; +} + +message QueryReservationInfoRequest { + string reservation_name = 1; +} + +message QueryReservationInfoReply { + repeated ReservationInfo reservation_info_list = 3; } message ModifyTaskRequest { @@ -449,6 +457,28 @@ message QueryTasksInfoReply{ repeated TaskInfo task_info_list = 2; } +message CreateReservationRequest { + string reservation_name = 1; + int64 start_time_seconds = 2; + int64 duration_seconds = 3; + string partition = 4; + string craned_regex = 5; +} + +message CreateReservationReply { + bool ok = 1; + string reason = 2; +} + +message DeleteReservationRequest { + string reservation_name = 1; +} + +message DeleteReservationReply { + bool ok = 1; + string reason = 2; +} + message StreamCallocRequest { enum CallocRequestType { TASK_REQUEST = 0; @@ -767,6 +797,7 @@ service CraneCtld { /* PRCs called from ccontrol */ rpc QueryCranedInfo(QueryCranedInfoRequest) returns (QueryCranedInfoReply); rpc QueryPartitionInfo(QueryPartitionInfoRequest) returns (QueryPartitionInfoReply); + rpc QueryReservationInfo(QueryReservationInfoRequest) returns (QueryReservationInfoReply); rpc ModifyTask(ModifyTaskRequest) returns (ModifyTaskReply); rpc ModifyNode(ModifyCranedStateRequest) returns (ModifyCranedStateReply); @@ -794,6 +825,8 @@ service CraneCtld { /* common RPCs */ rpc QueryTasksInfo(QueryTasksInfoRequest) returns (QueryTasksInfoReply); + rpc CreateReservation(CreateReservationRequest) returns (CreateReservationReply); + rpc DeleteReservation(DeleteReservationRequest) returns (DeleteReservationReply); } service Craned { diff --git a/protos/PublicDefs.proto b/protos/PublicDefs.proto index 8c328d86..02a239cc 100644 --- a/protos/PublicDefs.proto +++ b/protos/PublicDefs.proto @@ -303,6 +303,18 @@ message CranedInfo { google.protobuf.Timestamp last_busy_time = 16; } +message ReservationInfo { + string reservation_name = 1; + google.protobuf.Timestamp start_time = 2; + google.protobuf.Duration duration = 3; + string partition = 4; + string craned_regex = 5; + + ResourceView res_total = 6; + ResourceView res_avail = 7; + ResourceView res_alloc = 8; +} + message TrimmedPartitionInfo { message TrimmedCranedInfo { CranedResourceState resource_state = 1; diff --git a/src/CraneCtld/CranedMetaContainer.cpp b/src/CraneCtld/CranedMetaContainer.cpp index 3023221c..efdbbe62 100644 --- a/src/CraneCtld/CranedMetaContainer.cpp +++ b/src/CraneCtld/CranedMetaContainer.cpp @@ -43,7 +43,7 @@ void CranedMetaContainer::CranedUp(const CranedId& craned_id) { std::vector::ExclusivePtr> part_meta_ptrs; part_meta_ptrs.reserve(part_ids.size()); - auto raw_part_metas_map_ = partition_metas_map_.GetMapSharedPtr(); + auto raw_part_metas_map_ = partition_meta_map_.GetMapSharedPtr(); // Acquire all partition locks first. for (PartitionId const& part_id : part_ids) @@ -84,7 +84,7 @@ void CranedMetaContainer::CranedDown(const CranedId& craned_id) { std::vector::ExclusivePtr> part_meta_ptrs; part_meta_ptrs.reserve(part_ids.size()); - auto raw_part_metas_map_ = partition_metas_map_.GetMapSharedPtr(); + auto raw_part_metas_map_ = partition_meta_map_.GetMapSharedPtr(); // Acquire all partition locks first. for (PartitionId const& part_id : part_ids) { @@ -123,7 +123,7 @@ bool CranedMetaContainer::CheckCranedOnline(const CranedId& craned_id) { CranedMetaContainer::PartitionMetaPtr CranedMetaContainer::GetPartitionMetasPtr( PartitionId partition_id) { - return partition_metas_map_.GetValueExclusivePtr(partition_id); + return partition_meta_map_.GetValueExclusivePtr(partition_id); } CranedMetaContainer::CranedMetaPtr CranedMetaContainer::GetCranedMetaPtr( @@ -131,9 +131,14 @@ CranedMetaContainer::CranedMetaPtr CranedMetaContainer::GetCranedMetaPtr( return craned_meta_map_.GetValueExclusivePtr(craned_id); } +CranedMetaContainer::ReservationMetaPtr +CranedMetaContainer::GetReservationMetaPtr(const ReservationId& name) { + return reservation_meta_map_.GetValueExclusivePtr(name); +} + CranedMetaContainer::AllPartitionsMetaMapConstPtr CranedMetaContainer::GetAllPartitionsMetaMapConstPtr() { - return partition_metas_map_.GetMapConstSharedPtr(); + return partition_meta_map_.GetMapConstSharedPtr(); } CranedMetaContainer::CranedMetaMapConstPtr @@ -141,6 +146,16 @@ CranedMetaContainer::GetCranedMetaMapConstPtr() { return craned_meta_map_.GetMapConstSharedPtr(); } +CranedMetaContainer::ReservationMetaMapConstPtr +CranedMetaContainer::GetReservationMetaMapConstPtr() { + return reservation_meta_map_.GetMapConstSharedPtr(); +} + +CranedMetaContainer::ReservationMetaMapPtr +CranedMetaContainer::GetReservationMetaMapPtr() { + return reservation_meta_map_.GetMapSharedPtr(); +} + void CranedMetaContainer::MallocResourceFromNode(CranedId node_id, task_id_t task_id, const ResourceV2& resources) { @@ -154,7 +169,7 @@ void CranedMetaContainer::MallocResourceFromNode(CranedId node_id, std::vector::ExclusivePtr> part_meta_ptrs; part_meta_ptrs.reserve(part_ids.size()); - auto raw_part_metas_map_ = partition_metas_map_.GetMapSharedPtr(); + auto raw_part_metas_map_ = partition_meta_map_.GetMapSharedPtr(); // Acquire all partition locks first. for (PartitionId const& part_id : part_ids) @@ -192,7 +207,7 @@ void CranedMetaContainer::FreeResourceFromNode(CranedId node_id, std::vector::ExclusivePtr> part_meta_ptrs; part_meta_ptrs.reserve(part_ids.size()); - auto raw_part_metas_map_ = partition_metas_map_.GetMapSharedPtr(); + auto raw_part_metas_map_ = partition_meta_map_.GetMapSharedPtr(); // Acquire all partition locks first. for (PartitionId const& part_id : part_ids) @@ -298,7 +313,7 @@ void CranedMetaContainer::InitFromConfig(const Config& config) { } craned_meta_map_.InitFromMap(std::move(craned_map)); - partition_metas_map_.InitFromMap(std::move(partition_map)); + partition_meta_map_.InitFromMap(std::move(partition_map)); } crane::grpc::QueryCranedInfoReply CranedMetaContainer::QueryAllCranedInfo() { @@ -316,7 +331,7 @@ crane::grpc::QueryCranedInfoReply CranedMetaContainer::QueryAllCranedInfo() { } crane::grpc::QueryCranedInfoReply CranedMetaContainer::QueryCranedInfo( - const std::string& node_name) { + const CranedId& node_name) { crane::grpc::QueryCranedInfoReply reply; auto* list = reply.mutable_craned_info_list(); @@ -335,9 +350,9 @@ crane::grpc::QueryCranedInfoReply CranedMetaContainer::QueryCranedInfo( crane::grpc::QueryPartitionInfoReply CranedMetaContainer::QueryAllPartitionInfo() { crane::grpc::QueryPartitionInfoReply reply; - auto* list = reply.mutable_partition_info(); + auto* list = reply.mutable_partition_info_list(); - auto partition_map = partition_metas_map_.GetMapConstSharedPtr(); + auto partition_map = partition_meta_map_.GetMapConstSharedPtr(); for (auto&& [part_name, part_meta_ptr] : *partition_map) { auto* part_info = list->Add(); @@ -367,13 +382,13 @@ CranedMetaContainer::QueryAllPartitionInfo() { } crane::grpc::QueryPartitionInfoReply CranedMetaContainer::QueryPartitionInfo( - const std::string& partition_name) { + const PartitionId& partition_name) { crane::grpc::QueryPartitionInfoReply reply; - auto* list = reply.mutable_partition_info(); + auto* list = reply.mutable_partition_info_list(); - if (!partition_metas_map_.Contains(partition_name)) return reply; + if (!partition_meta_map_.Contains(partition_name)) return reply; - auto part_meta = partition_metas_map_.GetValueExclusivePtr(partition_name); + auto part_meta = partition_meta_map_.GetValueExclusivePtr(partition_name); auto* part_info = list->Add(); part_info->set_name(part_meta->partition_global_meta.name); @@ -397,6 +412,74 @@ crane::grpc::QueryPartitionInfoReply CranedMetaContainer::QueryPartitionInfo( return reply; } +crane::grpc::QueryReservationInfoReply +CranedMetaContainer::QueryAllReservationInfo() { + crane::grpc::QueryReservationInfoReply reply; + auto* list = reply.mutable_reservation_info_list(); + + auto reservation_map = reservation_meta_map_.GetMapConstSharedPtr(); + for (auto&& [reservation_id, reservation_meta_ptr] : *reservation_map) { + auto reservation_meta = reservation_meta_ptr.GetExclusivePtr(); + + auto* reservation_info = list->Add(); + + reservation_info->set_reservation_name(reservation_id); + reservation_info->mutable_start_time()->set_seconds( + absl::ToUnixSeconds(reservation_meta->start_time)); + reservation_info->mutable_duration()->set_seconds( + absl::ToUnixSeconds(reservation_meta->end_time) - + absl::ToUnixSeconds(reservation_meta->start_time)); + reservation_info->set_partition(reservation_meta->partition_id); + reservation_info->set_craned_regex( + util::HostNameListToStr(reservation_meta->craned_ids)); + *reservation_info->mutable_res_total() = + static_cast( + reservation_meta->resources_total); + *reservation_info->mutable_res_avail() = + static_cast( + reservation_meta->resources_avail); + *reservation_info->mutable_res_alloc() = + static_cast( + reservation_meta->resources_in_use); + } + return reply; +} + +crane::grpc::QueryReservationInfoReply +CranedMetaContainer::QueryReservationInfo( + const ReservationId& reservation_name) { + crane::grpc::QueryReservationInfoReply reply; + auto* list = reply.mutable_reservation_info_list(); + + if (!reservation_meta_map_.Contains(reservation_name)) { + return reply; + } + + auto reservation_meta = + reservation_meta_map_.GetValueExclusivePtr(reservation_name); + + auto* reservation_info = list->Add(); + + reservation_info->set_reservation_name(reservation_name); + reservation_info->mutable_start_time()->set_seconds( + absl::ToUnixSeconds(reservation_meta->start_time)); + reservation_info->mutable_duration()->set_seconds( + absl::ToUnixSeconds(reservation_meta->end_time) - + absl::ToUnixSeconds(reservation_meta->start_time)); + reservation_info->set_partition(reservation_meta->partition_id); + reservation_info->set_craned_regex( + util::HostNameListToStr(reservation_meta->craned_ids)); + *reservation_info->mutable_res_total() = + static_cast(reservation_meta->resources_total); + *reservation_info->mutable_res_avail() = + static_cast(reservation_meta->resources_avail); + *reservation_info->mutable_res_alloc() = + static_cast( + reservation_meta->resources_in_use); + + return reply; +} + crane::grpc::QueryClusterInfoReply CranedMetaContainer::QueryClusterInfo( const crane::grpc::QueryClusterInfoRequest& request) { crane::grpc::QueryClusterInfoReply reply; @@ -441,8 +524,8 @@ crane::grpc::QueryClusterInfoReply CranedMetaContainer::QueryClusterInfo( resource_filters[static_cast(it)] = true; // Ensure that the map global read lock is held during the following filtering - // operations and partition_metas_map_ must be locked before craned_meta_map_ - auto partition_map = partition_metas_map_.GetMapConstSharedPtr(); + // operations and partition_meta_map_ must be locked before craned_meta_map_ + auto partition_map = partition_meta_map_.GetMapConstSharedPtr(); auto craned_map = craned_meta_map_.GetMapConstSharedPtr(); auto partition_rng = @@ -455,7 +538,7 @@ crane::grpc::QueryClusterInfoReply CranedMetaContainer::QueryClusterInfo( // we make a copy of these craned ids to lower overall latency. // The amortized cost is 1 copy for each craned node. // Although an extra copying cost is introduced, - // the time of accessing partition_metas_map_ is minimized and + // the time of accessing partition_meta_map_ is minimized and // the copying cost is taken only by this grpc thread handling // cinfo request. // Since we assume the number of cpu cores is sufficient on the @@ -583,7 +666,7 @@ void CranedMetaContainer::AddDedicatedResource( std::vector::ExclusivePtr> part_meta_ptrs; part_meta_ptrs.reserve(part_ids.size()); - auto raw_part_metas_map_ = partition_metas_map_.GetMapSharedPtr(); + auto raw_part_metas_map_ = partition_meta_map_.GetMapSharedPtr(); // Acquire all partition locks first. for (PartitionId const& part_id : part_ids) diff --git a/src/CraneCtld/CranedMetaContainer.h b/src/CraneCtld/CranedMetaContainer.h index 38d995d2..cc020709 100644 --- a/src/CraneCtld/CranedMetaContainer.h +++ b/src/CraneCtld/CranedMetaContainer.h @@ -54,10 +54,18 @@ class CranedMetaContainer final { util::AtomicHashMap; using CranedMetaRawMap = CranedMetaAtomicMap::RawMap; + using ReservationMetaAtomicMap = + util::AtomicHashMap; + using ReservationMetaRawMap = ReservationMetaAtomicMap::RawMap; + using AllPartitionsMetaMapConstPtr = util::ScopeConstSharedPtr; using CranedMetaMapConstPtr = util::ScopeConstSharedPtr; + using ReservationMetaMapConstPtr = + util::ScopeConstSharedPtr; + + using ReservationMetaMapPtr = ReservationMetaAtomicMap::MapSharedPtr; using PartitionMetaPtr = util::ManagedScopeExclusivePtr; + using ReservationMetaPtr = + util::ManagedScopeExclusivePtr; CranedMetaContainer() = default; ~CranedMetaContainer() = default; @@ -76,13 +87,17 @@ class CranedMetaContainer final { crane::grpc::QueryCranedInfoReply QueryAllCranedInfo(); - crane::grpc::QueryCranedInfoReply QueryCranedInfo( - const std::string& node_name); + crane::grpc::QueryCranedInfoReply QueryCranedInfo(const CranedId& node_name); crane::grpc::QueryPartitionInfoReply QueryAllPartitionInfo(); crane::grpc::QueryPartitionInfoReply QueryPartitionInfo( - const std::string& partition_name); + const PartitionId& partition_name); + + crane::grpc::QueryReservationInfoReply QueryAllReservationInfo(); + + crane::grpc::QueryReservationInfoReply QueryReservationInfo( + const ReservationId& reservation_name); crane::grpc::QueryClusterInfoReply QueryClusterInfo( const crane::grpc::QueryClusterInfoRequest& request); @@ -100,10 +115,16 @@ class CranedMetaContainer final { CranedMetaPtr GetCranedMetaPtr(CranedId craned_id); + ReservationMetaPtr GetReservationMetaPtr(const ReservationId& name); + AllPartitionsMetaMapConstPtr GetAllPartitionsMetaMapConstPtr(); CranedMetaMapConstPtr GetCranedMetaMapConstPtr(); + ReservationMetaMapConstPtr GetReservationMetaMapConstPtr(); + + ReservationMetaMapPtr GetReservationMetaMapPtr(); + bool CheckCranedAllowed(const std::string& hostname) { return craned_meta_map_.Contains(hostname); }; @@ -116,12 +137,13 @@ class CranedMetaContainer final { private: // In this part of code, the following lock sequence MUST be held // to avoid deadlock: - // 1. lock elements in partition_metas_map_ + // 1. lock elements in partition_meta_map_ // 2. lock elements in craned_meta_map_ // 3. unlock elements in craned_meta_map_ - // 4. unlock elements in partition_metas_map_ + // 4. unlock elements in partition_meta_map_ CranedMetaAtomicMap craned_meta_map_; - AllPartitionsMetaAtomicMap partition_metas_map_; + AllPartitionsMetaAtomicMap partition_meta_map_; + ReservationMetaAtomicMap reservation_meta_map_; // A craned node may belong to multiple partitions. // Use this map as a READ-ONLY index, so multi-thread reading is ok. diff --git a/src/CraneCtld/CtldGrpcServer.cpp b/src/CraneCtld/CtldGrpcServer.cpp index 724fcf2b..6435b29a 100644 --- a/src/CraneCtld/CtldGrpcServer.cpp +++ b/src/CraneCtld/CtldGrpcServer.cpp @@ -151,6 +151,20 @@ grpc::Status CraneCtldServiceImpl::QueryPartitionInfo( return grpc::Status::OK; } +grpc::Status CraneCtldServiceImpl::QueryReservationInfo( + grpc::ServerContext *context, + const crane::grpc::QueryReservationInfoRequest *request, + crane::grpc::QueryReservationInfoReply *response) { + if (request->reservation_name().empty()) { + *response = g_meta_container->QueryAllReservationInfo(); + } else { + *response = + g_meta_container->QueryReservationInfo(request->reservation_name()); + } + + return grpc::Status::OK; +} + grpc::Status CraneCtldServiceImpl::ModifyTask( grpc::ServerContext *context, const crane::grpc::ModifyTaskRequest *request, crane::grpc::ModifyTaskReply *response) { @@ -694,6 +708,22 @@ grpc::Status CraneCtldServiceImpl::QueryClusterInfo( return grpc::Status::OK; } +grpc::Status CraneCtldServiceImpl::CreateReservation( + grpc::ServerContext *context, + const crane::grpc::CreateReservationRequest *request, + crane::grpc::CreateReservationReply *response) { + *response = g_task_scheduler->CreateReservation(*request); + return grpc::Status::OK; +} + +grpc::Status CraneCtldServiceImpl::DeleteReservation( + grpc::ServerContext *context, + const crane::grpc::DeleteReservationRequest *request, + crane::grpc::DeleteReservationReply *response) { + *response = g_task_scheduler->DeleteReservation(*request); + return grpc::Status::OK; +} + grpc::Status CraneCtldServiceImpl::CforedStream( grpc::ServerContext *context, grpc::ServerReaderWriter running_task_resource_map; + absl::flat_hash_map reservation_resource_map; }; struct PartitionGlobalMeta { @@ -261,6 +262,21 @@ struct InteractiveMetaInTask { std::atomic has_been_terminated_on_craned{false}; }; +struct ReservationMeta { + ReservationId name; + ResourceView resources_total; + ResourceView resources_avail; + ResourceView resources_in_use; + + absl::Time start_time; + absl::Time end_time; + + PartitionId partition_id; + std::list craned_ids; + + ResourceV2 allocatable_res; +}; + struct BatchMetaInTask { std::string sh_script; std::string output_file_pattern; diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 58efa354..eca54a9d 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -1255,7 +1255,7 @@ crane::grpc::CancelTaskReply TaskScheduler::CancelPendingOrRunningTask( operator_uid, task->Username(), false); if (!result) { reply.add_not_cancelled_tasks(task_id); - reply.add_not_cancelled_reasons("Permission Denied."); + reply.add_not_cancelled_reasons("Permission Denied"); } else { reply.add_cancelled_tasks(task_id); @@ -1277,7 +1277,7 @@ crane::grpc::CancelTaskReply TaskScheduler::CancelPendingOrRunningTask( operator_uid, task->Username(), false); if (!result) { reply.add_not_cancelled_tasks(task_id); - reply.add_not_cancelled_reasons("Permission Denied."); + reply.add_not_cancelled_reasons("Permission Denied"); } else { bool is_calloc = false; if (task->type == crane::grpc::Interactive) { @@ -1338,6 +1338,195 @@ crane::grpc::CancelTaskReply TaskScheduler::CancelPendingOrRunningTask( return reply; } +crane::grpc::CreateReservationReply TaskScheduler::CreateReservation( + const crane::grpc::CreateReservationRequest& request) { + crane::grpc::CreateReservationReply reply; + + ReservationId reservation_name = request.reservation_name(); + absl::Time start_time = absl::FromUnixSeconds(request.start_time_seconds()); + if (start_time < absl::Now() + absl::Seconds(kReservationMinAdvanceSec)) { + reply.set_ok(false); + reply.set_reason("Reservation start time is too close"); + return reply; + } + absl::Duration duration = absl::Seconds(request.duration_seconds()); + absl::Time end_time = start_time + duration; + PartitionId partition = request.partition(); + // TODO: Add support for partial node reservation + ResourceView resources; + bool whole_node = true; + + std::list craned_ids; + if (!util::ParseHostList(request.craned_regex(), &craned_ids)) { + reply.set_ok(false); + reply.set_reason("Invalid craned_regex"); + return reply; + } + + auto all_partitions_meta_map = + g_meta_container->GetAllPartitionsMetaMapConstPtr(); + if (!all_partitions_meta_map->contains(partition)) { + reply.set_ok(false); + reply.set_reason(fmt::format("Partition {} not found", partition)); + return reply; + } + const util::Synchronized& partition_meta_ptr = + all_partitions_meta_map->at(partition); + auto craned_meta_map = g_meta_container->GetCranedMetaMapConstPtr(); + auto reservation_meta_map = g_meta_container->GetReservationMetaMapPtr(); + + if (reservation_meta_map->contains(reservation_name)) { + reply.set_ok(false); + reply.set_reason("Reservation name already exists"); + return reply; + } + + std::vector> + craned_meta_res_vec; + + ResourceV2 allocated_res; + for (CranedId const& craned_id : craned_ids) { + if (!partition_meta_ptr.GetExclusivePtr()->craned_ids.contains(craned_id)) { + reply.set_ok(false); + reply.set_reason( + fmt::format("Node {} is not in partition {}", craned_id, partition)); + return reply; + } + } + + { + LockGuard running_guard(&m_running_task_map_mtx_); + + for (CranedId const& craned_id : craned_ids) { + auto craned_meta = g_meta_container->GetCranedMetaPtr(craned_id); + if (craned_meta.get() == nullptr) { + reply.set_ok(false); + reply.set_reason(fmt::format("Node {} not found", craned_id)); + return reply; + } + ResourceInNode res_avail = craned_meta->res_total; + for (const auto& [task_id, res] : + craned_meta->running_task_resource_map) { + const auto& task = m_running_task_map_.at(task_id); + absl::Time task_end_time = task->StartTime() + task->time_limit; + if (task_end_time > start_time) { + if (whole_node) { + reply.set_ok(false); + reply.set_reason( + fmt::format("Node {} has running tasks that end after the " + "reservation start time", + craned_id)); + return reply; + } + res_avail -= res; + } + } + for (const auto& [reservation_name, res] : + craned_meta->reservation_resource_map) { + const auto& reservation = + reservation_meta_map->at(reservation_name).GetExclusivePtr(); + if (reservation->start_time < end_time && + reservation->end_time > start_time) { + if (whole_node) { + reply.set_ok(false); + reply.set_reason( + fmt::format("Node {} has reservations that overlap with the " + "new reservation", + craned_id)); + return reply; + } + res_avail -= res; + } + } + ResourceInNode feasible_res; + if (whole_node) { + feasible_res = res_avail; + } else { + bool ok = resources.GetFeasibleResourceInNode(res_avail, &feasible_res); + if (!ok) { + reply.set_ok(false); + reply.set_reason(fmt::format( + "Node {} does not have enough resources for the reservation", + craned_id)); + return reply; + } + } + + allocated_res.AddResourceInNode(craned_id, feasible_res); + craned_meta_res_vec.emplace_back(std::move(craned_meta), + std::move(feasible_res)); + } + } + + const auto& [it, ok] = reservation_meta_map->emplace( + reservation_name, ReservationMeta{ + .name = reservation_name, + .resources_total = resources, + .resources_avail = resources, + .resources_in_use = ResourceView(), + .start_time = start_time, + .end_time = end_time, + .partition_id = partition, + .craned_ids = craned_ids, + .allocatable_res = allocated_res, + }); + if (!ok) { + CRANE_ERROR("Failed to insert reservation meta for reservation {}", + reservation_name); + reply.set_ok(false); + reply.set_reason("Failed to insert reservation meta"); + } + for (auto& [craned_meta, res] : craned_meta_res_vec) { + const auto& [it, ok] = craned_meta->reservation_resource_map.emplace( + reservation_name, std::move(res)); + if (!ok) { + CRANE_ERROR("Failed to insert reservation resource to {}", + craned_meta->static_meta.hostname); + continue; + } + } + + reply.set_ok(true); + return reply; +} + +crane::grpc::DeleteReservationReply TaskScheduler::DeleteReservation( + const crane::grpc::DeleteReservationRequest& request) { + crane::grpc::DeleteReservationReply reply; + + ReservationId reservation_name = request.reservation_name(); + auto reservation_meta_map = g_meta_container->GetReservationMetaMapPtr(); + + if (!reservation_meta_map->contains(reservation_name)) { + reply.set_ok(false); + reply.set_reason(fmt::format("Reservation {} not found", reservation_name)); + return reply; + } + + auto& reservation_meta = reservation_meta_map->at(reservation_name); + for (const auto& craned_id : reservation_meta.GetExclusivePtr()->craned_ids) { + auto craned_meta = g_meta_container->GetCranedMetaPtr(craned_id); + if (craned_meta.get() == nullptr) { + CRANE_ERROR("Node {} not found when deleting reservation {}", craned_id, + reservation_name); + continue; + } + auto& reservation_resource_map = craned_meta->reservation_resource_map; + if (!reservation_resource_map.contains(reservation_name)) { + CRANE_ERROR( + "Reservation not found on node {} when deleting reservation {}", + craned_id, reservation_name); + continue; + } + reservation_resource_map.erase(reservation_name); + } + + reservation_meta_map->erase(reservation_name); + + reply.set_ok(true); + return reply; +} + void TaskScheduler::CleanTaskTimerCb_() { m_clean_task_timer_queue_handle_->send(); } @@ -1859,6 +2048,8 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( // Sort all running task in this node by ending time. std::vector> end_time_task_id_vec; + std::vector>> + time_res_vec; node_selection_info_ref.task_num_node_id_map.emplace( craned_meta->running_task_resource_map.size(), craned_id); @@ -1877,6 +2068,7 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( absl::Time end_time = std::max(task->StartTime() + task->time_limit, now + absl::Seconds(1)); end_time_task_id_vec.emplace_back(end_time, task_id); + time_res_vec.emplace_back(end_time, std::make_pair(true, res)); running_task_ids_str.emplace_back(std::to_string(task_id)); } @@ -1886,11 +2078,11 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( absl::StrJoin(running_task_ids_str, ", ")); } - std::sort( - end_time_task_id_vec.begin(), end_time_task_id_vec.end(), - [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }); - if constexpr (kAlgoTraceOutput) { + std::sort(end_time_task_id_vec.begin(), end_time_task_id_vec.end(), + [](const auto& lhs, const auto& rhs) { + return lhs.first < rhs.first; + }); if (!end_time_task_id_vec.empty()) { std::string str; str.append( @@ -1903,6 +2095,26 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( } } + for (const auto& [ReservationId, res] : + craned_meta->reservation_resource_map) { + const auto& reservation = + g_meta_container->GetReservationMetaPtr(ReservationId); + if (reservation.get() == nullptr) { + CRANE_ERROR("Failed to get reservation meta for reservation {}", + ReservationId); + continue; + } + // TODO: expired reservation should be removed from the map. + time_res_vec.emplace_back(std::max(now, reservation->start_time), + std::make_pair(false, res)); + time_res_vec.emplace_back(std::max(now, reservation->end_time), + std::make_pair(true, res)); + } + + std::sort( + time_res_vec.begin(), time_res_vec.end(), + [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }); + // Calculate how many resources are available at [now, first task end, // second task end, ...] in this node. auto& time_avail_res_map = @@ -1922,8 +2134,8 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( { // Limit the scope of `iter` auto cur_time_iter = time_avail_res_map.find(now); bool ok; - for (auto& [end_time, task_id] : end_time_task_id_vec) { - const auto& running_task = running_tasks.at(task_id); + for (auto& [end_time, res_info] : time_res_vec) { + const auto& [is_end, res] = res_info; if (!time_avail_res_map.contains(end_time)) { /** * If there isn't any task that ends at the `end_time`, @@ -1955,7 +2167,11 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_( * {{now+1+1: available_res(now) + available_res(1) + * available_res(2)}, ...} */ - cur_time_iter->second += running_task->Resources().at(craned_id); + if (is_end) { + cur_time_iter->second += res; + } else { + cur_time_iter->second -= res; + } if constexpr (kAlgoTraceOutput) { CRANE_TRACE( diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index 32eab650..4c45cd79 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -282,6 +282,12 @@ class TaskScheduler { return TerminateRunningTaskNoLock_(iter->second.get()); } + crane::grpc::CreateReservationReply CreateReservation( + const crane::grpc::CreateReservationRequest& request); + + crane::grpc::DeleteReservationReply DeleteReservation( + const crane::grpc::DeleteReservationRequest& request); + static CraneErr AcquireTaskAttributes(TaskInCtld* task); static CraneErr CheckTaskValidity(TaskInCtld* task); diff --git a/src/Utilities/PublicHeader/include/crane/PublicHeader.h b/src/Utilities/PublicHeader/include/crane/PublicHeader.h index 8802d4a8..b9dc0838 100644 --- a/src/Utilities/PublicHeader/include/crane/PublicHeader.h +++ b/src/Utilities/PublicHeader/include/crane/PublicHeader.h @@ -90,6 +90,8 @@ constexpr uint64_t kTaskMinTimeLimitSec = 11; constexpr int64_t kTaskMaxTimeLimitSec = google::protobuf::util::TimeUtil::kDurationMaxSeconds; +constexpr uint64_t kReservationMinAdvanceSec = 0; + namespace ExitCode { inline constexpr size_t kExitStatusNum = 256; @@ -153,6 +155,7 @@ inline std::string_view CraneErrStr(CraneErr err) { using PartitionId = std::string; using CranedId = std::string; +using ReservationId = std::string; using cpu_t = fpm::fixed_24_8; // Device path. e.g. /dev/nvidia0