Skip to content

Commit

Permalink
[flink] Upgraded sink connector to new API version
Browse files Browse the repository at this point in the history
- Upgraded sink connector from deprecated RichSinkFunction to new Sink interface
- Adapted corresponding unit test cases

Issue alibaba#132
  • Loading branch information
michaelkoepf committed Dec 16, 2024
1 parent 5c62f3d commit 41e592e
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.connector.flink.sink;

import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.sink.writer.AppendSinkWriter;
import com.alibaba.fluss.connector.flink.sink.writer.FlinkSinkWriter;
import com.alibaba.fluss.connector.flink.sink.writer.UpsertSinkWriter;
import com.alibaba.fluss.metadata.TablePath;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;

/** Flink sink for Fluss. */
public class FlinkSink implements Sink<RowData> {

private static final long serialVersionUID = 1L;

private final SinkWriterBuilder<? extends FlinkSinkWriter> builder;

public FlinkSink(SinkWriterBuilder<? extends FlinkSinkWriter> builder) {
this.builder = builder;
}

@Deprecated
@Override
public SinkWriter<RowData> createWriter(InitContext context) throws IOException {
throw new UnsupportedOperationException(
"Not supported. Use FlinkSink#createWriter(WriterInitContext context)");
}

@Override
public SinkWriter<RowData> createWriter(WriterInitContext context) throws IOException {
FlinkSinkWriter flinkSinkWriter = builder.createWriter();
flinkSinkWriter.initialize(context);
return flinkSinkWriter;
}

@Internal
interface SinkWriterBuilder<W extends FlinkSinkWriter> extends Serializable {
W createWriter();
}

@Internal
static class AppendSinkSinkWriterBuilder implements SinkWriterBuilder<AppendSinkWriter> {

private static final long serialVersionUID = 1L;

private final TablePath tablePath;
private final Configuration flussConfig;
private final RowType tableRowType;

public AppendSinkSinkWriterBuilder(
TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
}

@Override
public AppendSinkWriter createWriter() {
return new AppendSinkWriter(tablePath, flussConfig, tableRowType);
}
}

@Internal
static class UpsertSinkWriterBuilder implements SinkWriterBuilder<UpsertSinkWriter> {

private static final long serialVersionUID = 1L;

private final TablePath tablePath;
private final Configuration flussConfig;
private final RowType tableRowType;
private final @Nullable int[] targetColumnIndexes;

UpsertSinkWriterBuilder(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
@Nullable int[] targetColumnIndexes) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableRowType = tableRowType;
this.targetColumnIndexes = targetColumnIndexes;
}

@Override
public UpsertSinkWriter createWriter() {
return new UpsertSinkWriter(tablePath, flussConfig, tableRowType, targetColumnIndexes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.fluss.connector.flink.sink;

import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.sink.writer.FlinkSinkWriter;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.FieldEqual;
import com.alibaba.fluss.connector.flink.utils.PushdownUtils.ValueConversion;
Expand All @@ -28,7 +29,7 @@
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.RowLevelModificationScanContext;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
Expand Down Expand Up @@ -144,13 +145,16 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
}
}

FlinkSinkFunction sinkFunction =
primaryKeyIndexes.length > 0
? new UpsertSinkFunction(
FlinkSink.SinkWriterBuilder<? extends FlinkSinkWriter> flinkSinkWriterBuilder =
(primaryKeyIndexes.length > 0)
? new FlinkSink.UpsertSinkWriterBuilder(
tablePath, flussConfig, tableRowType, targetColumnIndexes)
: new AppendSinkFunction(tablePath, flussConfig, tableRowType);
: new FlinkSink.AppendSinkSinkWriterBuilder(
tablePath, flussConfig, tableRowType);

return SinkFunctionProvider.of(sinkFunction);
FlinkSink flinkSink = new FlinkSink(flinkSinkWriterBuilder);

return SinkV2Provider.of(flinkSink);
}

private List<String> columns(int[] columnIndexes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,33 @@
* limitations under the License.
*/

package com.alibaba.fluss.connector.flink.sink;
package com.alibaba.fluss.connector.flink.sink.writer;

import com.alibaba.fluss.client.table.writer.AppendWriter;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.connector.flink.utils.FlinkRowToFlussRowConverter;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;

import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/** An append only sink for fluss log table. */
class AppendSinkFunction extends FlinkSinkFunction {

private static final long serialVersionUID = 1L;
/** An append only sink writer for fluss log table. */
public class AppendSinkWriter extends FlinkSinkWriter {

private transient AppendWriter appendWriter;

AppendSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
public AppendSinkWriter(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
super(tablePath, flussConfig, tableRowType);
}

@Override
public void open(org.apache.flink.configuration.Configuration config) {
super.open(config);
public void initialize(WriterInitContext context) {
super.initialize(context);
appendWriter = table.getAppendWriter();
LOG.info("Finished opening Fluss {}.", this.getClass().getSimpleName());
}
Expand All @@ -51,19 +50,14 @@ CompletableFuture<Void> writeRow(RowKind rowKind, InternalRow internalRow) {
return appendWriter.append(internalRow);
}

@Override
void flush() throws IOException {
appendWriter.flush();
checkAsyncException();
}

@Override
FlinkRowToFlussRowConverter createFlinkRowToFlussRowConverter() {
return FlinkRowToFlussRowConverter.create(tableRowType);
}

@Override
public void close() throws Exception {
super.close();
public void flush(boolean endOfInput) throws IOException {
appendWriter.flush();
checkAsyncException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.alibaba.fluss.connector.flink.sink;
package com.alibaba.fluss.connector.flink.sink.writer;

import com.alibaba.fluss.client.Connection;
import com.alibaba.fluss.client.ConnectionFactory;
Expand All @@ -30,14 +30,11 @@
import com.alibaba.fluss.metrics.MetricNames;
import com.alibaba.fluss.row.InternalRow;

import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
Expand All @@ -48,16 +45,13 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;

/** Flink's {@link SinkFunction} implementation for Fluss. */
abstract class FlinkSinkFunction extends RichSinkFunction<RowData>
implements CheckpointedFunction, Serializable {
/** Base class for Flink {@link SinkWriter} implementations in Fluss. */
public abstract class FlinkSinkWriter implements SinkWriter<RowData> {

private static final long serialVersionUID = 1L;
protected static final Logger LOG = LoggerFactory.getLogger(FlinkSinkFunction.class);
protected static final Logger LOG = LoggerFactory.getLogger(FlinkSinkWriter.class);

private final TablePath tablePath;
private final Configuration flussConfig;
Expand All @@ -75,11 +69,11 @@ abstract class FlinkSinkFunction extends RichSinkFunction<RowData>
private transient Counter numRecordsOutErrorsCounter;
private volatile Throwable asyncWriterException;

public FlinkSinkFunction(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
public FlinkSinkWriter(TablePath tablePath, Configuration flussConfig, RowType tableRowType) {
this(tablePath, flussConfig, tableRowType, null);
}

public FlinkSinkFunction(
public FlinkSinkWriter(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
Expand All @@ -90,14 +84,13 @@ public FlinkSinkFunction(
this.tableRowType = tableRowType;
}

@Override
public void open(org.apache.flink.configuration.Configuration config) {
public void initialize(WriterInitContext context) {
LOG.info(
"Opening Fluss {}, database: {} and table: {}",
this.getClass().getSimpleName(),
tablePath.getDatabaseName(),
tablePath.getTableName());
metricGroup = InternalSinkWriterMetricGroup.wrap(getRuntimeContext().getMetricGroup());
metricGroup = InternalSinkWriterMetricGroup.wrap(context.metricGroup());
flinkMetricRegistry =
new FlinkMetricRegistry(
metricGroup, Collections.singleton(MetricNames.WRITER_SEND_LATENCY_MS));
Expand All @@ -115,7 +108,7 @@ protected void initMetrics() {
}

@Override
public void invoke(RowData value, SinkFunction.Context context) throws IOException {
public void write(RowData value, Context context) throws IOException, InterruptedException {
checkAsyncException();
InternalRow internalRow = dataConverter.toInternalRow(value);
CompletableFuture<Void> writeFuture = writeRow(value.getRowKind(), internalRow);
Expand All @@ -131,28 +124,14 @@ public void invoke(RowData value, SinkFunction.Context context) throws IOExcepti
}

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws IOException {
flush();
}

@Override
public void initializeState(FunctionInitializationContext functionInitializationContext) {}

@Override
public void finish() throws IOException {
flush();
}

abstract void flush() throws IOException;
public abstract void flush(boolean endOfInput) throws IOException, InterruptedException;

abstract FlinkRowToFlussRowConverter createFlinkRowToFlussRowConverter();

abstract CompletableFuture<Void> writeRow(RowKind rowKind, InternalRow internalRow);

@Override
public void close() throws Exception {
super.close();

try {
if (table != null) {
table.close();
Expand Down Expand Up @@ -193,7 +172,7 @@ public void close() throws Exception {

private void sanityCheck(TableDescriptor flussTableDescriptor) {
// when it's UpsertSinkFunction, it means it has primary key got from Flink's metadata
boolean hasPrimaryKey = this instanceof UpsertSinkFunction;
boolean hasPrimaryKey = this instanceof UpsertSinkWriter;
if (flussTableDescriptor.hasPrimaryKey() != hasPrimaryKey) {
throw new ValidationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.alibaba.fluss.connector.flink.sink;
package com.alibaba.fluss.connector.flink.sink.writer;

import com.alibaba.fluss.client.table.writer.UpsertWrite;
import com.alibaba.fluss.client.table.writer.UpsertWriter;
Expand All @@ -23,6 +23,7 @@
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.row.InternalRow;

import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

Expand All @@ -31,14 +32,12 @@
import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/** A upsert sink for fluss primary key table. */
class UpsertSinkFunction extends FlinkSinkFunction {

private static final long serialVersionUID = 1L;
/** An upsert sink writer or fluss primary key table. */
public class UpsertSinkWriter extends FlinkSinkWriter {

private transient UpsertWriter upsertWriter;

UpsertSinkFunction(
public UpsertSinkWriter(
TablePath tablePath,
Configuration flussConfig,
RowType tableRowType,
Expand All @@ -47,8 +46,8 @@ class UpsertSinkFunction extends FlinkSinkFunction {
}

@Override
public void open(org.apache.flink.configuration.Configuration config) {
super.open(config);
public void initialize(WriterInitContext context) {
super.initialize(context);
UpsertWrite upsertOptions = new UpsertWrite();
if (targetColumnIndexes != null) {
upsertOptions = upsertOptions.withPartialUpdate(targetColumnIndexes);
Expand All @@ -75,7 +74,7 @@ FlinkRowToFlussRowConverter createFlinkRowToFlussRowConverter() {
}

@Override
void flush() throws IOException {
public void flush(boolean endOfInput) throws IOException, InterruptedException {
upsertWriter.flush();
checkAsyncException();
}
Expand Down
Loading

0 comments on commit 41e592e

Please sign in to comment.