From 76ca253d198224438426bda8a6e7c92bc387844d Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 27 Sep 2016 15:52:13 -0700 Subject: [PATCH 1/3] Make GenericIndexedWriter guava 14 friendly --- .../segment/data/GenericIndexedWriter.java | 43 +++++++++++-------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java index 020fccd461f5..2d6d93b07224 100644 --- a/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java @@ -25,14 +25,12 @@ import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; import com.google.common.io.CountingOutputStream; -import com.google.common.io.InputSupplier; import com.google.common.primitives.Ints; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; @@ -136,26 +134,35 @@ public long getSerializedSize() public ByteSource combineStreams() { - return ByteSource.concat( - Iterables.transform( - Arrays.asList("meta", "header", "values"), - new Function() { - - @Override - public ByteSource apply(final String input) - { - return new ByteSource() + // ByteSource.concat is only available in guava 15 and higher + return new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + // When we no longer have to maintain compat with Guava 14, this can be upgraded + return ByteStreams.join( + Iterables.transform( + Arrays.asList("meta", "header", "values"), + new Function() { @Override - public InputStream openStream() throws IOException + public ByteSource apply(final String input) { - return ioPeon.makeInputStream(makeFilename(input)); + return new ByteSource() + { + @Override + public InputStream openStream() throws IOException + { + return ioPeon.makeInputStream(makeFilename(input)); + } + }; } - }; - } - } - ) - ); + } + ) + ).getInput(); + } + }; } public void writeToChannel(WritableByteChannel channel) throws IOException From d97127ab2eef08bb9551115d3f440ba0b5e4fc14 Mon Sep 17 00:00:00 2001 From: Charles Allen Date: Tue, 27 Sep 2016 16:48:33 -0700 Subject: [PATCH 2/3] Move to raw InputSupplier even though it is deprecated --- .../java/io/druid/segment/data/GenericIndexedWriter.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java index 2d6d93b07224..c91d73ffbf32 100644 --- a/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/io/druid/segment/data/GenericIndexedWriter.java @@ -25,6 +25,7 @@ import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; import com.google.common.io.CountingOutputStream; +import com.google.common.io.InputSupplier; import com.google.common.primitives.Ints; import java.io.Closeable; @@ -144,15 +145,15 @@ public InputStream openStream() throws IOException return ByteStreams.join( Iterables.transform( Arrays.asList("meta", "header", "values"), - new Function() + new Function>() { @Override - public ByteSource apply(final String input) + public InputSupplier apply(final String input) { - return new ByteSource() + return new InputSupplier() { @Override - public InputStream openStream() throws IOException + public InputStream getInput() throws IOException { return ioPeon.makeInputStream(makeFilename(input)); } From d030c084b62540c8987a4f206975b17cfa6cce31 Mon Sep 17 00:00:00 2001 From: leventov Date: Tue, 27 Sep 2016 18:26:25 +0300 Subject: [PATCH 3/3] Avoid Offset object wrapping in QueryableIndexStorageAdapter when query granularity / interval is coarser than data (i. e. segment) interval, that is typical for topN queries (BACKEND-319) --- .../segment/QueryableIndexStorageAdapter.java | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index db724373a51a..2c35a3b905dd 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -417,19 +417,28 @@ public Cursor apply(final Long input) } } - final Offset offset = descending ? - new DescendingTimestampCheckingOffset( - baseOffset, - timestamps, - timeStart, - minDataTimestamp >= timeStart - ) : - new AscendingTimestampCheckingOffset( - baseOffset, - timestamps, - timeEnd, - maxDataTimestamp < timeEnd - ); + final Offset offset; + if (descending) { + if (minDataTimestamp >= timeStart) { + offset = baseOffset; + } else { + offset = new DescendingTimestampCheckingOffset( + baseOffset, + timestamps, + timeStart + ); + } + } else { + if (maxDataTimestamp < timeEnd) { + offset = baseOffset; + } else { + offset = new AscendingTimestampCheckingOffset( + baseOffset, + timestamps, + timeEnd + ); + } + } final Offset initOffset = offset.clone(); @@ -1178,20 +1187,16 @@ private abstract static class TimestampCheckingOffset implements Offset protected final Offset baseOffset; protected final GenericColumn timestamps; protected final long timeLimit; - protected final boolean allWithinThreshold; public TimestampCheckingOffset( Offset baseOffset, GenericColumn timestamps, - long timeLimit, - boolean allWithinThreshold + long timeLimit ) { this.baseOffset = baseOffset; this.timestamps = timestamps; this.timeLimit = timeLimit; - // checks if all the values are within the Threshold specified, skips timestamp lookups and checks if all values are within threshold. - this.allWithinThreshold = allWithinThreshold; } @Override @@ -1206,9 +1211,6 @@ public boolean withinBounds() if (!baseOffset.withinBounds()) { return false; } - if (allWithinThreshold) { - return true; - } return timeInRange(timestamps.getLongSingleValueRow(baseOffset.getOffset())); } @@ -1231,11 +1233,10 @@ private static class AscendingTimestampCheckingOffset extends TimestampCheckingO public AscendingTimestampCheckingOffset( Offset baseOffset, GenericColumn timestamps, - long timeLimit, - boolean allWithinThreshold + long timeLimit ) { - super(baseOffset, timestamps, timeLimit, allWithinThreshold); + super(baseOffset, timestamps, timeLimit); } @Override @@ -1254,7 +1255,7 @@ public String toString() @Override public Offset clone() { - return new AscendingTimestampCheckingOffset(baseOffset.clone(), timestamps, timeLimit, allWithinThreshold); + return new AscendingTimestampCheckingOffset(baseOffset.clone(), timestamps, timeLimit); } } @@ -1263,11 +1264,10 @@ private static class DescendingTimestampCheckingOffset extends TimestampChecking public DescendingTimestampCheckingOffset( Offset baseOffset, GenericColumn timestamps, - long timeLimit, - boolean allWithinThreshold + long timeLimit ) { - super(baseOffset, timestamps, timeLimit, allWithinThreshold); + super(baseOffset, timestamps, timeLimit); } @Override @@ -1287,7 +1287,7 @@ public String toString() @Override public Offset clone() { - return new DescendingTimestampCheckingOffset(baseOffset.clone(), timestamps, timeLimit, allWithinThreshold); + return new DescendingTimestampCheckingOffset(baseOffset.clone(), timestamps, timeLimit); } }