Skip to content

Commit

Permalink
Use the same shared FileSystem instance across calls in HadoopTableOp…
Browse files Browse the repository at this point in the history
…erations.

Closes Netflix#92.
  • Loading branch information
mccheah committed Nov 17, 2018
1 parent 17e9666 commit 5848996
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf) {
}
}

static HadoopInputFile fromFsPath(FileSystem fs, Path path, Configuration conf) {
return new HadoopInputFile(fs, path, conf);
}

private HadoopInputFile(FileSystem fs, Path path, Configuration conf) {
this.fs = fs;
this.path = path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,25 @@
*/
public class HadoopOutputFile implements OutputFile {
public static OutputFile fromPath(Path path, Configuration conf) {
return new HadoopOutputFile(path, conf);
return new HadoopOutputFile(Util.getFS(path, conf), path, conf);
}

static OutputFile fromFsPath(FileSystem fs, Path path, Configuration conf) {
return new HadoopOutputFile(fs, path, conf);
}

private final Path path;
private final Configuration conf;
private final FileSystem fs;

private HadoopOutputFile(Path path, Configuration conf) {
private HadoopOutputFile(FileSystem fs, Path path, Configuration conf) {
this.path = path;
this.conf = conf;
this.fs = fs;
}

@Override
public PositionOutputStream create() {
FileSystem fs = Util.getFS(path, conf);
try {
return HadoopStreams.wrap(fs.create(path, false /* createOrOverwrite */));
} catch (FileAlreadyExistsException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ class HadoopTableOperations implements TableOperations {

private final Configuration conf;
private final Path location;
private final FileSystem metadataFs;
private TableMetadata currentMetadata = null;
private Integer version = null;
private boolean shouldRefresh = true;

HadoopTableOperations(Path location, Configuration conf) {
this.conf = conf;
this.location = location;
this.metadataFs = Util.getFS(location, conf);
}

public TableMetadata current() {
Expand All @@ -67,18 +69,17 @@ public TableMetadata current() {
public TableMetadata refresh() {
int ver = version != null ? version : readVersionHint();
Path metadataFile = metadataFile(ver);
FileSystem fs = Util.getFS(metadataFile, conf);
try {
// don't check if the file exists if version is non-null because it was already checked
if (version == null && !fs.exists(metadataFile)) {
if (version == null && !metadataFs.exists(metadataFile)) {
if (ver == 0) {
// no v0 metadata means the table doesn't exist yet
return null;
}
throw new ValidationException("Metadata file is missing: %s", metadataFile);
}

while (fs.exists(metadataFile(ver + 1))) {
while (metadataFs.exists(metadataFile(ver + 1))) {
ver += 1;
metadataFile = metadataFile(ver);
}
Expand All @@ -88,7 +89,7 @@ public TableMetadata refresh() {
}
this.version = ver;
this.currentMetadata = TableMetadataParser.read(this,
HadoopInputFile.fromPath(metadataFile, conf));
HadoopInputFile.fromFsPath(metadataFs, metadataFile, conf));
this.shouldRefresh = false;
return currentMetadata;
}
Expand All @@ -105,14 +106,13 @@ public void commit(TableMetadata base, TableMetadata metadata) {
}

Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + getFileExtension(conf));
TableMetadataParser.write(metadata, HadoopOutputFile.fromPath(tempMetadataFile, conf));
TableMetadataParser.write(metadata, HadoopOutputFile.fromFsPath(metadataFs, tempMetadataFile, conf));

int nextVersion = (version != null ? version : 0) + 1;
Path finalMetadataFile = metadataFile(nextVersion);
FileSystem fs = Util.getFS(tempMetadataFile, conf);

try {
if (fs.exists(finalMetadataFile)) {
if (metadataFs.exists(finalMetadataFile)) {
throw new CommitFailedException(
"Version %d already exists: %s", nextVersion, finalMetadataFile);
}
Expand All @@ -123,7 +123,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {

try {
// this rename operation is the atomic commit operation
if (!fs.rename(tempMetadataFile, finalMetadataFile)) {
if (!metadataFs.rename(tempMetadataFile, finalMetadataFile)) {
throw new CommitFailedException(
"Failed to commit changes using rename: %s", finalMetadataFile);
}
Expand All @@ -140,20 +140,19 @@ public void commit(TableMetadata base, TableMetadata metadata) {

@Override
public InputFile newInputFile(String path) {
return HadoopInputFile.fromPath(new Path(path), conf);
return HadoopInputFile.fromFsPath(metadataFs, new Path(path), conf);
}

@Override
public OutputFile newMetadataFile(String filename) {
return HadoopOutputFile.fromPath(metadataPath(filename), conf);
return HadoopOutputFile.fromFsPath(metadataFs, metadataPath(filename), conf);
}

@Override
public void deleteFile(String path) {
Path toDelete = new Path(path);
FileSystem fs = Util.getFS(toDelete, conf);
try {
fs.delete(toDelete, false /* not recursive */ );
metadataFs.delete(toDelete, false /* not recursive */ );
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to delete file: %s", path);
}
Expand All @@ -178,9 +177,7 @@ private Path versionHintFile() {

private void writeVersionHint(int version) {
Path versionHintFile = versionHintFile();
FileSystem fs = Util.getFS(versionHintFile, conf);

try (FSDataOutputStream out = fs.create(versionHintFile, true /* overwrite */ )) {
try (FSDataOutputStream out = metadataFs.create(versionHintFile, true /* overwrite */ )) {
out.write(String.valueOf(version).getBytes("UTF-8"));

} catch (IOException e) {
Expand Down

0 comments on commit 5848996

Please sign in to comment.