Skip to content

Commit

Permalink
squash. Launching multiple agent factories and delay servers for relo…
Browse files Browse the repository at this point in the history
…ading seems to work. Need to plumb shutdown through agent read-write loop for graceful shutdown.
  • Loading branch information
korydraughn committed Sep 26, 2024
1 parent 5778a7f commit 1d59f2f
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 110 deletions.
1 change: 1 addition & 0 deletions lib/core/include/irods/rodsErrorTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ NEW_ERROR(TYPE_NOT_SUPPORTED, -177000)
NEW_ERROR(AUTHENTICATION_ERROR, -178000)
NEW_ERROR(SOCKET_ERROR, -179000)
NEW_ERROR(CONFIGURATION_ERROR, -180000)
NEW_ERROR(SHUTDOWN_SEQUENCE_INITIATED, -181000)

/** @} */

Expand Down
4 changes: 0 additions & 4 deletions server/core/src/initServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,11 +703,8 @@ initRsCommWithStartupPack( rsComm_t *rsComm, startupPack_t *startupPack, bool& r

if (startupPack) {
rsComm->connectCnt = startupPack->connectCnt;
log_agent::info("{}: startupPack->connectCnt = [{}]", __func__, startupPack->connectCnt);
rsComm->irodsProt = startupPack->irodsProt;
log_agent::info("{}: startupPack->irodsProt = [{}]", __func__, startupPack->irodsProt);
rsComm->reconnFlag = startupPack->reconnFlag;
log_agent::info("{}: startupPack->reconnFlag = [{}]", __func__, startupPack->reconnFlag);
rstrcpy( rsComm->proxyUser.userName, startupPack->proxyUser, NAME_LEN );
if ( strcmp( startupPack->proxyUser, PUBLIC_USER_NAME ) == 0 ) {
rsComm->proxyUser.authInfo.authFlag = PUBLIC_USER_AUTH;
Expand All @@ -731,7 +728,6 @@ initRsCommWithStartupPack( rsComm_t *rsComm, startupPack_t *startupPack, bool& r
else {
opt_str.copy(rsComm->option, sizeof(RsComm::option));
}
log_agent::info("{}: rsComm->option = [{}]", __func__, rsComm->option);
}
else { /* have to depend on env variable */
tmpStr = getenv( SP_NEW_SOCK );
Expand Down
175 changes: 98 additions & 77 deletions server/main_server/src/agent_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ namespace
auto make_agent_pid_file() -> std::string;
auto create_agent_pid_file_for_ips(const RsComm& _comm, std::time_t _created_at) -> void;

auto wait_for_agents_to_terminate(bool _use_wnohang) -> void;
auto reap_agent_processes(bool _shutting_down) -> void;
} // anonymous namespace

int main(int _argc, char* _argv[])
Expand Down Expand Up @@ -345,22 +345,9 @@ int main(int _argc, char* _argv[])
while (true) {
if (g_terminate) {
log_af::info("{}: Received shutdown instruction. Exiting agent factory main loop.", __func__);
// TODO Send shutdown message to main server process.
break;
}

if (g_reload_config) {
log_af::info("{}: Received configuration reload instruction. Reloading configuration.", __func__);
irods::environment_properties::instance().capture(); // TODO This MUST NOT assume /var/lib/irods.

// Must use .reload() to avoid accidentally removing the reCacheSalt
// from server_properties.
irods::server_properties::instance().reload(); // TODO This MUST NOT assume /etc/irods/server_config.json.

load_log_levels_for_loggers();
g_reload_config = 0;
}

// NOLINTNEXTLINE(hicpp-signed-bitwise, cppcoreguidelines-pro-bounds-constant-array-index)
FD_SET(svrComm.sock, &sockMask);

Expand All @@ -371,6 +358,11 @@ int main(int _argc, char* _argv[])

// FIXME Replace use of select() with epoll() or Boost.Asio.
while ((numSock = select(svrComm.sock + 1, &sockMask, nullptr, nullptr, &time_out)) == -1) {
if (g_terminate) {
log_af::info("{}: Received shutdown instruction. Exiting agent factory select() loop.", __func__);
break;
}

// "select" modifies the timeval structure, so reset it.
time_out.tv_sec = 0;
time_out.tv_usec = 500 * 1000;
Expand All @@ -386,6 +378,11 @@ int main(int _argc, char* _argv[])
return 1;
}

if (g_terminate) {
log_af::info("{}: Received shutdown instruction. Exiting agent factory main loop.", __func__);
break;
}

if (0 == numSock) {
// "select" modifies the timeval structure, so reset it.
time_out.tv_sec = 0;
Expand Down Expand Up @@ -423,7 +420,14 @@ int main(int _argc, char* _argv[])

// Start shutting everything down.

wait_for_agents_to_terminate(false);
// Do not accept new client requests (i.e. close the listening socket).
close(svrComm.sock);

// Instruct all agents to shutdown gracefully.
// To avoid unnecessary complexity, we use SIGUSR1 as the termination signal for agents.
kill(0, SIGUSR1);

reap_agent_processes(true);

log_af::info("{}: Shutdown complete.", __func__);

Expand Down Expand Up @@ -471,47 +475,35 @@ namespace
{
// DO NOT memset sigaction structures!

// SIGINT
std::signal(SIGINT, SIG_IGN);

// SIGTERM
struct sigaction sa_terminate; // NOLINT(cppcoreguidelines-pro-type-member-init)
sigemptyset(&sa_terminate.sa_mask);
sa_terminate.sa_flags = 0;
sa_terminate.sa_flags = SA_SIGINFO;
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
sa_terminate.sa_handler = [](int) { g_terminate = 1; };
if (sigaction(SIGINT, &sa_terminate, nullptr) == -1) {
return -1;
}

// SIGTERM
if (sigaction(SIGTERM, &sa_terminate, nullptr) == -1) {
return -1;
}
sa_terminate.sa_sigaction = [](int, siginfo_t* _siginfo, void*) {
const auto saved_errno = errno;

// SIGHUP
struct sigaction sa_sighup; // NOLINT(cppcoreguidelines-pro-type-member-init)
sigemptyset(&sa_sighup.sa_mask);
sa_sighup.sa_flags = SA_SIGINFO;
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
sa_sighup.sa_sigaction = [](int, siginfo_t* _siginfo, void*) {
// Only respond to SIGHUP if the main server process triggered it.
// This keeps the main server and its children in sync.
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
// Only respond to SIGTERM if the main server process sent it.
if (getppid() == _siginfo->si_pid) {
g_reload_config = 1;
g_terminate = 1;
}

errno = saved_errno;
};
if (sigaction(SIGHUP, &sa_sighup, nullptr) == -1) {
if (sigaction(SIGTERM, &sa_terminate, nullptr) == -1) {
return -1;
}

// SIGCHLD
// This signal is disabled by default.
struct sigaction sa_sigchld; // NOLINT(cppcoreguidelines-pro-type-member-init)
sigemptyset(&sa_sigchld.sa_mask);
sa_sigchld.sa_flags = 0;
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access, cppcoreguidelines-pro-type-cstyle-cast)
sa_sigchld.sa_handler = [](int) {
const auto saved_errno = errno;
wait_for_agents_to_terminate(true);
reap_agent_processes(false);
errno = saved_errno;
};
if (sigaction(SIGCHLD, &sa_sigchld, nullptr) == -1) {
Expand Down Expand Up @@ -684,16 +676,15 @@ namespace
} // initServer

auto initServerMain(RsComm& _comm,
const bool enable_test_mode = false,
const bool write_to_stdout = false) -> int
const bool enable_test_mode = false,
const bool write_to_stdout = false) -> int
{
std::memset(&_comm, 0, sizeof(RsComm));
int status = getRodsEnv(&_comm.myEnv);
if (status < 0) {
log_af::error("{}: getRodsEnv error. status = {}", __func__, status);
return status;
}
//initAndClearProcLog();

// TODO Verify this is reading from the correct location.
// Consider non-pkg installs and pkg installs.
Expand Down Expand Up @@ -747,27 +738,49 @@ namespace

log_af::info("{}: Server Release version {} - API Version {} is up", __func__, RODS_REL_VERSION, RODS_API_VERSION);

// TODO Likely unnecessary.
// Record port, PID, and CWD into a well-known file.
//recordServerProcess(_comm);

#if 0
// Setup the delay server CRON task.
// The delay server will launch just before we enter the server's main loop.
ix::cron::cron_builder delay_server;
const auto migrate_delay_server_sleep_time =
get_advanced_setting(irods::KW_CFG_MIGRATE_DELAY_SERVER_SLEEP_TIME_IN_SECONDS, 5);
delay_server.interval(migrate_delay_server_sleep_time).task([enable_test_mode, write_to_stdout] {
migrate_delay_server(enable_test_mode, write_to_stdout);
});
ix::cron::cron::instance().add_task(delay_server.build());
#endif
return 0;
} // initServerMain

auto handle_client_request(int _socket_fd, std::time_t _created_at) -> int
{
// TODO Reset signal dispositions.
// Setup signal handlers for agent.

// SIGINT is ignored to protect the agents from being killed when running the
// server in the foreground (i.e. no daemonization). The described behavior is
// observed by running the server in the foreground, enabling the default behavior
// of SIGINT, and entering CTRL-C in the bash terminal.
std::signal(SIGINT, SIG_IGN);

// SIGTERM is ignored to avoid unwanted termination of the agent during a full
// shutdown of the iRODS server.
std::signal(SIGTERM, SIG_IGN);

// To avoid unnecessary complexity for configuration reload, shutdown, and signals,
// we use SIGUSR1 as the signal for instructing the agents to shutdown.
struct sigaction sa_terminate; // NOLINT(cppcoreguidelines-pro-type-member-init)
sigemptyset(&sa_terminate.sa_mask);
sa_terminate.sa_flags = SA_SIGINFO;
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
sa_terminate.sa_sigaction = [](int, siginfo_t* _siginfo, void*) {
// Only respond to SIGUSR1 if the agent factory triggered it.
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access)
if (getppid() == _siginfo->si_pid) {
g_terminate = 1;
}
};
if (sigaction(SIGUSR1, &sa_terminate, nullptr) == -1) {
return -1; // TODO Use a better error code or log instead.
}

// SIGPIPE is ignored so that agents can detect socket issues to other servers.
std::signal(SIGPIPE, SIG_IGN);

std::signal(SIGCHLD, SIG_DFL);

std::signal(SIGHUP, SIG_IGN);

//std::signal(SIGABRT, SIG_DFL);
//std::signal(SIGUSR1, SIG_DFL);

irods::experimental::log::set_server_type("agent");

Expand All @@ -783,7 +796,6 @@ namespace
return 1;
}

//net_obj->socket_handle(_comm.sock);
net_obj->socket_handle(_socket_fd);
startupPack_t* startupPack = nullptr;
struct timeval tv;
Expand Down Expand Up @@ -957,21 +969,7 @@ namespace
cleanupAndExit(status);
}

// Add agent information to the agent pid table for ips.
// TODO Remove this
//logAgentProc(&rsComm);
#if 0
namespace apt = irods::experimental::agent_pid_table;
apt::agent_pid_info pid_info{};
pid_info.pid = getpid();
pid_info.created_at = _created_at;
std::strncpy(pid_info.client_addr, rsComm.clientAddr, sizeof(apt::agent_pid_info::client_addr));
std::strncpy(pid_info.client_username, rsComm.clientUser.userName, sizeof(apt::agent_pid_info::client_username));
std::strncpy(pid_info.client_zone, rsComm.clientUser.rodsZone, sizeof(apt::agent_pid_info::client_zone));
apt::insert(pid_info);
#else
create_agent_pid_file_for_ips(rsComm, _created_at);
#endif

// call initialization for network plugin as negotiated
irods::network_object_ptr new_net_obj;
Expand Down Expand Up @@ -1098,6 +1096,11 @@ namespace
irods::dynamic_cast_hack();

while (status >= 0) {
if (g_terminate) {
log_agent::trace("{}: Received shutdown instruction. Agent is shutting down.", __func__);
break;
}

// set default to the native auth scheme here.
if (!_comm.auth_scheme) {
_comm.auth_scheme = strdup("native");
Expand Down Expand Up @@ -1257,16 +1260,34 @@ namespace
}
} // create_agent_pid_file_for_ips

auto wait_for_agents_to_terminate(bool _use_wnohang) -> void
auto reap_agent_processes(bool _shutting_down) -> void
{
pid_t pid;
int status;
char agent_pid[16];
char agent_pid_file_path[1024]; // TODO _POSIX_PATH_MAX?

const auto flags = _use_wnohang ? WNOHANG : 0;
const auto flags = _shutting_down ? 0: WNOHANG;

while (true) {
pid = waitpid(-1, &status, flags);

// This branch is for when we're shutting down (i.e. not in a signal handler).
// That is - continue to loop until all agents have terminated.
if (_shutting_down) {
if (-1 == pid && ECHILD == errno) {
//log_af::info("{}: NO MORE AGENTS TO REAP!", __func__); // TODO Remove
// All agents have terminated.
break;
}
}
// This branch is for when we're in a signal handler.
// That is - continue to loop as long as waitpid returns a valid pid.
else if (pid <= 0) {
break;
}

while ((pid = waitpid(-1, &status, flags)) > 0) {
//log_af::info("{}: REAPING AGENT [{}]", __func__, pid); // TODO Remove
auto [p, ec] = std::to_chars(agent_pid, agent_pid + sizeof(agent_pid), pid);
if (std::errc{} != ec) {
continue;
Expand All @@ -1282,5 +1303,5 @@ namespace

unlink(agent_pid_file_path);
}
} // wait_for_agents_to_terminate
} // reap_agent_processes
} // anonymous namespace
Loading

0 comments on commit 1d59f2f

Please sign in to comment.