Skip to content

Commit

Permalink
[BugFix] Fix predicate push-down time dimension table error(#375)
Browse files Browse the repository at this point in the history
Signed-off-by: hhoao <[email protected]>
  • Loading branch information
hhoao committed Jul 18, 2024
1 parent cc8689d commit bc77d8e
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@
import java.text.SimpleDateFormat;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class StarRocksDynamicLRUFunction extends TableFunction<RowData> {

private static final Logger LOG = LoggerFactory.getLogger(StarRocksDynamicLRUFunction.class);

private final ColumnRichInfo[] filterRichInfos;
private final StarRocksSourceOptions sourceOptions;
private final ArrayList<String> filterList;
Expand All @@ -56,7 +58,7 @@ public class StarRocksDynamicLRUFunction extends TableFunction<RowData> {
private final long cacheExpireMs;
private final int maxRetryTimes;

public StarRocksDynamicLRUFunction(StarRocksSourceOptions sourceOptions,
public StarRocksDynamicLRUFunction(StarRocksSourceOptions sourceOptions,
ColumnRichInfo[] filterRichInfos,
List<ColumnRichInfo> columnRichInfos,
SelectColumn[] selectColumns) {
Expand All @@ -72,7 +74,7 @@ public StarRocksDynamicLRUFunction(StarRocksSourceOptions sourceOptions,
this.filterList = new ArrayList<>();
this.dataReaderList = new ArrayList<>();
}

@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
Expand Down Expand Up @@ -101,14 +103,17 @@ public void eval(Object... keys) {
}
String filter = String.join(" and ", filterList);
filterList.clear();
String SQL = "select * from " + sourceOptions.getDatabaseName() + "." + sourceOptions.getTableName() + " where " + filter;
String columns = Arrays.stream(selectColumns)
.map(col -> "`" + col.getColumnName() + "`")
.collect(Collectors.joining(","));
String SQL = "select " + columns + " from " + sourceOptions.getDatabaseName() + "." + sourceOptions.getTableName() + " where " + filter;
LOG.info("LookUpFunction SQL [{}]", SQL);
this.queryInfo = StarRocksSourceCommonFunc.getQueryInfo(this.sourceOptions, SQL);
List<List<QueryBeXTablets>> lists = StarRocksSourceCommonFunc.splitQueryBeXTablets(1, queryInfo);
lists.get(0).forEach(beXTablets -> {
StarRocksSourceBeReader beReader = new StarRocksSourceBeReader(beXTablets.getBeNode(),
columnRichInfos,
selectColumns,
selectColumns,
sourceOptions);
beReader.openScanner(beXTablets.getTabletIds(), queryInfo.getQueryPlan().getOpaqued_query_plan(), sourceOptions);
beReader.startToRead();
Expand All @@ -132,7 +137,7 @@ public void eval(Object... keys) {
});
rows.trimToSize();
cache.put(keyRow, rows);
}
}
}

private void getFieldValue(Object obj, ColumnRichInfo columnRichInfo) {
Expand All @@ -147,9 +152,9 @@ private void getFieldValue(Object obj, ColumnRichInfo columnRichInfo) {
filter = columnRichInfo.getColumnName() + " = '" + sdf.format(d).toString() + "'";
}
if (flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE ||
flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE ||
flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE ||
flinkTypeRoot == LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE) {

DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
String strDateTime = dtf.format(((TimestampData)obj).toLocalDateTime());
filter = columnRichInfo.getColumnName() + " = '" + strDateTime + "'";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;

import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.PushDownHolder;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
import org.apache.flink.table.functions.TableFunction;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -58,32 +60,77 @@ public ChangelogMode getChangelogMode() {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
StarRocksDynamicSourceFunction sourceFunction = new StarRocksDynamicSourceFunction(
options, flinkSchema,
this.pushDownHolder.getFilter(),
this.pushDownHolder.getLimit(),
this.pushDownHolder.getSelectColumns(),
options, flinkSchema,
this.pushDownHolder.getFilter(),
this.pushDownHolder.getLimit(),
this.pushDownHolder.getSelectColumns(),
this.pushDownHolder.getQueryType());
return SourceFunctionProvider.of(sourceFunction, true);
}

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
int[] projectedFields = Arrays.stream(context.getKeys()).mapToInt(value -> value[0]).toArray();
Map<String, ColumnRichInfo> columnMap = StarRocksSourceCommonFunc.genColumnMap(flinkSchema);
List<ColumnRichInfo> allColumnRichInfos =
StarRocksSourceCommonFunc.genColumnRichInfo(columnMap);
SelectColumn[] pushDownSelectColumns = pushDownHolder.getSelectColumns();
SelectColumn[] selectColumns;
List<ColumnRichInfo> columnRichInfos;
int[] projectedFields =
Arrays.stream(context.getKeys()).mapToInt(value -> value[0]).toArray();
ColumnRichInfo[] filerRichInfo = new ColumnRichInfo[projectedFields.length];
for (int i = 0; i < projectedFields.length; i ++) {
ColumnRichInfo columnRichInfo = new ColumnRichInfo(
this.flinkSchema.getFieldName(projectedFields[i]).get(),
projectedFields[i],
this.flinkSchema.getFieldDataType(projectedFields[i]).get()
);
filerRichInfo[i] = columnRichInfo;
StarRocksSourceQueryType queryType = pushDownHolder.getQueryType();
if (queryType == StarRocksSourceQueryType.QuerySomeColumns) {
columnRichInfos = new ArrayList<>();
selectColumns = new SelectColumn[pushDownSelectColumns.length];
for (int i = 0; i < pushDownSelectColumns.length; i++) {
ColumnRichInfo columnRichInfo =
allColumnRichInfos.get(
pushDownSelectColumns[i].getColumnIndexInFlinkTable());
columnRichInfos.add(
new ColumnRichInfo(
columnRichInfo.getColumnName(), i, columnRichInfo.getDataType()));
selectColumns[i] = new SelectColumn(columnRichInfo.getColumnName(), i);
}
for (int i = 0; i < projectedFields.length; i++) {
int columnIndexInFlinkTable = pushDownSelectColumns[i].getColumnIndexInFlinkTable();
ColumnRichInfo columnRichInfo =
new ColumnRichInfo(
this.flinkSchema.getFieldName(columnIndexInFlinkTable).get(),
i,
this.flinkSchema.getFieldDataType(columnIndexInFlinkTable).get());

filerRichInfo[i] = columnRichInfo;
}
} else {
columnRichInfos = allColumnRichInfos;
selectColumns =
StarRocksSourceCommonFunc.genSelectedColumns(
columnMap, this.options, allColumnRichInfos);
for (int i = 0; i < projectedFields.length; i++) {
ColumnRichInfo columnRichInfo =
new ColumnRichInfo(
this.flinkSchema.getFieldName(i).get(),
projectedFields[i],
this.flinkSchema.getFieldDataType(i).get());
filerRichInfo[i] = columnRichInfo;
}
}

Map<String, ColumnRichInfo> columnMap = StarRocksSourceCommonFunc.genColumnMap(flinkSchema);
List<ColumnRichInfo> ColumnRichInfos = StarRocksSourceCommonFunc.genColumnRichInfo(columnMap);
SelectColumn[] selectColumns = StarRocksSourceCommonFunc.genSelectedColumns(columnMap, this.options, ColumnRichInfos);

StarRocksDynamicLookupFunction tableFunction = new StarRocksDynamicLookupFunction(this.options, filerRichInfo, ColumnRichInfos, selectColumns);
TableFunction<RowData> tableFunction = null;
StarRocksSourceOptions.CacheType lookupCacheType = options.getLookupCacheType();
switch (lookupCacheType) {
case ALL:
tableFunction =
new StarRocksDynamicLookupFunction(
this.options, filerRichInfo, columnRichInfos, selectColumns);
break;
case LRU:
tableFunction =
new StarRocksDynamicLRUFunction(
this.options, filerRichInfo, columnRichInfos, selectColumns);
break;
}
return TableFunctionProvider.of(tableFunction);
}

Expand Down Expand Up @@ -113,7 +160,7 @@ public void applyProjection(int[][] projectedFields) {
this.pushDownHolder.setQueryType(StarRocksSourceQueryType.QuerySomeColumns);

ArrayList<String> columnList = new ArrayList<>();
ArrayList<SelectColumn> selectColumns = new ArrayList<SelectColumn>();
ArrayList<SelectColumn> selectColumns = new ArrayList<SelectColumn>();
for (int index : curProjectedFields) {
String columnName = flinkSchema.getFieldName(index).get();
columnList.add(columnName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(StarRocksSourceOptions.LOOKUP_CACHE_TTL_MS);
options.add(StarRocksSourceOptions.LOOKUP_CACHE_MAX_ROWS);
options.add(StarRocksSourceOptions.LOOKUP_MAX_RETRIES);
options.add(StarRocksSourceOptions.LOOKUP_CACHE_TYPE);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@


public class StarRocksSourceCommonFunc {

private static volatile StarRocksQueryVisitor starrocksQueryVisitor;

private static volatile StarRocksQueryPlanVisitor starRocksQueryPlanVisitor;


private static StarRocksQueryVisitor getStarRocksQueryVisitor(StarRocksSourceOptions sourceOptions) {
if (null == starrocksQueryVisitor) {
Expand Down Expand Up @@ -84,15 +84,15 @@ public static List<List<QueryBeXTablets>> splitQueryBeXTablets(int subTaskCount,
curBeXTabletList.set(i, Collections.singletonList(queryInfo.getBeXTablets().get(i)));
}
return curBeXTabletList;
}
}
if (subTaskCount < beXTabletsListCount) {
for (int i = 0; i < beXTabletsListCount; i ++) {
List<QueryBeXTablets> tList = curBeXTabletList.get(i%subTaskCount);
tList.add(queryInfo.getBeXTablets().get(i));
curBeXTabletList.set(i%subTaskCount, tList);
}
return curBeXTabletList;
}
}
List<QueryBeXTablets> beWithSingleTabletList = new ArrayList<>();
queryInfo.getBeXTablets().forEach(beXTablets -> {
beXTablets.getTabletIds().forEach(tabletId -> {
Expand All @@ -106,7 +106,7 @@ public static List<List<QueryBeXTablets>> splitQueryBeXTablets(int subTaskCount,
curBeXTabletList.set(i, Collections.singletonList(beWithSingleTabletList.get(i)));
}
return curBeXTabletList;
}
}
long newx = Math.round(x);
for (int i = 0; i < subTaskCount; i ++) {
int start = (int)(i * newx);
Expand All @@ -124,7 +124,7 @@ public static List<List<QueryBeXTablets>> splitQueryBeXTablets(int subTaskCount,
curBxTs = beWithSingleTabletList.subList(start, end);
Map<String, List<Long>> beXTabletsMap = new HashMap<>();
curBxTs.forEach(curBxT -> {
List<Long> tablets = new ArrayList<>();
List<Long> tablets = new ArrayList<>();
if (beXTabletsMap.containsKey(curBxT.getBeNode())) {
tablets = beXTabletsMap.get(curBxT.getBeNode());
} else {
Expand Down Expand Up @@ -174,8 +174,12 @@ public static List<ColumnRichInfo> genColumnRichInfo(Map<String, ColumnRichInfo>
return columnMap.values().stream().sorted(Comparator.comparing(ColumnRichInfo::getColumnIndexInSchema)).collect(Collectors.toList());
}

public static List<ColumnRichInfo> getSelectSql(Map<String, ColumnRichInfo> columnMap) {
return columnMap.values().stream().sorted(Comparator.comparing(ColumnRichInfo::getColumnIndexInSchema)).collect(Collectors.toList());
}

public static SelectColumn[] genSelectedColumns(Map<String, ColumnRichInfo> columnMap,
StarRocksSourceOptions sourceOptions,
StarRocksSourceOptions sourceOptions,
List<ColumnRichInfo> columnRichInfos) {
List<SelectColumn> selectedColumns = new ArrayList<>();
// user selected columns from sourceOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,24 @@ public class StarRocksSourceOptions implements Serializable {

public static final ConfigOption<String> TABLE_NAME = ConfigOptions.key("table-name")
.stringType().noDefaultValue().withDescription("Table name");


// optional Options
public static final ConfigOption<Integer> SCAN_CONNECT_TIMEOUT = ConfigOptions.key("scan.connect.timeout-ms")
.intType().defaultValue(1000).withDescription("Connect timeout");

public static final ConfigOption<Integer> SCAN_BATCH_ROWS = ConfigOptions.key("scan.params.batch-rows")
.intType().defaultValue(1000).withDescription("Batch rows");

public static final ConfigOption<String> SCAN_PROPERTIES = ConfigOptions.key("scan.params.properties")
.stringType().noDefaultValue().withDescription("Reserved params for use");

public static final ConfigOption<Integer> SCAN_LIMIT = ConfigOptions.key("scan.params.limit")
.intType().defaultValue(1).withDescription("The query limit, if specified.");

public static final ConfigOption<Integer> SCAN_KEEP_ALIVE_MIN = ConfigOptions.key("scan.params.keep-alive-min")
.intType().defaultValue(10).withDescription("Max keep alive time min");

public static final ConfigOption<Integer> SCAN_QUERTY_TIMEOUT_S = ConfigOptions.key("scan.params.query-timeout-s")
.intType().defaultValue(600).withDescription("Query timeout for a single query");

Expand All @@ -88,7 +88,7 @@ public class StarRocksSourceOptions implements Serializable {

public static final ConfigOption<String> SCAN_BE_HOST_MAPPING_LIST = ConfigOptions.key("scan.be-host-mapping-list")
.stringType().defaultValue("").withDescription("List of be host mapping");

// lookup Options
public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS = ConfigOptions.key("lookup.cache.max-rows")
.longType().defaultValue(-1L).withDescription(
Expand All @@ -102,6 +102,12 @@ public class StarRocksSourceOptions implements Serializable {
public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES = ConfigOptions.key("lookup.max-retries")
.intType().defaultValue(1).withDescription("the max retry times if lookup database failed.");

public static final ConfigOption<CacheType> LOOKUP_CACHE_TYPE =
ConfigOptions.key("lookup.cache-type")
.enumType(CacheType.class)
.defaultValue(CacheType.ALL)
.withDescription("lookup type.");


public static final String SOURCE_PROPERTIES_PREFIX = "scan.params.";

Expand Down Expand Up @@ -150,7 +156,7 @@ public String getScanUrl() {
public String getJdbcUrl() {
return tableOptions.get(JDBC_URL);
}

public String getUsername() {
return tableOptions.get(USERNAME);
}
Expand All @@ -169,8 +175,8 @@ public String getTableName() {


// optional Options
public int getConnectTimeoutMs() {
return tableOptions.get(SCAN_CONNECT_TIMEOUT).intValue();
public int getConnectTimeoutMs() {
return tableOptions.get(SCAN_CONNECT_TIMEOUT).intValue();
}

public int getBatchRows() {
Expand Down Expand Up @@ -236,10 +242,23 @@ public int getLookupMaxRetries() {
return tableOptions.get(LOOKUP_MAX_RETRIES).intValue();
}


public static Builder builder() {
return new Builder();
}

public CacheType getLookupCacheType() {
return tableOptions.get(LOOKUP_CACHE_TYPE);
}

/**
* Cache Type
*/
public enum CacheType {
LRU,
ALL
}

/**
* Builder for {@link StarRocksSourceOptions}.
*/
Expand Down

0 comments on commit bc77d8e

Please sign in to comment.