Skip to content

Commit

Permalink
[libhdfs]Move the thread local value into the member variable
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Zhou <[email protected]>
  • Loading branch information
zhouyuan committed Nov 19, 2024
1 parent 2fa3f97 commit 88964d4
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 47 deletions.
51 changes: 4 additions & 47 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,51 +16,9 @@

#include "HdfsReadFile.h"
#include <folly/synchronization/CallOnce.h>
#include "velox/external/hdfs/ArrowHdfsInternal.h"

namespace facebook::velox {

struct HdfsFile {
filesystems::arrow::io::internal::LibHdfsShim* driver_;
hdfsFS client_;
hdfsFile handle_;

HdfsFile() : driver_(nullptr), client_(nullptr), handle_(nullptr) {}
~HdfsFile() {
if (handle_ && driver_->CloseFile(client_, handle_) == -1) {
LOG(ERROR) << "Unable to close file, errno: " << errno;
}
}

void open(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS client,
const std::string& path) {
driver_ = driver;
client_ = client;
handle_ = driver->OpenFile(client, path.data(), O_RDONLY, 0, 0, 0);
VELOX_CHECK_NOT_NULL(
handle_,
"Unable to open file {}. got error: {}",
path,
driver_->GetLastExceptionRootCause());
}

void seek(uint64_t offset) const {
VELOX_CHECK_EQ(
driver_->Seek(client_, handle_, offset),
0,
"Cannot seek through HDFS file, error is : {}",
driver_->GetLastExceptionRootCause());
}

int32_t read(char* pos, uint64_t length) const {
auto bytesRead = driver_->Read(client_, handle_, pos, length);
VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal.");
return bytesRead;
}
};

HdfsReadFile::HdfsReadFile(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfs,
Expand Down Expand Up @@ -91,14 +49,13 @@ HdfsReadFile::~HdfsReadFile() {
void HdfsReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos)
const {
checkFileReadParameters(offset, length);
folly::ThreadLocal<HdfsFile> file;
if (!file->handle_) {
file->open(driver_, hdfsClient_, filePath_);
if (!file_->handle_) {
file_->open(driver_, hdfsClient_, filePath_);
}
file->seek(offset);
file_->seek(offset);
uint64_t totalBytesRead = 0;
while (totalBytesRead < length) {
auto bytesRead = file->read(pos, length - totalBytesRead);
auto bytesRead = file_->read(pos, length - totalBytesRead);
totalBytesRead += bytesRead;
pos += bytesRead;
}
Expand Down
43 changes: 43 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/common/file/File.h"
#include "velox/external/hdfs/ArrowHdfsInternal.h"
#include "velox/external/hdfs/hdfs.h"

namespace facebook::velox {
Expand All @@ -23,6 +24,47 @@ namespace filesystems::arrow::io::internal {
class LibHdfsShim;
}

struct HdfsFile {
filesystems::arrow::io::internal::LibHdfsShim* driver_;
hdfsFS client_;
hdfsFile handle_;

HdfsFile() : driver_(nullptr), client_(nullptr), handle_(nullptr) {}
~HdfsFile() {
if (handle_ && driver_->CloseFile(client_, handle_) == -1) {
LOG(ERROR) << "Unable to close file, errno: " << errno;
}
}

void open(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS client,
const std::string& path) {
driver_ = driver;
client_ = client;
handle_ = driver->OpenFile(client, path.data(), O_RDONLY, 0, 0, 0);
VELOX_CHECK_NOT_NULL(
handle_,
"Unable to open file {}. got error: {}",
path,
driver_->GetLastExceptionRootCause());
}

void seek(uint64_t offset) const {
VELOX_CHECK_EQ(
driver_->Seek(client_, handle_, offset),
0,
"Cannot seek through HDFS file, error is : {}",
driver_->GetLastExceptionRootCause());
}

int32_t read(char* pos, uint64_t length) const {
auto bytesRead = driver_->Read(client_, handle_, pos, length);
VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal.");
return bytesRead;
}
};

/**
* Implementation of hdfs read file.
*/
Expand Down Expand Up @@ -61,6 +103,7 @@ class HdfsReadFile final : public ReadFile {
hdfsFS hdfsClient_;
hdfsFileInfo* fileInfo_;
std::string filePath_;
folly::ThreadLocal<HdfsFile> file_;
};

} // namespace facebook::velox

0 comments on commit 88964d4

Please sign in to comment.