Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
huerni committed Dec 5, 2024
1 parent 475f71f commit e7cdbe8
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 159 deletions.
15 changes: 2 additions & 13 deletions src/CraneCtld/AccountManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -834,12 +834,6 @@ AccountManager::CraneExpected<void> AccountManager::ModifyQos(
Qos qos;
g_db_client->SelectQos("name", name, &qos);

// Modify QosResource when max_jobs_per_user or max_cpus_per_user is changed.
if (modify_field == crane::grpc::ModifyField::MaxJobsPerUser ||
modify_field == crane::grpc::ModifyField::MaxCpusPerUser)
g_account_meta_container->ModifyQosResourceOnUser(
name, QosResource{qos.max_cpus_per_user, qos.max_jobs_per_user});

*m_qos_map_[name] = std::move(qos);

return {};
Expand Down Expand Up @@ -977,15 +971,10 @@ result::result<void, std::string> AccountManager::CheckAndApplyQosLimitOnTask(
} else if (task->time_limit > qos_share_ptr->max_time_limit_per_task)
return result::fail("time-limit reached the user's limit.");

if (static_cast<double>(task->cpus_per_task) >
qos_share_ptr->max_cpus_per_user)
if (!g_account_meta_container->CheckAndMallocQosResourceFromUser(
user_share_ptr->name, *task, *qos_share_ptr))
return result::fail("cpus-per-task reached the user's limit.");

g_account_meta_container->AddQosResourceToUser(
user_share_ptr->name, qos_share_ptr->name,
QosResource{qos_share_ptr->max_cpus_per_user,
qos_share_ptr->max_jobs_per_user});

return {};
}

Expand Down
133 changes: 27 additions & 106 deletions src/CraneCtld/AccountMetaContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,123 +22,44 @@

