diff --git a/src/CraneCtld/AccountManager.cpp b/src/CraneCtld/AccountManager.cpp index 5dc623c3..09c84487 100644 --- a/src/CraneCtld/AccountManager.cpp +++ b/src/CraneCtld/AccountManager.cpp @@ -834,12 +834,6 @@ AccountManager::CraneExpected AccountManager::ModifyQos( Qos qos; g_db_client->SelectQos("name", name, &qos); - // Modify QosResource when max_jobs_per_user or max_cpus_per_user is changed. - if (modify_field == crane::grpc::ModifyField::MaxJobsPerUser || - modify_field == crane::grpc::ModifyField::MaxCpusPerUser) - g_account_meta_container->ModifyQosResourceOnUser( - name, QosResource{qos.max_cpus_per_user, qos.max_jobs_per_user}); - *m_qos_map_[name] = std::move(qos); return {}; @@ -977,15 +971,10 @@ result::result AccountManager::CheckAndApplyQosLimitOnTask( } else if (task->time_limit > qos_share_ptr->max_time_limit_per_task) return result::fail("time-limit reached the user's limit."); - if (static_cast(task->cpus_per_task) > - qos_share_ptr->max_cpus_per_user) + if (!g_account_meta_container->CheckAndMallocQosResourceFromUser( + user_share_ptr->name, *task, *qos_share_ptr)) return result::fail("cpus-per-task reached the user's limit."); - g_account_meta_container->AddQosResourceToUser( - user_share_ptr->name, qos_share_ptr->name, - QosResource{qos_share_ptr->max_cpus_per_user, - qos_share_ptr->max_jobs_per_user}); - return {}; } diff --git a/src/CraneCtld/AccountMetaContainer.cpp b/src/CraneCtld/AccountMetaContainer.cpp index ffac6fe4..0a113762 100644 --- a/src/CraneCtld/AccountMetaContainer.cpp +++ b/src/CraneCtld/AccountMetaContainer.cpp @@ -22,123 +22,44 @@ namespace Ctld { -AccountMetaContainer::AccountMetaContainer() { InitFromDB_(); } +bool AccountMetaContainer::CheckAndMallocQosResourceFromUser( + const std::string& username, const TaskInCtld& task, const Qos& qos) { + if (static_cast(task.cpus_per_task) > qos.max_cpus_per_user || + qos.max_jobs_per_user == 0) + return false; -void AccountMetaContainer::AddQosResourceToUser( - const std::string& username, const std::string& qos_name, - const QosResource& qos_resource) { - if (!user_meta_map_.contains(username)) - user_meta_map_.emplace(username, QosToQosResourceMap{}); + bool result = true; - TryEmplace_(username, qos_name, qos_resource); -} - -void AccountMetaContainer::ModifyQosResourceOnUser( - const std::string& qos_name, const QosResource& qos_resource) { - for (const auto& [username, _] : user_meta_map_) { - TryEmplace_(username, qos_name, qos_resource); - } -} + ResourceView resource_view{}; + resource_view.GetAllocatableRes().cpu_count = task.cpus_per_task; -void AccountMetaContainer::FreeQosResource(const std::string& username, - const TaskInCtld& task) { - uint32_t cpus_per_task = static_cast(task.cpus_per_task); - user_meta_map_[username].modify_if( - task.qos, [&](std::pair& pair) { + user_meta_map_[username].qos_resource_in_use.try_emplace_l( + task.qos, + [&](std::pair& pair) { auto& val = pair.second; - val.res_avail.cpus_per_user += cpus_per_task; - val.res_avail.jobs_per_user += 1; - val.res_in_use.cpus_per_user -= cpus_per_task; - val.res_in_use.jobs_per_user -= 1; - - if (val.res_avail.cpus_per_user > val.res_total.cpus_per_user) - val.res_avail.cpus_per_user = val.res_total.cpus_per_user; - if (val.res_avail.jobs_per_user > val.res_total.jobs_per_user) - val.res_avail.jobs_per_user = val.res_total.jobs_per_user; - }); -} - -bool AccountMetaContainer::CheckQosLimitOnUser(const std::string& username, - const TaskInCtld& task) { - uint32_t cpus_per_task = static_cast(task.cpus_per_task); + if (val.resource.CpuCount() + static_cast(task.cpus_per_task) > + qos.max_cpus_per_user || + val.jobs_per_user >= qos.max_jobs_per_user) { + result = false; + return; + } - bool result = false; - user_meta_map_[username].if_contains( - task.qos, [&](const std::pair& pair) { - const auto& val = pair.second; - if (val.res_avail.cpus_per_user >= cpus_per_task && - val.res_avail.jobs_per_user > 0) - result = true; - }); + val.resource.GetAllocatableRes().cpu_count += task.cpus_per_task; + val.jobs_per_user++; + }, + QosResource{resource_view, 1}); return result; } -void AccountMetaContainer::MallocQosResourceFromUser( - const std::string& username, const TaskInCtld& task) { - uint32_t cpus_per_task = static_cast(task.cpus_per_task); - user_meta_map_[username].modify_if( - task.qos, [&](std::pair& pair) { +void AccountMetaContainer::FreeQosResource(const std::string& username, + const TaskInCtld& task) { + user_meta_map_[username].qos_resource_in_use.modify_if( + task.qos, [&](std::pair& pair) { auto& val = pair.second; - val.res_avail.cpus_per_user -= cpus_per_task; - val.res_avail.jobs_per_user--; - val.res_in_use.cpus_per_user += cpus_per_task; - val.res_in_use.jobs_per_user++; + val.resource.GetAllocatableRes().cpu_count -= task.cpus_per_task; + val.jobs_per_user--; }); } -void AccountMetaContainer::InitFromDB_() { - AccountManager::UserMapMutexSharedPtr all_user = - g_account_manager->GetAllUserInfo(); - // all users in user_map - for (const auto& [username, user] : *all_user) { - // query all qos in user account->partitioin->qos - for (const auto& [account, attrs_in_account] : user->account_to_attrs_map) { - for (const auto& [part, qos_list] : - attrs_in_account.allowed_partition_qos_map) { - // user qos list - for (const auto& qos_name : qos_list.second) { - // initialize - AccountManager::QosMutexSharedPtr qos = - g_account_manager->GetExistedQosInfo(qos_name); - QosResource qos_resource = - QosResource{qos->max_cpus_per_user, qos->max_jobs_per_user}; - TryEmplace_(username, qos_name, qos_resource); - } - } - } - } -} - -void AccountMetaContainer::TryEmplace_(const std::string& username, - const std::string& qos_name, - const QosResource& qos_resource) { - user_meta_map_[username].try_emplace_l( - qos_name, - [&](std::pair& pair) { - QosResourceLimit& val = pair.second; - if (val.res_total.cpus_per_user == qos_resource.cpus_per_user && - val.res_total.jobs_per_user == qos_resource.jobs_per_user) - return; - auto& avail = val.res_avail; - auto& total = val.res_total; - auto& in_use = val.res_in_use; - - if (qos_resource.cpus_per_user >= in_use.cpus_per_user) - avail.cpus_per_user = - qos_resource.cpus_per_user - in_use.cpus_per_user; - else - avail.cpus_per_user = 0; - - if (qos_resource.jobs_per_user >= in_use.jobs_per_user) - avail.jobs_per_user = - qos_resource.jobs_per_user - in_use.jobs_per_user; - else - avail.jobs_per_user = 0; - - total = qos_resource; - }, - QosResourceLimit{qos_resource, qos_resource, QosResource{}}); -} - } // namespace Ctld \ No newline at end of file diff --git a/src/CraneCtld/AccountMetaContainer.h b/src/CraneCtld/AccountMetaContainer.h index ac568ee6..b8da1875 100644 --- a/src/CraneCtld/AccountMetaContainer.h +++ b/src/CraneCtld/AccountMetaContainer.h @@ -25,40 +25,20 @@ namespace Ctld { class AccountMetaContainer final { public: - using QosToQosResourceMap = phmap::parallel_flat_hash_map< - std::string, // QosName - QosResourceLimit, phmap::priv::hash_default_hash, - phmap::priv::hash_default_eq, - std::allocator>, 4, - std::shared_mutex>; - using UserResourceMetaMap = std::unordered_map; + ResourcePerUser>; - AccountMetaContainer(); + AccountMetaContainer() = default; ~AccountMetaContainer() = default; - void AddQosResourceToUser(const std::string& username, - const std::string& qos_name, - const QosResource& qos_resource); - - void ModifyQosResourceOnUser(const std::string& qos_name, - const QosResource& qos_resource); + bool CheckAndMallocQosResourceFromUser(const std::string& username, + const TaskInCtld& task, + const Qos& qos); void FreeQosResource(const std::string& username, const TaskInCtld& task); - bool CheckQosLimitOnUser(const std::string& username, const TaskInCtld& task); - - void MallocQosResourceFromUser(const std::string& username, - const TaskInCtld& task); - private: UserResourceMetaMap user_meta_map_; - - void InitFromDB_(); - - void TryEmplace_(const std::string& username, const std::string& qos_name, - const QosResource& qos_resource); }; inline std::unique_ptr g_account_meta_container; diff --git a/src/CraneCtld/CraneCtld.cpp b/src/CraneCtld/CraneCtld.cpp index c2bb9d06..8818604c 100644 --- a/src/CraneCtld/CraneCtld.cpp +++ b/src/CraneCtld/CraneCtld.cpp @@ -25,7 +25,6 @@ #include #include -#include #include "AccountManager.h" #include "AccountMetaContainer.h" diff --git a/src/CraneCtld/CtldGrpcServer.cpp b/src/CraneCtld/CtldGrpcServer.cpp index 2449741e..a4b9d36e 100644 --- a/src/CraneCtld/CtldGrpcServer.cpp +++ b/src/CraneCtld/CtldGrpcServer.cpp @@ -959,8 +959,7 @@ CtldServer::SubmitTaskToScheduler(std::unique_ptr task) { task->Username(), task->partition_id, task->account)); } - auto enable_res = - g_account_manager->CheckIfUserOfAccountIsEnabled( + auto enable_res = g_account_manager->CheckIfUserOfAccountIsEnabled( task->Username(), task->account); if (enable_res.has_error()) { return result::fail(enable_res.error()); diff --git a/src/CraneCtld/CtldPublicDefs.h b/src/CraneCtld/CtldPublicDefs.h index b7a39439..9aecd838 100644 --- a/src/CraneCtld/CtldPublicDefs.h +++ b/src/CraneCtld/CtldPublicDefs.h @@ -704,14 +704,19 @@ inline bool CheckIfTimeLimitIsValid(absl::Duration d) { } struct QosResource { - uint32_t cpus_per_user; + ResourceView resource; uint32_t jobs_per_user; }; -struct QosResourceLimit { - QosResource res_total; - QosResource res_avail; - QosResource res_in_use; +struct ResourcePerUser { + using QosToQosResourceMap = phmap::parallel_flat_hash_map< + std::string, // QosName + QosResource, phmap::priv::hash_default_hash, + phmap::priv::hash_default_eq, + std::allocator>, 4, + std::shared_mutex>; + + QosToQosResourceMap qos_resource_in_use; }; } // namespace Ctld diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index cb683dfc..7da6fbee 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -522,7 +522,6 @@ void TaskScheduler::PutRecoveredTaskIntoRunningQueueLock_( for (const CranedId& craned_id : task->CranedIds()) g_meta_container->MallocResourceFromNode(craned_id, task->TaskId(), task->Resources()); - g_account_meta_container->MallocQosResourceFromUser(task->Username(), *task); // The order of LockGuards matters. LockGuard running_guard(&m_running_task_map_mtx_); LockGuard indexes_guard(&m_task_indexes_mtx_); @@ -2463,10 +2462,6 @@ void MinLoadFirst::NodeSelect( continue; } - bool apply_qos_result = g_account_meta_container->CheckQosLimitOnUser( - task->Username(), *task); - if (!apply_qos_result) continue; - // For pending tasks, the `start time` field in TaskInCtld means expected // start time and the `end time` is expected end time. // For running tasks, the `start time` means the time when it starts and @@ -2515,8 +2510,6 @@ void MinLoadFirst::NodeSelect( for (CranedId const& craned_id : craned_ids) g_meta_container->MallocResourceFromNode(craned_id, task->TaskId(), task->Resources()); - g_account_meta_container->MallocQosResourceFromUser(task->Username(), - *task); std::unique_ptr moved_task; // Move task out of pending_task_map and insert it to the