Skip to content

Commit

Permalink
[flink] supports upgrading from versions below 0.8.0 (excluding 0.8.0…
Browse files Browse the repository at this point in the history
…) to 0.8.1+.
  • Loading branch information
liming30 committed Jun 4, 2024
1 parent 41df78a commit 4b33dbb
Show file tree
Hide file tree
Showing 16 changed files with 459 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,5 +236,11 @@
<td>Integer</td>
<td>Defines a custom parallelism for the unaware-bucket table compaction job. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration.</td>
</tr>
<tr>
<td><h5>state.compatible-for-less-than-08</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If the user attempts to upgrade from a version less than Paimon 0.8.0 to 0.8.1(+), this option can be set to true to ensure flink's state compatibility.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.index;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ObjectSerializer;

import java.util.ArrayList;
import java.util.List;

import static org.apache.paimon.utils.SerializationUtils.newStringType;

/** A legacy version serializer for {@link IndexFileMeta}. */
public class IndexFileMetaLegacyV2Serializer extends ObjectSerializer<IndexFileMeta> {

public IndexFileMetaLegacyV2Serializer() {
super(schema());
}

@Override
public InternalRow toRow(IndexFileMeta record) {
return GenericRow.of(
BinaryString.fromString(record.indexType()),
BinaryString.fromString(record.fileName()),
record.fileSize(),
record.rowCount());
}

@Override
public IndexFileMeta fromRow(InternalRow row) {
return new IndexFileMeta(
row.getString(0).toString(),
row.getString(1).toString(),
row.getLong(2),
row.getLong(3),
null);
}

private static RowType schema() {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(0, "_INDEX_TYPE", newStringType(false)));
fields.add(new DataField(1, "_FILE_NAME", newStringType(false)));
fields.add(new DataField(2, "_FILE_SIZE", new BigIntType(false)));
fields.add(new DataField(3, "_ROW_COUNT", new BigIntType(false)));
return new RowType(fields);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.io;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ObjectSerializer;

import java.util.ArrayList;
import java.util.List;

import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData;
import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
import static org.apache.paimon.utils.SerializationUtils.newBytesType;
import static org.apache.paimon.utils.SerializationUtils.newStringType;
import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;

/** A legacy version serializer for {@link DataFileMeta}. */
public class DataFileMetaLegacyV2Serializer extends ObjectSerializer<DataFileMeta> {

private static final long serialVersionUID = 1L;

public DataFileMetaLegacyV2Serializer() {
super(schemaForLessThan08());
}

private static RowType schemaForLessThan08() {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(0, "_FILE_NAME", newStringType(false)));
fields.add(new DataField(1, "_FILE_SIZE", new BigIntType(false)));
fields.add(new DataField(2, "_ROW_COUNT", new BigIntType(false)));
fields.add(new DataField(3, "_MIN_KEY", newBytesType(false)));
fields.add(new DataField(4, "_MAX_KEY", newBytesType(false)));
fields.add(new DataField(5, "_KEY_STATS", SimpleStatsConverter.schema()));
fields.add(new DataField(6, "_VALUE_STATS", SimpleStatsConverter.schema()));
fields.add(new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false)));
fields.add(new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false)));
fields.add(new DataField(9, "_SCHEMA_ID", new BigIntType(false)));
fields.add(new DataField(10, "_LEVEL", new IntType(false)));
fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false, newStringType(false))));
fields.add(new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS()));
return new RowType(fields);
}

@Override
public InternalRow toRow(DataFileMeta meta) {
return GenericRow.of(
BinaryString.fromString(meta.fileName()),
meta.fileSize(),
meta.rowCount(),
serializeBinaryRow(meta.minKey()),
serializeBinaryRow(meta.maxKey()),
meta.keyStats().toRow(),
meta.valueStats().toRow(),
meta.minSequenceNumber(),
meta.maxSequenceNumber(),
meta.schemaId(),
meta.level(),
toStringArrayData(meta.extraFiles()),
meta.creationTime());
}

