Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/reservation #351

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion protos/Crane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,15 @@ message QueryPartitionInfoRequest {
}

message QueryPartitionInfoReply {
repeated PartitionInfo partition_info = 1;
repeated PartitionInfo partition_info_list = 1;
}

message QueryReservationInfoRequest {
string reservation_name = 1;
}

message QueryReservationInfoReply {
repeated ReservationInfo reservation_info_list = 3;
}

message ModifyTaskRequest {
Expand Down Expand Up @@ -449,6 +457,28 @@ message QueryTasksInfoReply{
repeated TaskInfo task_info_list = 2;
}

message CreateReservationRequest {
string reservation_name = 1;
int64 start_time_seconds = 2;
int64 duration_seconds = 3;
string partition = 4;
string craned_regex = 5;
}

message CreateReservationReply {
bool ok = 1;
string reason = 2;
}

message DeleteReservationRequest {
string reservation_name = 1;
}

message DeleteReservationReply {
bool ok = 1;
string reason = 2;
}

message StreamCallocRequest {
enum CallocRequestType {
TASK_REQUEST = 0;
Expand Down Expand Up @@ -767,6 +797,7 @@ service CraneCtld {
/* PRCs called from ccontrol */
rpc QueryCranedInfo(QueryCranedInfoRequest) returns (QueryCranedInfoReply);
rpc QueryPartitionInfo(QueryPartitionInfoRequest) returns (QueryPartitionInfoReply);
rpc QueryReservationInfo(QueryReservationInfoRequest) returns (QueryReservationInfoReply);
rpc ModifyTask(ModifyTaskRequest) returns (ModifyTaskReply);
rpc ModifyNode(ModifyCranedStateRequest) returns (ModifyCranedStateReply);

Expand Down Expand Up @@ -794,6 +825,8 @@ service CraneCtld {

/* common RPCs */
rpc QueryTasksInfo(QueryTasksInfoRequest) returns (QueryTasksInfoReply);
rpc CreateReservation(CreateReservationRequest) returns (CreateReservationReply);
rpc DeleteReservation(DeleteReservationRequest) returns (DeleteReservationReply);
}

service Craned {
Expand Down
12 changes: 12 additions & 0 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,18 @@ message CranedInfo {
google.protobuf.Timestamp last_busy_time = 16;
}

message ReservationInfo {
string reservation_name = 1;
google.protobuf.Timestamp start_time = 2;
google.protobuf.Duration duration = 3;
string partition = 4;
string craned_regex = 5;

ResourceView res_total = 6;
ResourceView res_avail = 7;
ResourceView res_alloc = 8;
}

message TrimmedPartitionInfo {
message TrimmedCranedInfo {
CranedResourceState resource_state = 1;
Expand Down
119 changes: 101 additions & 18 deletions src/CraneCtld/CranedMetaContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ void CranedMetaContainer::CranedUp(const CranedId& craned_id) {
std::vector<util::Synchronized<PartitionMeta>::ExclusivePtr> part_meta_ptrs;
part_meta_ptrs.reserve(part_ids.size());

auto raw_part_metas_map_ = partition_metas_map_.GetMapSharedPtr();
auto raw_part_metas_map_ = partition_meta_map_.GetMapSharedPtr();

// Acquire all partition locks first.
for (PartitionId const& part_id : part_ids)
Expand Down Expand Up @@ -84,7 +84,7 @@ void CranedMetaContainer::CranedDown(const CranedId& craned_id) {
std::vector<util::Synchronized<PartitionMeta>::ExclusivePtr> part_meta_ptrs;
part_meta_ptrs.reserve(part_ids.size());

auto raw_part_metas_map_ = partition_metas_map_.GetMapSharedPtr();
auto raw_part_metas_map_ = partition_meta_map_.GetMapSharedPtr();

// Acquire all partition locks first.
for (PartitionId const& part_id : part_ids) {
Expand Down Expand Up @@ -123,24 +123,39 @@ bool CranedMetaContainer::CheckCranedOnline(const CranedId& craned_id) {

CranedMetaContainer::PartitionMetaPtr CranedMetaContainer::GetPartitionMetasPtr(
PartitionId partition_id) {
return partition_metas_map_.GetValueExclusivePtr(partition_id);
return partition_meta_map_.GetValueExclusivePtr(partition_id);
}

CranedMetaContainer::CranedMetaPtr CranedMetaContainer::GetCranedMetaPtr(
CranedId craned_id) {
return craned_meta_map_.GetValueExclusivePtr(craned_id);
}

CranedMetaContainer::ReservationMetaPtr
CranedMetaContainer::GetReservationMetaPtr(const ReservationId& name) {
return reservation_meta_map_.GetValueExclusivePtr(name);
}

CranedMetaContainer::AllPartitionsMetaMapConstPtr
CranedMetaContainer::GetAllPartitionsMetaMapConstPtr() {
return partition_metas_map_.GetMapConstSharedPtr();
return partition_meta_map_.GetMapConstSharedPtr();
}

CranedMetaContainer::CranedMetaMapConstPtr
CranedMetaContainer::GetCranedMetaMapConstPtr() {
return craned_meta_map_.GetMapConstSharedPtr();
}

CranedMetaContainer::ReservationMetaMapConstPtr
CranedMetaContainer::GetReservationMetaMapConstPtr() {
return reservation_meta_map_.GetMapConstSharedPtr();
}

CranedMetaContainer::ReservationMetaMapPtr
CranedMetaContainer::GetReservationMetaMapPtr() {
return reservation_meta_map_.GetMapSharedPtr();
}

void CranedMetaContainer::MallocResourceFromNode(CranedId node_id,
task_id_t task_id,
const ResourceV2& resources) {
Expand All @@ -154,7 +169,7 @@ void CranedMetaContainer::MallocResourceFromNode(CranedId node_id,
std::vector<util::Synchronized<PartitionMeta>::ExclusivePtr> part_meta_ptrs;
part_meta_ptrs.reserve(part_ids.size());

auto raw_part_metas_map_ = partition_metas_map_.GetMapSharedPtr();
auto raw_part_metas_map_ = partition_meta_map_.GetMapSharedPtr();

// Acquire all partition locks first.
for (PartitionId const& part_id : part_ids)
Expand Down Expand Up @@ -192,7 +207,7 @@ void CranedMetaContainer::FreeResourceFromNode(CranedId node_id,
std::vector<util::Synchronized<PartitionMeta>::ExclusivePtr> part_meta_ptrs;
part_meta_ptrs.reserve(part_ids.size());

auto raw_part_metas_map_ = partition_metas_map_.GetMapSharedPtr();
auto raw_part_metas_map_ = partition_meta_map_.GetMapSharedPtr();

// Acquire all partition locks first.
for (PartitionId const& part_id : part_ids)
Expand Down Expand Up @@ -298,7 +313,7 @@ void CranedMetaContainer::InitFromConfig(const Config& config) {
}

craned_meta_map_.InitFromMap(std::move(craned_map));
partition_metas_map_.InitFromMap(std::move(partition_map));
partition_meta_map_.InitFromMap(std::move(partition_map));
}

crane::grpc::QueryCranedInfoReply CranedMetaContainer::QueryAllCranedInfo() {
Expand All @@ -316,7 +331,7 @@ crane::grpc::QueryCranedInfoReply CranedMetaContainer::QueryAllCranedInfo() {
}

crane::grpc::QueryCranedInfoReply CranedMetaContainer::QueryCranedInfo(
const std::string& node_name) {
const CranedId& node_name) {
crane::grpc::QueryCranedInfoReply reply;
auto* list = reply.mutable_craned_info_list();

Expand All @@ -335,9 +350,9 @@ crane::grpc::QueryCranedInfoReply CranedMetaContainer::QueryCranedInfo(
crane::grpc::QueryPartitionInfoReply
CranedMetaContainer::QueryAllPartitionInfo() {
crane::grpc::QueryPartitionInfoReply reply;
auto* list = reply.mutable_partition_info();
auto* list = reply.mutable_partition_info_list();

auto partition_map = partition_metas_map_.GetMapConstSharedPtr();
auto partition_map = partition_meta_map_.GetMapConstSharedPtr();

for (auto&& [part_name, part_meta_ptr] : *partition_map) {
auto* part_info = list->Add();
Expand Down Expand Up @@ -367,13 +382,13 @@ CranedMetaContainer::QueryAllPartitionInfo() {
}

crane::grpc::QueryPartitionInfoReply CranedMetaContainer::QueryPartitionInfo(
const std::string& partition_name) {
const PartitionId& partition_name) {
crane::grpc::QueryPartitionInfoReply reply;
auto* list = reply.mutable_partition_info();
auto* list = reply.mutable_partition_info_list();

if (!partition_metas_map_.Contains(partition_name)) return reply;
if (!partition_meta_map_.Contains(partition_name)) return reply;

auto part_meta = partition_metas_map_.GetValueExclusivePtr(partition_name);
auto part_meta = partition_meta_map_.GetValueExclusivePtr(partition_name);

auto* part_info = list->Add();
part_info->set_name(part_meta->partition_global_meta.name);
Expand All @@ -397,6 +412,74 @@ crane::grpc::QueryPartitionInfoReply CranedMetaContainer::QueryPartitionInfo(
return reply;
}

crane::grpc::QueryReservationInfoReply
CranedMetaContainer::QueryAllReservationInfo() {
crane::grpc::QueryReservationInfoReply reply;
auto* list = reply.mutable_reservation_info_list();

auto reservation_map = reservation_meta_map_.GetMapConstSharedPtr();
for (auto&& [reservation_id, reservation_meta_ptr] : *reservation_map) {
auto reservation_meta = reservation_meta_ptr.GetExclusivePtr();

auto* reservation_info = list->Add();

reservation_info->set_reservation_name(reservation_id);
reservation_info->mutable_start_time()->set_seconds(
absl::ToUnixSeconds(reservation_meta->start_time));
reservation_info->mutable_duration()->set_seconds(
absl::ToUnixSeconds(reservation_meta->end_time) -
absl::ToUnixSeconds(reservation_meta->start_time));
reservation_info->set_partition(reservation_meta->partition_id);
reservation_info->set_craned_regex(
util::HostNameListToStr(reservation_meta->craned_ids));
*reservation_info->mutable_res_total() =
static_cast<crane::grpc::ResourceView>(
reservation_meta->resources_total);
*reservation_info->mutable_res_avail() =
static_cast<crane::grpc::ResourceView>(
reservation_meta->resources_avail);
*reservation_info->mutable_res_alloc() =
static_cast<crane::grpc::ResourceView>(
reservation_meta->resources_in_use);
}
return reply;
}

crane::grpc::QueryReservationInfoReply
CranedMetaContainer::QueryReservationInfo(
const ReservationId& reservation_name) {
crane::grpc::QueryReservationInfoReply reply;
auto* list = reply.mutable_reservation_info_list();

if (!reservation_meta_map_.Contains(reservation_name)) {
return reply;
}

auto reservation_meta =
reservation_meta_map_.GetValueExclusivePtr(reservation_name);

auto* reservation_info = list->Add();

reservation_info->set_reservation_name(reservation_name);
reservation_info->mutable_start_time()->set_seconds(
absl::ToUnixSeconds(reservation_meta->start_time));
reservation_info->mutable_duration()->set_seconds(
absl::ToUnixSeconds(reservation_meta->end_time) -
absl::ToUnixSeconds(reservation_meta->start_time));
reservation_info->set_partition(reservation_meta->partition_id);
reservation_info->set_craned_regex(
util::HostNameListToStr(reservation_meta->craned_ids));
*reservation_info->mutable_res_total() =
static_cast<crane::grpc::ResourceView>(reservation_meta->resources_total);
*reservation_info->mutable_res_avail() =
static_cast<crane::grpc::ResourceView>(reservation_meta->resources_avail);
*reservation_info->mutable_res_alloc() =
static_cast<crane::grpc::ResourceView>(
reservation_meta->resources_in_use);

return reply;
}

crane::grpc::QueryClusterInfoReply CranedMetaContainer::QueryClusterInfo(
const crane::grpc::QueryClusterInfoRequest& request) {
crane::grpc::QueryClusterInfoReply reply;
Expand Down Expand Up @@ -441,8 +524,8 @@ crane::grpc::QueryClusterInfoReply CranedMetaContainer::QueryClusterInfo(
resource_filters[static_cast<int>(it)] = true;

// Ensure that the map global read lock is held during the following filtering
// operations and partition_metas_map_ must be locked before craned_meta_map_
auto partition_map = partition_metas_map_.GetMapConstSharedPtr();
// operations and partition_meta_map_ must be locked before craned_meta_map_
auto partition_map = partition_meta_map_.GetMapConstSharedPtr();
auto craned_map = craned_meta_map_.GetMapConstSharedPtr();

auto partition_rng =
Expand All @@ -455,7 +538,7 @@ crane::grpc::QueryClusterInfoReply CranedMetaContainer::QueryClusterInfo(
// we make a copy of these craned ids to lower overall latency.
// The amortized cost is 1 copy for each craned node.
// Although an extra copying cost is introduced,
// the time of accessing partition_metas_map_ is minimized and
// the time of accessing partition_meta_map_ is minimized and
// the copying cost is taken only by this grpc thread handling
// cinfo request.
// Since we assume the number of cpu cores is sufficient on the
Expand Down Expand Up @@ -583,7 +666,7 @@ void CranedMetaContainer::AddDedicatedResource(
std::vector<util::Synchronized<PartitionMeta>::ExclusivePtr> part_meta_ptrs;
part_meta_ptrs.reserve(part_ids.size());

auto raw_part_metas_map_ = partition_metas_map_.GetMapSharedPtr();
auto raw_part_metas_map_ = partition_meta_map_.GetMapSharedPtr();

// Acquire all partition locks first.
for (PartitionId const& part_id : part_ids)
Expand Down
34 changes: 28 additions & 6 deletions src/CraneCtld/CranedMetaContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,28 @@ class CranedMetaContainer final {
util::AtomicHashMap<HashMap, CranedId, CranedMeta>;
using CranedMetaRawMap = CranedMetaAtomicMap::RawMap;

using ReservationMetaAtomicMap =
util::AtomicHashMap<HashMap, std::string, ReservationMeta>;
using ReservationMetaRawMap = ReservationMetaAtomicMap::RawMap;

using AllPartitionsMetaMapConstPtr =
util::ScopeConstSharedPtr<AllPartitionsMetaRawMap, util::rw_mutex>;
using CranedMetaMapConstPtr =
util::ScopeConstSharedPtr<CranedMetaRawMap, util::rw_mutex>;
using ReservationMetaMapConstPtr =
util::ScopeConstSharedPtr<ReservationMetaRawMap, util::rw_mutex>;

using ReservationMetaMapPtr = ReservationMetaAtomicMap::MapSharedPtr;

using PartitionMetaPtr =
util::ManagedScopeExclusivePtr<PartitionMeta,
AllPartitionsMetaAtomicMap::CombinedLock>;
using CranedMetaPtr =
util::ManagedScopeExclusivePtr<CranedMeta,
CranedMetaAtomicMap::CombinedLock>;
using ReservationMetaPtr =
util::ManagedScopeExclusivePtr<ReservationMeta,
ReservationMetaAtomicMap::CombinedLock>;

CranedMetaContainer() = default;
~CranedMetaContainer() = default;
Expand All @@ -76,13 +87,17 @@ class CranedMetaContainer final {

crane::grpc::QueryCranedInfoReply QueryAllCranedInfo();

crane::grpc::QueryCranedInfoReply QueryCranedInfo(
const std::string& node_name);
crane::grpc::QueryCranedInfoReply QueryCranedInfo(const CranedId& node_name);

crane::grpc::QueryPartitionInfoReply QueryAllPartitionInfo();

crane::grpc::QueryPartitionInfoReply QueryPartitionInfo(
const std::string& partition_name);
const PartitionId& partition_name);

crane::grpc::QueryReservationInfoReply QueryAllReservationInfo();

crane::grpc::QueryReservationInfoReply QueryReservationInfo(
const ReservationId& reservation_name);

crane::grpc::QueryClusterInfoReply QueryClusterInfo(
const crane::grpc::QueryClusterInfoRequest& request);
Expand All @@ -100,10 +115,16 @@ class CranedMetaContainer final {

CranedMetaPtr GetCranedMetaPtr(CranedId craned_id);

ReservationMetaPtr GetReservationMetaPtr(const ReservationId& name);

AllPartitionsMetaMapConstPtr GetAllPartitionsMetaMapConstPtr();

CranedMetaMapConstPtr GetCranedMetaMapConstPtr();

ReservationMetaMapConstPtr GetReservationMetaMapConstPtr();

ReservationMetaMapPtr GetReservationMetaMapPtr();

bool CheckCranedAllowed(const std::string& hostname) {
return craned_meta_map_.Contains(hostname);
};
Expand All @@ -116,12 +137,13 @@ class CranedMetaContainer final {
private:
// In this part of code, the following lock sequence MUST be held
// to avoid deadlock:
// 1. lock elements in partition_metas_map_
// 1. lock elements in partition_meta_map_
// 2. lock elements in craned_meta_map_
// 3. unlock elements in craned_meta_map_
// 4. unlock elements in partition_metas_map_
// 4. unlock elements in partition_meta_map_
CranedMetaAtomicMap craned_meta_map_;
AllPartitionsMetaAtomicMap partition_metas_map_;
AllPartitionsMetaAtomicMap partition_meta_map_;
ReservationMetaAtomicMap reservation_meta_map_;

// A craned node may belong to multiple partitions.
// Use this map as a READ-ONLY index, so multi-thread reading is ok.
Expand Down
Loading