Skip to content

Commit

Permalink
squash. Implemented graceful shutdown of agent factory and agents. TO…
Browse files Browse the repository at this point in the history
…DO: Make SSL socket logic handle graceful shutdown.
  • Loading branch information
korydraughn committed Oct 5, 2024
1 parent 9328578 commit d9679da
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 16 deletions.
1 change: 1 addition & 0 deletions lib/core/include/irods/rodsErrorTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ NEW_ERROR(AUTHENTICATION_ERROR, -178000)
NEW_ERROR(SOCKET_ERROR, -179000)
NEW_ERROR(CONFIGURATION_ERROR, -180000)
NEW_ERROR(SHUTDOWN_SEQUENCE_INITIATED, -181000)
NEW_ERROR(INTERRUPT_DETECTED, -182000)

/** @} */

Expand Down
45 changes: 30 additions & 15 deletions plugins/network/src/tcp.cpp
Original file line number Diff line number Diff line change
@@ -1,42 +1,44 @@
#include "irods/irods_at_scope_exit.hpp"
#include "irods/rodsDef.h"
#include "irods/msParam.h"
#include "irods/rcConnect.h"
#include "irods/rodsErrorTable.h"
#include "irods/sockComm.h"
#include "irods/irods_network_plugin.hpp"
#include "irods/irods_network_constants.hpp"
#include "irods/irods_tcp_object.hpp"
#include "irods/irods_stacktrace.hpp"
#include "irods/sockCommNetworkInterface.hpp"
#include "irods/rcMisc.h"
#include "irods/irods_logger.hpp"

#include <cstdio>
#include <sstream>
#include <string>
#include <iostream>

namespace
{
using log_net = irods::experimental::log::network;
} // anonymous namespace