@Override
public DataFileMeta fromRow(InternalRow row) {
return new DataFileMeta(
row.getString(0).toString(),
row.getLong(1),
row.getLong(2),
deserializeBinaryRow(row.getBinary(3)),
deserializeBinaryRow(row.getBinary(4)),
SimpleStats.fromRow(row.getRow(5, 3)),
SimpleStats.fromRow(row.getRow(6, 3)),
row.getLong(7),
row.getLong(8),
row.getLong(9),
row.getInt(10),
fromStringArrayData(row.getArray(11)),
row.getTimestamp(12, 3),
null,
null,
null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.manifest;

import org.apache.paimon.io.DataInputView;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageLegacyV2Serializer;

import java.io.IOException;
import java.util.List;

/** A legacy version serializer for {@link ManifestCommittable}. */
public class ManifestCommittableLegacyV2Serializer extends ManifestCommittableSerializer {

private final CommitMessageLegacyV2Serializer commitMessageLegacyV2Serializer;

public ManifestCommittableLegacyV2Serializer() {
this.commitMessageLegacyV2Serializer = new CommitMessageLegacyV2Serializer();
}

@Override
protected List<CommitMessage> deserializeCommitMessage(int version, DataInputView view)
throws IOException {
return commitMessageLegacyV2Serializer.deserializeList(version, view);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.serializer.VersionedSerializer;
import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageSerializer;
Expand Down Expand Up @@ -90,10 +91,15 @@ public ManifestCommittable deserialize(int version, byte[] serialized) throws IO
Map<Integer, Long> offsets = deserializeOffsets(view);
int fileCommittableSerializerVersion = view.readInt();
List<CommitMessage> fileCommittables =
commitMessageSerializer.deserializeList(fileCommittableSerializerVersion, view);
deserializeCommitMessage(fileCommittableSerializerVersion, view);
return new ManifestCommittable(identifier, watermark, offsets, fileCommittables);
}

protected List<CommitMessage> deserializeCommitMessage(int version, DataInputView view)
throws IOException {
return commitMessageSerializer.deserializeList(version, view);
}

private Map<Integer, Long> deserializeOffsets(DataInputDeserializer view) throws IOException {
int size = view.readInt();
Map<Integer, Long> offsets = new HashMap<>(size);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table.sink;

import org.apache.paimon.index.IndexFileMetaLegacyV2Serializer;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMetaLegacyV2Serializer;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.IndexIncrement;

import java.io.IOException;
import java.util.Collections;

import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;

/** A legacy version serializer for {@link CommitMessage}. */
public class CommitMessageLegacyV2Serializer extends CommitMessageSerializer {

private DataFileMetaLegacyV2Serializer dataFileSerializer;
private IndexFileMetaLegacyV2Serializer indexEntrySerializer;

@Override
protected CommitMessage deserialize(int version, DataInputView view) throws IOException {
if (version == 2) {
if (dataFileSerializer == null) {
dataFileSerializer = new DataFileMetaLegacyV2Serializer();
indexEntrySerializer = new IndexFileMetaLegacyV2Serializer();
}
return new CommitMessageImpl(
deserializeBinaryRow(view),
view.readInt(),
new DataIncrement(
dataFileSerializer.deserializeList(view),
Collections.emptyList(),
dataFileSerializer.deserializeList(view)),
new CompactIncrement(
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view)),
new IndexIncrement(
indexEntrySerializer.deserializeList(view), Collections.emptyList()));
}
return super.deserialize(version, view);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public List<CommitMessage> deserializeList(int version, DataInputView view) thro
return list;
}

private CommitMessage deserialize(int version, DataInputView view) throws IOException {
protected CommitMessage deserialize(int version, DataInputView view) throws IOException {
ObjectSerializer<DataFileMeta> dataFileSerializer;
if (version == CURRENT_VERSION) {
dataFileSerializer = this.dataFileSerializer;
Expand Down
Loading

0 comments on commit 4b33dbb

Please sign in to comment.