Skip to content

Commit

Permalink
duration config
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Sep 20, 2023
1 parent ca740ca commit 1290e20
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import org.apache.calcite.linq4j.function.Experimental;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.TimeUtils;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -196,17 +197,17 @@ public Integer writeParallelism() {
*/
@Experimental
public Duration tableRefreshInterval() {
Long intervalMs =
String duration =
confParser
.longConf()
.option(FlinkWriteOptions.TABLE_REFRSH_MS.key())
.flinkConfig(FlinkWriteOptions.TABLE_REFRSH_MS)
.stringConf()
.option(FlinkWriteOptions.TABLE_REFRSH_INTERVAL.key())
.flinkConfig(FlinkWriteOptions.TABLE_REFRSH_INTERVAL)
.parseOptional();

if (intervalMs == null) {
if (duration == null) {
return null;
}

return Duration.ofMillis(intervalMs);
return TimeUtils.parseDuration(duration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ private FlinkWriteOptions() {}
ConfigOptions.key("write-parallelism").intType().noDefaultValue();

@Experimental
public static final ConfigOption<Long> TABLE_REFRSH_MS =
ConfigOptions.key("table-refresh-ms").longType().noDefaultValue();
public static final ConfigOption<String> TABLE_REFRSH_INTERVAL =
ConfigOptions.key("table-refresh-interval").stringType().noDefaultValue();
}

0 comments on commit 1290e20

Please sign in to comment.