From 1cfecac3ff8d3b937651a092efb23ee8da9dd287 Mon Sep 17 00:00:00 2001 From: "Qirui(Keery) Nie" Date: Mon, 18 Sep 2023 11:16:10 +0800 Subject: [PATCH] fix(postgres): close socket actively when timeout happens during query (#11480) Currently, we do set/keep socket keepalive after every Postgres SQL query, based on keepalive timeout configured or lua_socket_keepalive_timeout(default 60s). This could go wrong under some cases, when a query encounters read timeout when trying to receive data from a database with high load, the query ends on Kong's side but the query result may be sent back after timeout happens, and the result data will be lingering inside the socket buffer, and the socket itself get reused for subsequent query, then the subsequent query might get the incorrect result from the previous query. The PR checks the query result's err string, and if any error happens, it'll try to close the socket actively so that the subsequent query will establish new clean ones. Fix FTI-5322 (cherry picked from commit d2da4dbb372db3687f1dfae33ba422c384b61024) --- CHANGELOG/unreleased/kong/11480.yaml | 7 ++++ kong/db/strategies/postgres/connector.lua | 34 ++++++++++++------ spec/02-integration/03-db/01-db_spec.lua | 44 +++++++++++++++++++++++ 3 files changed, 75 insertions(+), 10 deletions(-) create mode 100644 CHANGELOG/unreleased/kong/11480.yaml diff --git a/CHANGELOG/unreleased/kong/11480.yaml b/CHANGELOG/unreleased/kong/11480.yaml new file mode 100644 index 000000000000..96f396355587 --- /dev/null +++ b/CHANGELOG/unreleased/kong/11480.yaml @@ -0,0 +1,7 @@ +message: Fix a problem that abnormal socket connection will be reused when querying Postgres database. +type: bugfix +scope: Core +prs: + - 11480 +jiras: + - "FTI-5322" diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index e0c8d886074a..3d932a518421 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -497,6 +497,7 @@ function _mt:query(sql, operation) operation = "write" end + local conn, is_new_conn local res, err, partial, num_queries local ok @@ -505,23 +506,36 @@ function _mt:query(sql, operation) return nil, "error acquiring query semaphore: " .. err end - local conn = self:get_stored_connection(operation) - if conn then - res, err, partial, num_queries = conn:query(sql) - - else - local connection + conn = self:get_stored_connection(operation) + if not conn then local config = operation == "write" and self.config or self.config_ro - connection, err = connect(config) - if not connection then + conn, err = connect(config) + if not conn then self:release_query_semaphore_resource(operation) return nil, err end + is_new_conn = true + end + + res, err, partial, num_queries = conn:query(sql) - res, err, partial, num_queries = connection:query(sql) + -- if err is string then either it is a SQL error + -- or it is a socket error, here we abort connections + -- that encounter errors instead of reusing them, for + -- safety reason + if err and type(err) == "string" then + ngx.log(ngx.DEBUG, "SQL query throw error: ", err, ", close connection") + local _, err = conn:disconnect() + if err then + -- We're at the end of the query - just logging if + -- we cannot cleanup the connection + ngx.log(ngx.ERR, "failed to disconnect: ", err) + end + self.store_connection(nil, operation) - setkeepalive(connection) + elseif is_new_conn then + setkeepalive(conn) end self:release_query_semaphore_resource(operation) diff --git a/spec/02-integration/03-db/01-db_spec.lua b/spec/02-integration/03-db/01-db_spec.lua index b7b46d7e8b2d..cd11e9bfdfd7 100644 --- a/spec/02-integration/03-db/01-db_spec.lua +++ b/spec/02-integration/03-db/01-db_spec.lua @@ -447,6 +447,50 @@ for _, strategy in helpers.each_strategy() do end) end) + describe("#testme :query() [#" .. strategy .. "]", function() + lazy_setup(function() + helpers.get_db_utils(strategy, {}) + end) + + postgres_only("establish new connection when error occurred", function() + ngx.IS_CLI = false + + local conf = utils.deep_copy(helpers.test_conf) + conf.pg_ro_host = conf.pg_host + conf.pg_ro_user = conf.pg_user + + local db, err = DB.new(conf, strategy) + + assert.is_nil(err) + assert.is_table(db) + assert(db:init_connector()) + assert(db:connect()) + + local res, err = db.connector:query("SELECT now();") + assert.not_nil(res) + assert.is_nil(err) + + local old_conn = db.connector:get_stored_connection("write") + assert.not_nil(old_conn) + + local res, err = db.connector:query("SELECT * FROM not_exist_table;") + assert.is_nil(res) + assert.not_nil(err) + + local new_conn = db.connector:get_stored_connection("write") + assert.is_nil(new_conn) + + local res, err = db.connector:query("SELECT now();") + assert.not_nil(res) + assert.is_nil(err) + + local res, err = db.connector:query("SELECT now();") + assert.not_nil(res) + assert.is_nil(err) + + assert(db:close()) + end) + end) describe(":setkeepalive() [#" .. strategy .. "]", function() lazy_setup(function()