diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index db724373a51a..2c35a3b905dd 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -417,19 +417,28 @@ public Cursor apply(final Long input) } } - final Offset offset = descending ? - new DescendingTimestampCheckingOffset( - baseOffset, - timestamps, - timeStart, - minDataTimestamp >= timeStart - ) : - new AscendingTimestampCheckingOffset( - baseOffset, - timestamps, - timeEnd, - maxDataTimestamp < timeEnd - ); + final Offset offset; + if (descending) { + if (minDataTimestamp >= timeStart) { + offset = baseOffset; + } else { + offset = new DescendingTimestampCheckingOffset( + baseOffset, + timestamps, + timeStart + ); + } + } else { + if (maxDataTimestamp < timeEnd) { + offset = baseOffset; + } else { + offset = new AscendingTimestampCheckingOffset( + baseOffset, + timestamps, + timeEnd + ); + } + } final Offset initOffset = offset.clone(); @@ -1178,20 +1187,16 @@ private abstract static class TimestampCheckingOffset implements Offset protected final Offset baseOffset; protected final GenericColumn timestamps; protected final long timeLimit; - protected final boolean allWithinThreshold; public TimestampCheckingOffset( Offset baseOffset, GenericColumn timestamps, - long timeLimit, - boolean allWithinThreshold + long timeLimit ) { this.baseOffset = baseOffset; this.timestamps = timestamps; this.timeLimit = timeLimit; - // checks if all the values are within the Threshold specified, skips timestamp lookups and checks if all values are within threshold. - this.allWithinThreshold = allWithinThreshold; } @Override @@ -1206,9 +1211,6 @@ public boolean withinBounds() if (!baseOffset.withinBounds()) { return false; } - if (allWithinThreshold) { - return true; - } return timeInRange(timestamps.getLongSingleValueRow(baseOffset.getOffset())); } @@ -1231,11 +1233,10 @@ private static class AscendingTimestampCheckingOffset extends TimestampCheckingO public AscendingTimestampCheckingOffset( Offset baseOffset, GenericColumn timestamps, - long timeLimit, - boolean allWithinThreshold + long timeLimit ) { - super(baseOffset, timestamps, timeLimit, allWithinThreshold); + super(baseOffset, timestamps, timeLimit); } @Override @@ -1254,7 +1255,7 @@ public String toString() @Override public Offset clone() { - return new AscendingTimestampCheckingOffset(baseOffset.clone(), timestamps, timeLimit, allWithinThreshold); + return new AscendingTimestampCheckingOffset(baseOffset.clone(), timestamps, timeLimit); } } @@ -1263,11 +1264,10 @@ private static class DescendingTimestampCheckingOffset extends TimestampChecking public DescendingTimestampCheckingOffset( Offset baseOffset, GenericColumn timestamps, - long timeLimit, - boolean allWithinThreshold + long timeLimit ) { - super(baseOffset, timestamps, timeLimit, allWithinThreshold); + super(baseOffset, timestamps, timeLimit); } @Override @@ -1287,7 +1287,7 @@ public String toString() @Override public Offset clone() { - return new DescendingTimestampCheckingOffset(baseOffset.clone(), timestamps, timeLimit, allWithinThreshold); + return new DescendingTimestampCheckingOffset(baseOffset.clone(), timestamps, timeLimit); } } diff --git a/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java index 020fccd461f5..c91d73ffbf32 100644 --- a/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java @@ -32,7 +32,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; @@ -136,26 +135,35 @@ public long getSerializedSize() public ByteSource combineStreams() { - return ByteSource.concat( - Iterables.transform( - Arrays.asList("meta", "header", "values"), - new Function() { - - @Override - public ByteSource apply(final String input) - { - return new ByteSource() + // ByteSource.concat is only available in guava 15 and higher + return new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + // When we no longer have to maintain compat with Guava 14, this can be upgraded + return ByteStreams.join( + Iterables.transform( + Arrays.asList("meta", "header", "values"), + new Function>() { @Override - public InputStream openStream() throws IOException + public InputSupplier apply(final String input) { - return ioPeon.makeInputStream(makeFilename(input)); + return new InputSupplier() + { + @Override + public InputStream getInput() throws IOException + { + return ioPeon.makeInputStream(makeFilename(input)); + } + }; } - }; - } - } - ) - ); + } + ) + ).getInput(); + } + }; } public void writeToChannel(WritableByteChannel channel) throws IOException