namespace Ctld {

AccountMetaContainer::AccountMetaContainer() { InitFromDB_(); }
bool AccountMetaContainer::CheckAndMallocQosResourceFromUser(
const std::string& username, const TaskInCtld& task, const Qos& qos) {
if (static_cast<double>(task.cpus_per_task) > qos.max_cpus_per_user ||
qos.max_jobs_per_user == 0)
return false;

void AccountMetaContainer::AddQosResourceToUser(
const std::string& username, const std::string& qos_name,
const QosResource& qos_resource) {
if (!user_meta_map_.contains(username))
user_meta_map_.emplace(username, QosToQosResourceMap{});
bool result = true;

TryEmplace_(username, qos_name, qos_resource);
}

void AccountMetaContainer::ModifyQosResourceOnUser(
const std::string& qos_name, const QosResource& qos_resource) {
for (const auto& [username, _] : user_meta_map_) {
TryEmplace_(username, qos_name, qos_resource);
}
}
ResourceView resource_view{};
resource_view.GetAllocatableRes().cpu_count = task.cpus_per_task;

void AccountMetaContainer::FreeQosResource(const std::string& username,
const TaskInCtld& task) {
uint32_t cpus_per_task = static_cast<uint32_t>(task.cpus_per_task);
user_meta_map_[username].modify_if(
task.qos, [&](std::pair<const std::string, QosResourceLimit>& pair) {
user_meta_map_[username].qos_resource_in_use.try_emplace_l(
task.qos,
[&](std::pair<const std::string, QosResource>& pair) {
auto& val = pair.second;
val.res_avail.cpus_per_user += cpus_per_task;
val.res_avail.jobs_per_user += 1;
val.res_in_use.cpus_per_user -= cpus_per_task;
val.res_in_use.jobs_per_user -= 1;

if (val.res_avail.cpus_per_user > val.res_total.cpus_per_user)
val.res_avail.cpus_per_user = val.res_total.cpus_per_user;
if (val.res_avail.jobs_per_user > val.res_total.jobs_per_user)
val.res_avail.jobs_per_user = val.res_total.jobs_per_user;
});
}

bool AccountMetaContainer::CheckQosLimitOnUser(const std::string& username,
const TaskInCtld& task) {
uint32_t cpus_per_task = static_cast<uint32_t>(task.cpus_per_task);
if (val.resource.CpuCount() + static_cast<double>(task.cpus_per_task) >
qos.max_cpus_per_user ||
val.jobs_per_user >= qos.max_jobs_per_user) {
result = false;
return;
}

bool result = false;
user_meta_map_[username].if_contains(
task.qos, [&](const std::pair<std::string, QosResourceLimit>& pair) {
const auto& val = pair.second;
if (val.res_avail.cpus_per_user >= cpus_per_task &&
val.res_avail.jobs_per_user > 0)
result = true;
});
val.resource.GetAllocatableRes().cpu_count += task.cpus_per_task;
val.jobs_per_user++;
},
QosResource{resource_view, 1});

return result;
}

void AccountMetaContainer::MallocQosResourceFromUser(
const std::string& username, const TaskInCtld& task) {
uint32_t cpus_per_task = static_cast<uint32_t>(task.cpus_per_task);
user_meta_map_[username].modify_if(
task.qos, [&](std::pair<const std::string, QosResourceLimit>& pair) {
void AccountMetaContainer::FreeQosResource(const std::string& username,
const TaskInCtld& task) {
user_meta_map_[username].qos_resource_in_use.modify_if(
task.qos, [&](std::pair<const std::string, QosResource>& pair) {
auto& val = pair.second;
val.res_avail.cpus_per_user -= cpus_per_task;
val.res_avail.jobs_per_user--;
val.res_in_use.cpus_per_user += cpus_per_task;
val.res_in_use.jobs_per_user++;
val.resource.GetAllocatableRes().cpu_count -= task.cpus_per_task;
val.jobs_per_user--;
});
}

void AccountMetaContainer::InitFromDB_() {
AccountManager::UserMapMutexSharedPtr all_user =
g_account_manager->GetAllUserInfo();
// all users in user_map
for (const auto& [username, user] : *all_user) {
// query all qos in user account->partitioin->qos
for (const auto& [account, attrs_in_account] : user->account_to_attrs_map) {
for (const auto& [part, qos_list] :
attrs_in_account.allowed_partition_qos_map) {
// user qos list
for (const auto& qos_name : qos_list.second) {
// initialize
AccountManager::QosMutexSharedPtr qos =
g_account_manager->GetExistedQosInfo(qos_name);
QosResource qos_resource =
QosResource{qos->max_cpus_per_user, qos->max_jobs_per_user};
TryEmplace_(username, qos_name, qos_resource);
}
}
}
}
}

void AccountMetaContainer::TryEmplace_(const std::string& username,
const std::string& qos_name,
const QosResource& qos_resource) {
user_meta_map_[username].try_emplace_l(
qos_name,
[&](std::pair<const std::string, QosResourceLimit>& pair) {
QosResourceLimit& val = pair.second;
if (val.res_total.cpus_per_user == qos_resource.cpus_per_user &&
val.res_total.jobs_per_user == qos_resource.jobs_per_user)
return;
auto& avail = val.res_avail;
auto& total = val.res_total;
auto& in_use = val.res_in_use;

if (qos_resource.cpus_per_user >= in_use.cpus_per_user)
avail.cpus_per_user =
qos_resource.cpus_per_user - in_use.cpus_per_user;
else
avail.cpus_per_user = 0;

if (qos_resource.jobs_per_user >= in_use.jobs_per_user)
avail.jobs_per_user =
qos_resource.jobs_per_user - in_use.jobs_per_user;
else
avail.jobs_per_user = 0;

total = qos_resource;
},
QosResourceLimit{qos_resource, qos_resource, QosResource{}});
}

} // namespace Ctld
30 changes: 5 additions & 25 deletions src/CraneCtld/AccountMetaContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,20 @@ namespace Ctld {

class AccountMetaContainer final {
public:
using QosToQosResourceMap = phmap::parallel_flat_hash_map<
std::string, // QosName
QosResourceLimit, phmap::priv::hash_default_hash<std::string>,
phmap::priv::hash_default_eq<std::string>,
std::allocator<std::pair<const std::string, QosResourceLimit>>, 4,
std::shared_mutex>;

using UserResourceMetaMap = std::unordered_map<std::string, // username
QosToQosResourceMap>;
ResourcePerUser>;

AccountMetaContainer();
AccountMetaContainer() = default;
~AccountMetaContainer() = default;

void AddQosResourceToUser(const std::string& username,
const std::string& qos_name,
const QosResource& qos_resource);

void ModifyQosResourceOnUser(const std::string& qos_name,
const QosResource& qos_resource);
bool CheckAndMallocQosResourceFromUser(const std::string& username,
const TaskInCtld& task,
const Qos& qos);

void FreeQosResource(const std::string& username, const TaskInCtld& task);

bool CheckQosLimitOnUser(const std::string& username, const TaskInCtld& task);

void MallocQosResourceFromUser(const std::string& username,
const TaskInCtld& task);

private:
UserResourceMetaMap user_meta_map_;

void InitFromDB_();

void TryEmplace_(const std::string& username, const std::string& qos_name,
const QosResource& qos_resource);
};

inline std::unique_ptr<Ctld::AccountMetaContainer> g_account_meta_container;
Expand Down
1 change: 0 additions & 1 deletion src/CraneCtld/CraneCtld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

#include <cxxopts.hpp>
#include <filesystem>
#include <memory>

#include "AccountManager.h"
#include "AccountMetaContainer.h"
Expand Down
3 changes: 1 addition & 2 deletions src/CraneCtld/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -959,8 +959,7 @@ CtldServer::SubmitTaskToScheduler(std::unique_ptr<TaskInCtld> task) {
task->Username(), task->partition_id, task->account));
}

auto enable_res =
g_account_manager->CheckIfUserOfAccountIsEnabled(
auto enable_res = g_account_manager->CheckIfUserOfAccountIsEnabled(
task->Username(), task->account);
if (enable_res.has_error()) {
return result::fail(enable_res.error());
Expand Down
15 changes: 10 additions & 5 deletions src/CraneCtld/CtldPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -704,14 +704,19 @@ inline bool CheckIfTimeLimitIsValid(absl::Duration d) {
}

struct QosResource {
uint32_t cpus_per_user;
ResourceView resource;
uint32_t jobs_per_user;
};

struct QosResourceLimit {
QosResource res_total;
QosResource res_avail;
QosResource res_in_use;
struct ResourcePerUser {
using QosToQosResourceMap = phmap::parallel_flat_hash_map<
std::string, // QosName
QosResource, phmap::priv::hash_default_hash<std::string>,
phmap::priv::hash_default_eq<std::string>,
std::allocator<std::pair<const std::string, QosResource>>, 4,
std::shared_mutex>;

QosToQosResourceMap qos_resource_in_use;
};

} // namespace Ctld
Expand Down
7 changes: 0 additions & 7 deletions src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,6 @@ void TaskScheduler::PutRecoveredTaskIntoRunningQueueLock_(
for (const CranedId& craned_id : task->CranedIds())
g_meta_container->MallocResourceFromNode(craned_id, task->TaskId(),
task->Resources());
g_account_meta_container->MallocQosResourceFromUser(task->Username(), *task);
// The order of LockGuards matters.
LockGuard running_guard(&m_running_task_map_mtx_);
LockGuard indexes_guard(&m_task_indexes_mtx_);
Expand Down Expand Up @@ -2463,10 +2462,6 @@ void MinLoadFirst::NodeSelect(
continue;
}

bool apply_qos_result = g_account_meta_container->CheckQosLimitOnUser(
task->Username(), *task);
if (!apply_qos_result) continue;

// For pending tasks, the `start time` field in TaskInCtld means expected
// start time and the `end time` is expected end time.
// For running tasks, the `start time` means the time when it starts and
Expand Down Expand Up @@ -2515,8 +2510,6 @@ void MinLoadFirst::NodeSelect(
for (CranedId const& craned_id : craned_ids)
g_meta_container->MallocResourceFromNode(craned_id, task->TaskId(),
task->Resources());
g_account_meta_container->MallocQosResourceFromUser(task->Username(),
*task);
std::unique_ptr<TaskInCtld> moved_task;

// Move task out of pending_task_map and insert it to the
Expand Down

0 comments on commit e7cdbe8

Please sign in to comment.