Skip to content

Commit

Permalink
AVRO-4006: Fix block finish while reading data files (#2969)
Browse files Browse the repository at this point in the history
  • Loading branch information
opwvhk authored Jun 25, 2024
1 parent 6956954 commit 2490231
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@
*/
package org.apache.avro.file;

import org.apache.avro.InvalidAvroMagicException;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.commons.compress.utils.IOUtils;

import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.File;
import java.util.Arrays;

import org.apache.avro.InvalidAvroMagicException;
import org.apache.avro.io.DecoderFactory;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.avro.io.DatumReader;
import static org.apache.avro.file.DataFileConstants.SYNC_SIZE;
import static org.apache.avro.file.DataFileConstants.MAGIC;
import static org.apache.avro.file.DataFileConstants.SYNC_SIZE;

/**
* Random access to files written with {@link DataFileWriter}.
Expand Down Expand Up @@ -170,7 +171,7 @@ public void seek(long position) throws IOException {
vin = DecoderFactory.get().binaryDecoder(this.sin, vin);
datumIn = null;
blockRemaining = 0;
blockStart = position;
blockFinished();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,30 @@
*/
package org.apache.avro.file;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.InvalidAvroMagicException;
import org.apache.avro.NameValidator;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.InvalidAvroMagicException;
import org.apache.avro.NameValidator;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;

/**
* Streaming access to files written by {@link DataFileWriter}. Use
* {@link DataFileReader} for file-based input.
Expand Down Expand Up @@ -275,6 +275,7 @@ public ByteBuffer nextBlock() throws IOException {
if (blockRemaining != blockCount)
throw new IllegalStateException("Not at block start.");
blockRemaining = 0;
blockFinished();
datumIn = null;
return blockBuffer;
}
Expand Down
72 changes: 55 additions & 17 deletions lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,6 @@
*/
package org.apache.avro;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.Stream;

import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
Expand All @@ -46,16 +31,29 @@
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.util.RandomData;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestDataFile {
private static final Logger LOG = LoggerFactory.getLogger(TestDataFile.class);

Expand Down Expand Up @@ -89,6 +87,14 @@ public static Stream<Arguments> codecs() {
private static final String SCHEMA_JSON = "{\"type\": \"record\", \"name\": \"Test\", \"fields\": ["
+ "{\"name\":\"stringField\", \"type\":\"string\"}," + "{\"name\":\"longField\", \"type\":\"long\"}]}";
private static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON);
private static final Object LAST_RECORD;
static {
Object lastValue = null;
for (Object object : new RandomData(SCHEMA, COUNT, SEED)) {
lastValue = object;
}
LAST_RECORD = lastValue;
}

private File makeFile(CodecFactory codec) {
return new File(DIR, "test-" + codec + ".avro");
Expand All @@ -109,6 +115,7 @@ public void runTestsInOrder(CodecFactory codec) throws Exception {
testGenericRead(codec);
testSplits(codec);
testSyncDiscovery(codec);
testReadLastRecord(codec);
testGenericAppend(codec, encoder);
testReadWithHeader(codec);
testFSync(codec, encoder, false);
Expand Down Expand Up @@ -221,6 +228,37 @@ private void testSyncDiscovery(CodecFactory codec) throws IOException {
reader.seek(sync);
assertNotNull(reader.next());
}
// Lastly, confirm that reading (but not decoding) all blocks results in the
// same sync points
reader.sync(0);
ArrayList<Long> syncs2 = new ArrayList<>();
while (reader.hasNext()) {
syncs2.add(reader.previousSync());
reader.nextBlock();
}
assertEquals(syncs, syncs2);
}
}

private void testReadLastRecord(CodecFactory codec) throws IOException {
File file = makeFile(codec);
try (DataFileReader<Object> reader = new DataFileReader<>(file, new GenericDatumReader<>())) {
long lastBlockStart = -1;
while (reader.hasNext()) {
// This algorithm can be made more efficient by checking if the underlying
// SeekableFileInput has been fully read: if so, the last block is in
// memory, and calls to next() will decode it.
// NOTE: this depends on the current implementation of DataFileReader.
lastBlockStart = reader.previousSync();
reader.nextBlock();
}
reader.seek(lastBlockStart);

Object lastRecord = null;
while (reader.hasNext()) {
lastRecord = reader.next(lastRecord);
}
assertEquals(LAST_RECORD, lastRecord);
}
}

Expand Down

0 comments on commit 2490231

Please sign in to comment.