diff --git a/Makefile b/Makefile index 584efa3..c9c6d4f 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ MODULES = wal2json REGRESS = cmdline insert1 update1 update2 update3 update4 delete1 delete2 \ delete3 delete4 savepoint specialvalue toast bytea message typmod \ filtertable selecttable include_timestamp include_lsn include_xids \ - include_domain_data_type truncate actions position default + include_domain_data_type truncate actions position default pk PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/README.md b/README.md index 111f5bc..79d983c 100644 --- a/README.md +++ b/README.md @@ -107,6 +107,7 @@ Parameters * `include-column-positions`: add column position (_pg_attribute.attnum_). Default is _false_. * `include-not-null`: add _not null_ information as _columnoptionals_. Default is _false_. * `include-default`: add default expression. Default is _false_. +* `include-pk`: add _primary key_ information as _pk_. Column name and data type is included. Default is _false_. * `pretty-print`: add spaces and indentation to JSON structures. Default is _false_. * `write-in-chunks`: write after every change instead of every changeset. Default is _false_. * `include-lsn`: add _nextlsn_ to each changeset. Default is _false_. diff --git a/expected/pk.out b/expected/pk.out new file mode 100644 index 0000000..c999059 --- /dev/null +++ b/expected/pk.out @@ -0,0 +1,209 @@ +\set VERBOSITY terse +-- predictability +SET synchronous_commit = on; +SET extra_float_digits = 0; +CREATE TABLE w2j_pk_with_pk ( +a int, +b timestamp, +c text, +d boolean, +e numeric(5,3), +PRIMARY KEY(b, d, e) +); +CREATE TABLE w2j_pk_without_pk ( +a int, +b timestamp, +c text, +d boolean, +e numeric(5,3) +); +CREATE TABLE w2j_pk_with_ri ( +a int NOT NULL, +b timestamp, +c text, +d boolean, +e numeric(5,3), +UNIQUE(a) +); +ALTER TABLE w2j_pk_with_ri REPLICA IDENTITY USING INDEX w2j_pk_with_ri_a_key; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + ?column? +---------- + init +(1 row) + +INSERT INTO w2j_pk_with_pk (a, b, c, d, e) VALUES(123, '2020-04-26 16:23:59', 'Melanosuchus Niger', true, 4.56); +UPDATE w2j_pk_with_pk SET a = 456, c = 'Panthera Onca', d = false; +DELETE FROM w2j_pk_with_pk; +INSERT INTO w2j_pk_without_pk (a, b, c, d, e) VALUES(123, '2020-04-26 16:23:59', 'Melanosuchus Niger', true, 4.56); +UPDATE w2j_pk_without_pk SET a = 456, c = 'Panthera Onca', d = false; +DELETE FROM w2j_pk_without_pk; +INSERT INTO w2j_pk_with_ri (a, b, c, d, e) VALUES(123, '2020-04-26 16:23:59', 'Inia Araguaiaensis', true, 4.56); +UPDATE w2j_pk_with_ri SET a = 456, c = 'Panthera Onca', d = false; +DELETE FROM w2j_pk_with_ri; +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'pretty-print', '1', 'include-typmod', '0', 'include-pk', '1'); +WARNING: table "w2j_pk_without_pk" without primary key or replica identity is nothing +WARNING: table "w2j_pk_without_pk" without primary key or replica identity is nothing + data +--------------------------------------------------------------------------------------------------------------- + { + + "change": [ + + { + + "kind": "insert", + + "schema": "public", + + "table": "w2j_pk_with_pk", + + "columnnames": ["a", "b", "c", "d", "e"], + + "columntypes": ["int4", "timestamp", "text", "bool", "numeric"], + + "columnvalues": [123, "Sun Apr 26 16:23:59 2020", "Melanosuchus Niger", true, 4.560],+ + "pk": { + + "pknames": ["b", "d", "e"], + + "pktypes": ["timestamp", "bool", "numeric"] + + } + + } + + ] + + } + { + + "change": [ + + { + + "kind": "update", + + "schema": "public", + + "table": "w2j_pk_with_pk", + + "columnnames": ["a", "b", "c", "d", "e"], + + "columntypes": ["int4", "timestamp", "text", "bool", "numeric"], + + "columnvalues": [456, "Sun Apr 26 16:23:59 2020", "Panthera Onca", false, 4.560], + + "pk": { + + "pknames": ["b", "d", "e"], + + "pktypes": ["timestamp", "bool", "numeric"] + + }, + + "oldkeys": { + + "keynames": ["b", "d", "e"], + + "keytypes": ["timestamp", "bool", "numeric"], + + "keyvalues": ["Sun Apr 26 16:23:59 2020", true, 4.560] + + } + + } + + ] + + } + { + + "change": [ + + { + + "kind": "delete", + + "schema": "public", + + "table": "w2j_pk_with_pk", + + "pk": { + + "pknames": ["b", "d", "e"], + + "pktypes": ["timestamp", "bool", "numeric"] + + }, + + "oldkeys": { + + "keynames": ["b", "d", "e"], + + "keytypes": ["timestamp", "bool", "numeric"], + + "keyvalues": ["Sun Apr 26 16:23:59 2020", false, 4.560] + + } + + } + + ] + + } + { + + "change": [ + + { + + "kind": "insert", + + "schema": "public", + + "table": "w2j_pk_without_pk", + + "columnnames": ["a", "b", "c", "d", "e"], + + "columntypes": ["int4", "timestamp", "text", "bool", "numeric"], + + "columnvalues": [123, "Sun Apr 26 16:23:59 2020", "Melanosuchus Niger", true, 4.560] + + } + + ] + + } + { + + "change": [ + + ] + + } + { + + "change": [ + + ] + + } + { + + "change": [ + + { + + "kind": "insert", + + "schema": "public", + + "table": "w2j_pk_with_ri", + + "columnnames": ["a", "b", "c", "d", "e"], + + "columntypes": ["int4", "timestamp", "text", "bool", "numeric"], + + "columnvalues": [123, "Sun Apr 26 16:23:59 2020", "Inia Araguaiaensis", true, 4.560] + + } + + ] + + } + { + + "change": [ + + { + + "kind": "update", + + "schema": "public", + + "table": "w2j_pk_with_ri", + + "columnnames": ["a", "b", "c", "d", "e"], + + "columntypes": ["int4", "timestamp", "text", "bool", "numeric"], + + "columnvalues": [456, "Sun Apr 26 16:23:59 2020", "Panthera Onca", false, 4.560], + + "oldkeys": { + + "keynames": ["a"], + + "keytypes": ["int4"], + + "keyvalues": [123] + + } + + } + + ] + + } + { + + "change": [ + + { + + "kind": "delete", + + "schema": "public", + + "table": "w2j_pk_with_ri", + + "oldkeys": { + + "keynames": ["a"], + + "keytypes": ["int4"], + + "keyvalues": [456] + + } + + } + + ] + + } +(9 rows) + +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'include-pk', '1'); +WARNING: no tuple identifier for UPDATE in table "public"."w2j_pk_without_pk" +WARNING: no tuple identifier for DELETE in table "public"."w2j_pk_without_pk" + data{"action":"B"} + {"action":"I","schema":"public","table":"w2j_pk_with_pk","columns":[{"name":"a","type":"integer","value":123},{"name":"b","type":"timestamp without time zone","value":"Sun Apr 26 16:23:59 2020"},{"name":"c","type":"text","value":"Melanosuchus Niger"},{"name":"d","type":"boolean","value":true},{"name":"e","type":"numeric(5,3)","value":4.560}],"pk":[{"name":"b","type":"timestamp without time zone"},{"name":"d","type":"boolean"},{"name":"e","type":"numeric(5,3)"}]} + {"action":"C"} + {"action":"B"} + {"action":"U","schema":"public","table":"w2j_pk_with_pk","columns":[{"name":"a","type":"integer","value":456},{"name":"b","type":"timestamp without time zone","value":"Sun Apr 26 16:23:59 2020"},{"name":"c","type":"text","value":"Panthera Onca"},{"name":"d","type":"boolean","value":false},{"name":"e","type":"numeric(5,3)","value":4.560}],"identity":[{"name":"b","type":"timestamp without time zone","value":"Sun Apr 26 16:23:59 2020"},{"name":"d","type":"boolean","value":true},{"name":"e","type":"numeric(5,3)","value":4.560}],"pk":[{"name":"b","type":"timestamp without time zone"},{"name":"d","type":"boolean"},{"name":"e","type":"numeric(5,3)"}]} + {"action":"C"} + {"action":"B"} + {"action":"D","schema":"public","table":"w2j_pk_with_pk","identity":[{"name":"b","type":"timestamp without time zone","value":"Sun Apr 26 16:23:59 2020"},{"name":"d","type":"boolean","value":false},{"name":"e","type":"numeric(5,3)","value":4.560}],"pk":[{"name":"b","type":"timestamp without time zone"},{"name":"d","type":"boolean"},{"name":"e","type":"numeric(5,3)"}]} + {"action":"C"} + {"action":"B"} + {"action":"I","schema":"public","table":"w2j_pk_without_pk","columns":[{"name":"a","type":"integer","value":123},{"name":"b","type":"timestamp without time zone","value":"Sun Apr 26 16:23:59 2020"},{"name":"c","type":"text","value":"Melanosuchus Niger"},{"name":"d","type":"boolean","value":true},{"name":"e","type":"numeric(5,3)","value":4.560}],"pk":[]} + {"action":"C"} + {"action":"B"} + {"action":"C"} + {"action":"B"} + {"action":"C"} + {"action":"B"} + {"action":"I","schema":"public","table":"w2j_pk_with_ri","columns":[{"name":"a","type":"integer","value":123},{"name":"b","type":"timestamp without time zone","value":"Sun Apr 26 16:23:59 2020"},{"name":"c","type":"text","value":"Inia Araguaiaensis"},{"name":"d","type":"boolean","value":true},{"name":"e","type":"numeric(5,3)","value":4.560}],"pk":[]} + {"action":"C"} + {"action":"B"} + {"action":"U","schema":"public","table":"w2j_pk_with_ri","columns":[{"name":"a","type":"integer","value":456},{"name":"b","type":"timestamp without time zone","value":"Sun Apr 26 16:23:59 2020"},{"name":"c","type":"text","value":"Panthera Onca"},{"name":"d","type":"boolean","value":false},{"name":"e","type":"numeric(5,3)","value":4.560}],"identity":[{"name":"a","type":"integer","value":123}],"pk":[]} + {"action":"C"} + {"action":"B"} + {"action":"D","schema":"public","table":"w2j_pk_with_ri","identity":[{"name":"a","type":"integer","value":456}],"pk":[]} + {"action":"C"} +(25 rows) + +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + ?column? +---------- + stop +(1 row) + +DROP TABLE w2j_pk_with_pk; +DROP TABLE w2j_pk_without_pk; +DROP TABLE w2j_pk_with_ri; diff --git a/sql/pk.sql b/sql/pk.sql new file mode 100644 index 0000000..160454f --- /dev/null +++ b/sql/pk.sql @@ -0,0 +1,54 @@ +\set VERBOSITY terse + +-- predictability +SET synchronous_commit = on; +SET extra_float_digits = 0; + +CREATE TABLE w2j_pk_with_pk ( +a int, +b timestamp, +c text, +d boolean, +e numeric(5,3), +PRIMARY KEY(b, d, e) +); + +CREATE TABLE w2j_pk_without_pk ( +a int, +b timestamp, +c text, +d boolean, +e numeric(5,3) +); + +CREATE TABLE w2j_pk_with_ri ( +a int NOT NULL, +b timestamp, +c text, +d boolean, +e numeric(5,3), +UNIQUE(a) +); +ALTER TABLE w2j_pk_with_ri REPLICA IDENTITY USING INDEX w2j_pk_with_ri_a_key; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); + +INSERT INTO w2j_pk_with_pk (a, b, c, d, e) VALUES(123, '2020-04-26 16:23:59', 'Melanosuchus Niger', true, 4.56); +UPDATE w2j_pk_with_pk SET a = 456, c = 'Panthera Onca', d = false; +DELETE FROM w2j_pk_with_pk; + +INSERT INTO w2j_pk_without_pk (a, b, c, d, e) VALUES(123, '2020-04-26 16:23:59', 'Melanosuchus Niger', true, 4.56); +UPDATE w2j_pk_without_pk SET a = 456, c = 'Panthera Onca', d = false; +DELETE FROM w2j_pk_without_pk; + +INSERT INTO w2j_pk_with_ri (a, b, c, d, e) VALUES(123, '2020-04-26 16:23:59', 'Inia Araguaiaensis', true, 4.56); +UPDATE w2j_pk_with_ri SET a = 456, c = 'Panthera Onca', d = false; +DELETE FROM w2j_pk_with_ri; + +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '1', 'pretty-print', '1', 'include-typmod', '0', 'include-pk', '1'); +SELECT data FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'format-version', '2', 'include-pk', '1'); +SELECT 'stop' FROM pg_drop_replication_slot('regression_slot'); + +DROP TABLE w2j_pk_with_pk; +DROP TABLE w2j_pk_without_pk; +DROP TABLE w2j_pk_with_ri; diff --git a/wal2json.c b/wal2json.c index b4d3950..c3465a4 100644 --- a/wal2json.c +++ b/wal2json.c @@ -64,6 +64,7 @@ typedef struct bool include_column_positions; /* include column numbers */ bool include_not_null; /* include not-null constraints */ bool include_default; /* include default expressions */ + bool include_pk; /* include primary key */ bool pretty_print; /* pretty-print JSON? */ bool write_in_chunks; /* write in chunks? */ @@ -96,7 +97,8 @@ typedef struct typedef enum { PGOUTPUTJSON_CHANGE, - PGOUTPUTJSON_IDENTITY + PGOUTPUTJSON_IDENTITY, + PGOUTPUTJSON_PK } PGOutputJsonKind; typedef struct SelectTable @@ -132,8 +134,9 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); #endif -static void columns_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, bool hasreplident, Oid reloid); -static void tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, TupleDesc indexdesc, bool replident, bool hasreplident, Oid reloid); +static void columns_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, bool addcomma, Oid reloid); +static void tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, TupleDesc indexdesc, bool replident, bool addcomma, Oid reloid); +static void pk_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, TupleDesc indexdesc, bool addcomma); static bool parse_table_identifier(List *qualified_tables, char separator, List **select_tables); static bool string_to_SelectTable(char *rawstring, char separator, List **select_tables); static bool split_string_to_list(char *rawstring, char separator, List **sl); @@ -244,6 +247,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is data->include_transaction = true; data->include_xids = false; data->include_timestamp = false; + data->include_pk = false; data->include_origin = false; data->include_schemas = true; data->include_types = true; @@ -340,6 +344,19 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "include-pk") == 0) + { + if (elem->arg == NULL) + { + elog(DEBUG1, "include-pk argument is null"); + data->include_pk = true; + } + else if (!parse_bool(strVal(elem->arg), &data->include_pk)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else if (strcmp(elem->defname, "include-origin") == 0) { if (elem->arg == NULL) @@ -924,10 +941,9 @@ pg_decode_commit_txn_v2(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * Accumulate tuple information and stores it at the end * * replident: is this tuple a replica identity? - * hasreplident: does this tuple has an associated replica identity? */ static void -tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, TupleDesc indexdesc, bool replident, bool hasreplident, Oid reloid) +tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, TupleDesc indexdesc, bool replident, bool addcomma, Oid reloid) { JsonDecodingData *data; int natt; @@ -1304,7 +1320,7 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu appendStringInfo(&colnotnulls, "],%s", data->nl); if (data->include_default) appendStringInfo(&coldefaults, "],%s", data->nl); - if (hasreplident) + if (addcomma) appendStringInfo(&colvalues, "],%s", data->nl); else appendStringInfo(&colvalues, "]%s", data->nl); @@ -1339,9 +1355,9 @@ tuple_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tu /* Print columns information */ static void -columns_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, bool hasreplident, Oid reloid) +columns_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, bool addcomma, Oid reloid) { - tuple_to_stringinfo(ctx, tupdesc, tuple, NULL, false, hasreplident, reloid); + tuple_to_stringinfo(ctx, tupdesc, tuple, NULL, false, addcomma, reloid); } /* Print replica identity information */ @@ -1352,6 +1368,153 @@ identity_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple_to_stringinfo(ctx, tupdesc, tuple, indexdesc, true, false, InvalidOid); } +/* Print primary key information */ +static void +pk_to_stringinfo(LogicalDecodingContext *ctx, TupleDesc tupdesc, HeapTuple tuple, TupleDesc indexdesc, bool addcomma) +{ + JsonDecodingData *data; + int natt; + char comma[3] = ""; + + StringInfoData pknames; + StringInfoData pktypes; + + data = ctx->output_plugin_private; + + /* no primary key */ + if (indexdesc == NULL) + return; + + initStringInfo(&pknames); + initStringInfo(&pktypes); + + appendStringInfo(&pknames, "%s%s%s\"pk\":%s{%s", data->ht, data->ht, data->ht, data->sp, data->nl); + appendStringInfo(&pknames, "%s%s%s%s\"pknames\":%s[", data->ht, data->ht, data->ht, data->ht, data->sp); + appendStringInfo(&pktypes, "%s%s%s%s\"pktypes\":%s[", data->ht, data->ht, data->ht, data->ht, data->sp); + + /* Print column information (name, type, value) */ + for (natt = 0; natt < tupdesc->natts; natt++) + { + Form_pg_attribute attr; /* the attribute itself */ + Oid typid; /* type of current attribute */ + HeapTuple type_tuple; /* information about a type */ + + /* + * Commit d34a74dd064af959acd9040446925d9d53dff15b introduced + * TupleDescAttr() in back branches. If the version supports + * this macro, use it. Version 10 and later already support it. + */ +#if (PG_VERSION_NUM >= 90600 && PG_VERSION_NUM < 90605) || (PG_VERSION_NUM >= 90500 && PG_VERSION_NUM < 90509) || (PG_VERSION_NUM >= 90400 && PG_VERSION_NUM < 90414) + attr = tupdesc->attrs[natt]; +#else + attr = TupleDescAttr(tupdesc, natt); +#endif + + /* Do not print dropped or system columns */ + if (attr->attisdropped || attr->attnum < 0) + continue; + + /* Search pk columns in whole heap tuple */ + if (indexdesc != NULL) + { + int j; + bool found_col = false; + + for (j = 0; j < indexdesc->natts; j++) + { + Form_pg_attribute iattr; + + /* See explanation a few lines above. */ +#if (PG_VERSION_NUM >= 90600 && PG_VERSION_NUM < 90605) || (PG_VERSION_NUM >= 90500 && PG_VERSION_NUM < 90509) || (PG_VERSION_NUM >= 90400 && PG_VERSION_NUM < 90414) + iattr = indexdesc->attrs[j]; +#else + iattr = TupleDescAttr(indexdesc, j); +#endif + + if (strcmp(NameStr(attr->attname), NameStr(iattr->attname)) == 0) + found_col = true; + } + + /* Print only indexed columns */ + if (!found_col) + continue; + } + + typid = attr->atttypid; + + /* Figure out type name */ + type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid)); + if (!HeapTupleIsValid(type_tuple)) + elog(ERROR, "cache lookup failed for type %u", typid); + + /* Accumulate each column info */ + appendStringInfo(&pknames, "%s", comma); + escape_json(&pknames, NameStr(attr->attname)); + + if (data->include_types) + { + char *type_str; + Form_pg_type type_form = (Form_pg_type) GETSTRUCT(type_tuple); + + /* + * It is a domain. Replace domain name with base data type if + * include_domain_data_type is enabled. + */ + if (type_form->typtype == TYPTYPE_DOMAIN && data->include_domain_data_type) + { + typid = type_form->typbasetype; + if (data->include_typmod) + { + type_str = format_type_with_typemod(type_form->typbasetype, type_form->typtypmod); + } + else + { + /* + * Since we are not using a format function, grab base type + * name from Form_pg_type. + */ + type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid)); + if (!HeapTupleIsValid(type_tuple)) + elog(ERROR, "cache lookup failed for type %u", typid); + type_form = (Form_pg_type) GETSTRUCT(type_tuple); + type_str = pstrdup(NameStr(type_form->typname)); + } + } + else + { + if (data->include_typmod) + type_str = TextDatumGetCString(DirectFunctionCall2(format_type, attr->atttypid, attr->atttypmod)); + else + type_str = pstrdup(NameStr(type_form->typname)); + } + + appendStringInfo(&pktypes, "%s", comma); + escape_json(&pktypes, type_str); + + pfree(type_str); + } + + ReleaseSysCache(type_tuple); + + /* The first column does not have comma */ + if (strcmp(comma, "") == 0) + snprintf(comma, 3, ",%s", data->sp); + } + + appendStringInfo(&pknames, "],%s", data->nl); + appendStringInfo(&pktypes, "]%s", data->nl); + if (addcomma) + appendStringInfo(&pktypes, "%s%s%s},%s", data->ht, data->ht, data->ht, data->nl); + else + appendStringInfo(&pktypes, "%s%s%s}%s", data->ht, data->ht, data->ht, data->nl); + + appendStringInfoString(ctx->out, pknames.data); + appendStringInfoString(ctx->out, pktypes.data); + + pfree(pknames.data); + pfree(pktypes.data); +} + /* Callback for individual changed tuples */ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, @@ -1379,6 +1542,9 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation indexrel; TupleDesc indexdesc; + Relation pkrel = NULL; + TupleDesc pkdesc = NULL; + char *schemaname; char *tablename; @@ -1584,15 +1750,42 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, escape_json(ctx->out, NameStr(class_form->relname)); appendStringInfo(ctx->out, ",%s", data->nl); + if (data->include_pk) + { +#if PG_VERSION_NUM >= 100000 + if (OidIsValid(relation->rd_pkindex)) /* 10+ */ + { + pkrel = RelationIdGetRelation(relation->rd_pkindex); + pkdesc = RelationGetDescr(pkrel); + } +#else + if (OidIsValid(relation->rd_replidindex) && relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT) + { + pkrel = RelationIdGetRelation(relation->rd_replidindex); + pkdesc = RelationGetDescr(pkrel); + } +#endif + } + switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: /* Print the new tuple */ - columns_to_stringinfo(ctx, tupdesc, &change->data.tp.newtuple->tuple, false, change->data.tp.relnode.relNode); + if (data->include_pk && pkrel != NULL) + { + columns_to_stringinfo(ctx, tupdesc, &change->data.tp.newtuple->tuple, true, change->data.tp.relnode.relNode); + pk_to_stringinfo(ctx, tupdesc, &change->data.tp.newtuple->tuple, pkdesc, false); + } + else + { + columns_to_stringinfo(ctx, tupdesc, &change->data.tp.newtuple->tuple, false, change->data.tp.relnode.relNode); + } break; case REORDER_BUFFER_CHANGE_UPDATE: /* Print the new tuple */ columns_to_stringinfo(ctx, tupdesc, &change->data.tp.newtuple->tuple, true, change->data.tp.relnode.relNode); + if (data->include_pk && pkrel != NULL) + pk_to_stringinfo(ctx, tupdesc, &change->data.tp.newtuple->tuple, pkdesc, true); /* * The old tuple is available when: @@ -1626,6 +1819,9 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } break; case REORDER_BUFFER_CHANGE_DELETE: + if (data->include_pk && pkrel != NULL) + pk_to_stringinfo(ctx, tupdesc, &change->data.tp.newtuple->tuple, pkdesc, true); + /* Print the replica identity */ indexrel = RelationIdGetRelation(relation->rd_replidindex); if (indexrel != NULL) @@ -1648,6 +1844,9 @@ pg_decode_change_v1(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + if (data->include_pk && pkrel != NULL) + RelationClose(pkrel); + appendStringInfo(ctx->out, "%s%s}", data->ht, data->ht); MemoryContextSwitchTo(old); @@ -1785,6 +1984,22 @@ pg_decode_write_tuple(LogicalDecodingContext *ctx, Relation relation, HeapTuple else if (relation->rd_rel->relreplident != REPLICA_IDENTITY_FULL) elog(ERROR, "table does not have primary key or replica identity"); } + else if (kind == PGOUTPUTJSON_PK) + { +#if PG_VERSION_NUM >= 100000 + if (OidIsValid(relation->rd_pkindex)) /* 10+ */ + { + idxrel = RelationIdGetRelation(relation->rd_pkindex); + idxdesc = RelationGetDescr(idxrel); + } +#else + if (OidIsValid(relation->rd_replidindex) && relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT) + { + idxrel = RelationIdGetRelation(relation->rd_replidindex); + idxdesc = RelationGetDescr(idxrel); + } +#endif + } /* open pg_attrdef in preparation to get default values from columns */ if (kind == PGOUTPUTJSON_CHANGE && data->include_default) @@ -1817,7 +2032,7 @@ pg_decode_write_tuple(LogicalDecodingContext *ctx, Relation relation, HeapTuple * oldtuple contains NULL on those values that are not defined by * REPLICA IDENTITY. In this case, print only non-null values. */ - if (nulls[i] && kind == PGOUTPUTJSON_IDENTITY) + if (nulls[i] && (kind == PGOUTPUTJSON_PK || kind == PGOUTPUTJSON_IDENTITY)) continue; /* don't send unchanged TOAST Datum */ @@ -1828,7 +2043,7 @@ pg_decode_write_tuple(LogicalDecodingContext *ctx, Relation relation, HeapTuple * Is it replica identity column? Print only those columns or all * columns if REPLICA IDENTITY FULL is set. */ - if (kind == PGOUTPUTJSON_IDENTITY && relation->rd_rel->relreplident != REPLICA_IDENTITY_FULL) + if (kind == PGOUTPUTJSON_PK || (kind == PGOUTPUTJSON_IDENTITY && relation->rd_rel->relreplident != REPLICA_IDENTITY_FULL)) { for (j = 0; j < idxdesc->natts; j++) { @@ -1880,8 +2095,11 @@ pg_decode_write_tuple(LogicalDecodingContext *ctx, Relation relation, HeapTuple ReleaseSysCache(type_tuple); } - appendStringInfoString(ctx->out, ",\"value\":"); - pg_decode_write_value(ctx, values[i], nulls[i], attr->atttypid); + if (kind != PGOUTPUTJSON_PK) + { + appendStringInfoString(ctx->out, ",\"value\":"); + pg_decode_write_value(ctx, values[i], nulls[i], attr->atttypid); + } /* * Print optional for columns. This information is redundant for @@ -2147,6 +2365,23 @@ pg_decode_write_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relat } } + if (data->include_pk) + { + appendStringInfoString(ctx->out, ",\"pk\":["); +#if PG_VERSION_NUM >= 100000 + if (OidIsValid(relation->rd_pkindex)) +#else + if (OidIsValid(relation->rd_replidindex) && relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT) +#endif + { + if (change->data.tp.oldtuple != NULL) + pg_decode_write_tuple(ctx, relation, &change->data.tp.oldtuple->tuple, PGOUTPUTJSON_PK); + else + pg_decode_write_tuple(ctx, relation, &change->data.tp.newtuple->tuple, PGOUTPUTJSON_PK); + } + appendStringInfoChar(ctx->out, ']'); + } + appendStringInfoChar(ctx->out, '}'); OutputPluginWrite(ctx, true);