diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp index d48dd373d344..ab07649fde40 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp @@ -16,51 +16,9 @@ #include "HdfsReadFile.h" #include -#include "velox/external/hdfs/ArrowHdfsInternal.h" namespace facebook::velox { -struct HdfsFile { - filesystems::arrow::io::internal::LibHdfsShim* driver_; - hdfsFS client_; - hdfsFile handle_; - - HdfsFile() : driver_(nullptr), client_(nullptr), handle_(nullptr) {} - ~HdfsFile() { - if (handle_ && driver_->CloseFile(client_, handle_) == -1) { - LOG(ERROR) << "Unable to close file, errno: " << errno; - } - } - - void open( - filesystems::arrow::io::internal::LibHdfsShim* driver, - hdfsFS client, - const std::string& path) { - driver_ = driver; - client_ = client; - handle_ = driver->OpenFile(client, path.data(), O_RDONLY, 0, 0, 0); - VELOX_CHECK_NOT_NULL( - handle_, - "Unable to open file {}. got error: {}", - path, - driver_->GetLastExceptionRootCause()); - } - - void seek(uint64_t offset) const { - VELOX_CHECK_EQ( - driver_->Seek(client_, handle_, offset), - 0, - "Cannot seek through HDFS file, error is : {}", - driver_->GetLastExceptionRootCause()); - } - - int32_t read(char* pos, uint64_t length) const { - auto bytesRead = driver_->Read(client_, handle_, pos, length); - VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal."); - return bytesRead; - } -}; - HdfsReadFile::HdfsReadFile( filesystems::arrow::io::internal::LibHdfsShim* driver, hdfsFS hdfs, @@ -91,14 +49,13 @@ HdfsReadFile::~HdfsReadFile() { void HdfsReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos) const { checkFileReadParameters(offset, length); - folly::ThreadLocal file; - if (!file->handle_) { - file->open(driver_, hdfsClient_, filePath_); + if (!file_->handle_) { + file_->open(driver_, hdfsClient_, filePath_); } - file->seek(offset); + file_->seek(offset); uint64_t totalBytesRead = 0; while (totalBytesRead < length) { - auto bytesRead = file->read(pos, length - totalBytesRead); + auto bytesRead = file_->read(pos, length - totalBytesRead); totalBytesRead += bytesRead; pos += bytesRead; } diff --git a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h index b63c2dd933dd..535870e6b7ac 100644 --- a/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h +++ b/velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h @@ -15,6 +15,7 @@ */ #include "velox/common/file/File.h" +#include "velox/external/hdfs/ArrowHdfsInternal.h" #include "velox/external/hdfs/hdfs.h" namespace facebook::velox { @@ -23,6 +24,47 @@ namespace filesystems::arrow::io::internal { class LibHdfsShim; } +struct HdfsFile { + filesystems::arrow::io::internal::LibHdfsShim* driver_; + hdfsFS client_; + hdfsFile handle_; + + HdfsFile() : driver_(nullptr), client_(nullptr), handle_(nullptr) {} + ~HdfsFile() { + if (handle_ && driver_->CloseFile(client_, handle_) == -1) { + LOG(ERROR) << "Unable to close file, errno: " << errno; + } + } + + void open( + filesystems::arrow::io::internal::LibHdfsShim* driver, + hdfsFS client, + const std::string& path) { + driver_ = driver; + client_ = client; + handle_ = driver->OpenFile(client, path.data(), O_RDONLY, 0, 0, 0); + VELOX_CHECK_NOT_NULL( + handle_, + "Unable to open file {}. got error: {}", + path, + driver_->GetLastExceptionRootCause()); + } + + void seek(uint64_t offset) const { + VELOX_CHECK_EQ( + driver_->Seek(client_, handle_, offset), + 0, + "Cannot seek through HDFS file, error is : {}", + driver_->GetLastExceptionRootCause()); + } + + int32_t read(char* pos, uint64_t length) const { + auto bytesRead = driver_->Read(client_, handle_, pos, length); + VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal."); + return bytesRead; + } +}; + /** * Implementation of hdfs read file. */ @@ -61,6 +103,7 @@ class HdfsReadFile final : public ReadFile { hdfsFS hdfsClient_; hdfsFileInfo* fileInfo_; std::string filePath_; + folly::ThreadLocal file_; }; } // namespace facebook::velox