Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Crun support pty #362

Merged
merged 11 commits into from
Dec 11, 2024
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ set(DEPENDENCIES_PRE_INSTALLED_DIR ${CMAKE_CURRENT_SOURCE_DIR}/dependencies/pre_
add_subdirectory(${DEPENDENCIES_PRE_INSTALLED_DIR})

find_package(Threads REQUIRED)
find_library(LIBUTIL_LIBRARY util)

# New in version cmake3.24:
# Set ZLIB_USE_STATIC_LIBS to ON to look for static libraries. Default is OFF.
Expand Down
1 change: 1 addition & 0 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ message InteractiveTaskAdditionalMeta {
string sh_script = 2;
string term_env = 3;
InteractiveTaskType interactive_type = 4;
bool pty = 5;
}

message TaskInfo {
Expand Down
1 change: 1 addition & 0 deletions src/CraneCtld/CranedKeeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ crane::grpc::ExecuteTasksRequest CranedStub::NewExecuteTasksRequests(
mutable_meta->set_sh_script(meta_in_ctld.sh_script);
mutable_meta->set_term_env(meta_in_ctld.term_env);
mutable_meta->set_interactive_type(meta_in_ctld.interactive_type);
mutable_meta->set_pty(meta_in_ctld.pty);
}
}

Expand Down
46 changes: 23 additions & 23 deletions src/CraneCtld/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTasks(
results.emplace_back(std::move(result));
}

for (auto& res : results) {
for (auto &res : results) {
if (res.has_value())
response->mutable_task_id_list()->Add(res.value().get());
else
Expand Down Expand Up @@ -732,21 +732,22 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
CRANE_ERROR("Expect type CFORED_REGISTRATION from peer {}.",
context->peer());
return Status::CANCELLED;
} else {
cfored_name = cfored_request.payload_cfored_reg().cfored_name();
CRANE_INFO("Cfored {} registered.", cfored_name);
}

ok = stream_writer->WriteCforedRegistrationAck({});
if (ok) {
state = StreamState::kWaitMsg;
} else {
CRANE_ERROR(
"Failed to send msg to cfored {}. Connection is broken. "
"Exiting...",
cfored_name);
state = StreamState::kCleanData;
}
cfored_name = cfored_request.payload_cfored_reg().cfored_name();
CRANE_INFO("Cfored {} registered.", cfored_name);

ok = stream_writer->WriteCforedRegistrationAck({});
if (ok) {
state = StreamState::kWaitMsg;
} else {
CRANE_ERROR(
"Failed to send msg to cfored {}. Connection is broken. "
"Exiting...",
cfored_name);
state = StreamState::kCleanData;
}

} else {
state = StreamState::kCleanData;
}
Expand Down Expand Up @@ -776,15 +777,16 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
};

meta.cb_task_cancel = [writer_weak_ptr](task_id_t task_id) {
CRANE_TRACE("Sending TaskCancelRequest in task_cancel", task_id);
if (auto writer = writer_weak_ptr.lock(); writer)
writer->WriteTaskCancelRequest(task_id);
};

