From 93555a7d55dc27571122cccb7c4d8af2c5db54cb Mon Sep 17 00:00:00 2001 From: zhouyisha Date: Fri, 1 Mar 2024 17:11:32 +0800 Subject: [PATCH] [FLINK-16627][json] Support ignore null fields when serializing into JSON Close apache/flink#24430 --- .../docs/connectors/table/formats/debezium.md | 7 ++ .../docs/connectors/table/formats/json.md | 7 ++ .../docs/connectors/table/formats/maxwell.md | 7 ++ .../docs/connectors/table/formats/ogg.md | 9 +- .../docs/connectors/table/formats/debezium.md | 7 ++ .../docs/connectors/table/formats/json.md | 8 ++ .../docs/connectors/table/formats/maxwell.md | 7 ++ .../docs/connectors/table/formats/ogg.md | 7 ++ .../hive/util/ThriftObjectConversions.java | 2 +- .../flink/formats/json/JsonFormatFactory.java | 7 +- .../flink/formats/json/JsonFormatOptions.java | 7 ++ .../json/JsonRowDataSerializationSchema.java | 21 +++-- .../formats/json/RowDataToJsonConverters.java | 15 +++- .../json/canal/CanalJsonFormatFactory.java | 7 +- .../canal/CanalJsonSerializationSchema.java | 6 +- .../debezium/DebeziumJsonFormatFactory.java | 6 +- .../DebeziumJsonSerializationSchema.java | 6 +- .../maxwell/MaxwellJsonFormatFactory.java | 7 +- .../MaxwellJsonSerializationSchema.java | 6 +- .../json/ogg/OggJsonFormatFactory.java | 7 +- .../json/ogg/OggJsonSerializationSchema.java | 6 +- .../formats/json/JsonFormatFactoryTest.java | 2 + .../json/JsonRowDataSerDeSchemaTest.java | 84 +++++++++++++++++-- .../canal/CanalJsonFormatFactoryTest.java | 3 + .../json/canal/CanalJsonSerDeSchemaTest.java | 3 +- .../DebeziumJsonFormatFactoryTest.java | 2 + .../debezium/DebeziumJsonSerDeSchemaTest.java | 3 +- .../maxwell/MaxwellJsonFormatFactoryTest.java | 2 + .../json/maxwell/MaxwellJsonSerDerTest.java | 3 +- .../json/ogg/OggJsonFormatFactoryTest.java | 2 + .../json/ogg/OggJsonSerDeSchemaTest.java | 3 +- .../rest/serde/ResultInfoSerializer.java | 5 +- 32 files changed, 236 insertions(+), 38 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/formats/debezium.md b/docs/content.zh/docs/connectors/table/formats/debezium.md index a6ac486f0f015..7ba62dd408de9 100644 --- a/docs/content.zh/docs/connectors/table/formats/debezium.md +++ b/docs/content.zh/docs/connectors/table/formats/debezium.md @@ -424,6 +424,13 @@ Flink 提供了 `debezium-avro-confluent` 和 `debezium-json` 两种 format 来 Boolean 将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。 + +
debezium-json.encode.ignore-null-fields
+ 选填 + false + Boolean + 仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。 + diff --git a/docs/content.zh/docs/connectors/table/formats/json.md b/docs/content.zh/docs/connectors/table/formats/json.md index f1acdd7a00170..005485a7a0a8e 100644 --- a/docs/content.zh/docs/connectors/table/formats/json.md +++ b/docs/content.zh/docs/connectors/table/formats/json.md @@ -135,6 +135,13 @@ Format 参数 Boolean 将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。 + +
json.encode.ignore-null-fields
+ 选填 + false + Boolean + 仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。 +
decode.json-parser.enabled
选填 diff --git a/docs/content.zh/docs/connectors/table/formats/maxwell.md b/docs/content.zh/docs/connectors/table/formats/maxwell.md index a3ac161f23164..0bdedeac6821a 100644 --- a/docs/content.zh/docs/connectors/table/formats/maxwell.md +++ b/docs/content.zh/docs/connectors/table/formats/maxwell.md @@ -251,6 +251,13 @@ Format Options Boolean Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, 0.000000027 is encoded as 2.7E-8 by default, and will be written as 0.000000027 if set this option to true. + +
maxwell-json.encode.ignore-null-fields
+ optional + false + Boolean + Encode only non-null fields. By default, all fields will be included. + diff --git a/docs/content.zh/docs/connectors/table/formats/ogg.md b/docs/content.zh/docs/connectors/table/formats/ogg.md index c8e8a7a6c6d55..61ec97b60fdfb 100644 --- a/docs/content.zh/docs/connectors/table/formats/ogg.md +++ b/docs/content.zh/docs/connectors/table/formats/ogg.md @@ -216,7 +216,7 @@ Format Options 当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。 -
debezium-json.timestamp-format.standard
+
ogg-json.timestamp-format.standard
可选 'SQL' String @@ -247,6 +247,13 @@ Format Options String 当 'ogg-json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 + +
ogg-json.encode.ignore-null-fields
+ 选填 + false + Boolean + 仅序列化非 Null 的列,默认情况下,会序列化所有列无论是否为 Null。 + diff --git a/docs/content/docs/connectors/table/formats/debezium.md b/docs/content/docs/connectors/table/formats/debezium.md index 790196e258800..f69e3dc5d8f0a 100644 --- a/docs/content/docs/connectors/table/formats/debezium.md +++ b/docs/content/docs/connectors/table/formats/debezium.md @@ -445,6 +445,13 @@ Use format `debezium-avro-confluent` to interpret Debezium Avro messages and for Boolean Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, 0.000000027 is encoded as 2.7E-8 by default, and will be written as 0.000000027 if set this option to true. + +
debezium-json.encode.ignore-null-fields
+ optional + false + Boolean + Encode only non-null fields. By default, all fields will be included. + diff --git a/docs/content/docs/connectors/table/formats/json.md b/docs/content/docs/connectors/table/formats/json.md index 52345a42ea10c..64592ac28bea7 100644 --- a/docs/content/docs/connectors/table/formats/json.md +++ b/docs/content/docs/connectors/table/formats/json.md @@ -146,6 +146,14 @@ Format Options Boolean Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, 0.000000027 is encoded as 2.7E-8 by default, and will be written as 0.000000027 if set this option to true. + +
json.encode.ignore-null-fields
+ optional + yes + false + Boolean + Encode only non-null fields. By default, all fields will be included. +
decode.json-parser.enabled
optional diff --git a/docs/content/docs/connectors/table/formats/maxwell.md b/docs/content/docs/connectors/table/formats/maxwell.md index a7a98270f38ff..47c87442c73af 100644 --- a/docs/content/docs/connectors/table/formats/maxwell.md +++ b/docs/content/docs/connectors/table/formats/maxwell.md @@ -251,6 +251,13 @@ Format Options Boolean Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, 0.000000027 is encoded as 2.7E-8 by default, and will be written as 0.000000027 if set this option to true. + +
maxwell-json.encode.ignore-null-fields
+ optional + false + Boolean + Encode only non-null fields. By default, all fields will be included. + diff --git a/docs/content/docs/connectors/table/formats/ogg.md b/docs/content/docs/connectors/table/formats/ogg.md index 482273af8ce3e..3b53916e36d95 100644 --- a/docs/content/docs/connectors/table/formats/ogg.md +++ b/docs/content/docs/connectors/table/formats/ogg.md @@ -260,6 +260,13 @@ Format Options String Specify string literal to replace null key when 'ogg-json.map-null-key.mode' is LITERAL. + +
ogg-json.encode.ignore-null-fields
+ optional + false + Boolean + Encode only non-null fields. By default, all fields will be included. + diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java index 35a40ea187fdd..fbc7d89d6ad57 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/ThriftObjectConversions.java @@ -108,7 +108,7 @@ public class ThriftObjectConversions { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final RowDataToJsonConverters TO_JSON_CONVERTERS = new RowDataToJsonConverters( - TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null"); + TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", false); private static final Map TABLE_TYPE_MAPPINGS = buildTableTypeMapping(); // -------------------------------------------------------------------------------------------- diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java index 562b99e6bc807..6ecc9a0f086db 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java @@ -46,6 +46,7 @@ import static org.apache.flink.formats.json.JsonFormatOptions.DECODE_JSON_PARSER_ENABLED; import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER; +import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS; import static org.apache.flink.formats.json.JsonFormatOptions.FAIL_ON_MISSING_FIELD; import static org.apache.flink.formats.json.JsonFormatOptions.IGNORE_PARSE_ERRORS; import static org.apache.flink.formats.json.JsonFormatOptions.MAP_NULL_KEY_LITERAL; @@ -147,6 +148,7 @@ public EncodingFormat> createEncodingFormat( final boolean encodeDecimalAsPlainNumber = formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS); return new EncodingFormat>() { @Override @@ -158,7 +160,8 @@ public SerializationSchema createRuntimeEncoder( timestampOption, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } @Override @@ -187,6 +190,7 @@ public Set> optionalOptions() { options.add(MAP_NULL_KEY_MODE); options.add(MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + options.add(ENCODE_IGNORE_NULL_FIELDS); return options; } @@ -197,6 +201,7 @@ public Set> forwardOptions() { options.add(MAP_NULL_KEY_MODE); options.add(MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + options.add(ENCODE_IGNORE_NULL_FIELDS); return options; } } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java index 5c9e61068ac39..cc40b325d915c 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatOptions.java @@ -73,6 +73,13 @@ public class JsonFormatOptions { .withDescription( "Optional flag to specify whether to encode all decimals as plain numbers instead of possible scientific notations, false by default."); + public static final ConfigOption ENCODE_IGNORE_NULL_FIELDS = + ConfigOptions.key("encode.ignore-null-fields") + .booleanType() + .defaultValue(false) + .withDescription( + "Optional flag to specify whether to ignore null fields when encoding, false by default."); + public static final ConfigOption DECODE_JSON_PARSER_ENABLED = ConfigOptions.key("decode.json-parser.enabled") .booleanType() diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java index 376d0d568a33d..4b68bb0c2af74 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java @@ -68,19 +68,28 @@ public class JsonRowDataSerializationSchema implements SerializationSchema> createEncodingFormat( final boolean encodeDecimalAsPlainNumber = formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS); + return new EncodingFormat>() { @Override public ChangelogMode getChangelogMode() { @@ -111,7 +114,8 @@ public SerializationSchema createRuntimeEncoder( timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } }; } @@ -136,6 +140,7 @@ public Set> optionalOptions() { options.add(JSON_MAP_NULL_KEY_MODE); options.add(JSON_MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + options.add(ENCODE_IGNORE_NULL_FIELDS); return options; } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java index 362b9df6e6ab2..aaa292ef9df1c 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonSerializationSchema.java @@ -59,14 +59,16 @@ public CanalJsonSerializationSchema( TimestampFormat timestampFormat, JsonFormatOptions.MapNullKeyMode mapNullKeyMode, String mapNullKeyLiteral, - boolean encodeDecimalAsPlainNumber) { + boolean encodeDecimalAsPlainNumber, + boolean ignoreNullFields) { jsonSerializer = new JsonRowDataSerializationSchema( createJsonRowType(fromLogicalToDataType(rowType)), timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } @Override diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java index d72fcd23debd9..7fec2f43c1e16 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java @@ -45,6 +45,7 @@ import java.util.Set; import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER; +import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS; import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.IGNORE_PARSE_ERRORS; import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL; import static org.apache.flink.formats.json.debezium.DebeziumJsonFormatOptions.JSON_MAP_NULL_KEY_MODE; @@ -92,6 +93,7 @@ public EncodingFormat> createEncodingFormat( final boolean encodeDecimalAsPlainNumber = formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS); return new EncodingFormat>() { @@ -114,7 +116,8 @@ public SerializationSchema createRuntimeEncoder( timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } }; } @@ -138,6 +141,7 @@ public Set> optionalOptions() { options.add(JSON_MAP_NULL_KEY_MODE); options.add(JSON_MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + options.add(ENCODE_IGNORE_NULL_FIELDS); return options; } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java index 0dc9a96b01297..7312b30593aee 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerializationSchema.java @@ -56,14 +56,16 @@ public DebeziumJsonSerializationSchema( TimestampFormat timestampFormat, JsonFormatOptions.MapNullKeyMode mapNullKeyMode, String mapNullKeyLiteral, - boolean encodeDecimalAsPlainNumber) { + boolean encodeDecimalAsPlainNumber, + boolean ignoreNullFields) { jsonSerializer = new JsonRowDataSerializationSchema( createJsonRowType(fromLogicalToDataType(rowType)), timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } @Override diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java index 1bbbec8441457..e56966753a203 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java @@ -44,6 +44,7 @@ import java.util.Set; import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER; +import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS; import static org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.IGNORE_PARSE_ERRORS; import static org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL; import static org.apache.flink.formats.json.maxwell.MaxwellJsonFormatOptions.JSON_MAP_NULL_KEY_MODE; @@ -86,6 +87,8 @@ public EncodingFormat> createEncodingFormat( final boolean encodeDecimalAsPlainNumber = formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS); + return new EncodingFormat>() { @Override @@ -107,7 +110,8 @@ public SerializationSchema createRuntimeEncoder( timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } }; } @@ -130,6 +134,7 @@ public Set> optionalOptions() { options.add(JSON_MAP_NULL_KEY_MODE); options.add(JSON_MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + options.add(ENCODE_IGNORE_NULL_FIELDS); return options; } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java index 1fe567b08c3cb..ad1accdddd611 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerializationSchema.java @@ -56,14 +56,16 @@ public MaxwellJsonSerializationSchema( TimestampFormat timestampFormat, JsonFormatOptions.MapNullKeyMode mapNullKeyMode, String mapNullKeyLiteral, - boolean encodeDecimalAsPlainNumber) { + boolean encodeDecimalAsPlainNumber, + boolean ignoreNullFields) { this.jsonSerializer = new JsonRowDataSerializationSchema( createJsonRowType(fromLogicalToDataType(rowType)), timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); this.timestampFormat = timestampFormat; } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java index f853983d43e61..11182e9380616 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactory.java @@ -44,6 +44,7 @@ import java.util.Set; import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_DECIMAL_AS_PLAIN_NUMBER; +import static org.apache.flink.formats.json.JsonFormatOptions.ENCODE_IGNORE_NULL_FIELDS; import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.IGNORE_PARSE_ERRORS; import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.JSON_MAP_NULL_KEY_LITERAL; import static org.apache.flink.formats.json.ogg.OggJsonFormatOptions.JSON_MAP_NULL_KEY_MODE; @@ -99,6 +100,8 @@ public EncodingFormat> createEncodingFormat( final boolean encodeDecimalAsPlainNumber = formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + final boolean ignoreNullFields = formatOptions.get(ENCODE_IGNORE_NULL_FIELDS); + return new EncodingFormat>() { @Override @@ -120,7 +123,8 @@ public SerializationSchema createRuntimeEncoder( timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } }; } @@ -143,6 +147,7 @@ public Set> optionalOptions() { options.add(JSON_MAP_NULL_KEY_MODE); options.add(JSON_MAP_NULL_KEY_LITERAL); options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + options.add(ENCODE_IGNORE_NULL_FIELDS); return options; } } diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java index 635ff3dc7e39d..f44387a586301 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/ogg/OggJsonSerializationSchema.java @@ -57,14 +57,16 @@ public OggJsonSerializationSchema( TimestampFormat timestampFormat, JsonFormatOptions.MapNullKeyMode mapNullKeyMode, String mapNullKeyLiteral, - boolean encodeDecimalAsPlainNumber) { + boolean encodeDecimalAsPlainNumber, + boolean ignoreNullFields) { jsonSerializer = new JsonRowDataSerializationSchema( createJsonRowType(fromLogicalToDataType(rowType)), timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } private static RowType createJsonRowType(DataType databaseSchema) { diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java index 3559e2b2c8727..4430203b28e46 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonFormatFactoryTest.java @@ -176,6 +176,7 @@ private void testSchemaSerializationSchema(Map options) { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", + true, true); SerializationSchema actualSer = @@ -227,6 +228,7 @@ private Map getAllOptions() { options.put("json.map-null-key.mode", "LITERAL"); options.put("json.map-null-key.literal", "null"); options.put("json.encode.decimal-as-plain-number", "true"); + options.put("json.encode.ignore-null-fields", "true"); return options; } } diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java index ced449e09365c..916b04f50f8be 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java @@ -216,7 +216,8 @@ void testSerDe() throws Exception { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); open(serializationSchema); byte[] actualBytes = serializationSchema.serialize(rowData); @@ -300,7 +301,8 @@ void testSerDeMultiRows() throws Exception { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); open(serializationSchema); // the first row @@ -381,7 +383,8 @@ void testSerDeMultiRowsWithNullValues() throws Exception { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); open(serializationSchema); for (int i = 0; i < jsons.length; i++) { @@ -496,7 +499,8 @@ void testSerDeSQLTimestampFormat() throws Exception { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); open(serializationSchema); ObjectNode root = OBJECT_MAPPER.createObjectNode(); @@ -538,7 +542,8 @@ void testSerializationMapNullKey() { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.FAIL, "null", - true); + true, + false); open(serializationSchema1); // expect message for serializationSchema1 String errorMessage1 = @@ -551,7 +556,8 @@ void testSerializationMapNullKey() { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.DROP, "null", - true); + true, + false); open(serializationSchema2); // expect result for serializationSchema2 String expectResult2 = "{\"nestedMap\":{\"no-null key\":{\"no-null key\":1}}}"; @@ -562,7 +568,8 @@ void testSerializationMapNullKey() { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "nullKey", - true); + true, + false); open(serializationSchema3); // expect result for serializationSchema3 String expectResult3 = @@ -601,7 +608,8 @@ void testSerializationDecimalEncode() throws Exception { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); plainDecimalSerializer.open(new DummyInitializationContext()); JsonRowDataSerializationSchema scientificDecimalSerializer = new JsonRowDataSerializationSchema( @@ -609,6 +617,7 @@ void testSerializationDecimalEncode() throws Exception { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", + false, false); scientificDecimalSerializer.open(new DummyInitializationContext()); @@ -626,6 +635,62 @@ void testSerializationDecimalEncode() throws Exception { assertThat(scientificDecimalResult).isEqualTo(scientificDecimalJson); } + @TestTemplate + void testSerDeMultiRowsWithNullValuesIgnored() throws Exception { + String[] jsons = + new String[] { + "{\"ops\":null,\"ids\":null,\"metrics\":{\"k1\":10.01,\"k2\":null}}", + "{\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\", \"svt\":\"2020-02-24T12:58:09.209+0800\"}, " + + "\"ids\":[1, 2, 3]}", + "{\"ops\":{\"id\":null, \"svt\":\"2020-02-24T12:58:09.209+0800\"}, " + + "\"ids\":[1, 2, null]}", + "{\"ops\":{},\"ids\":[],\"metrics\":{}}", + }; + + String[] expected = + new String[] { + "{\"metrics\":{\"k1\":10.01,\"k2\":null}}", + "{\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\",\"svt\":\"2020-02-24T12:58:09.209+0800\"}," + + "\"ids\":[1,2,3]}", + "{\"ops\":{\"svt\":\"2020-02-24T12:58:09.209+0800\"},\"ids\":[1,2,null]}", + "{\"ops\":{},\"ids\":[],\"metrics\":{}}", + }; + + RowType rowType = + (RowType) + ROW( + FIELD( + "ops", + ROW(FIELD("id", STRING()), FIELD("svt", STRING()))), + FIELD("ids", ARRAY(INT())), + FIELD("metrics", MAP(STRING(), DOUBLE()))) + .getLogicalType(); + + JsonRowDataDeserializationSchema deserializationSchema = + new JsonRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + false, + true, + TimestampFormat.ISO_8601); + open(deserializationSchema); + JsonRowDataSerializationSchema serializationSchema = + new JsonRowDataSerializationSchema( + rowType, + TimestampFormat.ISO_8601, + JsonFormatOptions.MapNullKeyMode.LITERAL, + "null", + false, + true); + open(serializationSchema); + for (int i = 0; i < jsons.length; i++) { + String json = jsons[i]; + RowData row = deserializationSchema.deserialize(json.getBytes()); + String result = new String(serializationSchema.serialize(row)); + assertThat(result).isEqualTo(expected[i]); + } + } + @TestTemplate void testJsonParse() throws Exception { for (TestSpec spec : testData) { @@ -648,7 +713,8 @@ void testSerializationWithTypesMismatch() { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.FAIL, "null", - true); + true, + false); open(serializationSchema); String errorMessage = "Fail to serialize at field: f1."; diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java index 00bd5a0625e33..bf2b95dac98fa 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactoryTest.java @@ -74,6 +74,7 @@ void testDefaultOptions() { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.FAIL, "null", + false, false); SerializationSchema actualSer = createSerializationSchema(options); assertThat(actualSer).isEqualTo(expectedSer); @@ -89,6 +90,7 @@ void testUserDefinedOptions() { options.put("canal-json.map-null-key.mode", "LITERAL"); options.put("canal-json.map-null-key.literal", "nullKey"); options.put("canal-json.encode.decimal-as-plain-number", "true"); + options.put("canal-json.encode.ignore-null-fields", "true"); // test Deser CanalJsonDeserializationSchema expectedDeser = @@ -109,6 +111,7 @@ void testUserDefinedOptions() { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "nullKey", + true, true); SerializationSchema actualSer = createSerializationSchema(options); assertThat(actualSer).isEqualTo(expectedSer); diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java index e45bfcc5eeede..cf326f2a3394d 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/canal/CanalJsonSerDeSchemaTest.java @@ -218,7 +218,8 @@ public void runTest(List lines, CanalJsonDeserializationSchema deseriali TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); serializationSchema.open(new DummyInitializationContext()); List result = new ArrayList<>(); diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java index d000877b2f993..c469e0b2f95f4 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactoryTest.java @@ -81,6 +81,7 @@ void testSeDeSchema() { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", + true, true); final DynamicTableSink actualSink = createTableSink(SCHEMA, options); @@ -200,6 +201,7 @@ private Map getAllOptions() { options.put("debezium-json.map-null-key.mode", "LITERAL"); options.put("debezium-json.map-null-key.literal", "null"); options.put("debezium-json.encode.decimal-as-plain-number", "true"); + options.put("debezium-json.encode.ignore-null-fields", "true"); return options; } } diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java index 3b9151f33a9f8..ffe0007f522a4 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonSerDeSchemaTest.java @@ -249,7 +249,8 @@ private void testSerializationDeserialization(String resourceFile, boolean schem TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); open(serializationSchema); actual = new ArrayList<>(); diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java index bc47d1e68f008..54fe0804a5bd0 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactoryTest.java @@ -69,6 +69,7 @@ void testSeDeSchema() { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", + true, true); final Map options = getAllOptions(); @@ -165,6 +166,7 @@ private Map getAllOptions() { options.put("maxwell-json.map-null-key.mode", "LITERAL"); options.put("maxwell-json.map-null-key.literal", "null"); options.put("maxwell-json.encode.decimal-as-plain-number", "true"); + options.put("maxwell-json.encode.ignore-null-fields", "true"); return options; } } diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java index 12d64fd99d0f5..d17d6a83534ee 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/maxwell/MaxwellJsonSerDerTest.java @@ -187,7 +187,8 @@ void testSerializationDeserialization() throws Exception { TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); open(serializationSchema); List result = new ArrayList<>(); for (RowData rowData : collector.list) { diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java index c04e991a2de22..f840783ca95b0 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonFormatFactoryTest.java @@ -55,6 +55,7 @@ void testSeDeSchema() { TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", + true, true); final DynamicTableSink actualSink = createTableSink(SCHEMA, options); @@ -137,6 +138,7 @@ private Map getAllOptions() { options.put("ogg-json.map-null-key.mode", "LITERAL"); options.put("ogg-json.map-null-key.literal", "null"); options.put("ogg-json.encode.decimal-as-plain-number", "true"); + options.put("ogg-json.encode.ignore-null-fields", "true"); return options; } } diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java index 2fa78c8941216..76e417d4ac11b 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/ogg/OggJsonSerDeSchemaTest.java @@ -216,7 +216,8 @@ private void testSerializationDeserialization(String resourceFile) throws Except TimestampFormat.SQL, JsonFormatOptions.MapNullKeyMode.LITERAL, "null", - true); + true, + false); open(serializationSchema); actual = new ArrayList<>(); diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java index b796d32ba3e48..fb43f6d62b8b5 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java @@ -68,7 +68,10 @@ public ResultInfoSerializer() { private static final RowDataToJsonConverters TO_JSON_CONVERTERS = new RowDataToJsonConverters( - TimestampFormat.ISO_8601, JsonFormatOptions.MapNullKeyMode.LITERAL, "null"); + TimestampFormat.ISO_8601, + JsonFormatOptions.MapNullKeyMode.LITERAL, + "null", + false); @Override public void serialize(