diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java index 10067cd6c5c..8f333a1cb48 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileReader.java @@ -17,18 +17,19 @@ */ package org.apache.avro.file; +import org.apache.avro.InvalidAvroMagicException; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.commons.compress.utils.IOUtils; + import java.io.EOFException; +import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.io.File; import java.util.Arrays; -import org.apache.avro.InvalidAvroMagicException; -import org.apache.avro.io.DecoderFactory; -import org.apache.commons.compress.utils.IOUtils; -import org.apache.avro.io.DatumReader; -import static org.apache.avro.file.DataFileConstants.SYNC_SIZE; import static org.apache.avro.file.DataFileConstants.MAGIC; +import static org.apache.avro.file.DataFileConstants.SYNC_SIZE; /** * Random access to files written with {@link DataFileWriter}. @@ -170,7 +171,7 @@ public void seek(long position) throws IOException { vin = DecoderFactory.get().binaryDecoder(this.sin, vin); datumIn = null; blockRemaining = 0; - blockStart = position; + blockFinished(); } /** diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java index e9b5ed38852..aa458684635 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java @@ -17,14 +17,23 @@ */ package org.apache.avro.file; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.InvalidAvroMagicException; +import org.apache.avro.NameValidator; +import org.apache.avro.Schema; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; + +import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.io.Closeable; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -32,15 +41,6 @@ import java.util.Map; import java.util.NoSuchElementException; -import org.apache.avro.AvroRuntimeException; -import org.apache.avro.InvalidAvroMagicException; -import org.apache.avro.NameValidator; -import org.apache.avro.Schema; -import org.apache.avro.io.BinaryEncoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.BinaryDecoder; -import org.apache.avro.io.DatumReader; - /** * Streaming access to files written by {@link DataFileWriter}. Use * {@link DataFileReader} for file-based input. @@ -275,6 +275,7 @@ public ByteBuffer nextBlock() throws IOException { if (blockRemaining != blockCount) throw new IllegalStateException("Not at block start."); blockRemaining = 0; + blockFinished(); datumIn = null; return blockBuffer; } diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java index 01611c698bf..e411ab0effa 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java +++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java @@ -17,21 +17,6 @@ */ package org.apache.avro; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.function.Function; -import java.util.stream.Stream; - import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileStream; @@ -46,16 +31,29 @@ import org.apache.avro.io.DatumReader; import org.apache.avro.io.EncoderFactory; import org.apache.avro.util.RandomData; - import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class TestDataFile { private static final Logger LOG = LoggerFactory.getLogger(TestDataFile.class); @@ -89,6 +87,14 @@ public static Stream codecs() { private static final String SCHEMA_JSON = "{\"type\": \"record\", \"name\": \"Test\", \"fields\": [" + "{\"name\":\"stringField\", \"type\":\"string\"}," + "{\"name\":\"longField\", \"type\":\"long\"}]}"; private static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON); + private static final Object LAST_RECORD; + static { + Object lastValue = null; + for (Object object : new RandomData(SCHEMA, COUNT, SEED)) { + lastValue = object; + } + LAST_RECORD = lastValue; + } private File makeFile(CodecFactory codec) { return new File(DIR, "test-" + codec + ".avro"); @@ -109,6 +115,7 @@ public void runTestsInOrder(CodecFactory codec) throws Exception { testGenericRead(codec); testSplits(codec); testSyncDiscovery(codec); + testReadLastRecord(codec); testGenericAppend(codec, encoder); testReadWithHeader(codec); testFSync(codec, encoder, false); @@ -221,6 +228,37 @@ private void testSyncDiscovery(CodecFactory codec) throws IOException { reader.seek(sync); assertNotNull(reader.next()); } + // Lastly, confirm that reading (but not decoding) all blocks results in the + // same sync points + reader.sync(0); + ArrayList syncs2 = new ArrayList<>(); + while (reader.hasNext()) { + syncs2.add(reader.previousSync()); + reader.nextBlock(); + } + assertEquals(syncs, syncs2); + } + } + + private void testReadLastRecord(CodecFactory codec) throws IOException { + File file = makeFile(codec); + try (DataFileReader reader = new DataFileReader<>(file, new GenericDatumReader<>())) { + long lastBlockStart = -1; + while (reader.hasNext()) { + // This algorithm can be made more efficient by checking if the underlying + // SeekableFileInput has been fully read: if so, the last block is in + // memory, and calls to next() will decode it. + // NOTE: this depends on the current implementation of DataFileReader. + lastBlockStart = reader.previousSync(); + reader.nextBlock(); + } + reader.seek(lastBlockStart); + + Object lastRecord = null; + while (reader.hasNext()) { + lastRecord = reader.next(lastRecord); + } + assertEquals(LAST_RECORD, lastRecord); } }