Skip to content

Commit

Permalink
Add a remote call for task completion for plugins running on craned (#…
Browse files Browse the repository at this point in the history
…378)

* Add a remote call for task completion for plugins running on craned

* update CreateCgroupHookRequest proto struct
  • Loading branch information
huyongqii authored Dec 6, 2024
1 parent ede7741 commit 5be1b79
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 32 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
generated
cmake-build-*
build/
.cache/

third_party/*

Expand Down
19 changes: 16 additions & 3 deletions protos/Plugin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,31 @@ message EndHookReply {
repeated TaskIdReply result = 1;
}

message JobMonitorHookRequest {
message CreateCgroupHookRequest {
uint32 task_id = 1;
string cgroup = 2;
DedicatedResourceInNode request_res = 3;
}

message JobMonitorHookReply {
message CreateCgroupHookReply {
bool ok = 1;
}

message DestroyCgroupHookRequest {
uint32 task_id = 1;
string cgroup = 2;
}

message DestroyCgroupHookReply {
bool ok = 1;
}

service CranePluginD {
/* ----------------------------------- Called from CraneCtld ---------------------------------------------------- */
rpc StartHook(StartHookRequest) returns (StartHookReply);
rpc EndHook(EndHookRequest) returns (EndHookReply);
rpc JobMonitorHook(JobMonitorHookRequest) returns (JobMonitorHookReply);