meta.cb_task_completed = [this, i_type, cfored_name,
writer_weak_ptr](task_id_t task_id) {
CRANE_TRACE("Sending TaskCompletionAckReply in task_completed",
task_id);
if (auto writer = writer_weak_ptr.lock(); writer)
meta.cb_task_completed = [this, i_type, cfored_name, writer_weak_ptr](
task_id_t task_id,
bool send_completion_ack) {
if (auto writer = writer_weak_ptr.lock();
writer && send_completion_ack)
writer->WriteTaskCompletionAckReply(task_id);
m_ctld_server_->m_mtx_.Lock();

Expand Down Expand Up @@ -830,8 +832,7 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
case StreamCforedRequest::TASK_COMPLETION_REQUEST: {
auto const &payload = cfored_request.payload_task_complete_req();
CRANE_TRACE("Recv TaskCompletionReq of Task #{}", payload.task_id());

if (g_task_scheduler->TerminatePendingOrRunningTask(
if (g_task_scheduler->TerminatePendingOrRunningIaTask(
payload.task_id()) != CraneErr::kOk)
stream_writer->WriteTaskCompletionAckReply(payload.task_id());
} break;
Expand Down Expand Up @@ -965,8 +966,7 @@ CtldServer::SubmitTaskToScheduler(std::unique_ptr<TaskInCtld> task) {
task->Username(), task->partition_id, task->account));
}

auto enable_res =
g_account_manager->CheckIfUserOfAccountIsEnabled(
auto enable_res = g_account_manager->CheckIfUserOfAccountIsEnabled(
task->Username(), task->account);
if (!enable_res) {
return std::unexpected(enable_res.error());
Expand Down
6 changes: 2 additions & 4 deletions src/CraneCtld/CtldGrpcServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ class CforedStreamWriter {

bool WriteTaskResAllocReply(
task_id_t task_id,
std::expected<std::pair<std::string, std::list<std::string>>,
std::string>
std::expected<std::pair<std::string, std::list<CranedId>>, std::string>
res) {
LockGuard guard(&m_stream_mtx_);
if (!m_valid_) return false;
Expand Down Expand Up @@ -121,8 +120,7 @@ class CforedStreamWriter {
return m_stream_->Write(reply);
}

bool WriteCforedRegistrationAck(
const std::expected<void, std::string> &res) {
bool WriteCforedRegistrationAck(const std::expected<void, std::string> &res) {
LockGuard guard(&m_stream_mtx_);
if (!m_valid_) return false;

Expand Down
10 changes: 7 additions & 3 deletions src/CraneCtld/CtldPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,14 @@ struct InteractiveMetaInTask {

std::string sh_script;
std::string term_env;
bool pty;
std::function<void(task_id_t, std::string const&,
std::list<std::string> const&)>
cb_task_res_allocated;
std::function<void(task_id_t)> cb_task_completed;

// only for calloc.
std::function<void(task_id_t, bool)> cb_task_completed;

// This will ask front end like crun/calloc to exit
std::function<void(task_id_t)> cb_task_cancel;

// only for crun.
Expand Down Expand Up @@ -493,8 +495,10 @@ struct TaskInCtld {
InteractiveMeta.interactive_type =
val.interactive_meta().interactive_type();
if (InteractiveMeta.interactive_type ==
crane::grpc::InteractiveTaskType::Crun)
crane::grpc::InteractiveTaskType::Crun) {
InteractiveMeta.term_env = val.interactive_meta().term_env();
InteractiveMeta.pty = val.interactive_meta().pty();
}
}

node_num = val.node_num();
Expand Down
92 changes: 56 additions & 36 deletions src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,22 +686,37 @@ void TaskScheduler::ScheduleThread_() {
task->allocated_craneds_regex =
util::HostNameListToStr(task->CranedIds());

// Task execute on all node, otherwise on the first node
bool launch_on_all_nodes;
if (task->type == crane::grpc::Batch) {
// For cbatch tasks whose --node > 1,
// only execute the command at the first allocated node.
task->executing_craned_ids.emplace_back(task->CranedIds().front());
launch_on_all_nodes = false;
} else {
const auto& meta = std::get<InteractiveMetaInTask>(task->meta);
if (meta.interactive_type == crane::grpc::Calloc)
// For calloc tasks we still need to execute a dummy empty task to
// set up a timer.
task->executing_craned_ids.emplace_back(task->CranedIds().front());
else
// For crun tasks we need to execute tasks on all allocated nodes.
for (auto const& craned_id : task->CranedIds())
task->executing_craned_ids.emplace_back(craned_id);
launch_on_all_nodes = false;
else {
// For crun tasks we need to execute tasks on all allocated
// nodes.

// Crun task with pty only launch on first node
if (task->TaskToCtld().interactive_meta().pty())
launch_on_all_nodes = false;
else
launch_on_all_nodes = true;
}
}

if (launch_on_all_nodes) {
for (auto const& craned_id : task->CranedIds())
task->executing_craned_ids.emplace_back(craned_id);
} else
task->executing_craned_ids.emplace_back(task->CranedIds().front());
}

end = std::chrono::steady_clock::now();
CRANE_TRACE(
"Set task fields costed {} ms",
Expand Down Expand Up @@ -1273,18 +1288,12 @@ crane::grpc::CancelTaskReply TaskScheduler::CancelPendingOrRunningTask(
reply.add_not_cancelled_tasks(task_id);
reply.add_not_cancelled_reasons("Permission Denied.");
} else {
bool is_calloc = false;
if (task->type == crane::grpc::Interactive) {
auto& meta = std::get<InteractiveMetaInTask>(task->meta);
if (meta.interactive_type == crane::grpc::Calloc) is_calloc = true;

if (is_calloc && !meta.has_been_cancelled_on_front_end) {
if (!meta.has_been_cancelled_on_front_end) {
meta.has_been_cancelled_on_front_end = true;
meta.cb_task_cancel(task_id);
}
}

if (is_calloc) {
reply.add_cancelled_tasks(task_id);
} else {
CraneErr err = TerminateRunningTaskNoLock_(task);
Expand Down Expand Up @@ -1457,8 +1466,18 @@ void TaskScheduler::CleanCancelQueueCb_() {

if (task->type == crane::grpc::Interactive) {
auto& meta = std::get<InteractiveMetaInTask>(task->meta);
g_thread_pool->detach_task([cb = meta.cb_task_cancel,
task_id = task->TaskId()] { cb(task_id); });
// Cancel request may not come from crun/calloc, ask them to exit
if (!meta.has_been_cancelled_on_front_end) {
meta.has_been_cancelled_on_front_end = true;
g_thread_pool->detach_task([cb = meta.cb_task_cancel,
task_id = task->TaskId()] { cb(task_id); });
} else {
// Cancel request from crun/calloc, reply CompletionAck
g_thread_pool->detach_task(
[cb = meta.cb_task_completed, task_id = task->TaskId()] {
cb(task_id, true);
});
}
}
}

Expand Down Expand Up @@ -1619,34 +1638,35 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() {
task->SetStatus(new_status);
} else {
auto& meta = std::get<InteractiveMetaInTask>(task->meta);
if (meta.interactive_type == crane::grpc::Calloc) {
// TaskStatusChange may indicate the time limit has been reached and
// the task has been terminated. No more TerminateTask RPC should be
// sent to the craned node if any further CancelTask or
// TaskCompletionRequest RPC is received.
meta.has_been_terminated_on_craned = true;

if (new_status == crane::grpc::ExceedTimeLimit ||
exit_code == ExitCode::kExitCodeCranedDown) {
meta.has_been_cancelled_on_front_end = true;
meta.cb_task_cancel(task->TaskId());
task->SetStatus(new_status);
} else {
task->SetStatus(crane::grpc::Completed);
}
meta.cb_task_completed(task->TaskId());
} else { // Crun
if (++meta.status_change_cnt < task->node_num) {
if (meta.interactive_type == crane::grpc::Crun) { // Crun
if (++meta.status_change_cnt < task->executing_craned_ids.size()) {
CRANE_TRACE(
"{}/{} TaskStatusChanges of Crun task #{} were received. "
"Keep waiting...",
meta.status_change_cnt, task->node_num, task->TaskId());
meta.status_change_cnt, task->executing_craned_ids.size(),
task->TaskId());
continue;
}
}

task->SetStatus(new_status);
meta.cb_task_completed(task->TaskId());
// TaskStatusChange may indicate the time limit has been reached and
// the task has been terminated. No more TerminateTask RPC should be
// sent to the craned node if any further CancelTask or
// TaskCompletionRequest RPC is received.

// Task end triggered by craned.
if (!meta.has_been_cancelled_on_front_end) {
meta.has_been_cancelled_on_front_end = true;
meta.cb_task_cancel(task->TaskId());
// Completion ack will send in grpc server triggered by task complete
// req
meta.cb_task_completed(task->TaskId(), false);
} else {
// Send Completion Ack to frontend now.
meta.cb_task_completed(task->TaskId(), true);
}

task->SetStatus(new_status);
}

task->SetExitCode(exit_code);
Expand Down
19 changes: 16 additions & 3 deletions src/CraneCtld/TaskScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,21 +254,34 @@ class TaskScheduler {
crane::grpc::CancelTaskReply CancelPendingOrRunningTask(
const crane::grpc::CancelTaskRequest& request);

CraneErr TerminatePendingOrRunningTask(uint32_t task_id) {
CraneErr TerminatePendingOrRunningIaTask(uint32_t task_id) {
LockGuard pending_guard(&m_pending_task_map_mtx_);
LockGuard running_guard(&m_running_task_map_mtx_);

auto pd_it = m_pending_task_map_.find(task_id);
if (pd_it != m_pending_task_map_.end()) {
auto& task = pd_it->second;
if (task->type == crane::grpc::TaskType::Interactive) {
auto& meta = std::get<InteractiveMetaInTask>(task->meta);
meta.has_been_cancelled_on_front_end = true;
}
m_cancel_task_queue_.enqueue(
CancelPendingTaskQueueElem{.task = std::move(pd_it->second)});
CancelPendingTaskQueueElem{.task = std::move(task)});
m_cancel_task_async_handle_->send();
m_pending_task_map_.erase(pd_it);
return CraneErr::kOk;
}

auto rn_it = m_running_task_map_.find(task_id);
if (rn_it == m_running_task_map_.end()) return CraneErr::kNonExistent;
if (rn_it == m_running_task_map_.end())
return CraneErr::kNonExistent;
else {
auto& task = rn_it->second;
if (task->type == crane::grpc::TaskType::Interactive) {
auto& meta = std::get<InteractiveMetaInTask>(task->meta);
meta.has_been_cancelled_on_front_end = true;
}
}

return TerminateRunningTaskNoLock_(rn_it->second.get());
}
Expand Down
1 change: 1 addition & 0 deletions src/Craned/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ target_link_libraries(craned

cxxopts
Threads::Threads
${LIBUTIL_LIBRARY}
nlohmann_json::nlohmann_json

absl::flat_hash_map
Expand Down
Loading
Loading