From 088c34630f770ea780ac63e7bf5a975634b979c7 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 4 Aug 2022 16:41:33 +0100 Subject: [PATCH 1/2] AVRO-3594: FsInput to use openFile() API Boost performance reading from object stores in hadoop by using the openFile builder API and passing in the file length as an option (can save a HEAD) and asks for adaptive IO (sequential going to random if the client starts seeking) --- .../main/java/org/apache/avro/mapred/FsInput.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java index e8fe1d03e1d..d7a6e147aeb 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java @@ -28,6 +28,8 @@ import org.apache.avro.file.SeekableInput; +import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; + /** Adapt an {@link FSDataInputStream} to {@link SeekableInput}. */ public class FsInput implements Closeable, SeekableInput { private final FSDataInputStream stream; @@ -41,7 +43,15 @@ public FsInput(Path path, Configuration conf) throws IOException { /** Construct given a path and a {@code FileSystem}. */ public FsInput(Path path, FileSystem fileSystem) throws IOException { this.len = fileSystem.getFileStatus(path).getLen(); - this.stream = fileSystem.open(path); + // use the hadoop 3.3.0 openFile API and specify length + // and read policy. object stores can use these to + // optimize read performance. + // the read policy "adaptive" means "start sequential but + // go to random IO after backwards seeks" + // Filesystems which don't recognize the options will ignore them + + this.stream = awaitFuture(fileSystem.openFile(path).opt("fs.option.openfile.read.policy", "adaptive") + .opt("fs.option.openfile.length", Long.toString(len)).build()); } @Override From a20c7fbc5913f5d2af1bbf3a17114cad43fd8a7a Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Fri, 21 Apr 2023 16:33:42 +0100 Subject: [PATCH 2/2] AVRO-3594. openFile() API through shim Change-Id: Ie7099208b601a08823775e901359a6727f0c6fe6 --- .../java/org/apache/avro/mapred/FsInput.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java b/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java index d7a6e147aeb..5e25e3ccfe4 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java @@ -22,12 +22,15 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.avro.file.SeekableInput; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; /** Adapt an {@link FSDataInputStream} to {@link SeekableInput}. */ @@ -42,16 +45,19 @@ public FsInput(Path path, Configuration conf) throws IOException { /** Construct given a path and a {@code FileSystem}. */ public FsInput(Path path, FileSystem fileSystem) throws IOException { - this.len = fileSystem.getFileStatus(path).getLen(); - // use the hadoop 3.3.0 openFile API and specify length + final FileStatus st = fileSystem.getFileStatus(path); + this.len = st.getLen(); + // use the hadoop 3.3.0 openFile API, pass in status // and read policy. object stores can use these to // optimize read performance. // the read policy "adaptive" means "start sequential but - // go to random IO after backwards seeks" + // go to random IO when considered better" // Filesystems which don't recognize the options will ignore them - - this.stream = awaitFuture(fileSystem.openFile(path).opt("fs.option.openfile.read.policy", "adaptive") - .opt("fs.option.openfile.length", Long.toString(len)).build()); + // Importantly deployments which have switched the default read policy to + // "random" + // will not suffer when reading large avro files. + this.stream = awaitFuture(fileSystem.openFile(path) + .opt(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE).withFileStatus(st).build()); } @Override