Skip to content

Commit

Permalink
feat(ABFS): Support SAS and OAuth config (facebookincubator#11623)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: facebookincubator#11623

Reviewed By: Yuhta

Differential Revision: D66707634

Pulled By: xiaoxmeng

fbshipit-source-id: 86e8543a6be3e906e424ecee8762860a32192641
  • Loading branch information
zhli1142015 authored and facebook-github-bot committed Dec 3, 2024
1 parent 4dd6499 commit a0bbea2
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 53 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ if(VELOX_ENABLE_ABFS)
endif()
# files-datalake is built on blobs
find_package(azure-storage-files-datalake-cpp CONFIG REQUIRED)
find_package(azure-identity-cpp CONFIG REQUIRED)
add_definitions(-DVELOX_ENABLE_ABFS)
endif()

Expand Down
128 changes: 109 additions & 19 deletions velox/connectors/hive/storage_adapters/abfs/AbfsConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@
#include "velox/common/config/Config.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h"

#include <azure/identity/client_secret_credential.hpp>

namespace facebook::velox::filesystems {

AbfsConfig::AbfsConfig(
std::string_view path,
const config::ConfigBase& config) {
std::string_view file;
bool isHttps = true;
isHttps_ = true;
if (path.find(kAbfssScheme) == 0) {
file = path.substr(kAbfssScheme.size());
} else if (path.find(kAbfsScheme) == 0) {
file = path.substr(kAbfsScheme.size());
isHttps = false;
isHttps_ = false;
} else {
VELOX_FAIL("Invalid ABFS Path {}", path);
}
Expand All @@ -39,30 +41,118 @@ AbfsConfig::AbfsConfig(
fileSystem_ = file.substr(0, firstAt);
auto firstSep = file.find_first_of("/");
filePath_ = file.substr(firstSep + 1);
accountNameWithSuffix_ = file.substr(firstAt + 1, firstSep - firstAt - 1);

auto accountNameWithSuffix = file.substr(firstAt + 1, firstSep - firstAt - 1);
auto firstDot = accountNameWithSuffix.find_first_of(".");
auto accountName = accountNameWithSuffix.substr(0, firstDot);
auto endpointSuffix = accountNameWithSuffix.substr(firstDot + 5);
auto credKey = fmt::format("fs.azure.account.key.{}", accountNameWithSuffix);
std::stringstream ss;
ss << "DefaultEndpointsProtocol=" << (isHttps ? "https" : "http");
ss << ";AccountName=" << accountName;

if (config.valueExists(credKey)) {
auto authTypeKey =
fmt::format("{}.{}", kAzureAccountAuthType, accountNameWithSuffix_);
authType_ = kAzureSharedKeyAuthType;
if (config.valueExists(authTypeKey)) {
authType_ = config.get<std::string>(authTypeKey).value();
}
if (authType_ == kAzureSharedKeyAuthType) {
auto credKey =
fmt::format("{}.{}", kAzureAccountKey, accountNameWithSuffix_);
VELOX_USER_CHECK(
config.valueExists(credKey), "Config {} not found", credKey);
auto firstDot = accountNameWithSuffix_.find_first_of(".");
auto accountName = accountNameWithSuffix_.substr(0, firstDot);
auto endpointSuffix = accountNameWithSuffix_.substr(firstDot + 5);
std::stringstream ss;
ss << "DefaultEndpointsProtocol=" << (isHttps_ ? "https" : "http");
ss << ";AccountName=" << accountName;
ss << ";AccountKey=" << config.get<std::string>(credKey).value();
ss << ";EndpointSuffix=" << endpointSuffix;

if (config.valueExists(kAzureBlobEndpoint)) {
ss << ";BlobEndpoint="
<< config.get<std::string>(kAzureBlobEndpoint).value();
}
ss << ";";
connectionString_ = ss.str();
} else if (authType_ == kAzureOAuthAuthType) {
auto clientIdKey = fmt::format(
"{}.{}", kAzureAccountOAuth2ClientId, accountNameWithSuffix_);
auto clientSecretKey = fmt::format(
"{}.{}", kAzureAccountOAuth2ClientSecret, accountNameWithSuffix_);
auto clientEndpointKey = fmt::format(
"{}.{}", kAzureAccountOAuth2ClientEndpoint, accountNameWithSuffix_);
VELOX_USER_CHECK(
config.valueExists(clientIdKey), "Config {} not found", clientIdKey);
VELOX_USER_CHECK(
config.valueExists(clientSecretKey),
"Config {} not found",
clientSecretKey);
VELOX_USER_CHECK(
config.valueExists(clientEndpointKey),
"Config {} not found",
clientEndpointKey);
auto clientEndpoint = config.get<std::string>(clientEndpointKey).value();
auto firstSep = clientEndpoint.find_first_of("/", /* https:// */ 8);
authorityHost_ = clientEndpoint.substr(0, firstSep + 1);
auto sedondSep = clientEndpoint.find_first_of("/", firstSep + 1);
tenentId_ = clientEndpoint.substr(firstSep + 1, sedondSep - firstSep - 1);
Azure::Identity::ClientSecretCredentialOptions options;
options.AuthorityHost = authorityHost_;
tokenCredential_ =
std::make_shared<Azure::Identity::ClientSecretCredential>(
tenentId_,
config.get<std::string>(clientIdKey).value(),
config.get<std::string>(clientSecretKey).value(),
options);
} else if (authType_ == kAzureSASAuthType) {
auto sasKey = fmt::format("{}.{}", kAzureSASKey, accountNameWithSuffix_);
VELOX_USER_CHECK(config.valueExists(sasKey), "Config {} not found", sasKey);
sas_ = config.get<std::string>(sasKey).value();
} else {
VELOX_USER_FAIL(
"Unsupported auth type {}, supported auth types are SharedKey, OAuth and SAS.",
authType_);
}
}

std::unique_ptr<BlobClient> AbfsConfig::getReadFileClient() {
if (authType_ == kAzureSASAuthType) {
auto url = getUrl(true);
return std::make_unique<BlobClient>(fmt::format("{}?{}", url, sas_));
} else if (authType_ == kAzureOAuthAuthType) {
auto url = getUrl(true);
return std::make_unique<BlobClient>(url, tokenCredential_);
} else {
VELOX_USER_FAIL("Config {} not found", credKey);
return std::make_unique<BlobClient>(BlobClient::CreateFromConnectionString(
connectionString_, fileSystem_, filePath_));
}
}

ss << ";EndpointSuffix=" << endpointSuffix;
std::unique_ptr<DataLakeFileClient> AbfsConfig::getWriteFileClient() {
if (authType_ == kAzureSASAuthType) {
auto url = getUrl(false);
return std::make_unique<DataLakeFileClient>(
fmt::format("{}?{}", url, sas_));
} else if (authType_ == kAzureOAuthAuthType) {
auto url = getUrl(false);
return std::make_unique<DataLakeFileClient>(url, tokenCredential_);
} else {
return std::make_unique<DataLakeFileClient>(
DataLakeFileClient::CreateFromConnectionString(
connectionString_, fileSystem_, filePath_));
}
}

if (config.valueExists(kAzureBlobEndpoint)) {
ss << ";BlobEndpoint="
<< config.get<std::string>(kAzureBlobEndpoint).value();
std::string AbfsConfig::getUrl(bool withblobSuffix) {
std::string accountNameWithSuffixForUrl(accountNameWithSuffix_);
if (withblobSuffix) {
// We should use correct suffix for blob client.
size_t start_pos = accountNameWithSuffixForUrl.find("dfs");
if (start_pos != std::string::npos) {
accountNameWithSuffixForUrl.replace(start_pos, 3, "blob");
}
}
ss << ";";
connectionString_ = ss.str();
return fmt::format(
"{}{}/{}/{}",
isHttps_ ? "https://" : "http://",
accountNameWithSuffixForUrl,
fileSystem_,
filePath_);
}

} // namespace facebook::velox::filesystems
72 changes: 64 additions & 8 deletions velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,100 @@

#pragma once

#include <azure/core/credentials/credentials.hpp>
#include <azure/storage/blobs/blob_client.hpp>
#include <azure/storage/files/datalake.hpp>
#include <folly/hash/Hash.h>
#include <string>

using namespace Azure::Storage::Blobs;
using namespace Azure::Storage::Files::DataLake;

namespace facebook::velox::config {
class ConfigBase;
}

namespace facebook::velox::filesystems {

// This is used to specify the Azurite endpoint in testing.
static std::string kAzureBlobEndpoint{"fs.azure.blob-endpoint"};
static constexpr const char* kAzureBlobEndpoint{"fs.azure.blob-endpoint"};

// The authentication mechanism is set in `fs.azure.account.auth.type` (or the
// account specific variant). The supported values are SharedKey, OAuth and SAS.
static constexpr const char* kAzureAccountAuthType =
"fs.azure.account.auth.type";

static constexpr const char* kAzureAccountKey = "fs.azure.account.key";

static constexpr const char* kAzureSASKey = "fs.azure.sas.fixed.token";

static constexpr const char* kAzureAccountOAuth2ClientId =
"fs.azure.account.oauth2.client.id";

static constexpr const char* kAzureAccountOAuth2ClientSecret =
"fs.azure.account.oauth2.client.secret";

// Token end point, this can be found through Azure portal. For example:
// https://login.microsoftonline.com/{TENANTID}/oauth2/token
static constexpr const char* kAzureAccountOAuth2ClientEndpoint =
"fs.azure.account.oauth2.client.endpoint";

static constexpr const char* kAzureSharedKeyAuthType = "SharedKey";

static constexpr const char* kAzureOAuthAuthType = "OAuth";

static constexpr const char* kAzureSASAuthType = "SAS";

class AbfsConfig {
public:
explicit AbfsConfig(std::string_view path, const config::ConfigBase& config);

std::string identity() const {
const auto hash = folly::Hash();
return std::to_string(hash(connectionString_));
std::unique_ptr<BlobClient> getReadFileClient();

std::unique_ptr<DataLakeFileClient> getWriteFileClient();

std::string filePath() const {
return filePath_;
}

/// Test only.
std::string fileSystem() const {
return fileSystem_;
}

/// Test only.
std::string connectionString() const {
return connectionString_;
}

std::string fileSystem() const {
return fileSystem_;
/// Test only.
std::string tenentId() const {
return tenentId_;
}

std::string filePath() const {
return filePath_;
/// Test only.
std::string authorityHost() const {
return authorityHost_;
}

private:
std::string getUrl(bool withblobSuffix);

std::string authType_;

// Container name is called FileSystem in some Azure API.
std::string fileSystem_;
std::string filePath_;
std::string connectionString_;

bool isHttps_;
std::string accountNameWithSuffix_;

std::string sas_;

std::string tenentId_;
std::string authorityHost_;
std::shared_ptr<Azure::Core::Credentials::TokenCredential> tokenCredential_;
};

} // namespace facebook::velox::filesystems
12 changes: 3 additions & 9 deletions velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "velox/connectors/hive/storage_adapters/abfs/AbfsFileSystem.h"

#include <azure/storage/blobs/blob_client.hpp>
#include <fmt/format.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <glog/logging.h>
Expand All @@ -27,19 +26,16 @@
#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h"

namespace facebook::velox::filesystems {
using namespace Azure::Storage::Blobs;

class AbfsReadFile::Impl {
constexpr static uint64_t kNaturalReadSize = 4 << 20; // 4M
constexpr static uint64_t kReadConcurrency = 8;

public:
explicit Impl(std::string_view path, const config::ConfigBase& config) {
auto account = AbfsConfig(path, config);
filePath_ = account.filePath();
fileClient_ =
std::make_unique<BlobClient>(BlobClient::CreateFromConnectionString(
account.connectionString(), account.fileSystem(), filePath_));
auto abfsConfig = AbfsConfig(path, config);
filePath_ = abfsConfig.filePath();
fileClient_ = abfsConfig.getReadFileClient();
}

void initialize(const FileOptions& options) {
Expand All @@ -59,7 +55,6 @@ class AbfsReadFile::Impl {
} catch (Azure::Storage::StorageException& e) {
throwStorageExceptionWithOperationDetails("GetProperties", filePath_, e);
}

VELOX_CHECK_GE(length_, 0);
}

Expand Down Expand Up @@ -141,7 +136,6 @@ class AbfsReadFile::Impl {

Azure::Storage::Blobs::DownloadBlobOptions blob;
blob.Range = range;

auto response = fileClient_->Download(blob);
response.Value.BodyStream->ReadToCount(
reinterpret_cast<uint8_t*>(position), length);
Expand Down
19 changes: 6 additions & 13 deletions velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
*/

#include "velox/connectors/hive/storage_adapters/abfs/AbfsWriteFile.h"
#include <azure/storage/files/datalake.hpp>
#include "velox/connectors/hive/storage_adapters/abfs/AbfsConfig.h"
#include "velox/connectors/hive/storage_adapters/abfs/AbfsUtil.h"

using namespace Azure::Storage::Files::DataLake;

namespace facebook::velox::filesystems {
class DataLakeFileClientWrapper final : public AzureDataLakeFileClient {
public:
Expand All @@ -31,7 +28,8 @@ class DataLakeFileClientWrapper final : public AzureDataLakeFileClient {
client_->Create();
}

Models::PathProperties getProperties() override {
Azure::Storage::Files::DataLake::Models::PathProperties getProperties()
override {
return client_->GetProperties().Value;
}

Expand Down Expand Up @@ -120,16 +118,11 @@ class AbfsWriteFile::Impl {
AbfsWriteFile::AbfsWriteFile(
std::string_view path,
const config::ConfigBase& config) {
auto abfsAccount = AbfsConfig(path, config);
std::unique_ptr<AzureDataLakeFileClient> client =
auto abfsConfig = AbfsConfig(path, config);
std::unique_ptr<AzureDataLakeFileClient> clientWrapper =
std::make_unique<DataLakeFileClientWrapper>(
std::make_unique<DataLakeFileClient>(
DataLakeFileClient::CreateFromConnectionString(
abfsAccount.connectionString(),
abfsAccount.fileSystem(),
abfsAccount.filePath())));

impl_ = std::make_unique<Impl>(path, client);
abfsConfig.getWriteFileClient());
impl_ = std::make_unique<Impl>(path, clientWrapper);
}

AbfsWriteFile::AbfsWriteFile(
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/storage_adapters/abfs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ if(VELOX_ENABLE_ABFS)
velox_core
velox_hive_config
velox_dwio_common_exception
Azure::azure-identity
Azure::azure-storage-blobs
Azure::azure-storage-files-datalake
Folly::folly
Expand Down
Loading

0 comments on commit a0bbea2

Please sign in to comment.