diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java index e8fe1d03e1d..5e25e3ccfe4 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java @@ -22,12 +22,17 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.avro.file.SeekableInput; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; + /** Adapt an {@link FSDataInputStream} to {@link SeekableInput}. */ public class FsInput implements Closeable, SeekableInput { private final FSDataInputStream stream; @@ -40,8 +45,19 @@ public FsInput(Path path, Configuration conf) throws IOException { /** Construct given a path and a {@code FileSystem}. */ public FsInput(Path path, FileSystem fileSystem) throws IOException { - this.len = fileSystem.getFileStatus(path).getLen(); - this.stream = fileSystem.open(path); + final FileStatus st = fileSystem.getFileStatus(path); + this.len = st.getLen(); + // use the hadoop 3.3.0 openFile API, pass in status + // and read policy. object stores can use these to + // optimize read performance. + // the read policy "adaptive" means "start sequential but + // go to random IO when considered better" + // Filesystems which don't recognize the options will ignore them + // Importantly deployments which have switched the default read policy to + // "random" + // will not suffer when reading large avro files. + this.stream = awaitFuture(fileSystem.openFile(path) + .opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE).withFileStatus(st).build()); } @Override