From d84d082a7a9863fd067a7ac9197b56de6ddbc0b7 Mon Sep 17 00:00:00 2001 From: Noah Misch Date: Mon, 18 Dec 2023 12:11:54 -0800 Subject: [PATCH 1/3] Consistently #include "pglogical_compat.h" after postgresql headers. The compatibility macros mess up some postgresql headers, yielding compilation failures. pglogical doesn't use affected headers now, so there's no live bug. The next change will add use of such a header. --- pglogical_conflict.c | 1 + pglogical_output_plugin.h | 3 --- pglogical_output_proto.c | 1 + pglogical_proto_json.c | 1 + pglogical_proto_native.c | 1 + 5 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pglogical_conflict.c b/pglogical_conflict.c index a37e53f..4a5a626 100644 --- a/pglogical_conflict.c +++ b/pglogical_conflict.c @@ -47,6 +47,7 @@ #include "pglogical_conflict.h" #include "pglogical_proto_native.h" +#include "pglogical.h" int pglogical_conflict_resolver = PGLOGICAL_RESOLVE_APPLY_REMOTE; int pglogical_conflict_log_level = LOG; diff --git a/pglogical_output_plugin.h b/pglogical_output_plugin.h index aa843ae..406f9f1 100644 --- a/pglogical_output_plugin.h +++ b/pglogical_output_plugin.h @@ -16,9 +16,6 @@ #include "nodes/pg_list.h" #include "nodes/primnodes.h" -/* summon cross-PG-version compatibility voodoo */ -#include "pglogical_compat.h" - /* typedef appears in pglogical_output_plugin.h */ typedef struct PGLogicalOutputData { diff --git a/pglogical_output_proto.c b/pglogical_output_proto.c index 15e00bd..9739d7f 100644 --- a/pglogical_output_proto.c +++ b/pglogical_output_proto.c @@ -17,6 +17,7 @@ #include "pglogical_output_proto.h" #include "pglogical_proto_native.h" #include "pglogical_proto_json.h" +#include "pglogical.h" PGLogicalProtoAPI * pglogical_init_api(PGLogicalProtoType typ) diff --git a/pglogical_proto_json.c b/pglogical_proto_json.c index 6650cba..d1b7dd5 100644 --- a/pglogical_proto_json.c +++ b/pglogical_proto_json.c @@ -37,6 +37,7 @@ #include "pglogical_output_plugin.h" #include "pglogical_proto_json.h" +#include "pglogical.h" #ifdef HAVE_REPLICATION_ORIGINS #include "replication/origin.h" diff --git a/pglogical_proto_native.c b/pglogical_proto_native.c index 7a94036..696f809 100644 --- a/pglogical_proto_native.c +++ b/pglogical_proto_native.c @@ -29,6 +29,7 @@ #include "pglogical_output_plugin.h" #include "pglogical_output_proto.h" #include "pglogical_proto_native.h" +#include "pglogical.h" #define IS_REPLICA_IDENTITY 1 From 10ac262aa4ff898570a9e281f84a5b17a783e810 Mon Sep 17 00:00:00 2001 From: Noah Misch Date: Mon, 18 Dec 2023 12:11:54 -0800 Subject: [PATCH 2/3] Call PQgetResult() to get the final result status of each COPY. https://www.postgresql.org/docs/current/libpq-copy.html directs COPY callers to do this. By not doing this, pglogical risked ignoring a late failure in the COPY FROM query. This change unblocks replacing PQexec() with PQsendQuery(). The former silently discards any prior query result, but the latter fails. Since plain PQgetResult() blocks without responding to interrupts, import the PostgreSQL v17 libpq/libpq-be-fe-helpers.h for its interruptible helper. Adapt the header to v9.4+ with PG_VERSION_NUM conditions and changing WL_EXIT_ON_PM_DEATH to explicit proc_exit(). Divide the header into two include-guarded regions, one for v16 content and one for v17+ content. --- compat10/pglogical_compat.h | 2 + compat11/pglogical_compat.h | 3 + compat12/pglogical_compat.h | 3 + compat13/pglogical_compat.h | 3 + compat14/pglogical_compat.h | 3 + compat15/pglogical_compat.h | 3 + compat16/pglogical_compat.h | 4 + compat94/pglogical_compat.h | 3 + compat94/pglogical_libpq-be-fe-helpers.h | 438 +++++++++++++++++++++++ compat95/pglogical_compat.h | 3 + compat96/pglogical_compat.h | 3 + pglogical.h | 2 - pglogical_sync.c | 21 +- 13 files changed, 486 insertions(+), 5 deletions(-) create mode 100644 compat94/pglogical_libpq-be-fe-helpers.h diff --git a/compat10/pglogical_compat.h b/compat10/pglogical_compat.h index 6893861..f0970bc 100644 --- a/compat10/pglogical_compat.h +++ b/compat10/pglogical_compat.h @@ -8,6 +8,8 @@ #include "replication/origin.h" #include "utils/varlena.h" +#include "compat94/pglogical_libpq-be-fe-helpers.h" + #define PGLCreateTrigger CreateTrigger #define WaitLatchOrSocket(latch, wakeEvents, sock, timeout) \ diff --git a/compat11/pglogical_compat.h b/compat11/pglogical_compat.h index a555c12..4a1493f 100644 --- a/compat11/pglogical_compat.h +++ b/compat11/pglogical_compat.h @@ -1,9 +1,12 @@ #ifndef PG_LOGICAL_COMPAT_H #define PG_LOGICAL_COMPAT_H +#include "pgstat.h" #include "replication/origin.h" #include "utils/varlena.h" +#include "compat94/pglogical_libpq-be-fe-helpers.h" + #define WaitLatchOrSocket(latch, wakeEvents, sock, timeout) \ WaitLatchOrSocket(latch, wakeEvents, sock, timeout, PG_WAIT_EXTENSION) diff --git a/compat12/pglogical_compat.h b/compat12/pglogical_compat.h index af2ce82..e0f3e03 100644 --- a/compat12/pglogical_compat.h +++ b/compat12/pglogical_compat.h @@ -1,6 +1,7 @@ #ifndef PG_LOGICAL_COMPAT_H #define PG_LOGICAL_COMPAT_H +#include "pgstat.h" #include "access/amapi.h" #include "access/heapam.h" #include "access/table.h" @@ -8,6 +9,8 @@ #include "replication/origin.h" #include "utils/varlena.h" +#include "compat94/pglogical_libpq-be-fe-helpers.h" + #define WaitLatchOrSocket(latch, wakeEvents, sock, timeout) \ WaitLatchOrSocket(latch, wakeEvents, sock, timeout, PG_WAIT_EXTENSION) diff --git a/compat13/pglogical_compat.h b/compat13/pglogical_compat.h index bbb50ca..0d295c6 100644 --- a/compat13/pglogical_compat.h +++ b/compat13/pglogical_compat.h @@ -1,6 +1,7 @@ #ifndef PG_LOGICAL_COMPAT_H #define PG_LOGICAL_COMPAT_H +#include "pgstat.h" #include "access/amapi.h" #include "access/heapam.h" #include "access/table.h" @@ -8,6 +9,8 @@ #include "replication/origin.h" #include "utils/varlena.h" +#include "compat94/pglogical_libpq-be-fe-helpers.h" + #define WaitLatchOrSocket(latch, wakeEvents, sock, timeout) \ WaitLatchOrSocket(latch, wakeEvents, sock, timeout, PG_WAIT_EXTENSION) diff --git a/compat14/pglogical_compat.h b/compat14/pglogical_compat.h index 990eb75..6e43eee 100644 --- a/compat14/pglogical_compat.h +++ b/compat14/pglogical_compat.h @@ -6,6 +6,9 @@ #include "access/table.h" #include "access/tableam.h" #include "utils/varlena.h" +#include "utils/wait_event.h" + +#include "compat94/pglogical_libpq-be-fe-helpers.h" #define WaitLatchOrSocket(latch, wakeEvents, sock, timeout) \ WaitLatchOrSocket(latch, wakeEvents, sock, timeout, PG_WAIT_EXTENSION) diff --git a/compat15/pglogical_compat.h b/compat15/pglogical_compat.h index 3591d94..ed40c1b 100644 --- a/compat15/pglogical_compat.h +++ b/compat15/pglogical_compat.h @@ -6,6 +6,9 @@ #include "access/table.h" #include "access/tableam.h" #include "utils/varlena.h" +#include "utils/wait_event.h" + +#include "compat94/pglogical_libpq-be-fe-helpers.h" #define WaitLatchOrSocket(latch, wakeEvents, sock, timeout) \ WaitLatchOrSocket(latch, wakeEvents, sock, timeout, PG_WAIT_EXTENSION) diff --git a/compat16/pglogical_compat.h b/compat16/pglogical_compat.h index a607519..b781e78 100644 --- a/compat16/pglogical_compat.h +++ b/compat16/pglogical_compat.h @@ -6,6 +6,10 @@ #include "access/table.h" #include "access/tableam.h" #include "utils/varlena.h" +#include "utils/wait_event.h" + +#include "libpq/libpq-be-fe-helpers.h" +#include "compat94/pglogical_libpq-be-fe-helpers.h" #define WaitLatchOrSocket(latch, wakeEvents, sock, timeout) \ WaitLatchOrSocket(latch, wakeEvents, sock, timeout, PG_WAIT_EXTENSION) diff --git a/compat94/pglogical_compat.h b/compat94/pglogical_compat.h index e5973a9..b749017 100644 --- a/compat94/pglogical_compat.h +++ b/compat94/pglogical_compat.h @@ -12,6 +12,9 @@ #include "storage/lwlock.h" #include "utils/array.h" +#define PG_WAIT_EXTENSION 0 +#include "compat94/pglogical_libpq-be-fe-helpers.h" + /* 9.4 lacks PG_*_MAX */ #ifndef PG_UINT32_MAX #define PG_UINT32_MAX (0xFFFFFFFF) diff --git a/compat94/pglogical_libpq-be-fe-helpers.h b/compat94/pglogical_libpq-be-fe-helpers.h new file mode 100644 index 0000000..f6fb167 --- /dev/null +++ b/compat94/pglogical_libpq-be-fe-helpers.h @@ -0,0 +1,438 @@ +/*------------------------------------------------------------------------- + * + * libpq-be-fe-helpers.h + * Helper functions for using libpq in extensions + * + * Code built directly into the backend is not allowed to link to libpq + * directly. Extension code is allowed to use libpq however. However, libpq + * used in extensions has to be careful not to block inside libpq, otherwise + * interrupts will not be processed, leading to issues like unresolvable + * deadlocks. Backend code also needs to take care to acquire/release an + * external fd for the connection, otherwise fd.c's accounting of fd's is + * broken. + * + * This file provides helper functions to make it easier to comply with these + * rules. It is a header only library as it needs to be linked into each + * extension using libpq, and it seems too small to be worth adding a + * dedicated static library for. + * + * TODO: For historical reasons the connections established here are not put + * into non-blocking mode. That can lead to blocking even when only the async + * libpq functions are used. This should be fixed. + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/libpq/libpq-be-fe-helpers.h + * + *------------------------------------------------------------------------- + */ +#ifndef LIBPQ_BE_FE_HELPERS_H +#define LIBPQ_BE_FE_HELPERS_H + +/* + * Despite the name, BUILDING_DLL is set only when building code directly part + * of the backend. Which also is where libpq isn't allowed to be + * used. Obviously this doesn't protect against libpq-fe.h getting included + * otherwise, but perhaps still protects against a few mistakes... + */ +#ifdef BUILDING_DLL +#error "libpq may not be used code directly built into the backend" +#endif + +#include "libpq-fe.h" +#include "miscadmin.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/proc.h" + + +#if PG_VERSION_NUM < 100000 +#define WaitLatchOrSocket(latch, wakeEvents, sock, timeout, wait_event_info) \ + WaitLatchOrSocket(latch, wakeEvents, sock, timeout) +#else +#define WaitLatchOrSocket(latch, wakeEvents, sock, timeout, wait_event_info) \ + WaitLatchOrSocket(latch, wakeEvents, sock, timeout, wait_event_info) +#endif + +#if PG_VERSION_NUM < 130000 +#define AcquireExternalFD() (true) +#define ReleaseExternalFD() do {} while (0) +#endif + +#if PG_VERSION_NUM < 90500 +#define MyLatch (&MyProc->procLatch) +#endif + + +static inline void libpqsrv_connect_prepare(void); +static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info); + + +/* + * PQconnectdb() wrapper that reserves a file descriptor and processes + * interrupts during connection establishment. + * + * Throws an error if AcquireExternalFD() fails, but does not throw if + * connection establishment itself fails. Callers need to use PQstatus() to + * check if connection establishment succeeded. + */ +static inline PGconn * +libpqsrv_connect(const char *conninfo, uint32 wait_event_info) +{ + PGconn *conn = NULL; + + libpqsrv_connect_prepare(); + + conn = PQconnectStart(conninfo); + + libpqsrv_connect_internal(conn, wait_event_info); + + return conn; +} + +/* + * Like libpqsrv_connect(), except that this is a wrapper for + * PQconnectdbParams(). + */ +static inline PGconn * +libpqsrv_connect_params(const char *const *keywords, + const char *const *values, + int expand_dbname, + uint32 wait_event_info) +{ + PGconn *conn = NULL; + + libpqsrv_connect_prepare(); + + conn = PQconnectStartParams(keywords, values, expand_dbname); + + libpqsrv_connect_internal(conn, wait_event_info); + + return conn; +} + +/* + * PQfinish() wrapper that additionally releases the reserved file descriptor. + * + * It is allowed to call this with a NULL pgconn iff NULL was returned by + * libpqsrv_connect*. + */ +static inline void +libpqsrv_disconnect(PGconn *conn) +{ + /* + * If no connection was established, we haven't reserved an FD for it (or + * already released it). This rule makes it easier to write PG_CATCH() + * handlers for this facility's users. + * + * See also libpqsrv_connect_internal(). + */ + if (conn == NULL) + return; + + ReleaseExternalFD(); + PQfinish(conn); +} + + +/* internal helper functions follow */ + + +/* + * Helper function for all connection establishment functions. + */ +static inline void +libpqsrv_connect_prepare(void) +{ + /* + * We must obey fd.c's limit on non-virtual file descriptors. Assume that + * a PGconn represents one long-lived FD. (Doing this here also ensures + * that VFDs are closed if needed to make room.) + */ + if (!AcquireExternalFD()) + { +#ifndef WIN32 /* can't write #if within ereport() macro */ + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not establish connection"), + errdetail("There are too many open files on the local server."), + errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits."))); +#else + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not establish connection"), + errdetail("There are too many open files on the local server."), + errhint("Raise the server's max_files_per_process setting."))); +#endif + } +} + +/* + * Helper function for all connection establishment functions. + */ +static inline void +libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info) +{ + /* + * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do + * that here. + */ + if (conn == NULL) + { + ReleaseExternalFD(); + return; + } + + /* + * Can't wait without a socket. Note that we don't want to close the libpq + * connection yet, so callers can emit a useful error. + */ + if (PQstatus(conn) == CONNECTION_BAD) + return; + + /* + * WaitLatchOrSocket() can conceivably fail, handle that case here instead + * of requiring all callers to do so. + */ + PG_TRY(); + { + PostgresPollingStatusType status; + + /* + * Poll connection until we have OK or FAILED status. + * + * Per spec for PQconnectPoll, first wait till socket is write-ready. + */ + status = PGRES_POLLING_WRITING; + while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED) + { + int io_flag; + int rc; + + if (status == PGRES_POLLING_READING) + io_flag = WL_SOCKET_READABLE; +#ifdef WIN32 + + /* + * Windows needs a different test while waiting for + * connection-made + */ + else if (PQstatus(conn) == CONNECTION_STARTED) + io_flag = WL_SOCKET_CONNECTED; +#endif + else + io_flag = WL_SOCKET_WRITEABLE; + + rc = WaitLatchOrSocket(MyLatch, + WL_POSTMASTER_DEATH | WL_LATCH_SET | io_flag, + PQsocket(conn), + 0, + wait_event_info); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + /* Interrupted? */ + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* If socket is ready, advance the libpq state machine */ + if (rc & io_flag) + status = PQconnectPoll(conn); + } + } + PG_CATCH(); + { + /* + * If an error is thrown here, the callers won't call + * libpqsrv_disconnect() with a conn, so release resources + * immediately. + */ + ReleaseExternalFD(); + PQfinish(conn); + + PG_RE_THROW(); + } + PG_END_TRY(); +} + +#undef WaitLatchOrSocket +#undef AcquireExternalFD +#undef ReleaseExternalFD +#undef MyLatch + +#endif /* LIBPQ_BE_FE_HELPERS_H */ + +/* + * Preceding half of file is v16+ header content, and following half is v17+ + * header content. By using separate include guards, v16 can include both + * this header and PostgreSQL's libpq/libpq-be-fe-helpers.h. + */ + +#ifndef PGLOGICAL_LIBPQ_BE_FE_HELPERS_H +#define PGLOGICAL_LIBPQ_BE_FE_HELPERS_H + +#include "libpq-fe.h" +#include "miscadmin.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/proc.h" + + +#if PG_VERSION_NUM >= 100000 +#define WaitLatchOrSocket(latch, wakeEvents, sock, timeout, wait_event_info) \ + WaitLatchOrSocket(latch, wakeEvents, sock, timeout, wait_event_info) +#else +#define WaitLatchOrSocket(latch, wakeEvents, sock, timeout, wait_event_info) \ + WaitLatchOrSocket(latch, wakeEvents, sock, timeout) +#endif + +#if PG_VERSION_NUM < 90500 +#define MyLatch (&MyProc->procLatch) +#endif + + +static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info); +static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info); + +/* + * PQexec() wrapper that processes interrupts. + * + * Unless PQsetnonblocking(conn, 1) is in effect, this can't process + * interrupts while pushing the query text to the server. Consider that + * setting if query strings can be long relative to TCP buffer size. + * + * This has the preconditions of PQsendQuery(), not those of PQexec(). Most + * notably, PQexec() would silently discard any prior query results. + */ +static inline PGresult * +libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info) +{ + if (!PQsendQuery(conn, query)) + return NULL; + return libpqsrv_get_result_last(conn, wait_event_info); +} + +/* + * PQexecParams() wrapper that processes interrupts. + * + * See notes at libpqsrv_exec(). + */ +static inline PGresult * +libpqsrv_exec_params(PGconn *conn, + const char *command, + int nParams, + const Oid *paramTypes, + const char *const *paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat, + uint32 wait_event_info) +{ + if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues, + paramLengths, paramFormats, resultFormat)) + return NULL; + return libpqsrv_get_result_last(conn, wait_event_info); +} + +/* + * Like PQexec(), loop over PQgetResult() until it returns NULL or another + * terminal state. Return the last non-NULL result or the terminal state. + */ +static inline PGresult * +libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info) +{ + PGresult *volatile lastResult = NULL; + + /* In what follows, do not leak any PGresults on an error. */ + PG_TRY(); + { + for (;;) + { + /* Wait for, and collect, the next PGresult. */ + PGresult *result; + + result = libpqsrv_get_result(conn, wait_event_info); + if (result == NULL) + break; /* query is complete, or failure */ + + /* + * Emulate PQexec()'s behavior of returning the last result when + * there are many. + */ + PQclear(lastResult); + lastResult = result; + + if (PQresultStatus(lastResult) == PGRES_COPY_IN || + PQresultStatus(lastResult) == PGRES_COPY_OUT || + PQresultStatus(lastResult) == PGRES_COPY_BOTH || + PQstatus(conn) == CONNECTION_BAD) + break; + } + } + PG_CATCH(); + { + PQclear(lastResult); + PG_RE_THROW(); + } + PG_END_TRY(); + + return lastResult; +} + +/* + * Perform the equivalent of PQgetResult(), but watch for interrupts. + */ +static inline PGresult * +libpqsrv_get_result(PGconn *conn, uint32 wait_event_info) +{ + /* + * Collect data until PQgetResult is ready to get the result without + * blocking. + */ + while (PQisBusy(conn)) + { + int rc; + + rc = WaitLatchOrSocket(MyLatch, + WL_POSTMASTER_DEATH | WL_LATCH_SET | + WL_SOCKET_READABLE, + PQsocket(conn), + 0, + wait_event_info); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + /* Interrupted? */ + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + /* Consume whatever data is available from the socket */ + if (PQconsumeInput(conn) == 0) + { + /* trouble; expect PQgetResult() to return NULL */ + break; + } + } + + /* Now we can collect and return the next PGresult */ + return PQgetResult(conn); +} + + +#undef WaitLatchOrSocket +#undef MyLatch + +#endif /* PGLOGICAL_LIBPQ_BE_FE_HELPERS_H */ diff --git a/compat95/pglogical_compat.h b/compat95/pglogical_compat.h index 85cb766..7c19d05 100644 --- a/compat95/pglogical_compat.h +++ b/compat95/pglogical_compat.h @@ -9,6 +9,9 @@ #include "replication/origin.h" #include "storage/lwlock.h" +#define PG_WAIT_EXTENSION 0 +#include "compat94/pglogical_libpq-be-fe-helpers.h" + extern LWLockPadded *GetNamedLWLockTranche(const char *tranche_name); extern void RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks); diff --git a/compat96/pglogical_compat.h b/compat96/pglogical_compat.h index 509b262..5035420 100644 --- a/compat96/pglogical_compat.h +++ b/compat96/pglogical_compat.h @@ -7,6 +7,9 @@ #include "executor/executor.h" #include "replication/origin.h" +#define PG_WAIT_EXTENSION 0 +#include "compat94/pglogical_libpq-be-fe-helpers.h" + #define PGLCreateTrigger CreateTrigger #define RawStmt Node diff --git a/pglogical.h b/pglogical.h index fc7ebb3..52e0371 100644 --- a/pglogical.h +++ b/pglogical.h @@ -20,8 +20,6 @@ #include "executor/executor.h" #include "miscadmin.h" -#include "libpq-fe.h" - #include "pglogical_fe.h" #include "pglogical_node.h" diff --git a/pglogical_sync.c b/pglogical_sync.c index 8e63815..e292601 100644 --- a/pglogical_sync.c +++ b/pglogical_sync.c @@ -21,8 +21,6 @@ #include #endif -#include "libpq-fe.h" - #include "miscadmin.h" #include "access/genam.h" @@ -649,6 +647,7 @@ copy_table_data(PGconn *origin_conn, PGconn *target_conn, errdetail("Query '%s': %s", query.data, PQerrorMessage(origin_conn)))); } + PQclear(res); while ((bytes = PQgetCopyData(origin_conn, ©buf, false)) > 0) { @@ -671,6 +670,15 @@ copy_table_data(PGconn *origin_conn, PGconn *target_conn, errdetail("source connection returned %d: %s", bytes, PQerrorMessage(origin_conn)))); } + res = libpqsrv_get_result_last(origin_conn, PG_WAIT_EXTENSION); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + ereport(ERROR, + (errmsg("reading from origin table failed"), + errdetail("Query '%s': %s", query.data, + PQerrorMessage(origin_conn)))); + } + PQclear(res); /* Send local finish */ if (PQputCopyEnd(target_conn, NULL) != 1) @@ -680,7 +688,14 @@ copy_table_data(PGconn *origin_conn, PGconn *target_conn, errdetail("destination connection reported: %s", PQerrorMessage(target_conn)))); } - + res = libpqsrv_get_result_last(target_conn, PG_WAIT_EXTENSION); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + ereport(ERROR, + (errmsg("writing to target table failed"), + errdetail("destination connection reported: %s", + PQerrorMessage(target_conn)))); + } PQclear(res); elog(INFO, "finished synchronization of data for table %s.%s", From e39216093e051c00102b29ccc150b8562daf9870 Mon Sep 17 00:00:00 2001 From: Noah Misch Date: Mon, 18 Dec 2023 12:11:54 -0800 Subject: [PATCH 3/3] Replace backend PQexec() and PQexecParams() with interruptible helpers. This prevents v15+ DROP DATABASE hanging when a pglogical process waits inside one of these calls. (See https://github.com/2ndQuadrant/pglogical/issues/418 for fixes of that symptom under different conditions.) In all versions, it brings responsiveness of these calls to query cancel, backend termination, and fast shutdown. --- pglogical.c | 5 +++-- pglogical_rpc.c | 37 +++++++++++++++++++++---------------- pglogical_sync.c | 17 +++++++++-------- 3 files changed, 33 insertions(+), 26 deletions(-) diff --git a/pglogical.c b/pglogical.c index 2e1209c..958471e 100644 --- a/pglogical.c +++ b/pglogical.c @@ -431,7 +431,8 @@ pglogical_identify_system(PGconn *streamConn, uint64* sysid, { PGresult *res; - res = PQexec(streamConn, "IDENTIFY_SYSTEM"); + res = libpqsrv_exec(streamConn, "IDENTIFY_SYSTEM", + PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_TUPLES_OK) { elog(ERROR, "could not send replication command \"%s\": %s", @@ -578,7 +579,7 @@ pglogical_start_replication(PGconn *streamConn, const char *slot_name, appendStringInfoChar(&command, ')'); - res = PQexec(streamConn, command.data); + res = libpqsrv_exec(streamConn, command.data, PG_WAIT_EXTENSION); sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); if (PQresultStatus(res) != PGRES_COPY_BOTH) elog(FATAL, "could not send replication command \"%s\": %s\n, sqlstate: %s", diff --git a/pglogical_rpc.c b/pglogical_rpc.c index ddb2c12..d77d5dd 100644 --- a/pglogical_rpc.c +++ b/pglogical_rpc.c @@ -80,7 +80,7 @@ pg_logical_get_remote_repset_tables(PGconn *conn, List *replication_sets) repsetarr.data); } - res = PQexec(conn, query.data); + res = libpqsrv_exec(conn, query.data, PG_WAIT_EXTENSION); /* TODO: better error message? */ if (PQresultStatus(res) != PGRES_TUPLES_OK) elog(ERROR, "could not get table list: %s", PQresultErrorMessage(res)); @@ -162,7 +162,7 @@ pg_logical_get_remote_repset_table(PGconn *conn, RangeVar *rv, repsetarr.data); } - res = PQexec(conn, query.data); + res = libpqsrv_exec(conn, query.data, PG_WAIT_EXTENSION); /* TODO: better error message? */ if (PQresultStatus(res) != PGRES_TUPLES_OK || PQntuples(res) != 1) elog(ERROR, "could not get table list: %s", PQresultErrorMessage(res)); @@ -195,11 +195,12 @@ pglogical_remote_slot_active(PGconn *conn, const char *slot_name) values[0] = slot_name; - res = PQexecParams(conn, - "SELECT plugin, active " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_name = $1", - 1, types, values, NULL, NULL, 0); + res = libpqsrv_exec_params(conn, + "SELECT plugin, active " + "FROM pg_catalog.pg_replication_slots " + "WHERE slot_name = $1", + 1, types, values, NULL, NULL, 0, + PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -245,11 +246,12 @@ pglogical_drop_remote_slot(PGconn *conn, const char *slot_name) values[0] = slot_name; /* Check if the slot exists */ - res = PQexecParams(conn, - "SELECT plugin " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_name = $1", - 1, types, values, NULL, NULL, 0); + res = libpqsrv_exec_params(conn, + "SELECT plugin " + "FROM pg_catalog.pg_replication_slots " + "WHERE slot_name = $1", + 1, types, values, NULL, NULL, 0, + PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -277,8 +279,9 @@ pglogical_drop_remote_slot(PGconn *conn, const char *slot_name) PQclear(res); - res = PQexecParams(conn, "SELECT pg_drop_replication_slot($1)", - 1, types, values, NULL, NULL, 0); + res = libpqsrv_exec_params(conn, "SELECT pg_drop_replication_slot($1)", + 1, types, values, NULL, NULL, 0, + PG_WAIT_EXTENSION); /* And finally, drop the slot. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) @@ -297,7 +300,8 @@ pglogical_remote_node_info(PGconn *conn, Oid *nodeid, char **node_name, char **s { PGresult *res; - res = PQexec(conn, "SELECT node_id, node_name, sysid, dbname, replication_sets FROM pglogical.pglogical_node_info()"); + res = libpqsrv_exec(conn, "SELECT node_id, node_name, sysid, dbname, replication_sets FROM pglogical.pglogical_node_info()", + PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_TUPLES_OK) elog(ERROR, "could not fetch remote node info: %s\n", PQerrorMessage(conn)); @@ -351,7 +355,8 @@ pglogical_remote_function_exists(PGconn *conn, const char *nspname, " AND %s = ANY (proargnames)", PQescapeLiteral(conn, argname, strlen(argname))); - res = PQexecParams(conn, query.data, 2, types, values, NULL, NULL, 0); + res = libpqsrv_exec_params(conn, query.data, 2, types, values, NULL, NULL, + 0, PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_TUPLES_OK) elog(ERROR, "could not fetch remote function info: %s\n", diff --git a/pglogical_sync.c b/pglogical_sync.c index e292601..6ba159b 100644 --- a/pglogical_sync.c +++ b/pglogical_sync.c @@ -308,7 +308,7 @@ ensure_replication_slot_snapshot(PGconn *sql_conn, PGconn *repl_conn, use_failover_slot ? " FAILOVER" : ""); - res = PQexec(repl_conn, query.data); + res = libpqsrv_exec(repl_conn, query.data, PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_TUPLES_OK) { @@ -385,7 +385,7 @@ start_copy_origin_tx(PGconn *conn, const char *snapshot) appendStringInfo(&query, "SET TRANSACTION SNAPSHOT %s;\n", s); } - res = PQexec(conn, query.data); + res = libpqsrv_exec(conn, query.data, PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_COMMAND_OK) elog(ERROR, "BEGIN on origin node failed: %s", PQresultErrorMessage(res)); @@ -425,7 +425,7 @@ start_copy_target_tx(PGconn *conn, const char *origin_name) appendStringInfoString(&query, setup_query); - res = PQexec(conn, query.data); + res = libpqsrv_exec(conn, query.data, PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_COMMAND_OK) elog(ERROR, "BEGIN on target node failed: %s", PQresultErrorMessage(res)); @@ -438,7 +438,7 @@ finish_copy_origin_tx(PGconn *conn) PGresult *res; /* Close the transaction and connection on origin node. */ - res = PQexec(conn, "ROLLBACK"); + res = libpqsrv_exec(conn, "ROLLBACK", PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_COMMAND_OK) elog(WARNING, "ROLLBACK on origin node failed: %s", PQresultErrorMessage(res)); @@ -452,7 +452,7 @@ finish_copy_target_tx(PGconn *conn) PGresult *res; /* Close the transaction and connection on target node. */ - res = PQexec(conn, "COMMIT"); + res = libpqsrv_exec(conn, "COMMIT", PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_COMMAND_OK) elog(ERROR, "COMMIT on target node failed: %s", PQresultErrorMessage(res)); @@ -464,7 +464,8 @@ finish_copy_target_tx(PGconn *conn) */ if (PQserverVersion(conn) >= 90500) { - res = PQexec(conn, "SELECT pg_catalog.pg_replication_origin_session_reset();\n"); + res = libpqsrv_exec(conn, "SELECT pg_catalog.pg_replication_origin_session_reset();\n", + PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_TUPLES_OK) elog(WARNING, "Resetting session origin on target node failed: %s", PQresultErrorMessage(res)); @@ -618,7 +619,7 @@ copy_table_data(PGconn *origin_conn, PGconn *target_conn, /* Execute COPY TO. */ - res = PQexec(origin_conn, query.data); + res = libpqsrv_exec(origin_conn, query.data, PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_COPY_OUT) { ereport(ERROR, @@ -639,7 +640,7 @@ copy_table_data(PGconn *origin_conn, PGconn *target_conn, appendStringInfoString(&query, "FROM stdin"); /* Execute COPY FROM. */ - res = PQexec(target_conn, query.data); + res = libpqsrv_exec(target_conn, query.data, PG_WAIT_EXTENSION); if (PQresultStatus(res) != PGRES_COPY_IN) { ereport(ERROR,