From 35113849acb24038a5e3f492358f407703b6b2d1 Mon Sep 17 00:00:00 2001 From: Euler Taveira Date: Wed, 15 Apr 2020 23:23:59 -0300 Subject: [PATCH] Add parameter include-pk This parameter adds primary key information if it is available. Both formats (1 and 2) are supported. Each "pk" object provides column name and its data type. Tests are included. Default is false. --- Makefile | 2 +- README.md | 1 + expected/pk.out | 209 ++++++++++++++++++++++++++++++++++++++ sql/pk.sql | 54 ++++++++++ wal2json.c | 261 +++++++++++++++++++++++++++++++++++++++++++++--- 5 files changed, 513 insertions(+), 14 deletions(-) create mode 100644 expected/pk.out create mode 100644 sql/pk.sql diff --git a/Makefile b/Makefile index 584efa3..c9c6d4f 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ MODULES = wal2json REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \ delete3 delete4 savepoint specialvalue toast bytea message typmod \ filtertable selecttable include_timestamp include_lsn include_xids \ - include_domain_data_type truncate actions position default + include_domain_data_type truncate actions position default pk PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/README.md b/README.md index 111f5bc..79d983c 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,7 @@ Parameters * `include-column-positions`: add column position (_pg_attribute.attnum_). Default is _false_. * `include-not-null`: add _not null_ information as _columnoptionals_. Default is _false_. * `include-default`: add default expression. Default is _false_. +* `include-pk`: add _primary key_ information as _pk_. Column name and data type is included. Default is _false_. * `pretty-print`: add spaces and indentation to JSON structures. Default is _false_. * `write-in-chunks`: write after every change instead of every changeset. Default is _false_. * `include-lsn`: add _nextlsn_ to each changeset. Default is _false_. diff --git a/expected/pk.out b/expected/pk.out new file mode 100644 index 0000000..c999059 --- /dev/null +++ b/expected/pk.out @@ -0,0 +1,209 @@ +\set VERBOSITY terse +-- predictability +SET synchronous_commit = on; +SET extra_float_digits = 0; +CREATE TABLE w2j_pk_with_pk ( +a int, +b timestamp, +c text, +d boolean, +e numeric(5,3), +PRIMARY KEY(b, d, e) +); +CREATE TABLE w2j_pk_without_pk ( +a int, +b timestamp, +c text, +d boolean, +e numeric(5,3) +); +CREATE TABLE w2j_pk_with_ri ( +a int NOT NULL, +b timestamp, +c text, +d boolean, +e numeric(5,3), +UNIQUE(a) +); +ALTER TABLE w2j_pk_with_ri REPLICA IDENTITY USING INDEX w2j_pk_with_ri_a_key; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + ?column? +---------- + init +(1 row) + +INSERT INTO w2j_pk_with_pk (a, b, c, d, e) VALUES(123, '2020-04-26 16:23:59', 'Melanosuchus Niger', true, 4.56); +UPDATE w2j_pk_with_pk SET a = 456, c = 'Panthera Onca', d = false; +DELETE FROM w2j_pk_with_pk; +INSERT INTO w2j_pk_without_pk (a, b, c, d, e) VALUES(123, '2020-04-26 16:23:59', 'Melanosuchus Niger', true, 4.56); +UPDATE w2j_pk_without_pk SET a = 456, c = 'Panthera Onca', d = false; +DELETE FROM w2j_pk_without_pk; +INSERT INTO w2j_pk_with_ri (a, b, c, d, e) VALUES(123, '2020-04-26 16:23:59', 'Inia Araguaiaensis', true, 4.56); +UPDATE w2j_pk_with_ri SET a = 456, c = 'Panthera Onca', d = false; +DELETE FROM w2j_pk_with_ri; +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'pretty-print', '1', 'include-typmod', '0', 'include-pk', '1'); +WARNING: table "w2j_pk_without_pk" without primary key or replica identity is nothing +WARNING: table "w2j_pk_without_pk" without primary key or replica identity is nothing + data +--------------------------------------------------------------------------------------------------------------- + { + + "change": [ + + { + + "kind": "insert", + + "schema": "public", + + "table": "w2j_pk_with_pk", + + "columnnames": ["a", "b", "c", "d", "e"], + + "columntypes": ["int4", "timestamp", "text", "bool", "numeric"], + + "columnvalues": [123, "Sun Apr 26 16:23:59 2020", "Melanosuchus Niger", true, 4.560],+ + "pk": { + + "pknames": ["b", "d", "e"], + + "pktypes": ["timestamp", "bool", "numeric"] + + } + + } + + ] + + } + { + + "change": [ + + { + + "kind": "update", + + "schema": "public", + + "table": "w2j_pk_with_pk", + + "columnnames": ["a", "b", "c", "d", "e"], + + "columntypes": ["int4", "timestamp", "text", "bool", "numeric"], + + "columnvalues": [456, "Sun Apr 26 16:23:59 2020", "Panthera Onca", false, 4.560], + + "pk": { + + "pknames": ["b", "d", "e"], + + "pktypes": ["timestamp", "bool", "numeric"] + + }, + + "oldkeys": { + + "keynames": ["b", "d", "e"], + + "keytypes": ["timestamp", "bool", "numeric"], + + "keyvalues": ["Sun Apr 26 16:23:59 2020", true, 4.560] + + } + + } + + ] + + } + { + + "change": [ + + { + + "kind": "delete", + + "schema": "public", + + "table": "w2j_pk_with_pk", + + "pk": { + + "pknames": ["b", "d", "e"], + + "pktypes": ["timestamp", "bool", "numeric"] + + }, + + "oldkeys": { + + "keynames": ["b", "d", "e"], + + "keytypes": ["timestamp", "bool", "numeric"], + + "keyvalues": ["Sun Apr 26 16:23:59 2020", false, 4.560] + + } + + } + + ] + + } + { + + "change": [ + + { + + "kind": "insert", + + "schema": "public", + + "table": "w2j_pk_without_pk", + + "columnnames": ["a", "b", "c", "d", "e"], + + "columntypes": ["int4", "timestamp", "text", "bool", "numeric"], + + "columnvalues": [123, "Sun Apr 26 16:23:59 2020", "Melanosuchus Niger", true, 4.560] + + } + + ] + + } + { + + "change": [ + + ] + + } + { + + "change": [ + + ] + + } + { + + "change": [ + + { + + "kind": "insert", + + "schema": "public", + + "table": "w2j_pk_with_ri", + + "columnnames": ["a", "b", "c", "d", "e"], + + "columntypes": ["int4", "timestamp", "text", "bool", "numeric"], + + "columnvalues": [123, "Sun Apr 26 16:23:59 2020", "Inia Araguaiaensis", true, 4.560] + + } + + ] + + } + { + + "change": [ + + { + + "kind": "update", + + "schema": "public", + + "table": "w2j_pk_with_ri", + + "columnnames": ["a", "b", "c", "d", "e"], + + "columntypes": ["int4", "timestamp", "text", "bool", "numeric"], + + "columnvalues": [456, "Sun Apr 26 16:23:59 2020", "Panthera Onca", false, 4.560], + + "oldkeys": { + + "keynames": ["a"], + + "keytypes": ["int4"], + + "keyvalues": [123] + + } + + } + + ] + + } + { + + "change": [ + + { + + "kind": "delete", + + "schema": "public", + + "table": "w2j_pk_with_ri", + + "oldkeys": { + + "keynames": ["a"], + + "keytypes": ["int4"], + + "keyvalues": [456] + + } + + } + + ] + + } +(9 rows) + +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'include-pk', '1'); +WARNING: no tuple identifier for UPDATE in table "public"."w2j_pk_without_pk" +WARNING: no tuple identifier for DELETE in table "public"."w2j_pk_without_pk" + data +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + {"action":"B"} + {"action":"I","schema":"public","table":"w2j_pk_with_pk","columns":[{"name":"a","type":"integer","value":123},{"name":"b","type":"timestamp without time zone","value":"Sun Apr 26 16:23:59 2020"},{"name":"c","type":"text","value":"Melanosuchus Niger"},{"name":"d","type":"boolean","value":true},{"name":"e","type":"numeric(5,3)","value":4.560}],"pk":[{"name":"b","type":"timestamp without time zone"},{"name":"d","type":"boolean"},{"name":"e","type":"numeric(5,3)"}]} + {"action":"C"} + {"action":"B"} + {"action":"U","schema":"public","table":"w2j_pk_with_pk","columns":[{"name":"a","type":"integer","value":456},{"name":"b","type":"timestamp without time zone","value":"Sun Apr 26 16:23:59 2020"},{"name":"c","type":"text","value":"Panthera Onca"},{"name":"d","type":"boolean","value":false},{"name":"e","type":"numeric(5,3)","value":4.560}],"identity":[{"name":"b","type":"timestamp without time zone","value":"Sun Apr 26 16:23:59 2020"},{"name":"d","type":"boolean","value":true},{"name":"e","type":"numeric(5,3)","value":4.560}],"pk":[{"name":"b","type":"timestamp without time zone"},{"name":"d","type":"boolean"},{"name":"e","type":"numeric(5,3)"}]} + {"action":"C"} + {"action":"B"} + {"action":"D","schema":"public","table":"w2j_pk_with_pk","identity":[{"name":"b","type":"timestamp without time zone","value":"Sun Apr 26 16:23:59 2020"},{"name":"d","type":"boolean","value":false},{"name":"e","type":"numeric(5,3)","value":4.560}],"pk":[{"name":"b","type":"timestamp without time zone"},{"name":"d","type":"boolean"},{"name":"e","type":"numeric(5,3)"}]} + {"action":"C"} + {"action":"B"} + {"action":"I","schema":"public","table":"w2j_pk_without_pk","columns":[{"name":"a","type":"integer","value":123},{"name":"b","type":"timestamp without time zone","value":"Sun Apr 26 16:23:59 2020"},{"name":"c","type":"text","value":"Melanosuchus Niger"},{"name":"d","type":"boolean","value":true},{"name":"e","type":"numeric(5,3)","value":4.560}],"pk":[]} + {"action":"C"} + {"action":"B"} + {"action":"C"} + {"action":"B"} + {"action":"C"} + {"action":"B"} + {"action":"I","schema":"public","table":"w2j_pk_with_ri","columns":[{"name":"a","type":"integer","value":123},{"name":"b","type":"timestamp without time zone","value":"Sun Apr 26 16:23:59 2020"},{"name":"c","type":"text","value":"Inia Araguaiaensis"},{"name":"d","type":"boolean","value":true},{"name":"e","type":"numeric(5,3)","value":4.560}],"pk":[]} + {"action":"C"} + {"action":"B"} + {"action":"U","schema":"public","table":"w2j_pk_with_ri","columns":[{"name":"a","type":"integer","value":456},{"name":"b","type":"timestamp without time zone","value":"Sun Apr 26 16:23:59 2020"},{"name":"c","type":"text","value":"Panthera Onca"},{"name":"d","type":"boolean","value":false},{"name":"e","type":"numeric(5,3)","value":4.560}],"identity":[{"name":"a","type":"integer","value":123}],"pk":[]} + {"action":"C"} + {"action":"B"} + {"action":"D","schema":"public","table":"w2j_pk_with_ri","identity":[{"name":"a","type":"integer","value":456}],"pk":[]} + {"action":"C"} +(25 rows) + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + stop +(1 row) + +DROP TABLE w2j_pk_with_pk; +DROP TABLE w2j_pk_without_pk; +DROP TABLE w2j_pk_with_ri; diff --git a/sql/pk.sql b/sql/pk.sql new file mode 100644 index 0000000..160454f --- /dev/null +++ b/sql/pk.sql @@ -0,0 +1,54 @@ +\set VERBOSITY terse + +-- predictability +SET synchronous_commit = on; +SET extra_float_digits = 0; + +CREATE TABLE w2j_pk_with_pk ( +a int, +b timestamp, +c text, +d boolean, +e numeric(5,3), +PRIMARY KEY(b, d, e) +); + +CREATE TABLE w2j_pk_without_pk ( +a int, +b timestamp, +c text, +d boolean, +e numeric(5,3) +); + +CREATE TABLE w2j_pk_with_ri ( +a int NOT NULL, +b timestamp, +c text, +d boolean, +e numeric(5,3), +UNIQUE(a) +); +ALTER TABLE w2j_pk_with_ri REPLICA IDENTITY USING INDEX w2j_pk_with_ri_a_key; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + +INSERT INTO w2j_pk_with_pk (a, b, c, d, e) VALUES(123, '2020-04-26 16:23:59', 'Melanosuchus Niger', true, 4.56); +UPDATE w2j_pk_with_pk SET a = 456, c = 'Panthera Onca', d = false; +DELETE FROM w2j_pk_with_pk; + +INSERT INTO w2j_pk_without_pk (a, b, c, d, e) VALUES(123, '2020-04-26 16:23:59', 'Melanosuchus Niger', true, 4.56); +UPDATE w2j_pk_without_pk SET a = 456, c = 'Panthera Onca', d = false; +DELETE FROM w2j_pk_without_pk; + +INSERT INTO w2j_pk_with_ri (a, b, c, d, e) VALUES(123, '2020-04-26 16:23:59', 'Inia Araguaiaensis', true, 4.56); +UPDATE w2j_pk_with_ri SET a = 456, c = 'Panthera Onca', d = false; +DELETE FROM w2j_pk_with_ri; + +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'pretty-print', '1', 'include-typmod', '0', 'include-pk', '1'); +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'include-pk', '1'); +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + +DROP TABLE w2j_pk_with_pk; +DROP TABLE w2j_pk_without_pk; +DROP TABLE w2j_pk_with_ri; diff --git a/wal2json.c b/wal2json.c index b4d3950..c3465a4 100644 --- a/wal2json.c +++ b/wal2json.c @@ -64,6 +64,7 @@ typedef struct bool include_column_positions; /* include column numbers */ bool include_not_null; /* include not-null constraints */ bool include_default; /* include default expressions */ + bool include_pk; /* include primary key */ bool pretty_print; /* pretty-print JSON? */ bool write_in_chunks; /* write in chunks? */ @@ -96,7 +97,8 @@ typedef struct typedef enum { PGOUTPUTJSON_CHANGE, - PGOUTPUTJSON_IDENTITY + PGOUTPUTJSON_IDENTITY, + PGOUTPUTJSON_PK } PGOutputJsonKind; typedef struct SelectTable @@ -132,8 +134,9 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); #endif -static void columns_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, bool hasreplident, Oid reloid); -static void tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, TupleDesc indexdesc, bool replident, bool hasreplident, Oid reloid); +static void columns_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, bool addcomma, Oid reloid); +static void tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, TupleDesc indexdesc, bool replident, bool addcomma, Oid reloid); +static void pk_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, TupleDesc indexdesc, bool addcomma); static bool parse_table_identifier(List *qualified_tables, char separator, List **select_tables); static bool string_to_SelectTable(char *rawstring, char separator, List **select_tables); static bool split_string_to_list(char *rawstring, char separator, List **sl); @@ -244,6 +247,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is data->include_transaction = true; data->include_xids = false; data->include_timestamp = false; + data->include_pk = false; data->include_origin = false; data->include_schemas = true; data->include_types = true; @@ -340,6 +344,19 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "include-pk") == 0) + { + if (elem->arg == NULL) + { + elog(DEBUG1, "include-pk argument is null"); + data->include_pk = true; + } + else if (!parse_bool(strVal(elem->arg), &data->include_pk)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else if (strcmp(elem->defname, "include-origin") == 0) { if (elem->arg == NULL) @@ -924,10 +941,9 @@ pg_decode_commit_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * Accumulate tuple information and stores it at the end * * replident: is this tuple a replica identity? - * hasreplident: does this tuple has an associated replica identity? */ static void -tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, TupleDesc indexdesc, bool replident, bool hasreplident, Oid reloid) +tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, TupleDesc indexdesc, bool replident, bool addcomma, Oid reloid) { JsonDecodingData *data; int natt; @@ -1304,7 +1320,7 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu appendStringInfo(&colnotnulls, "],%s", data->nl); if (data->include_default) appendStringInfo(&coldefaults, "],%s", data->nl); - if (hasreplident) + if (addcomma) appendStringInfo(&colvalues, "],%s", data->nl); else appendStringInfo(&colvalues, "]%s", data->nl); @@ -1339,9 +1355,9 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu /* Print columns information */ static void -columns_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, bool hasreplident, Oid reloid) +columns_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, bool addcomma, Oid reloid) { - tuple_to_stringinfo(ctx, tupdesc, tuple, NULL, false, hasreplident, reloid); + tuple_to_stringinfo(ctx, tupdesc, tuple, NULL, false, addcomma, reloid); } /* Print replica identity information */ @@ -1352,6 +1368,153 @@ identity_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple_to_stringinfo(ctx, tupdesc, tuple, indexdesc, true, false, InvalidOid); } +/* Print primary key information */ +static void +pk_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, TupleDesc indexdesc, bool addcomma) +{ + JsonDecodingData *data; + int natt; + char comma[3] = ""; + + StringInfoData pknames; + StringInfoData pktypes; + + data = ctx->output_plugin_private; + + /* no primary key */ + if (indexdesc == NULL) + return; + + initStringInfo(&pknames); + initStringInfo(&pktypes); + + appendStringInfo(&pknames, "%s%s%s\"pk\":%s{%s", data->ht, data->ht, data->ht, data->sp, data->nl); + appendStringInfo(&pknames, "%s%s%s%s\"pknames\":%s[", data->ht, data->ht, data->ht, data->ht, data->sp); + appendStringInfo(&pktypes, "%s%s%s%s\"pktypes\":%s[", data->ht, data->ht, data->ht, data->ht, data->sp); + + /* Print column information (name, type, value) */ + for (natt = 0; natt < tupdesc->natts; natt++) + { + Form_pg_attribute attr; /* the attribute itself */ + Oid typid; /* type of current attribute */ + HeapTuple type_tuple; /* information about a type */ + + /* + * Commit d34a74dd064af959acd9040446925d9d53dff15b introduced + * TupleDescAttr() in back branches. If the version supports + * this macro, use it. Version 10 and later already support it. + */ +#if (PG_VERSION_NUM >= 90600 && PG_VERSION_NUM < 90605) || (PG_VERSION_NUM >= 90500 && PG_VERSION_NUM < 90509) || (PG_VERSION_NUM >= 90400 && PG_VERSION_NUM < 90414) + attr = tupdesc->attrs[natt]; +#else + attr = TupleDescAttr(tupdesc, natt); +#endif + + /* Do not print dropped or system columns */ + if (attr->attisdropped || attr->attnum < 0) + continue; + + /* Search pk columns in whole heap tuple */ + if (indexdesc != NULL) + { + int j; + bool found_col = false; + + for (j = 0; j < indexdesc->natts; j++) + { + Form_pg_attribute iattr; + + /* See explanation a few lines above. */ +#if (PG_VERSION_NUM >= 90600 && PG_VERSION_NUM < 90605) || (PG_VERSION_NUM >= 90500 && PG_VERSION_NUM < 90509) || (PG_VERSION_NUM >= 90400 && PG_VERSION_NUM < 90414) + iattr = indexdesc->attrs[j]; +#else + iattr = TupleDescAttr(indexdesc, j); +#endif + + if (strcmp(NameStr(attr->attname), NameStr(iattr->attname)) == 0) + found_col = true; + } + + /* Print only indexed columns */ + if (!found_col) + continue; + } + + typid = attr->atttypid; + + /* Figure out type name */ + type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid)); + if (!HeapTupleIsValid(type_tuple)) + elog(ERROR, "cache lookup failed for type %u", typid); + + /* Accumulate each column info */ + appendStringInfo(&pknames, "%s", comma); + escape_json(&pknames, NameStr(attr->attname)); + + if (data->include_types) + { + char *type_str; + Form_pg_type type_form = (Form_pg_type) GETSTRUCT(type_tuple); + + /* + * It is a domain. Replace domain name with base data type if + * include_domain_data_type is enabled. + */ + if (type_form->typtype == TYPTYPE_DOMAIN && data->include_domain_data_type) + { + typid = type_form->typbasetype; + if (data->include_typmod) + { + type_str = format_type_with_typemod(type_form->typbasetype, type_form->typtypmod); + } + else + { + /* + * Since we are not using a format function, grab base type + * name from Form_pg_type. + */ + type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid)); + if (!HeapTupleIsValid(type_tuple)) + elog(ERROR, "cache lookup failed for type %u", typid); + type_form = (Form_pg_type) GETSTRUCT(type_tuple); + type_str = pstrdup(NameStr(type_form->typname)); + } + } + else + { + if (data->include_typmod) + type_str = TextDatumGetCString(DirectFunctionCall2(format_type, attr->atttypid, attr->atttypmod)); + else + type_str = pstrdup(NameStr(type_form->typname)); + } + + appendStringInfo(&pktypes, "%s", comma); + escape_json(&pktypes, type_str); + + pfree(type_str); + } + + ReleaseSysCache(type_tuple); + + /* The first column does not have comma */ + if (strcmp(comma, "") == 0) + snprintf(comma, 3, ",%s", data->sp); + } + + appendStringInfo(&pknames, "],%s", data->nl); + appendStringInfo(&pktypes, "]%s", data->nl); + if (addcomma) + appendStringInfo(&pktypes, "%s%s%s},%s", data->ht, data->ht, data->ht, data->nl); + else + appendStringInfo(&pktypes, "%s%s%s}%s", data->ht, data->ht, data->ht, data->nl); + + appendStringInfoString(ctx->out, pknames.data); + appendStringInfoString(ctx->out, pktypes.data); + + pfree(pknames.data); + pfree(pktypes.data); +} + /* Callback for individual changed tuples */ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -1379,6 +1542,9 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation indexrel; TupleDesc indexdesc; + Relation pkrel = NULL; + TupleDesc pkdesc = NULL; + char *schemaname; char *tablename; @@ -1584,15 +1750,42 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, escape_json(ctx->out, NameStr(class_form->relname)); appendStringInfo(ctx->out, ",%s", data->nl); + if (data->include_pk) + { +#if PG_VERSION_NUM >= 100000 + if (OidIsValid(relation->rd_pkindex)) /* 10+ */ + { + pkrel = RelationIdGetRelation(relation->rd_pkindex); + pkdesc = RelationGetDescr(pkrel); + } +#else + if (OidIsValid(relation->rd_replidindex) && relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT) + { + pkrel = RelationIdGetRelation(relation->rd_replidindex); + pkdesc = RelationGetDescr(pkrel); + } +#endif + } + switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: /* Print the new tuple */ - columns_to_stringinfo(ctx, tupdesc, &change->data.tp.newtuple->tuple, false, change->data.tp.relnode.relNode); + if (data->include_pk && pkrel != NULL) + { + columns_to_stringinfo(ctx, tupdesc, &change->data.tp.newtuple->tuple, true, change->data.tp.relnode.relNode); + pk_to_stringinfo(ctx, tupdesc, &change->data.tp.newtuple->tuple, pkdesc, false); + } + else + { + columns_to_stringinfo(ctx, tupdesc, &change->data.tp.newtuple->tuple, false, change->data.tp.relnode.relNode); + } break; case REORDER_BUFFER_CHANGE_UPDATE: /* Print the new tuple */ columns_to_stringinfo(ctx, tupdesc, &change->data.tp.newtuple->tuple, true, change->data.tp.relnode.relNode); + if (data->include_pk && pkrel != NULL) + pk_to_stringinfo(ctx, tupdesc, &change->data.tp.newtuple->tuple, pkdesc, true); /* * The old tuple is available when: @@ -1626,6 +1819,9 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } break; case REORDER_BUFFER_CHANGE_DELETE: + if (data->include_pk && pkrel != NULL) + pk_to_stringinfo(ctx, tupdesc, &change->data.tp.newtuple->tuple, pkdesc, true); + /* Print the replica identity */ indexrel = RelationIdGetRelation(relation->rd_replidindex); if (indexrel != NULL) @@ -1648,6 +1844,9 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + if (data->include_pk && pkrel != NULL) + RelationClose(pkrel); + appendStringInfo(ctx->out, "%s%s}", data->ht, data->ht); MemoryContextSwitchTo(old); @@ -1785,6 +1984,22 @@ pg_decode_write_tuple(LogicalDecodingContext *ctx, Relation relation, HeapTuple else if (relation->rd_rel->relreplident != REPLICA_IDENTITY_FULL) elog(ERROR, "table does not have primary key or replica identity"); } + else if (kind == PGOUTPUTJSON_PK) + { +#if PG_VERSION_NUM >= 100000 + if (OidIsValid(relation->rd_pkindex)) /* 10+ */ + { + idxrel = RelationIdGetRelation(relation->rd_pkindex); + idxdesc = RelationGetDescr(idxrel); + } +#else + if (OidIsValid(relation->rd_replidindex) && relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT) + { + idxrel = RelationIdGetRelation(relation->rd_replidindex); + idxdesc = RelationGetDescr(idxrel); + } +#endif + } /* open pg_attrdef in preparation to get default values from columns */ if (kind == PGOUTPUTJSON_CHANGE && data->include_default) @@ -1817,7 +2032,7 @@ pg_decode_write_tuple(LogicalDecodingContext *ctx, Relation relation, HeapTuple * oldtuple contains NULL on those values that are not defined by * REPLICA IDENTITY. In this case, print only non-null values. */ - if (nulls[i] && kind == PGOUTPUTJSON_IDENTITY) + if (nulls[i] && (kind == PGOUTPUTJSON_PK || kind == PGOUTPUTJSON_IDENTITY)) continue; /* don't send unchanged TOAST Datum */ @@ -1828,7 +2043,7 @@ pg_decode_write_tuple(LogicalDecodingContext *ctx, Relation relation, HeapTuple * Is it replica identity column? Print only those columns or all * columns if REPLICA IDENTITY FULL is set. */ - if (kind == PGOUTPUTJSON_IDENTITY && relation->rd_rel->relreplident != REPLICA_IDENTITY_FULL) + if (kind == PGOUTPUTJSON_PK || (kind == PGOUTPUTJSON_IDENTITY && relation->rd_rel->relreplident != REPLICA_IDENTITY_FULL)) { for (j = 0; j < idxdesc->natts; j++) { @@ -1880,8 +2095,11 @@ pg_decode_write_tuple(LogicalDecodingContext *ctx, Relation relation, HeapTuple ReleaseSysCache(type_tuple); } - appendStringInfoString(ctx->out, ",\"value\":"); - pg_decode_write_value(ctx, values[i], nulls[i], attr->atttypid); + if (kind != PGOUTPUTJSON_PK) + { + appendStringInfoString(ctx->out, ",\"value\":"); + pg_decode_write_value(ctx, values[i], nulls[i], attr->atttypid); + } /* * Print optional for columns. This information is redundant for @@ -2147,6 +2365,23 @@ pg_decode_write_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relat } } + if (data->include_pk) + { + appendStringInfoString(ctx->out, ",\"pk\":["); +#if PG_VERSION_NUM >= 100000 + if (OidIsValid(relation->rd_pkindex)) +#else + if (OidIsValid(relation->rd_replidindex) && relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT) +#endif + { + if (change->data.tp.oldtuple != NULL) + pg_decode_write_tuple(ctx, relation, &change->data.tp.oldtuple->tuple, PGOUTPUTJSON_PK); + else + pg_decode_write_tuple(ctx, relation, &change->data.tp.newtuple->tuple, PGOUTPUTJSON_PK); + } + appendStringInfoChar(ctx->out, ']'); + } + appendStringInfoChar(ctx->out, '}'); OutputPluginWrite(ctx, true);