Skip to content

Commit

Permalink
Fixes skupperproject#1145: Implement idle connection timeout for tcpL…
Browse files Browse the repository at this point in the history
…istener
  • Loading branch information
kgiusti committed Jul 3, 2023
1 parent 7895c27 commit cfb59de
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 5 deletions.
7 changes: 7 additions & 0 deletions python/skupper_router/management/skrouter.json
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,13 @@
"type": ["up", "down"],
"description": "The operational status of TCP socket listener: up - the service is active and incoming connections are permitted; down - the service is not active and incoming connection attempts will be refused.",
"create": false
},
"idleTimeoutSeconds": {
"type": "integer",
"default": 300,
"description": "The idle timeout, in seconds, for connections through this listener. If no data is transferred over the connection for this time interval, the connection shall be closed. A value of zero disables the timeout",
"required": false,
"create": true
}
}
},
Expand Down
13 changes: 13 additions & 0 deletions src/adaptors/adaptor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ qd_error_t qd_load_adaptor_config(qd_adaptor_config_t *config, qd_entity_t *enti
if (config->backlog <= 0 || config->backlog > SOMAXCONN)
config->backlog = SOMAXCONN;

// Currently idleTimeoutSeconds is only defined for tcpListeners. Since this function is used for all adaptor
// listener and connector configurations the call to get idleTimeoutSeconds will fail in those cases where the
// adaptor configuration is not a tcpListener. Ignore these failures. The schema will provide the default value.
//
long timeout = qd_entity_get_long(entity, "idleTimeoutSeconds");
if (timeout < 0) {
config->idle_timeout = 0;
qd_error_clear();
} else {
// TODO(kgiusti): enforce a minimum value to avoid high CPU load:
config->idle_timeout = timeout;
}

int hplen = strlen(config->host) + strlen(config->port) + 2;
config->host_port = malloc(hplen);
snprintf(config->host_port, hplen, "%s:%s", config->host, config->port);
Expand Down
1 change: 1 addition & 0 deletions src/adaptors/adaptor_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct qd_adaptor_config_t
char *site_id;
char *host_port;
int backlog;
int idle_timeout;

//TLS related info
char *ssl_profile_name;
Expand Down
56 changes: 51 additions & 5 deletions src/adaptors/tcp/tcp_adaptor.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,17 @@ struct qdr_tcp_connection_t {
bool flow_enabled;
bool is_egress_dispatcher_conn;
bool connector_closed;//only used if egress_dispatcher=true
bool in_list; // This connection is in the adaptor's connections list
sys_atomic_t raw_closed_read; // proton event seen
sys_atomic_t raw_closed_write; // proton event seen or write_close called
bool in_list; // This connection is in the adaptor's connections list
bool raw_read_shutdown; // stream closed
bool read_eos_seen;
bool window_disabled; // true: ignore unacked byte window
bool require_tls; // Is TLS required on this connection ?
sys_atomic_t check_idle_conn; // check if connection has been idle
sys_atomic_t raw_closed_read; // proton event seen
sys_atomic_t raw_closed_write; // proton event seen or write_close called
qdr_delivery_t *initial_delivery;
qd_timer_t *activate_timer;
qd_timer_t *idle_timer;
qd_tcp_adaptor_config_t *config; // config
qd_server_t *server;
qd_tls_t *tls;
Expand All @@ -118,6 +121,7 @@ struct qdr_tcp_connection_t {
uint64_t opened_time;
uint64_t last_in_time;
uint64_t last_out_time;
uint64_t idle_snapshot; // bytes_in + bytes_out at last idle check

qd_adaptor_buffer_list_t out_buffs; // Buffers for writing

Expand All @@ -127,10 +131,10 @@ struct qdr_tcp_connection_t {
size_t outgoing_body_bytes; // bytes received from current segment
int outgoing_body_offset; // buffer offset into current segment

pn_raw_buffer_t outgoing_buffs[WRITE_BUFFERS];
int outgoing_buff_count; // number of buffers with data
int outgoing_buff_idx; // first buffer with data
bool require_tls; // Is TLS required on this connection ?
pn_raw_buffer_t outgoing_buffs[WRITE_BUFFERS];

DEQ_LINKS(qdr_tcp_connection_t);
};

Expand Down Expand Up @@ -239,6 +243,17 @@ static void on_activate(void *context)
}
}

// This runs on a proactor timer thread. This thread may run in parallel with a proactor I/O thread that is handling the
// raw connection. However this timer callback is only active when the raw connection is connected and is disabled when
// the connection disconnects (see handle_connection_event). So it should be safe to access the connection's raw_conn.
//
static void on_idle_timer(void *context)
{
qdr_tcp_connection_t *conn = (qdr_tcp_connection_t *) context;
SET_ATOMIC_FLAG(&conn->check_idle_conn);
pn_raw_connection_wake(conn->pn_raw_conn);
}

/**
* Grants read buffers to the pn raw connection.
* This is the ONLY function that should be called if we want to grant read buffers to the raw connection.
Expand Down Expand Up @@ -574,9 +589,11 @@ static void free_qdr_tcp_connection(qdr_tcp_connection_t *tc)
free(tc->remote_address);
free(tc->global_id);
free(tc->alpn_protocol);
sys_atomic_destroy(&tc->check_idle_conn);
sys_atomic_destroy(&tc->raw_closed_read);
sys_atomic_destroy(&tc->raw_closed_write);
qd_timer_free(tc->activate_timer);
qd_timer_free(tc->idle_timer);
sys_mutex_free(&tc->activation_lock);

// Free tls related stuff if need be.
Expand Down Expand Up @@ -988,6 +1005,12 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
qd_set_vflow_netaddr_string(conn->vflow, conn->pn_raw_conn, conn->ingress);
if (conn->ingress) {
qdr_tcp_connection_ingress_accept(conn);
if (conn->idle_timer) {
qd_duration_t timeout_ms = conn->config->adaptor_config->idle_timeout * 1000;
assert(timeout_ms);
conn->idle_snapshot = conn->bytes_in + conn->bytes_out;
qd_timer_schedule(conn->idle_timer, timeout_ms);
}
qd_log(LOG_TCP_ADAPTOR, QD_LOG_INFO,
"[C%" PRIu64 "] PN_RAW_CONNECTION_CONNECTED Listener ingress accepted to %s from %s (global_id=%s)",
conn->conn_id, conn->config->adaptor_config->host_port, conn->remote_address, conn->global_id);
Expand Down Expand Up @@ -1043,6 +1066,9 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
break;
}
case PN_RAW_CONNECTION_DISCONNECTED: {
if (conn->idle_timer)
qd_timer_cancel(conn->idle_timer);

pn_condition_t *cond = pn_raw_connection_condition(conn->pn_raw_conn);
if (!!cond) {
const char *cname = pn_condition_get_name(cond);
Expand Down Expand Up @@ -1089,6 +1115,21 @@ static void handle_connection_event(pn_event_t *e, qd_server_t *qd_server, void
case PN_RAW_CONNECTION_WAKE: {
qd_log(LOG_TCP_ADAPTOR, QD_LOG_DEBUG, "[C%" PRIu64 "] PN_RAW_CONNECTION_WAKE %s", conn->conn_id,
qdr_tcp_connection_role_name(conn));
if (conn->idle_timer && CLEAR_ATOMIC_FLAG(&conn->check_idle_conn)) {
uint64_t new_snapshot = conn->bytes_in + conn->bytes_out;
if (new_snapshot == conn->idle_snapshot) {
// TODO(kgiusti): need vflow update!
qd_log(LOG_TCP_ADAPTOR, QD_LOG_WARNING,
"[C%" PRIu64 "] idle TCP client detected, closing the connection", conn->conn_id);
pn_raw_connection_close(conn->pn_raw_conn); // thread-safe call
break;
} else {
qd_duration_t timeout_ms = conn->config->adaptor_config->idle_timeout * 1000;
assert(timeout_ms);
conn->idle_snapshot = new_snapshot;
qd_timer_schedule(conn->idle_timer, timeout_ms);
}
}
while (qdr_connection_process(conn->qdr_conn)) {}
break;
}
Expand Down Expand Up @@ -1209,9 +1250,14 @@ static qdr_tcp_connection_t *qdr_tcp_connection(qd_tcp_listener_t *listener, qd_
assert(tcp_stats);
assert(server);

if (tc->config->adaptor_config->idle_timeout) {
tc->idle_timer = qd_timer(tcp_adaptor->core->qd, on_idle_timer, tc);
}

tc->conn_id = qd_server_allocate_connection_id(server);
tc->context.context = tc;
tc->context.handler = &handle_connection_event;
sys_atomic_init(&tc->check_idle_conn, 0);
sys_atomic_init(&tc->raw_closed_read, 0);
sys_atomic_init(&tc->raw_closed_write, 0);
sys_mutex_init(&tc->activation_lock);
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ foreach(py_test_module
system_tests_http1_over_tcp
system_tests_tcp_adaptor
system_tests_tcp_adaptor_tls
system_tests_tcp_adaptor_idle_timeout
system_tests_http2_tls
system_tests_heartbeats
system_tests_address_watch
Expand Down
127 changes: 127 additions & 0 deletions tests/system_tests_tcp_adaptor_idle_timeout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import socket

from system_test import Qdrouterd, TIMEOUT, TestCase, unittest
from system_test import main_module, retry_exception, retry


class TcpAdaptorIdleTimeoutTest(TestCase):
"""
Test the TCP adaptor idle connection timeout functionality
"""
@classmethod
def setUpClass(cls):
# Create a single router with a tcpConnector and tcpListener.
# Configure a short idle timeout on the tcpListener
super(TcpAdaptorIdleTimeoutTest, cls).setUpClass()

cls.idle_timeout = 5
cls.service_addr = "idle/timeout"
cls.listener_port = cls.tester.get_port()
cls.connector_port = cls.tester.get_port()
config = [
('router', {'mode': 'interior', 'id': 'TcpIdleTimeout'}),
('listener', {'role': 'normal',
'port': cls.tester.get_port()}),
('tcpListener', {'port': cls.listener_port,
'address': cls.service_addr,
'idleTimeoutSeconds': cls.idle_timeout}),
('tcpConnector', {'host': '127.0.0.1',
'port': cls.connector_port,
'address': cls.service_addr}),
('address', {'prefix': 'closest', 'distribution': 'closest'}),
('address', {'prefix': 'multicast', 'distribution': 'multicast'}),
]

config = Qdrouterd.Config(config)

cls.router = cls.tester.qdrouterd('TcpIdleTimeout',
Qdrouterd.Config(config),
wait=False)
cls.router.wait_startup_message()

def _get_tcp_conn_count(self):
"""
Return the number of currently active TCP connections
"""
CONNECTION_TYPE = 'io.skupper.router.connection'
mgmt = self.router.management
conns = mgmt.query(type=CONNECTION_TYPE, attribute_names=['protocol',
'container',
'host']).get_dicts()
results = [c for c in filter(lambda c:
c['protocol'] == 'tcp' and
c['container'] == 'TcpAdaptor' and
c['host'] != 'egress-dispatch', conns)]
return len(results)

def _is_socket_closed(self, sock):
sock.settimeout(TIMEOUT)
try:
data = sock.recv(4096)
if data == b'':
return True
except Exception as exc:
print(f"Socket Recv Failed! Error={exc}", flush=True)
return False

def test_01_detect_idle_conn(self):
"""
Connect a client and server that do not transfer any data. Wait for the
idle timeout to expire and verify the TCP connections have been
force-closed by the router.
"""

# create the server listening socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener:
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind(("", self.connector_port))
listener.settimeout(TIMEOUT)
listener.listen(1)

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client:
retry_exception(lambda cs=client:
cs.connect(("localhost",
self.listener_port)),
delay=0.25,
exception=ConnectionRefusedError)
client.settimeout(TIMEOUT)

# accept the client connection
server, _ = listener.accept()
try:
# Ensure both conns are established
self.assertTrue(retry(lambda: self._get_tcp_conn_count() == 2))

# Now wait until both conns are torn down
self.assertTrue(retry(lambda: self._get_tcp_conn_count() == 0))

# verify that the client and server sockets are closed
self.assertTrue(retry(lambda sock=client:
self._is_socket_closed(sock)))
self.assertTrue(retry(lambda sock=server:
self._is_socket_closed(sock)))
finally:
server.close()


if __name__ == '__main__':
unittest.main(main_module())

0 comments on commit cfb59de

Please sign in to comment.