Skip to content

Commit

Permalink
feat: reduce lock granularity
Browse files Browse the repository at this point in the history
  • Loading branch information
huerni committed Dec 3, 2024
1 parent a83d4d1 commit 00c1401
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 211 deletions.
47 changes: 7 additions & 40 deletions src/CraneCtld/AccountManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ AccountManager::CraneExpected<void> AccountManager::AddUser(

util::write_lock_guard user_guard(m_rw_user_mutex_);
util::write_lock_guard account_guard(m_rw_account_mutex_);
util::read_lock_guard qos_quard(m_rw_qos_mutex_);

auto user_result = GetUserInfoByUidNoLock_(uid);
if (!user_result) return std::unexpected(user_result.error());
Expand Down Expand Up @@ -842,7 +841,8 @@ AccountManager::CraneExpected<void> AccountManager::ModifyQos(
g_db_client->SelectQos("name", name, &qos);

// Modify QosResource when max_jobs_per_user or max_cpus_per_user is changed.
if (item == "max_jobs_per_user" || item == "max_cpus_per_user")
if (modify_field == crane::grpc::ModifyField::MaxJobsPerUser ||
modify_field == crane::grpc::ModifyField::MaxCpusPerUser)
g_account_meta_container->ModifyQosResourceOnUser(
name, QosResource{qos.max_cpus_per_user, qos.max_jobs_per_user});

Expand Down Expand Up @@ -987,6 +987,11 @@ result::result<void, std::string> AccountManager::CheckAndApplyQosLimitOnTask(
qos_share_ptr->max_cpus_per_user)
return result::fail("cpus-per-task reached the user's limit.");

g_account_meta_container->AddQosResourceToUser(
user_share_ptr->name, qos_share_ptr->name,
QosResource{qos_share_ptr->max_cpus_per_user,
qos_share_ptr->max_jobs_per_user});

return {};
}

Expand Down Expand Up @@ -1590,18 +1595,6 @@ AccountManager::CraneExpected<void> AccountManager::AddUser_(
}
res_user.account_to_attrs_map[object_account].blocked = false;

AccountMetaContainer::QosResourceList qos_resource_list;
for (const auto& [partition, qos] :
res_user.account_to_attrs_map[object_account]
.allowed_partition_qos_map) {
for (const auto& qos_name : qos.second) {
const Qos* qos_content = GetExistedQosInfoNoLock_(qos_name);
qos_resource_list.emplace_back(
qos_name, QosResource{qos_content->max_cpus_per_user,
qos_content->max_jobs_per_user});
}
}

mongocxx::client_session::with_transaction_cb callback =
[&](mongocxx::client_session* session) {
// Update the user's account
Expand Down Expand Up @@ -1631,8 +1624,6 @@ AccountManager::CraneExpected<void> AccountManager::AddUser_(
return std::unexpected(CraneErrCode::ERR_UPDATE_DATABASE);
}

g_account_meta_container->AddQosResourceToUser(name, qos_resource_list);

m_account_map_[object_account]->users.emplace_back(name);
if (add_coordinator) {
m_account_map_[object_account]->coordinators.emplace_back(name);
Expand Down Expand Up @@ -1788,8 +1779,6 @@ AccountManager::CraneExpected<void> AccountManager::DeleteUser_(
return std::unexpected(CraneErrCode::ERR_UPDATE_DATABASE);
}

g_account_meta_container->EraseUserResource(name);

for (auto& remove_account : remove_accounts) {
m_account_map_[remove_account]->users.remove(name);
}
Expand Down Expand Up @@ -1922,13 +1911,6 @@ AccountManager::CraneExpected<void> AccountManager::AddUserAllowedQos_(
return std::unexpected(CraneErrCode::ERR_UPDATE_DATABASE);
}

AccountMetaContainer::QosResourceList qos_resource_list;
const Qos* qos_content = GetExistedQosInfoNoLock_(qos);
qos_resource_list.emplace_back(qos,
QosResource{qos_content->max_cpus_per_user,
qos_content->max_jobs_per_user});
g_account_meta_container->AddQosResourceToUser(name, qos_resource_list);

m_user_map_[name]
->account_to_attrs_map[account_name]
.allowed_partition_qos_map =
Expand Down Expand Up @@ -2069,18 +2051,6 @@ AccountManager::CraneExpected<void> AccountManager::SetUserAllowedQos_(
return std::unexpected(CraneErrCode::ERR_UPDATE_DATABASE);
}

AccountMetaContainer::QosResourceList qos_resource_list;
for (const auto& [partition, qos] :
res_user.account_to_attrs_map[account_name].allowed_partition_qos_map) {
for (const auto& qos_name : qos.second) {
const Qos* qos_content = GetExistedQosInfoNoLock_(qos_name);
qos_resource_list.emplace_back(
qos_name, QosResource{qos_content->max_cpus_per_user,
qos_content->max_jobs_per_user});
}
}
g_account_meta_container->AddQosResourceToUser(name, qos_resource_list);

m_user_map_[name]
->account_to_attrs_map[account_name]
.allowed_partition_qos_map =
Expand Down Expand Up @@ -2157,8 +2127,6 @@ AccountManager::CraneExpected<void> AccountManager::DeleteUserAllowedQos_(
return std::unexpected(CraneErrCode::ERR_UPDATE_DATABASE);
}

g_account_meta_container->EraseQosResourceOnUser(name, qos);

m_user_map_[name]->account_to_attrs_map[account].allowed_partition_qos_map =
res_user.account_to_attrs_map[account].allowed_partition_qos_map;

Expand Down Expand Up @@ -2563,7 +2531,6 @@ bool AccountManager::DeleteAccountAllowedQosFromMapNoLock_(

for (const auto& user : account->users) {
DeleteUserAllowedQosOfAllPartitionFromMapNoLock_(user, name, qos);
g_account_meta_container->EraseQosResourceOnUser(user, qos);
}
m_account_map_[name]->allowed_qos_list.remove(qos);
if (account->default_qos == qos) {
Expand Down
187 changes: 69 additions & 118 deletions src/CraneCtld/AccountMetaContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,150 +24,74 @@ namespace Ctld {

AccountMetaContainer::AccountMetaContainer() { InitFromDB_(); }

AccountMetaContainer::UserResourceMetaPtr
AccountMetaContainer::GetUserResourceMetaPtr(const std::string& username) {
return user_meta_map_.GetValueExclusivePtr(username);
}

AccountMetaContainer::UserResourceMetaMapConstPtr
AccountMetaContainer::GetUserResourceMetaMapConstPtr() {
return user_meta_map_.GetMapConstSharedPtr();
}

AccountMetaContainer::UserResourceMetaMapExclusivePtr
AccountMetaContainer::GetUserResourceMetaMapExclusivePtr() {
return user_meta_map_.GetMapExclusivePtr();
}

void AccountMetaContainer::AddQosResourceToUser(
const std::string& username, const QosResourceList& qos_resource_list) {
auto user_meta_map_ptr = user_meta_map_.GetMapExclusivePtr();
if (!user_meta_map_ptr->contains(username))
user_meta_map_ptr->emplace(username, UserResourceMeta{});

auto& qos_resource_map =
user_meta_map_ptr->find(username)->second.RawPtr()->qos_to_resource_map;
for (const auto& pair : qos_resource_list)
qos_resource_map.emplace(
pair.first, QosResourceLimit{pair.second, pair.second, QosResource{}});
}

void AccountMetaContainer::EraseQosResourceOnUser(const std::string& username,
const std::string& qos_name) {
auto user_meta = user_meta_map_[username];
user_meta->qos_to_resource_map.erase(qos_name);
}
const std::string& username, const std::string& qos_name,
const QosResource& qos_resource) {
if (!user_meta_map_.contains(username))
user_meta_map_.emplace(username, QosToQosResourceMap{});

void AccountMetaContainer::EraseUserResource(const std::string& username) {
user_meta_map_.Erase(username);
TryEmplace_(username, qos_name, qos_resource);
}

// Modify QosResource when max_jobs_per_user or max_cpus_per_user is changed.
void AccountMetaContainer::ModifyQosResourceOnUser(
const std::string& qos_name, const QosResource& qos_resource) {
auto user_meta_map_ptr = user_meta_map_.GetMapExclusivePtr();
for (auto iter = user_meta_map_ptr->begin(); iter != user_meta_map_ptr->end();
iter++) {
auto& qos_resource_map = iter->second.RawPtr()->qos_to_resource_map;
auto qos_map_iter = qos_resource_map.find(qos_name);
if (qos_map_iter == qos_resource_map.end()) continue;
// Total is changed to the new QoS resources.
qos_map_iter->second.res_total = qos_resource;

auto& avail = qos_map_iter->second.res_avail;
auto& in_use = qos_map_iter->second.res_in_use;

if (qos_resource.cpus_per_user >= in_use.cpus_per_user)
avail.cpus_per_user = qos_resource.cpus_per_user - in_use.cpus_per_user;
else
avail.cpus_per_user = 0;

if (qos_resource.jobs_per_user >= in_use.jobs_per_user)
avail.jobs_per_user = qos_resource.jobs_per_user - in_use.jobs_per_user;
else
avail.jobs_per_user = 0;
for (const auto& [username, _] : user_meta_map_) {
TryEmplace_(username, qos_name, qos_resource);
}
}

void AccountMetaContainer::FreeQosResource(const std::string& username,
const TaskInCtld& task) {
if (!user_meta_map_.Contains(username)) {
CRANE_ERROR("Try to free resource from an unknown user {}", username);
return;
}

auto user_meta = user_meta_map_[username];
auto& qos_resource_map = user_meta->qos_to_resource_map;
auto iter = qos_resource_map.find(task.qos);
if (iter == qos_resource_map.end()) return;

uint32_t cpus_per_task = static_cast<uint32_t>(task.cpus_per_task);
const auto& total = iter->second.res_total;
auto& avail = iter->second.res_avail;
auto& in_use = iter->second.res_in_use;

avail.cpus_per_user += cpus_per_task;
avail.jobs_per_user += 1;
// The QoS resources may change during the execution of the job.
if (avail.cpus_per_user > total.cpus_per_user)
avail.cpus_per_user = total.cpus_per_user;
if (avail.jobs_per_user > total.jobs_per_user)
avail.jobs_per_user = total.jobs_per_user;

in_use.cpus_per_user -= cpus_per_task;
in_use.jobs_per_user -= 1;
user_meta_map_[username].modify_if(
task.qos, [&](std::pair<const std::string, QosResourceLimit>& pair) {
auto& val = pair.second;
val.res_avail.cpus_per_user += cpus_per_task;
val.res_avail.jobs_per_user += 1;
val.res_in_use.cpus_per_user -= cpus_per_task;
val.res_in_use.jobs_per_user -= 1;

if (val.res_avail.cpus_per_user > val.res_total.cpus_per_user)
val.res_avail.cpus_per_user = val.res_total.cpus_per_user;
if (val.res_avail.jobs_per_user > val.res_total.jobs_per_user)
val.res_avail.jobs_per_user = val.res_total.jobs_per_user;
});
}

bool AccountMetaContainer::CheckQosLimitOnUser(const std::string& username,
const TaskInCtld& task) {
if (!user_meta_map_.Contains(username)) {
CRANE_ERROR("Try to check resource to an unknown user {}", username);
return false;
}

auto user_meta = user_meta_map_[username];
uint32_t cpus_per_task = static_cast<uint32_t>(task.cpus_per_task);

const auto& qos_resource_map = user_meta->qos_to_resource_map;
auto iter = qos_resource_map.find(task.qos);
if (iter == qos_resource_map.end()) return false;

if (iter->second.res_avail.jobs_per_user == 0 ||
iter->second.res_avail.cpus_per_user < cpus_per_task)
return false;
bool result = false;
user_meta_map_[username].if_contains(
task.qos, [&](const std::pair<std::string, QosResourceLimit>& pair) {
const auto& val = pair.second;
if (val.res_avail.cpus_per_user >= cpus_per_task &&
val.res_avail.jobs_per_user > 0)
result = true;
});

return true;
return result;
}

void AccountMetaContainer::MallocQosResourceFromUser(
const std::string& username, const TaskInCtld& task) {
if (!user_meta_map_.Contains(username)) {
CRANE_ERROR("Try to apply resource to an unknown user {}", username);
return;
}

auto user_meta = user_meta_map_[username];
uint32_t cpus_per_task = static_cast<uint32_t>(task.cpus_per_task);

auto& qos_resource_map = user_meta->qos_to_resource_map;
auto iter = qos_resource_map.find(task.qos);
if (iter == qos_resource_map.end()) return;

iter->second.res_avail.cpus_per_user -= cpus_per_task;
iter->second.res_avail.jobs_per_user -= 1;

iter->second.res_in_use.cpus_per_user += cpus_per_task;
iter->second.res_in_use.jobs_per_user += 1;
user_meta_map_[username].modify_if(
task.qos, [&](std::pair<const std::string, QosResourceLimit>& pair) {
auto& val = pair.second;
val.res_avail.cpus_per_user -= cpus_per_task;
val.res_avail.jobs_per_user--;
val.res_in_use.cpus_per_user += cpus_per_task;
val.res_in_use.jobs_per_user++;
});
}

void AccountMetaContainer::InitFromDB_() {
HashMap<std::string, UserResourceMeta> user_meta_map;

AccountManager::UserMapMutexSharedPtr all_user =
g_account_manager->GetAllUserInfo();
// all users in user_map
for (const auto& [username, user] : *all_user) {
UserResourceMeta::QosToQosResourceMap qos_resource_map;
// query all qos in user account->partitioin->qos
for (const auto& [account, attrs_in_account] : user->account_to_attrs_map) {
for (const auto& [part, qos_list] :
Expand All @@ -179,15 +103,42 @@ void AccountMetaContainer::InitFromDB_() {
g_account_manager->GetExistedQosInfo(qos_name);
QosResource qos_resource =
QosResource{qos->max_cpus_per_user, qos->max_jobs_per_user};
qos_resource_map.emplace(
qos_name,
QosResourceLimit{qos_resource, qos_resource, QosResource{}});
TryEmplace_(username, qos_name, qos_resource);
}
}
}
user_meta_map.emplace(username, qos_resource_map);
}
user_meta_map_.InitFromMap(std::move(user_meta_map));
}

void AccountMetaContainer::TryEmplace_(const std::string& username,
const std::string& qos_name,
const QosResource& qos_resource) {
user_meta_map_[username].try_emplace_l(
qos_name,
[&](std::pair<const std::string, QosResourceLimit>& pair) {
QosResourceLimit& val = pair.second;
if (val.res_total.cpus_per_user == qos_resource.cpus_per_user &&
val.res_total.jobs_per_user == qos_resource.jobs_per_user)
return;
auto& avail = val.res_avail;
auto& total = val.res_total;
auto& in_use = val.res_in_use;

if (qos_resource.cpus_per_user >= in_use.cpus_per_user)
avail.cpus_per_user =
qos_resource.cpus_per_user - in_use.cpus_per_user;
else
avail.cpus_per_user = 0;

if (qos_resource.jobs_per_user >= in_use.jobs_per_user)
avail.jobs_per_user =
qos_resource.jobs_per_user - in_use.jobs_per_user;
else
avail.jobs_per_user = 0;

total = qos_resource;
},
QosResourceLimit{qos_resource, qos_resource, QosResource{}});
}

} // namespace Ctld
Loading

0 comments on commit 00c1401

Please sign in to comment.