/* ----------------------------------- Called from Craned ---------------------------------------------------- */
rpc CreateCgroupHook(CreateCgroupHookRequest) returns (CreateCgroupHookReply);
rpc DestroyCgroupHook(DestroyCgroupHookRequest) returns (DestroyCgroupHookReply);
}
26 changes: 16 additions & 10 deletions src/Craned/CgroupManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,13 @@ std::unique_ptr<CgroupInterface> CgroupManager::CreateOrOpen_(
changed_cgroup)) {
return nullptr;
}
// if ((preferred_controllers & Controller::BLOCK_CONTROLLER) &&
// initialize_controller(*native_cgroup, Controller::BLOCK_CONTROLLER,
// required_controllers &
// Controller::BLOCK_CONTROLLER, has_cgroup,
// changed_cgroup)) {
// return nullptr;
// }
if ((preferred_controllers & Controller::BLOCK_CONTROLLER) &&
InitializeController_(
*native_cgroup, Controller::BLOCK_CONTROLLER,
required_controllers & Controller::BLOCK_CONTROLLER, has_cgroup,
changed_cgroup)) {
return nullptr;
}
if ((preferred_controllers & Controller::CPU_CONTROLLER) &&
InitializeController_(*native_cgroup, Controller::CPU_CONTROLLER,
required_controllers & Controller::CPU_CONTROLLER,
Expand Down Expand Up @@ -479,7 +479,8 @@ bool CgroupManager::AllocateAndGetCgroup(task_id_t task_id,
CgroupStrByTaskId_(task_id),
NO_CONTROLLER_FLAG | CgroupConstant::Controller::CPU_CONTROLLER |
CgroupConstant::Controller::MEMORY_CONTROLLER |
CgroupConstant::Controller::DEVICES_CONTROLLER,
CgroupConstant::Controller::DEVICES_CONTROLLER |
CgroupConstant::Controller::BLOCK_CONTROLLER,
NO_CONTROLLER_FLAG, false);
} else if (GetCgroupVersion() ==
CgroupConstant::CgroupVersion::CGROUP_V2) {
Expand All @@ -499,9 +500,10 @@ bool CgroupManager::AllocateAndGetCgroup(task_id_t task_id,
if (cg) *cg = pcg;
}

// JobMonitorHook
if (g_config.Plugin.Enabled) {
g_plugin_client->JobMonitorHookAsync(task_id, pcg->GetCgroupString());
g_plugin_client->CreateCgroupHookAsync(task_id,
pcg->GetCgroupString(),
res.dedicated_res_in_node());
}

CRANE_TRACE(
Expand Down Expand Up @@ -593,6 +595,10 @@ bool CgroupManager::ReleaseCgroup(uint32_t task_id, uid_t uid) {
}
CgroupInterface *cgroup = it->second.GetExclusivePtr()->release();

if (g_config.Plugin.Enabled) {
g_plugin_client->DestroyCgroupHookAsync(task_id, cgroup->GetCgroupString());
}

task_id_to_cg_map_ptr->erase(task_id);

if (cgroup != nullptr) {
Expand Down
60 changes: 46 additions & 14 deletions src/Utilities/PluginClient/PluginClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ grpc::Status PluginClient::SendStartHook_(grpc::ClientContext* context,
using crane::grpc::plugin::StartHookReply;
using crane::grpc::plugin::StartHookRequest;

auto request = dynamic_cast<StartHookRequest*>(msg);
auto* request = dynamic_cast<StartHookRequest*>(msg);
CRANE_ASSERT(request != nullptr);

StartHookReply reply;

CRANE_TRACE("[Plugin] Sending StartHook.");
Expand All @@ -142,23 +144,41 @@ grpc::Status PluginClient::SendEndHook_(grpc::ClientContext* context,
using crane::grpc::plugin::EndHookReply;
using crane::grpc::plugin::EndHookRequest;

auto request = dynamic_cast<EndHookRequest*>(msg);
auto* request = dynamic_cast<EndHookRequest*>(msg);
CRANE_ASSERT(request != nullptr);

EndHookReply reply;

CRANE_TRACE("[Plugin] Sending EndHook.");
return m_stub_->EndHook(context, *request, &reply);
}

grpc::Status PluginClient::SendJobMonitorHook_(grpc::ClientContext* context,
grpc::Status PluginClient::SendCreateCgroupHook_(grpc::ClientContext* context,
google::protobuf::Message* msg) {
using crane::grpc::plugin::CreateCgroupHookReply;
using crane::grpc::plugin::CreateCgroupHookRequest;

auto* request = dynamic_cast<CreateCgroupHookRequest*>(msg);
CRANE_ASSERT(request != nullptr);

CreateCgroupHookReply reply;

CRANE_TRACE("[Plugin] Sending CreateCgroupHook.");
return m_stub_->CreateCgroupHook(context, *request, &reply);
}

grpc::Status PluginClient::SendDestroyCgroupHook_(grpc::ClientContext* context,
google::protobuf::Message* msg) {
using crane::grpc::plugin::JobMonitorHookReply;
using crane::grpc::plugin::JobMonitorHookRequest;
using crane::grpc::plugin::DestroyCgroupHookReply;
using crane::grpc::plugin::DestroyCgroupHookRequest;

auto request = dynamic_cast<JobMonitorHookRequest*>(msg);
JobMonitorHookReply reply;
auto* request = dynamic_cast<DestroyCgroupHookRequest*>(msg);
CRANE_ASSERT(request != nullptr);

DestroyCgroupHookReply reply;

CRANE_TRACE("[Plugin] Sending JobMonitorHook.");
return m_stub_->JobMonitorHook(context, *request, &reply);
CRANE_TRACE("[Plugin] Sending DestroyCgroupHook.");
return m_stub_->DestroyCgroupHook(context, *request, &reply);
}

void PluginClient::StartHookAsync(std::vector<crane::grpc::TaskInfo> tasks) {
Expand Down Expand Up @@ -191,15 +211,27 @@ void PluginClient::EndHookAsync(std::vector<crane::grpc::TaskInfo> tasks) {
m_event_queue_.enqueue(std::move(e));
}

void PluginClient::JobMonitorHookAsync(task_id_t task_id,
std::string cgroup_path) {
auto request = std::make_unique<crane::grpc::plugin::JobMonitorHookRequest>();
void PluginClient::CreateCgroupHookAsync(task_id_t task_id,
const std::string& cgroup,
const crane::grpc::DedicatedResourceInNode &request_resource) {
auto request = std::make_unique<crane::grpc::plugin::CreateCgroupHookRequest>();
request->set_task_id(task_id);
request->set_cgroup(cgroup_path);
request->set_cgroup(cgroup);
request->mutable_request_res()->CopyFrom(request_resource);

HookEvent e{HookType::JOB_MONITOR,
HookEvent e{HookType::CREATE_CGROUP,
std::unique_ptr<google::protobuf::Message>(std::move(request))};
m_event_queue_.enqueue(std::move(e));
}

void PluginClient::DestroyCgroupHookAsync(task_id_t task_id,
const std::string& cgroup) {
auto request = std::make_unique<crane::grpc::plugin::DestroyCgroupHookRequest>();
request->set_task_id(task_id);
request->set_cgroup(cgroup);

HookEvent e{HookType::DESTROY_CGROUP,
std::unique_ptr<google::protobuf::Message>(std::move(request))};
m_event_queue_.enqueue(std::move(e));
}

Expand Down
19 changes: 14 additions & 5 deletions src/Utilities/PluginClient/include/crane/PluginClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class PluginClient {
enum class HookType {
START,
END,
JOB_MONITOR,
CREATE_CGROUP,
DESTROY_CGROUP,
HookTypeCount,
};

Expand All @@ -62,9 +63,14 @@ class PluginClient {
void InitChannelAndStub(const std::string& endpoint);

// These functions are used to add HookEvent into the event queue.
// Launched by Ctld
void StartHookAsync(std::vector<crane::grpc::TaskInfo> tasks);
void EndHookAsync(std::vector<crane::grpc::TaskInfo> tasks);
void JobMonitorHookAsync(task_id_t task_id, std::string cgroup_path);

// Launched by Craned
void CreateCgroupHookAsync(task_id_t task_id, const std::string& cgroup,
const crane::grpc::DedicatedResourceInNode &request_resource);
void DestroyCgroupHookAsync(task_id_t task_id, const std::string& cgroup);

private:
// HookDispatchFunc is a function pointer type that handles different
Expand All @@ -75,8 +81,10 @@ class PluginClient {
google::protobuf::Message* msg);
grpc::Status SendEndHook_(grpc::ClientContext* context,
google::protobuf::Message* msg);
grpc::Status SendJobMonitorHook_(grpc::ClientContext* context,
google::protobuf::Message* msg);
grpc::Status SendCreateCgroupHook_(grpc::ClientContext* context,
google::protobuf::Message* msg);
grpc::Status SendDestroyCgroupHook_(grpc::ClientContext* context,
google::protobuf::Message* msg);

void AsyncSendThread_();

Expand All @@ -93,7 +101,8 @@ class PluginClient {
static constexpr std::array<HookDispatchFunc, size_t(HookType::HookTypeCount)>
s_hook_dispatch_funcs_{{&PluginClient::SendStartHook_,
&PluginClient::SendEndHook_,
&PluginClient::SendJobMonitorHook_}};
&PluginClient::SendCreateCgroupHook_,
&PluginClient::SendDestroyCgroupHook_}};
};

} // namespace plugin
Expand Down

0 comments on commit 5be1b79

Please sign in to comment.