From 707af38787c24ef2403ae2c3f7f6aa5422bdf1b2 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 7 Dec 2017 20:20:07 -0500 Subject: [PATCH] ARROW-1864: [Java] Upgrade Netty to 4.1.17 Upgrade Netty to 4.1.17 since the Netty community will deprecate 4.0.x soon. This PR includes the following changes: - Bump Netty version. - Implement new ByteBuf APIs added in Netty 4.1.x: a bunch of get/setXXXLE methods. They are the opposite of get/setXXX method regarding byte order. E.g., as ArrowBuf is little endian, `setInt` will put an `int` to the buffer in little endian byte order, while `setIntLE` will put `int` in big byte endian order. The method naming seems confusing anyway, and I opened a Netty issue: https://github.com/netty/netty/issues/7465. The user can call these new methods to get or set multi-byte integers in big endian byte order. - Make ArrowByteBufAllocator overwrite AbstractByteBufAllocator. Author: Shixiong Zhu Closes #1376 from zsxwing/ARROW-1864 and squashes the following commits: 96a93e18 [Shixiong Zhu] extend AbstractByteBufAllocator; add javadoc for new methods bb973335 [Shixiong Zhu] Add comment for calculateNewCapacity 555f88ae [Shixiong Zhu] Add methods back 5e09cca6 [Shixiong Zhu] Upgrade Netty to 4.1.x --- .../main/java/io/netty/buffer/ArrowBuf.java | 199 +++++++++++++++++- .../netty/buffer/MutableWrappedByteBuf.java | 116 +++++++++- .../arrow/memory/ArrowByteBufAllocator.java | 15 +- java/pom.xml | 2 +- .../arrow/vector/util/MapWithOrdinal.java | 4 +- 5 files changed, 320 insertions(+), 16 deletions(-) diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index e2bbe35480b66..23f5d65fbb550 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -23,6 +23,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.channels.FileChannel; import java.nio.channels.GatheringByteChannel; import java.nio.channels.ScatteringByteChannel; import java.nio.charset.Charset; @@ -493,6 +494,16 @@ public ArrowBuf retain() { return retain(1); } + @Override + public ByteBuf touch() { + return this; + } + + @Override + public ByteBuf touch(Object hint) { + return this; + } + @Override public long getLong(int index) { chk(index, 8); @@ -505,6 +516,17 @@ public float getFloat(int index) { return Float.intBitsToFloat(getInt(index)); } + /** + * Gets a 64-bit long integer at the specified absolute {@code index} in + * this buffer in Big Endian Byte Order. + */ + @Override + public long getLongLE(int index) { + chk(index, 8); + final long v = PlatformDependent.getLong(addr(index)); + return Long.reverseBytes(v); + } + @Override public double getDouble(int index) { return Double.longBitsToDouble(getLong(index)); @@ -527,6 +549,17 @@ public int getInt(int index) { return v; } + /** + * Gets a 32-bit integer at the specified absolute {@code index} in + * this buffer in Big Endian Byte Order. + */ + @Override + public int getIntLE(int index) { + chk(index, 4); + final int v = PlatformDependent.getInt(addr(index)); + return Integer.reverseBytes(v); + } + @Override public int getUnsignedShort(int index) { return getShort(index) & 0xFFFF; @@ -535,10 +568,44 @@ public int getUnsignedShort(int index) { @Override public short getShort(int index) { chk(index, 2); - short v = PlatformDependent.getShort(addr(index)); + final short v = PlatformDependent.getShort(addr(index)); return v; } + /** + * Gets a 16-bit short integer at the specified absolute {@code index} in + * this buffer in Big Endian Byte Order. + */ + @Override + public short getShortLE(int index) { + final short v = PlatformDependent.getShort(addr(index)); + return Short.reverseBytes(v); + } + + /** + * Gets an unsigned 24-bit medium integer at the specified absolute + * {@code index} in this buffer. + */ + @Override + public int getUnsignedMedium(int index) { + chk(index, 3); + final long addr = addr(index); + return (PlatformDependent.getByte(addr) & 0xff) << 16 | + (PlatformDependent.getShort(addr + 1) & 0xffff); + } + + /** + * Gets an unsigned 24-bit medium integer at the specified absolute {@code index} in + * this buffer in Big Endian Byte Order. + */ + @Override + public int getUnsignedMediumLE(int index) { + chk(index, 3); + final long addr = addr(index); + return (PlatformDependent.getByte(addr) & 0xff) | + (Short.reverseBytes(PlatformDependent.getShort(addr + 1)) & 0xffff) << 8; + } + @Override public ArrowBuf setShort(int index, int value) { chk(index, 2); @@ -546,6 +613,44 @@ public ArrowBuf setShort(int index, int value) { return this; } + /** + * Sets the specified 16-bit short integer at the specified absolute {@code index} + * in this buffer with Big Endian byte order. + */ + @Override + public ByteBuf setShortLE(int index, int value) { + chk(index, 2); + PlatformDependent.putShort(addr(index), Short.reverseBytes((short) value)); + return this; + } + + /** + * Sets the specified 24-bit medium integer at the specified absolute + * {@code index} in this buffer. + */ + @Override + public ByteBuf setMedium(int index, int value) { + chk(index, 3); + final long addr = addr(index); + PlatformDependent.putByte(addr, (byte) (value >>> 16)); + PlatformDependent.putShort(addr + 1, (short) value); + return this; + } + + + /** + * Sets the specified 24-bit medium integer at the specified absolute {@code index} + * in this buffer with Big Endian byte order. + */ + @Override + public ByteBuf setMediumLE(int index, int value) { + chk(index, 3); + final long addr = addr(index); + PlatformDependent.putByte(addr, (byte) value); + PlatformDependent.putShort(addr + 1, Short.reverseBytes((short) (value >>> 8))); + return this; + } + @Override public ArrowBuf setInt(int index, int value) { chk(index, 4); @@ -553,6 +658,17 @@ public ArrowBuf setInt(int index, int value) { return this; } + /** + * Sets the specified 32-bit integer at the specified absolute {@code index} + * in this buffer with Big Endian byte order. + */ + @Override + public ByteBuf setIntLE(int index, int value) { + chk(index, 4); + PlatformDependent.putInt(addr(index), Integer.reverseBytes(value)); + return this; + } + @Override public ArrowBuf setLong(int index, long value) { chk(index, 8); @@ -560,6 +676,17 @@ public ArrowBuf setLong(int index, long value) { return this; } + /** + * Sets the specified 64-bit long integer at the specified absolute {@code index} + * in this buffer with Big Endian byte order. + */ + @Override + public ByteBuf setLongLE(int index, long value) { + chk(index, 8); + PlatformDependent.putLong(addr(index), Long.reverseBytes(value)); + return this; + } + @Override public ArrowBuf setChar(int index, int value) { chk(index, 2); @@ -668,16 +795,46 @@ protected short _getShort(int index) { return getShort(index); } + /** @see {@link #getShortLE(int)} */ + @Override + protected short _getShortLE(int index) { + return getShortLE(index); + } + @Override protected int _getInt(int index) { return getInt(index); } + /** @see {@link #getIntLE(int)} */ + @Override + protected int _getIntLE(int index) { + return getIntLE(index); + } + + /** @see {@link #getUnsignedMedium(int)} */ + @Override + protected int _getUnsignedMedium(int index) { + return getUnsignedMedium(index); + } + + /** @see {@link #getUnsignedMediumLE(int)} */ + @Override + protected int _getUnsignedMediumLE(int index) { + return getUnsignedMediumLE(index); + } + @Override protected long _getLong(int index) { return getLong(index); } + /** @see {@link #getLongLE(int)} */ + @Override + protected long _getLongLE(int index) { + return getLongLE(index); + } + @Override protected void _setByte(int index, int value) { setByte(index, value); @@ -688,21 +845,45 @@ protected void _setShort(int index, int value) { setShort(index, value); } + /** @see {@link #setShortLE(int, int)} */ + @Override + protected void _setShortLE(int index, int value) { + setShortLE(index, value); + } + @Override protected void _setMedium(int index, int value) { setMedium(index, value); } + /** @see {@link #setMediumLE(int, int)} */ + @Override + protected void _setMediumLE(int index, int value) { + setMediumLE(index, value); + } + @Override protected void _setInt(int index, int value) { setInt(index, value); } + /** @see {@link #setIntLE(int, int)} */ + @Override + protected void _setIntLE(int index, int value) { + setIntLE(index, value); + } + @Override protected void _setLong(int index, long value) { setLong(index, value); } + /** @see {@link #setLongLE(int, long)} */ + @Override + public void _setLongLE(int index, long value) { + setLongLE(index, value); + } + @Override public ArrowBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { udle.getBytes(index + offset, dst, dstIndex, length); @@ -716,16 +897,13 @@ public ArrowBuf getBytes(int index, OutputStream out, int length) throws IOExcep } @Override - protected int _getUnsignedMedium(int index) { - final long addr = addr(index); - return (PlatformDependent.getByte(addr) & 0xff) << 16 | - (PlatformDependent.getByte(addr + 1) & 0xff) << 8 | - PlatformDependent.getByte(addr + 2) & 0xff; + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + return udle.getBytes(index + offset, out, length); } @Override - public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { - return udle.getBytes(index + offset, out, length); + public int getBytes(int index, FileChannel out, long position, int length) throws IOException { + return udle.getBytes(index + offset, out, position, length); } @Override @@ -776,6 +954,11 @@ public int setBytes(int index, ScatteringByteChannel in, int length) throws IOEx return udle.setBytes(index + offset, in, length); } + @Override + public int setBytes(int index, FileChannel in, long position, int length) throws IOException { + return udle.setBytes(index + offset, in, position, length); + } + @Override public byte getByte(int index) { chk(index, 1); diff --git a/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java b/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java index a5683adccbc32..f0bc84cdc2db2 100644 --- a/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/MutableWrappedByteBuf.java @@ -23,9 +23,12 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.channels.FileChannel; import java.nio.channels.GatheringByteChannel; import java.nio.channels.ScatteringByteChannel; +import io.netty.util.ByteProcessor; + /** * This is basically a complete copy of DuplicatedByteBuf. We copy because we want to override * some behaviors and make @@ -128,6 +131,16 @@ protected short _getShort(int index) { return buffer.getShort(index); } + @Override + public short getShortLE(int index) { + return buffer.getShortLE(index); + } + + @Override + protected short _getShortLE(int index) { + return buffer.getShortLE(index); + } + @Override public int getUnsignedMedium(int index) { return _getUnsignedMedium(index); @@ -138,6 +151,16 @@ protected int _getUnsignedMedium(int index) { return buffer.getUnsignedMedium(index); } + @Override + public int getUnsignedMediumLE(int index) { + return buffer.getUnsignedMediumLE(index); + } + + @Override + protected int _getUnsignedMediumLE(int index) { + return buffer.getUnsignedMediumLE(index); + } + @Override public int getInt(int index) { return _getInt(index); @@ -148,6 +171,16 @@ protected int _getInt(int index) { return buffer.getInt(index); } + @Override + public int getIntLE(int index) { + return buffer.getIntLE(index); + } + + @Override + protected int _getIntLE(int index) { + return buffer.getIntLE(index); + } + @Override public long getLong(int index) { return _getLong(index); @@ -158,6 +191,16 @@ protected long _getLong(int index) { return buffer.getLong(index); } + @Override + public long getLongLE(int index) { + return buffer.getLongLE(index); + } + + @Override + protected long _getLongLE(int index) { + return buffer.getLongLE(index); + } + @Override public abstract ByteBuf copy(int index, int length); @@ -206,6 +249,17 @@ protected void _setShort(int index, int value) { buffer.setShort(index, value); } + @Override + public ByteBuf setShortLE(int index, int value) { + buffer.setShortLE(index, value); + return this; + } + + @Override + protected void _setShortLE(int index, int value) { + buffer.setShortLE(index, value); + } + @Override public ByteBuf setMedium(int index, int value) { _setMedium(index, value); @@ -217,6 +271,17 @@ protected void _setMedium(int index, int value) { buffer.setMedium(index, value); } + @Override + public ByteBuf setMediumLE(int index, int value) { + buffer.setMediumLE(index, value); + return this; + } + + @Override + protected void _setMediumLE(int index, int value) { + buffer.setMediumLE(index, value); + } + @Override public ByteBuf setInt(int index, int value) { _setInt(index, value); @@ -228,6 +293,17 @@ protected void _setInt(int index, int value) { buffer.setInt(index, value); } + @Override + public ByteBuf setIntLE(int index, int value) { + buffer.setIntLE(index, value); + return this; + } + + @Override + protected void _setIntLE(int index, int value) { + buffer.setIntLE(index, value); + } + @Override public ByteBuf setLong(int index, long value) { _setLong(index, value); @@ -239,6 +315,17 @@ protected void _setLong(int index, long value) { buffer.setLong(index, value); } + @Override + public ByteBuf setLongLE(int index, long value) { + buffer.setLongLE(index, value); + return this; + } + + @Override + protected void _setLongLE(int index, long value) { + buffer.setLongLE(index, value); + } + @Override public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { buffer.setBytes(index, src, srcIndex, length); @@ -257,6 +344,12 @@ public ByteBuf setBytes(int index, ByteBuffer src) { return this; } + @Override + public int setBytes(int index, FileChannel in, long position, int length) + throws IOException { + return buffer.setBytes(index, in, position, length); + } + @Override public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { @@ -282,6 +375,13 @@ public int setBytes(int index, ScatteringByteChannel in, int length) return buffer.setBytes(index, in, length); } + + @Override + public int getBytes(int index, FileChannel out, long position, int length) + throws IOException { + return buffer.getBytes(index, out, position, length); + } + @Override public int nioBufferCount() { return buffer.nioBufferCount(); @@ -298,12 +398,12 @@ public ByteBuffer internalNioBuffer(int index, int length) { } @Override - public int forEachByte(int index, int length, ByteBufProcessor processor) { + public int forEachByte(int index, int length, ByteProcessor processor) { return buffer.forEachByte(index, length, processor); } @Override - public int forEachByteDesc(int index, int length, ByteBufProcessor processor) { + public int forEachByteDesc(int index, int length, ByteProcessor processor) { return buffer.forEachByteDesc(index, length, processor); } @@ -312,6 +412,18 @@ public final int refCnt() { return unwrap().refCnt(); } + @Override + public final ByteBuf touch() { + unwrap().touch(); + return this; + } + + @Override + public final ByteBuf touch(Object hint) { + unwrap().touch(hint); + return this; + } + @Override public final ByteBuf retain() { unwrap().retain(); diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java index b8b5283423c82..94102992139d8 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/ArrowByteBufAllocator.java @@ -18,8 +18,8 @@ package org.apache.arrow.memory; +import io.netty.buffer.AbstractByteBufAllocator; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.ExpandableByteBuf; @@ -32,7 +32,7 @@ * otherwise non-expandable * ArrowBufs to be expandable. */ -public class ArrowByteBufAllocator implements ByteBufAllocator { +public class ArrowByteBufAllocator extends AbstractByteBufAllocator { private static final int DEFAULT_BUFFER_SIZE = 4096; private static final int DEFAULT_MAX_COMPOSITE_COMPONENTS = 16; @@ -142,8 +142,17 @@ public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { throw fail(); } + @Override + protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { + throw fail(); + } + + @Override + protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { + return buffer(initialCapacity, maxCapacity); + } + private RuntimeException fail() { throw new UnsupportedOperationException("Allocator doesn't support heap-based memory."); } - } diff --git a/java/pom.xml b/java/pom.xml index 0a0f2e0ce8f65..384ef56882fe9 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -32,7 +32,7 @@ 4.11 1.7.25 18.0 - 4.0.49.Final + 4.1.17.Final 2.7.9 2.7.1 1.2.0-3f79e055 diff --git a/java/vector/src/main/java/org/apache/arrow/vector/util/MapWithOrdinal.java b/java/vector/src/main/java/org/apache/arrow/vector/util/MapWithOrdinal.java index 6d3b390379a56..b863fa8af86fd 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/util/MapWithOrdinal.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/util/MapWithOrdinal.java @@ -134,9 +134,9 @@ public Set keySet() { @Override public Collection values() { - return Lists.newArrayList(Iterables.transform(secondary.entries(), new Function, V>() { + return Lists.newArrayList(Iterables.transform(secondary.entries(), new Function, V>() { @Override - public V apply(IntObjectMap.Entry entry) { + public V apply(IntObjectMap.PrimitiveEntry entry) { return Preconditions.checkNotNull(entry).value(); } }));