From 5be1b79acf1ea764de5ebdd5d7eb8a3845c76af8 Mon Sep 17 00:00:00 2001 From: AABBC <129354195+huyongqii@users.noreply.github.com> Date: Fri, 6 Dec 2024 14:29:36 +0800 Subject: [PATCH] Add a remote call for task completion for plugins running on craned (#378) * Add a remote call for task completion for plugins running on craned * update CreateCgroupHookRequest proto struct --- .gitignore | 2 + protos/Plugin.proto | 19 +++++- src/Craned/CgroupManager.cpp | 26 ++++---- src/Utilities/PluginClient/PluginClient.cpp | 60 ++++++++++++++----- .../PluginClient/include/crane/PluginClient.h | 19 ++++-- 5 files changed, 94 insertions(+), 32 deletions(-) diff --git a/.gitignore b/.gitignore index 3b52cd8a3..750e10f7f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ generated cmake-build-* +build/ +.cache/ third_party/* diff --git a/protos/Plugin.proto b/protos/Plugin.proto index a270092cb..843a88bbe 100644 --- a/protos/Plugin.proto +++ b/protos/Plugin.proto @@ -51,12 +51,22 @@ message EndHookReply { repeated TaskIdReply result = 1; } -message JobMonitorHookRequest { +message CreateCgroupHookRequest { uint32 task_id = 1; string cgroup = 2; + DedicatedResourceInNode request_res = 3; } -message JobMonitorHookReply { +message CreateCgroupHookReply { + bool ok = 1; +} + +message DestroyCgroupHookRequest { + uint32 task_id = 1; + string cgroup = 2; +} + +message DestroyCgroupHookReply { bool ok = 1; } @@ -64,5 +74,8 @@ service CranePluginD { /* ----------------------------------- Called from CraneCtld ---------------------------------------------------- */ rpc StartHook(StartHookRequest) returns (StartHookReply); rpc EndHook(EndHookRequest) returns (EndHookReply); - rpc JobMonitorHook(JobMonitorHookRequest) returns (JobMonitorHookReply); + + /* ----------------------------------- Called from Craned ---------------------------------------------------- */ + rpc CreateCgroupHook(CreateCgroupHookRequest) returns (CreateCgroupHookReply); + rpc DestroyCgroupHook(DestroyCgroupHookRequest) returns (DestroyCgroupHookReply); } diff --git a/src/Craned/CgroupManager.cpp b/src/Craned/CgroupManager.cpp index 75a3ddca7..6c97870cf 100644 --- a/src/Craned/CgroupManager.cpp +++ b/src/Craned/CgroupManager.cpp @@ -359,13 +359,13 @@ std::unique_ptr CgroupManager::CreateOrOpen_( changed_cgroup)) { return nullptr; } - // if ((preferred_controllers & Controller::BLOCK_CONTROLLER) && - // initialize_controller(*native_cgroup, Controller::BLOCK_CONTROLLER, - // required_controllers & - // Controller::BLOCK_CONTROLLER, has_cgroup, - // changed_cgroup)) { - // return nullptr; - // } + if ((preferred_controllers & Controller::BLOCK_CONTROLLER) && + InitializeController_( + *native_cgroup, Controller::BLOCK_CONTROLLER, + required_controllers & Controller::BLOCK_CONTROLLER, has_cgroup, + changed_cgroup)) { + return nullptr; + } if ((preferred_controllers & Controller::CPU_CONTROLLER) && InitializeController_(*native_cgroup, Controller::CPU_CONTROLLER, required_controllers & Controller::CPU_CONTROLLER, @@ -479,7 +479,8 @@ bool CgroupManager::AllocateAndGetCgroup(task_id_t task_id, CgroupStrByTaskId_(task_id), NO_CONTROLLER_FLAG | CgroupConstant::Controller::CPU_CONTROLLER | CgroupConstant::Controller::MEMORY_CONTROLLER | - CgroupConstant::Controller::DEVICES_CONTROLLER, + CgroupConstant::Controller::DEVICES_CONTROLLER | + CgroupConstant::Controller::BLOCK_CONTROLLER, NO_CONTROLLER_FLAG, false); } else if (GetCgroupVersion() == CgroupConstant::CgroupVersion::CGROUP_V2) { @@ -499,9 +500,10 @@ bool CgroupManager::AllocateAndGetCgroup(task_id_t task_id, if (cg) *cg = pcg; } - // JobMonitorHook if (g_config.Plugin.Enabled) { - g_plugin_client->JobMonitorHookAsync(task_id, pcg->GetCgroupString()); + g_plugin_client->CreateCgroupHookAsync(task_id, + pcg->GetCgroupString(), + res.dedicated_res_in_node()); } CRANE_TRACE( @@ -593,6 +595,10 @@ bool CgroupManager::ReleaseCgroup(uint32_t task_id, uid_t uid) { } CgroupInterface *cgroup = it->second.GetExclusivePtr()->release(); + if (g_config.Plugin.Enabled) { + g_plugin_client->DestroyCgroupHookAsync(task_id, cgroup->GetCgroupString()); + } + task_id_to_cg_map_ptr->erase(task_id); if (cgroup != nullptr) { diff --git a/src/Utilities/PluginClient/PluginClient.cpp b/src/Utilities/PluginClient/PluginClient.cpp index 8b56c225b..e3bcc595d 100644 --- a/src/Utilities/PluginClient/PluginClient.cpp +++ b/src/Utilities/PluginClient/PluginClient.cpp @@ -130,7 +130,9 @@ grpc::Status PluginClient::SendStartHook_(grpc::ClientContext* context, using crane::grpc::plugin::StartHookReply; using crane::grpc::plugin::StartHookRequest; - auto request = dynamic_cast(msg); + auto* request = dynamic_cast(msg); + CRANE_ASSERT(request != nullptr); + StartHookReply reply; CRANE_TRACE("[Plugin] Sending StartHook."); @@ -142,23 +144,41 @@ grpc::Status PluginClient::SendEndHook_(grpc::ClientContext* context, using crane::grpc::plugin::EndHookReply; using crane::grpc::plugin::EndHookRequest; - auto request = dynamic_cast(msg); + auto* request = dynamic_cast(msg); + CRANE_ASSERT(request != nullptr); + EndHookReply reply; CRANE_TRACE("[Plugin] Sending EndHook."); return m_stub_->EndHook(context, *request, &reply); } -grpc::Status PluginClient::SendJobMonitorHook_(grpc::ClientContext* context, +grpc::Status PluginClient::SendCreateCgroupHook_(grpc::ClientContext* context, + google::protobuf::Message* msg) { + using crane::grpc::plugin::CreateCgroupHookReply; + using crane::grpc::plugin::CreateCgroupHookRequest; + + auto* request = dynamic_cast(msg); + CRANE_ASSERT(request != nullptr); + + CreateCgroupHookReply reply; + + CRANE_TRACE("[Plugin] Sending CreateCgroupHook."); + return m_stub_->CreateCgroupHook(context, *request, &reply); +} + +grpc::Status PluginClient::SendDestroyCgroupHook_(grpc::ClientContext* context, google::protobuf::Message* msg) { - using crane::grpc::plugin::JobMonitorHookReply; - using crane::grpc::plugin::JobMonitorHookRequest; + using crane::grpc::plugin::DestroyCgroupHookReply; + using crane::grpc::plugin::DestroyCgroupHookRequest; - auto request = dynamic_cast(msg); - JobMonitorHookReply reply; + auto* request = dynamic_cast(msg); + CRANE_ASSERT(request != nullptr); + + DestroyCgroupHookReply reply; - CRANE_TRACE("[Plugin] Sending JobMonitorHook."); - return m_stub_->JobMonitorHook(context, *request, &reply); + CRANE_TRACE("[Plugin] Sending DestroyCgroupHook."); + return m_stub_->DestroyCgroupHook(context, *request, &reply); } void PluginClient::StartHookAsync(std::vector tasks) { @@ -191,15 +211,27 @@ void PluginClient::EndHookAsync(std::vector tasks) { m_event_queue_.enqueue(std::move(e)); } -void PluginClient::JobMonitorHookAsync(task_id_t task_id, - std::string cgroup_path) { - auto request = std::make_unique(); +void PluginClient::CreateCgroupHookAsync(task_id_t task_id, + const std::string& cgroup, + const crane::grpc::DedicatedResourceInNode &request_resource) { + auto request = std::make_unique(); request->set_task_id(task_id); - request->set_cgroup(cgroup_path); + request->set_cgroup(cgroup); + request->mutable_request_res()->CopyFrom(request_resource); - HookEvent e{HookType::JOB_MONITOR, + HookEvent e{HookType::CREATE_CGROUP, std::unique_ptr(std::move(request))}; + m_event_queue_.enqueue(std::move(e)); +} + +void PluginClient::DestroyCgroupHookAsync(task_id_t task_id, + const std::string& cgroup) { + auto request = std::make_unique(); + request->set_task_id(task_id); + request->set_cgroup(cgroup); + HookEvent e{HookType::DESTROY_CGROUP, + std::unique_ptr(std::move(request))}; m_event_queue_.enqueue(std::move(e)); } diff --git a/src/Utilities/PluginClient/include/crane/PluginClient.h b/src/Utilities/PluginClient/include/crane/PluginClient.h index 4746a18eb..17ec2c2d6 100644 --- a/src/Utilities/PluginClient/include/crane/PluginClient.h +++ b/src/Utilities/PluginClient/include/crane/PluginClient.h @@ -50,7 +50,8 @@ class PluginClient { enum class HookType { START, END, - JOB_MONITOR, + CREATE_CGROUP, + DESTROY_CGROUP, HookTypeCount, }; @@ -62,9 +63,14 @@ class PluginClient { void InitChannelAndStub(const std::string& endpoint); // These functions are used to add HookEvent into the event queue. + // Launched by Ctld void StartHookAsync(std::vector tasks); void EndHookAsync(std::vector tasks); - void JobMonitorHookAsync(task_id_t task_id, std::string cgroup_path); + + // Launched by Craned + void CreateCgroupHookAsync(task_id_t task_id, const std::string& cgroup, + const crane::grpc::DedicatedResourceInNode &request_resource); + void DestroyCgroupHookAsync(task_id_t task_id, const std::string& cgroup); private: // HookDispatchFunc is a function pointer type that handles different @@ -75,8 +81,10 @@ class PluginClient { google::protobuf::Message* msg); grpc::Status SendEndHook_(grpc::ClientContext* context, google::protobuf::Message* msg); - grpc::Status SendJobMonitorHook_(grpc::ClientContext* context, - google::protobuf::Message* msg); + grpc::Status SendCreateCgroupHook_(grpc::ClientContext* context, + google::protobuf::Message* msg); + grpc::Status SendDestroyCgroupHook_(grpc::ClientContext* context, + google::protobuf::Message* msg); void AsyncSendThread_(); @@ -93,7 +101,8 @@ class PluginClient { static constexpr std::array s_hook_dispatch_funcs_{{&PluginClient::SendStartHook_, &PluginClient::SendEndHook_, - &PluginClient::SendJobMonitorHook_}}; + &PluginClient::SendCreateCgroupHook_, + &PluginClient::SendDestroyCgroupHook_}}; }; } // namespace plugin