From 4b33dbb720f4410a486938ca203cb99e95af03ef Mon Sep 17 00:00:00 2001 From: "liming.1018" Date: Tue, 4 Jun 2024 20:06:17 +0800 Subject: [PATCH] [flink] supports upgrading from versions below 0.8.0 (excluding 0.8.0) to 0.8.1+. --- .../flink_connector_configuration.html | 6 + .../IndexFileMetaLegacyV2Serializer.java | 68 +++++++++++ .../io/DataFileMetaLegacyV2Serializer.java | 109 +++++++++++++++++ ...ManifestCommittableLegacyV2Serializer.java | 42 +++++++ .../ManifestCommittableSerializer.java | 8 +- .../sink/CommitMessageLegacyV2Serializer.java | 62 ++++++++++ .../table/sink/CommitMessageSerializer.java | 2 +- ...leLegacyV2SerializerCompatibilityTest.java | 113 ++++++++++++++++++ .../manifest-committable-legacy-v2 | Bin 0 -> 2245 bytes .../sink/cdc/FlinkCdcMultiTableSink.java | 12 +- .../cdc/FlinkCdcSyncDatabaseSinkBuilder.java | 9 +- .../sink/cdc/FlinkCdcMultiTableSinkTest.java | 3 +- .../paimon/flink/FlinkConnectorOptions.java | 7 ++ .../sink/CombinedTableCompactorSink.java | 8 ++ .../paimon/flink/sink/FlinkWriteSink.java | 9 ++ .../WrappedManifestCommittableSerializer.java | 7 +- 16 files changed, 459 insertions(+), 6 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaLegacyV2Serializer.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaLegacyV2Serializer.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableLegacyV2Serializer.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableLegacyV2SerializerCompatibilityTest.java create mode 100644 paimon-core/src/test/resources/compatibility/manifest-committable-legacy-v2 diff --git a/docs/layouts/shortcodes/generated/flink_connector_configuration.html b/docs/layouts/shortcodes/generated/flink_connector_configuration.html index 4fb5fc2713fce..356600e03957b 100644 --- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html +++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html @@ -236,5 +236,11 @@ Integer Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. + +
state.compatible-for-less-than-08
+ false + Boolean + If the user attempts to upgrade from a version less than Paimon 0.8.0 to 0.8.1(+), this option can be set to true to ensure flink's state compatibility. + diff --git a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaLegacyV2Serializer.java b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaLegacyV2Serializer.java new file mode 100644 index 0000000000000..c44e27aa36d30 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaLegacyV2Serializer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.index; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ObjectSerializer; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.utils.SerializationUtils.newStringType; + +/** A legacy version serializer for {@link IndexFileMeta}. */ +public class IndexFileMetaLegacyV2Serializer extends ObjectSerializer { + + public IndexFileMetaLegacyV2Serializer() { + super(schema()); + } + + @Override + public InternalRow toRow(IndexFileMeta record) { + return GenericRow.of( + BinaryString.fromString(record.indexType()), + BinaryString.fromString(record.fileName()), + record.fileSize(), + record.rowCount()); + } + + @Override + public IndexFileMeta fromRow(InternalRow row) { + return new IndexFileMeta( + row.getString(0).toString(), + row.getString(1).toString(), + row.getLong(2), + row.getLong(3), + null); + } + + private static RowType schema() { + List fields = new ArrayList<>(); + fields.add(new DataField(0, "_INDEX_TYPE", newStringType(false))); + fields.add(new DataField(1, "_FILE_NAME", newStringType(false))); + fields.add(new DataField(2, "_FILE_SIZE", new BigIntType(false))); + fields.add(new DataField(3, "_ROW_COUNT", new BigIntType(false))); + return new RowType(fields); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaLegacyV2Serializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaLegacyV2Serializer.java new file mode 100644 index 0000000000000..85b84b938aa22 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaLegacyV2Serializer.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.stats.SimpleStatsConverter; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ObjectSerializer; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData; +import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData; +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; +import static org.apache.paimon.utils.SerializationUtils.newBytesType; +import static org.apache.paimon.utils.SerializationUtils.newStringType; +import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; + +/** A legacy version serializer for {@link DataFileMeta}. */ +public class DataFileMetaLegacyV2Serializer extends ObjectSerializer { + + private static final long serialVersionUID = 1L; + + public DataFileMetaLegacyV2Serializer() { + super(schemaForLessThan08()); + } + + private static RowType schemaForLessThan08() { + List fields = new ArrayList<>(); + fields.add(new DataField(0, "_FILE_NAME", newStringType(false))); + fields.add(new DataField(1, "_FILE_SIZE", new BigIntType(false))); + fields.add(new DataField(2, "_ROW_COUNT", new BigIntType(false))); + fields.add(new DataField(3, "_MIN_KEY", newBytesType(false))); + fields.add(new DataField(4, "_MAX_KEY", newBytesType(false))); + fields.add(new DataField(5, "_KEY_STATS", SimpleStatsConverter.schema())); + fields.add(new DataField(6, "_VALUE_STATS", SimpleStatsConverter.schema())); + fields.add(new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false))); + fields.add(new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false))); + fields.add(new DataField(9, "_SCHEMA_ID", new BigIntType(false))); + fields.add(new DataField(10, "_LEVEL", new IntType(false))); + fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false, newStringType(false)))); + fields.add(new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS())); + return new RowType(fields); + } + + @Override + public InternalRow toRow(DataFileMeta meta) { + return GenericRow.of( + BinaryString.fromString(meta.fileName()), + meta.fileSize(), + meta.rowCount(), + serializeBinaryRow(meta.minKey()), + serializeBinaryRow(meta.maxKey()), + meta.keyStats().toRow(), + meta.valueStats().toRow(), + meta.minSequenceNumber(), + meta.maxSequenceNumber(), + meta.schemaId(), + meta.level(), + toStringArrayData(meta.extraFiles()), + meta.creationTime()); + } + + @Override + public DataFileMeta fromRow(InternalRow row) { + return new DataFileMeta( + row.getString(0).toString(), + row.getLong(1), + row.getLong(2), + deserializeBinaryRow(row.getBinary(3)), + deserializeBinaryRow(row.getBinary(4)), + SimpleStats.fromRow(row.getRow(5, 3)), + SimpleStats.fromRow(row.getRow(6, 3)), + row.getLong(7), + row.getLong(8), + row.getLong(9), + row.getInt(10), + fromStringArrayData(row.getArray(11)), + row.getTimestamp(12, 3), + null, + null, + null); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableLegacyV2Serializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableLegacyV2Serializer.java new file mode 100644 index 0000000000000..1c883b78942e1 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableLegacyV2Serializer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.manifest; + +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageLegacyV2Serializer; + +import java.io.IOException; +import java.util.List; + +/** A legacy version serializer for {@link ManifestCommittable}. */ +public class ManifestCommittableLegacyV2Serializer extends ManifestCommittableSerializer { + + private final CommitMessageLegacyV2Serializer commitMessageLegacyV2Serializer; + + public ManifestCommittableLegacyV2Serializer() { + this.commitMessageLegacyV2Serializer = new CommitMessageLegacyV2Serializer(); + } + + @Override + protected List deserializeCommitMessage(int version, DataInputView view) + throws IOException { + return commitMessageLegacyV2Serializer.deserializeList(version, view); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java index c73c12ffa2068..7a3975c44422f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.serializer.VersionedSerializer; import org.apache.paimon.io.DataInputDeserializer; +import org.apache.paimon.io.DataInputView; import org.apache.paimon.io.DataOutputViewStreamWrapper; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageSerializer; @@ -90,10 +91,15 @@ public ManifestCommittable deserialize(int version, byte[] serialized) throws IO Map offsets = deserializeOffsets(view); int fileCommittableSerializerVersion = view.readInt(); List fileCommittables = - commitMessageSerializer.deserializeList(fileCommittableSerializerVersion, view); + deserializeCommitMessage(fileCommittableSerializerVersion, view); return new ManifestCommittable(identifier, watermark, offsets, fileCommittables); } + protected List deserializeCommitMessage(int version, DataInputView view) + throws IOException { + return commitMessageSerializer.deserializeList(version, view); + } + private Map deserializeOffsets(DataInputDeserializer view) throws IOException { int size = view.readInt(); Map offsets = new HashMap<>(size); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java new file mode 100644 index 0000000000000..838bd266ca6ff --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.sink; + +import org.apache.paimon.index.IndexFileMetaLegacyV2Serializer; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMetaLegacyV2Serializer; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.io.IndexIncrement; + +import java.io.IOException; +import java.util.Collections; + +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; + +/** A legacy version serializer for {@link CommitMessage}. */ +public class CommitMessageLegacyV2Serializer extends CommitMessageSerializer { + + private DataFileMetaLegacyV2Serializer dataFileSerializer; + private IndexFileMetaLegacyV2Serializer indexEntrySerializer; + + @Override + protected CommitMessage deserialize(int version, DataInputView view) throws IOException { + if (version == 2) { + if (dataFileSerializer == null) { + dataFileSerializer = new DataFileMetaLegacyV2Serializer(); + indexEntrySerializer = new IndexFileMetaLegacyV2Serializer(); + } + return new CommitMessageImpl( + deserializeBinaryRow(view), + view.readInt(), + new DataIncrement( + dataFileSerializer.deserializeList(view), + Collections.emptyList(), + dataFileSerializer.deserializeList(view)), + new CompactIncrement( + dataFileSerializer.deserializeList(view), + dataFileSerializer.deserializeList(view), + dataFileSerializer.deserializeList(view)), + new IndexIncrement( + indexEntrySerializer.deserializeList(view), Collections.emptyList())); + } + return super.deserialize(version, view); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index 53a1f9455c8bd..fff4e66faf4bd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -105,7 +105,7 @@ public List deserializeList(int version, DataInputView view) thro return list; } - private CommitMessage deserialize(int version, DataInputView view) throws IOException { + protected CommitMessage deserialize(int version, DataInputView view) throws IOException { ObjectSerializer dataFileSerializer; if (version == CURRENT_VERSION) { dataFileSerializer = this.dataFileSerializer; diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableLegacyV2SerializerCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableLegacyV2SerializerCompatibilityTest.java new file mode 100644 index 0000000000000..080054a2a73dc --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableLegacyV2SerializerCompatibilityTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.manifest; + +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.IOUtils; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.paimon.data.BinaryArray.fromLongArray; +import static org.apache.paimon.data.BinaryRow.singleColumn; +import static org.assertj.core.api.Assertions.assertThat; + +/** Compatibility Test for {@link ManifestCommittableSerializer}. */ +public class ManifestCommittableLegacyV2SerializerCompatibilityTest + extends ManifestCommittableSerializerCompatibilityTest { + + @Test + public void testCompatibilityToLegacyVersion2() throws IOException { + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + DataFileMeta dataFile = + new DataFileMeta( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + null, + null, + null); + List dataFiles = Collections.singletonList(dataFile); + + IndexFileMeta indexFile = + new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002); + List indexFiles = Collections.singletonList(indexFile); + + CommitMessageImpl commitMessage = + new CommitMessageImpl( + singleColumn("my_partition"), + 11, + new DataIncrement(dataFiles, dataFiles, dataFiles), + new CompactIncrement(dataFiles, dataFiles, dataFiles), + new IndexIncrement(indexFiles)); + + ManifestCommittable manifestCommittable = + new ManifestCommittable( + 5, + 202020L, + Collections.singletonMap(5, 555L), + Collections.singletonList(commitMessage)); + + ManifestCommittableSerializer serializer = new ManifestCommittableLegacyV2Serializer(); + byte[] bytes = serializer.serialize(manifestCommittable); + ManifestCommittable deserialized = serializer.deserialize(3, bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + + byte[] v2Bytes = + IOUtils.readFully( + ManifestCommittableLegacyV2SerializerCompatibilityTest.class + .getClassLoader() + .getResourceAsStream( + "compatibility/manifest-committable-legacy-v2"), + true); + deserialized = serializer.deserialize(2, v2Bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + } +} diff --git a/paimon-core/src/test/resources/compatibility/manifest-committable-legacy-v2 b/paimon-core/src/test/resources/compatibility/manifest-committable-legacy-v2 new file mode 100644 index 0000000000000000000000000000000000000000..5e1fb2f447553748ae234085c3e36b139e08b337 GIT binary patch literal 2245 zcmeH}J5Iwu5QaAi6bKL$AW^2Eh(rYyM?feAH*hSmkt|}ziVU(VQUyv%N{+)>I0fIo z>ue$;MI2y9`geAo`}WuvNBf=)&M)coK&$9q(eJmqwGGjjBXA1xCMnZJl~viikRCb& zz%UoWV*U{thPK3oL;$oQOl;o~aTeT5s!}QR5GR_$|=79ssn7`$xXu>|;Z#jT-{-JD~hpHxxn+5Tw_< zU((sLm32dZ{a8oWFF3VvIyjN7t3`TkR*v786)Ifg`MAxYiRTmFkM2TzKfAw|-TjI0 wC)S_sI@P@&yX%iDh3*O+(~f|+>aY3mnSR crea } protected CommittableStateManager createCommittableStateManager() { + if (stateCompatibleForLegacyV2) { + return new RestoreAndFailCommittableStateManager<>( + () -> + new WrappedManifestCommittableSerializer( + new ManifestCommittableLegacyV2Serializer())); + } return new RestoreAndFailCommittableStateManager<>( WrappedManifestCommittableSerializer::new); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java index 298e06ba3068a..f832668c6f83f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java @@ -66,6 +66,7 @@ public class FlinkCdcSyncDatabaseSinkBuilder { private double committerCpu; @Nullable private MemorySize committerMemory; private boolean commitChaining; + private boolean stateCompatibleForLegacyV2; // Paimon catalog used to check and create tables. There will be two // places where this catalog is used. 1) in processing function, @@ -102,6 +103,8 @@ public FlinkCdcSyncDatabaseSinkBuilder withTableOptions(Options options) { this.committerCpu = options.get(FlinkConnectorOptions.SINK_COMMITTER_CPU); this.committerMemory = options.get(FlinkConnectorOptions.SINK_COMMITTER_MEMORY); this.commitChaining = options.get(FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING); + this.stateCompatibleForLegacyV2 = + options.get(FlinkConnectorOptions.STATE_COMPATIBLE_FOR_LEGACY_V2); return this; } @@ -163,7 +166,11 @@ private void buildCombinedCdcSink() { FlinkCdcMultiTableSink sink = new FlinkCdcMultiTableSink( - catalogLoader, committerCpu, committerMemory, commitChaining); + catalogLoader, + committerCpu, + committerMemory, + commitChaining, + stateCompatibleForLegacyV2); sink.sinkFrom(partitioned); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java index ab4ac26bc30df..8bc976c311d4a 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSinkTest.java @@ -59,7 +59,8 @@ public void cancel() {} () -> FlinkCatalogFactory.createPaimonCatalog(new Options()), FlinkConnectorOptions.SINK_COMMITTER_CPU.defaultValue(), null, - true); + true, + false); DataStreamSink dataStreamSink = sink.sinkFrom(input); // check the transformation graph diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java index 275ce836a05e4..12fee0244a4b2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java @@ -387,6 +387,13 @@ public class FlinkConnectorOptions { "Both can be configured at the same time: 'done-partition,success-file'.") .build()); + public static final ConfigOption STATE_COMPATIBLE_FOR_LEGACY_V2 = + key("state.compatible-for-less-than-08") + .booleanType() + .defaultValue(false) + .withDescription( + "If the user attempts to upgrade from a version less than Paimon 0.8.0 to 0.8.1(+), this option can be set to true to ensure flink's state compatibility."); + public static List> getOptions() { final Field[] fields = FlinkConnectorOptions.class.getFields(); final List> list = new ArrayList<>(fields.length); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java index 02351a0371cde..0e0df37f5063c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.manifest.ManifestCommittableLegacyV2Serializer; import org.apache.paimon.manifest.WrappedManifestCommittable; import org.apache.paimon.options.Options; @@ -43,6 +44,7 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_OPERATOR_CHAINING; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_MANAGED_WRITER_BUFFER_MEMORY; import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_USE_MANAGED_MEMORY; +import static org.apache.paimon.flink.FlinkConnectorOptions.STATE_COMPATIBLE_FOR_LEGACY_V2; import static org.apache.paimon.flink.sink.FlinkSink.assertBatchConfiguration; import static org.apache.paimon.flink.sink.FlinkSink.assertStreamingConfiguration; import static org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory; @@ -179,6 +181,12 @@ protected DataStreamSink doCommit( } protected CommittableStateManager createCommittableStateManager() { + if (options.get(STATE_COMPATIBLE_FOR_LEGACY_V2)) { + return new RestoreAndFailCommittableStateManager<>( + () -> + new WrappedManifestCommittableSerializer( + new ManifestCommittableLegacyV2Serializer())); + } return new RestoreAndFailCommittableStateManager<>( WrappedManifestCommittableSerializer::new); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java index f36dae4a83ced..982140aa66a67 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java @@ -19,13 +19,17 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.manifest.ManifestCommittableLegacyV2Serializer; import org.apache.paimon.manifest.ManifestCommittableSerializer; +import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import javax.annotation.Nullable; import java.util.Map; +import static org.apache.paimon.flink.FlinkConnectorOptions.STATE_COMPATIBLE_FOR_LEGACY_V2; + /** A {@link FlinkSink} to write records. */ public abstract class FlinkWriteSink extends FlinkSink { @@ -55,6 +59,11 @@ protected Committer.Factory createCommitterFac @Override protected CommittableStateManager createCommittableStateManager() { + Options options = Options.fromMap(table.options()); + if (options.get(STATE_COMPATIBLE_FOR_LEGACY_V2)) { + return new RestoreAndFailCommittableStateManager<>( + ManifestCommittableLegacyV2Serializer::new); + } return new RestoreAndFailCommittableStateManager<>(ManifestCommittableSerializer::new); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java index 324452ce51133..9f33f81d03b9c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializer.java @@ -41,7 +41,12 @@ public class WrappedManifestCommittableSerializer private final ManifestCommittableSerializer manifestCommittableSerializer; public WrappedManifestCommittableSerializer() { - this.manifestCommittableSerializer = new ManifestCommittableSerializer(); + this(new ManifestCommittableSerializer()); + } + + public WrappedManifestCommittableSerializer( + ManifestCommittableSerializer manifestCommittableSerializer) { + this.manifestCommittableSerializer = manifestCommittableSerializer; } @Override