Skip to content

Commit

Permalink
Merge pull request cdapio#15324 from cdapio/bugfix/CDAP-20587
Browse files Browse the repository at this point in the history
fix(CDAP-20587): revert the order by clause for the default runs scan, specify the ordering for case without version info
  • Loading branch information
de-lan authored Sep 19, 2023
2 parents 98f4b53 + c3921d9 commit b7abe0d
Showing 1 changed file with 37 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1939,22 +1939,37 @@ private int getLimitByStatus(int limit, ProgramRunStatus status) {
return limit * multiplier;
}

private CloseableIterator<RunRecordDetail> queryProgramRuns(Range range,
@Nullable Predicate<StructuredRow> keyPredicate,
@Nullable Predicate<RunRecordDetail> predicate,
int limit) throws IOException {
return queryProgramRuns(range, false, keyPredicate, predicate, limit);
}

/**
* Iterate over a range of run records, filter by predicates and pass each run record to the
* consumer.
*
* @param range to scan runRecordsTable with
* @param orderedByStartTime whether to return the result ordered by "run_start_time"
* @param keyPredicate to filter the row keys by. If null, then does not filter.
* @param predicate to filter the runRecordMetas by. If null, then does not filter.
* @param limit the maximum number of entries to return
*/
private CloseableIterator<RunRecordDetail> queryProgramRuns(Range range,
boolean orderedByStartTime,
@Nullable Predicate<StructuredRow> keyPredicate,
@Nullable Predicate<RunRecordDetail> predicate,
int limit) throws IOException {
CloseableIterator<StructuredRow> iterator = getRunRecordsTable()
.scan(range, predicate == null && keyPredicate == null ? limit : Integer.MAX_VALUE,
StoreDefinition.AppMetadataStore.RUN_START_TIME, SortOrder.ASC);
// scanLimit is different from passed-in limit(the actual maximum number of entries to return)
int scanLimit = predicate == null && keyPredicate == null ? limit : Integer.MAX_VALUE;
CloseableIterator<StructuredRow> iterator;
if (orderedByStartTime) {
iterator = getRunRecordsTable().scan(range, scanLimit,
StoreDefinition.AppMetadataStore.RUN_START_TIME, SortOrder.ASC);
} else {
iterator = getRunRecordsTable().scan(range, scanLimit);
}

return new AbstractCloseableIterator<RunRecordDetail>() {

Expand Down Expand Up @@ -2032,17 +2047,34 @@ private Map<ProgramRunId, RunRecordDetail> getAllProgramRuns(ProgramReference pr
String recordType) throws IOException {
List<Field<?>> prefix = getRunRecordProgramRefPrefix(recordType, programReference);
Range scanRange = createRunRecordScanRange(prefix, startTime, endTime);
return getRuns(scanRange, status, limit, null, filter);
// Because the version field is not provided, we need to specify that the result is ordered by
// run_start_time to make sure the returned run records are chronological. Note that this could
// have DB performance implication since specifying ORDER BY run_start_time will sort the scan
// result in database before it's returned.
return getRuns(scanRange, true, status, limit, null, filter);
}

private Map<ProgramRunId, RunRecordDetail> getRuns(Range range, ProgramRunStatus status,
int limit,
@Nullable Predicate<StructuredRow> keyFilter,
@Nullable Predicate<RunRecordDetail> valueFilter)
throws IOException {
// By default, the version field is provided and result is ordered by the full primary keys
return getRuns(range, false, status, limit, keyFilter, valueFilter);
}

private Map<ProgramRunId, RunRecordDetail> getRuns(
Range range,
boolean orderedByStartTime,
ProgramRunStatus status,
int limit,
@Nullable Predicate<StructuredRow> keyFilter,
@Nullable Predicate<RunRecordDetail> valueFilter)
throws IOException {

Map<ProgramRunId, RunRecordDetail> map = new LinkedHashMap<>();
try (CloseableIterator<RunRecordDetail> iterator = queryProgramRuns(range, keyFilter,
try (CloseableIterator<RunRecordDetail> iterator = queryProgramRuns(range, orderedByStartTime,
keyFilter,
valueFilter,
getLimitByStatus(limit, status))) {
while (iterator.hasNext() && map.size() < limit) {
Expand Down

0 comments on commit b7abe0d

Please sign in to comment.