Skip to content

Commit

Permalink
[Improve] [Connector-V2] Remove scheduler in InfluxDB sink (apache#5271)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: gdliu3 <[email protected]>
  • Loading branch information
liugddx and gdliu3 authored Aug 24, 2023
1 parent 8bbda25 commit f459f50
Show file tree
Hide file tree
Showing 4 changed files with 1 addition and 52 deletions.
7 changes: 1 addition & 6 deletions docs/en/connector-v2/sink/InfluxDB.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ Write data to InfluxDB.
| key_time | string | no | processing time |
| key_tags | array | no | exclude `field` & `key_time` |
| batch_size | int | no | 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| connect_timeout_ms | long | no | 15000 |
Expand Down Expand Up @@ -63,11 +62,7 @@ If not specified, include all fields with `influxDB` measurement field

### batch_size [int]

For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB

### batch_interval_ms [int]

For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the influxDB
For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `checkpoint.interval`, the data will be flushed into the influxDB

### max_retries [int]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ public SinkConfig(Config config) {
.defaultValue(1024)
.withDescription("batch size of the influxdb client");

public static final Option<Integer> BATCH_INTERVAL_MS =
Options.key("batch_interval_ms")
.intType()
.noDefaultValue()
.withDescription("batch interval ms of the influxdb client");

public static final Option<Integer> MAX_RETRIES =
Options.key("max_retries")
.intType()
Expand Down Expand Up @@ -104,7 +98,6 @@ public SinkConfig(Config config) {
private String keyTime;
private List<String> keyTags;
private int batchSize = BATCH_SIZE.defaultValue();
private Integer batchIntervalMs;
private int maxRetries;
private int retryBackoffMultiplierMs;
private int maxRetryBackoffMs;
Expand All @@ -119,9 +112,6 @@ public static SinkConfig loadConfig(Config config) {
if (config.hasPath(KEY_TAGS.key())) {
sinkConfig.setKeyTags(config.getStringList(KEY_TAGS.key()));
}
if (config.hasPath(BATCH_INTERVAL_MS.key())) {
sinkConfig.setBatchIntervalMs(config.getInt(BATCH_INTERVAL_MS.key()));
}
if (config.hasPath(MAX_RETRIES.key())) {
sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES.key()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.PASSWORD;
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.USERNAME;
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.BATCH_INTERVAL_MS;
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.BATCH_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_TAGS;
Expand All @@ -54,7 +53,6 @@ public OptionRule optionRule() {
KEY_TAGS,
KEY_TIME,
BATCH_SIZE,
BATCH_INTERVAL_MS,
MAX_RETRIES,
RETRY_BACKOFF_MULTIPLIER_MS)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -43,10 +42,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

@Slf4j
public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
Expand All @@ -55,15 +50,11 @@ public class InfluxDBSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private InfluxDB influxdb;
private final SinkConfig sinkConfig;
private final List<Point> batchList;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> scheduledFuture;
private volatile Exception flushException;
private final Integer batchIntervalMs;

public InfluxDBSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType)
throws ConnectException {
this.sinkConfig = SinkConfig.loadConfig(pluginConfig);
this.batchIntervalMs = sinkConfig.getBatchIntervalMs();
this.serializer =
new DefaultSerializer(
seaTunnelRowType,
Expand All @@ -73,26 +64,6 @@ public InfluxDBSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType
sinkConfig.getMeasurement());
this.batchList = new ArrayList<>();

if (batchIntervalMs != null) {
scheduler =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("influxDB-sink-output-%s")
.build());
scheduledFuture =
scheduler.scheduleAtFixedRate(
() -> {
try {
flush();
} catch (IOException e) {
flushException = e;
}
},
batchIntervalMs,
batchIntervalMs,
TimeUnit.MILLISECONDS);
}

connect();
}

Expand All @@ -112,11 +83,6 @@ public Optional<Void> prepareCommit() {

@Override
public void close() throws IOException {
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
scheduler.shutdown();
}

flush();

if (influxdb != null) {
Expand Down

0 comments on commit f459f50

Please sign in to comment.