// =-=-=-=-=-=-=-
// local function to read a buffer from a socket
irods::error tcp_socket_read(
int _socket,
void* _buffer,
int _length,
int& _bytes_read,
struct timeval* _time_value ) {
// =-=-=-=-=-=-=-
// Initialize the file descriptor set
fd_set set;
struct timeval* _time_value )
{
log_net::debug("{}: BEGIN", __func__);

// =-=-=-=-=-=-=-
// local copy of time value?
struct timeval timeout;
irods::at_scope_exit log_exit{[fn = __func__] { log_net::debug("{}: END", fn); }};

// =-=-=-=-=-=-=-
// local working variables
int len_to_read = _length;
char* read_ptr = static_cast<char*>( _buffer );

// =-=-=-=-=-=-=-
// reset bytes read
fd_set set;
struct timeval timeout;
int len_to_read = _length;
char* read_ptr = static_cast<char*>( _buffer );
_bytes_read = 0;

while ( len_to_read > 0 ) {
Expand All @@ -46,30 +48,41 @@ irods::error tcp_socket_read(
FD_SET(_socket, &set);
timeout = *_time_value;

log_net::debug("{}: Calling select() with timeout [{}.{}].", __func__, timeout.tv_sec, timeout.tv_usec);
const int status = select( _socket + 1, &set, NULL, NULL, &timeout );

if ( status == 0 ) { // the select has timed out
log_net::debug("{}: select() timed out. timeout object now holds [{}.{}].", __func__, timeout.tv_sec, timeout.tv_usec);
return ERROR( SYS_SOCK_READ_TIMEDOUT, boost::format("socket timeout with [%d] bytes read") % _bytes_read);
}

if ( status < 0 ) {
log_net::debug("{}: select() encountered an error [{}], errno = [{}].", __func__, status, errno);

if ( errno == EINTR ) {
continue;
// TODO Need a way to detect that SIGUSR1 signal was received.
// Perhaps we read the value of g_terminate or simply return and handle
// termination at an earlier place in the callstack.
//continue;
return ERROR(INTERRUPT_DETECTED, fmt::format("{} interrupted by signal", __func__));
}

return ERROR( SYS_SOCK_READ_ERR - errno, boost::format("error on select after [%d] bytes read") % _bytes_read);
} // else
} // if tv

log_net::debug("{}: Reading [{}] bytes from socket [{}].", __func__, len_to_read, _socket);
int num_bytes = read( _socket, ( void * ) read_ptr, len_to_read );
if ( num_bytes < 0 ) {
log_net::debug("{}: read() encountered an error [{}], errno = [{}].", __func__, num_bytes, errno);
if ( EINTR == errno ) {
errno = 0;
num_bytes = 0;
} else {
return ERROR(SYS_SOCK_READ_ERR - errno, boost::format("error reading from socket after [%d] bytes read") % _bytes_read);
}
} else if ( num_bytes == 0 ) {
log_net::debug("{}: read() returned 0. Peer must have disconnected.", __func__);
break;
}

Expand Down Expand Up @@ -163,6 +176,7 @@ irods::error tcp_read_msg_header(
irods::plugin_context& _ctx,
void* _buffer,
struct timeval* _time_val ) {
log_net::debug("{}: BEGIN", __func__);
// =-=-=-=-=-=-=-
// check the context
irods::error ret = _ctx.valid< irods::tcp_object >();
Expand Down Expand Up @@ -433,6 +447,7 @@ irods::error read_bytes_buf(
bytesBuf_t* _buffer,
irodsProt_t _protocol,
struct timeval* _time_val ) {
log_net::debug("{}: BEGIN", __func__);
// =-=-=-=-=-=-=-
// trap input buffer ptr
if ( !_buffer || !_buffer->buf ) {
Expand Down
23 changes: 23 additions & 0 deletions server/core/include/irods/agent_globals.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#ifndef IRODS_AGENT_GLOBALS_HPP
#define IRODS_AGENT_GLOBALS_HPP

// Globals aren't the best, but given that C++17 supports initialization
// of global variable across translation units without violating ODR, it
// is okay to define globals for the agent factory here.
//
// With that said, let's keep this to a minimum. Defining global variables
// in this file should be a last resort.

#include <csignal>

// This global is the flag which allows agents to gracefully react to
// stop instructions from the agent factory.
//
// It is set by signal handlers defined by the agent factory and MUST NOT
// be used for anything else.
//
// Low-level systems which need to react to stop instructions should include
// this file and check this flag.
inline volatile std::sig_atomic_t g_terminate = 0; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)

#endif // IRODS_AGENT_GLOBALS_HPP
20 changes: 20 additions & 0 deletions server/core/src/rsApiHandler.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "irods/rsApiHandler.hpp"

#include "irods/agent_globals.hpp"
#include "irods/modDataObjMeta.h"
#include "irods/rcMisc.h"
#include "irods/miscServerFunct.hpp"
Expand Down Expand Up @@ -541,12 +543,23 @@ readAndProcClientMsg( rsComm_t * rsComm, int flags ) {
while ( 1 ) {
ret = readMsgHeader( net_obj, &myHeader, &tv );
if ( !ret.ok() ) {
log_agent::info("{}: error code = [{}]", __func__, ret.code());

if (ret.code() == INTERRUPT_DETECTED) {
// Check if the agent factory requested for the agent to stop.
if (g_terminate) {
log_agent::info("{}: Received instruction to shutdown. Agent is shutting down.", __func__, ret.code());
return SHUTDOWN_SEQUENCE_INITIATED;
}
}

if ( isL1descInuse() && retryCnt < MAX_READ_HEADER_RETRY ) {
rodsLogError( LOG_ERROR, status,
"readAndProcClientMsg:readMsgHeader error. status = %d", ret.code() );
retryCnt++;
continue;
}

if ( ret.code() == USER_SOCK_CONNECT_TIMEDOUT ) {
rodsLog( LOG_ERROR,
"readAndProcClientMsg: readMsgHeader by pid %d timedout",
Expand All @@ -559,6 +572,13 @@ readAndProcClientMsg( rsComm_t * rsComm, int flags ) {
}
else {
ret = readMsgHeader( net_obj, &myHeader, NULL );
if (!ret.ok() && ret.code() == INTERRUPT_DETECTED) {
// Check if the agent factory requested for the agent to stop.
if (g_terminate) {
log_agent::info("{}: Received instruction to shutdown. Agent is shutting down.", __func__, ret.code());
return SHUTDOWN_SEQUENCE_INITIATED;
}
}
}

if ( !ret.ok() ) {
Expand Down
4 changes: 3 additions & 1 deletion server/main_server/src/agent_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// - shared memory originally initialized by the main server process
// - stacktrace watcher

#include "irods/agent_globals.hpp"
#include "irods/agent_pid_table.hpp"
#include "irods/client_api_allowlist.hpp"
#include "irods/dns_cache.hpp"
Expand Down Expand Up @@ -116,7 +117,8 @@ namespace
using log_af = irods::experimental::log::agent_factory;
using log_agent = irods::experimental::log::agent;

volatile std::sig_atomic_t g_terminate = 0; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)
// TODO Remove
//volatile std::sig_atomic_t g_terminate = 0; // NOLINT(cppcoreguidelines-avoid-non-const-global-variables)

auto init_logger(const nlohmann::json& _config) -> void;
auto load_log_levels_for_loggers() -> void;
Expand Down

0 comments on commit d9679da

Please sign in to comment.