Skip to content

Commit

Permalink
Make Zstrong compressor pluggable
Browse files Browse the repository at this point in the history
Summary:
Preparing for OSS, we remove the name Zstrong from the public source code.
We now refer to it as "MetaInternal".

This required making compressors pluggable.

Reviewed By: sdruzkin

Differential Revision: D56396138

fbshipit-source-id: 265d8314b087ce5c404989813fafd42b364a4072
  • Loading branch information
helfman authored and facebook-github-bot committed Apr 25, 2024
1 parent c9f198c commit 9dba0cd
Show file tree
Hide file tree
Showing 22 changed files with 190 additions and 407 deletions.
16 changes: 9 additions & 7 deletions dwio/nimble/common/MetricsLogger.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,25 @@ struct FileCloseMetrics {
folly::dynamic serialize() const;
};

enum class LogOperation {
StripeLoad,
StripeFlush,
FileClose,
CompressionContext,
};

class MetricsLogger {
public:
constexpr static std::string_view kStripeLoadOperation{"STRIPE_LOAD"};
constexpr static std::string_view kStripeFlushOperation{"STRIPE_FLUSH"};
constexpr static std::string_view kFileCloseOperation{"FILE_CLOSE"};
constexpr static std::string_view kZstrong{"ZSTRONG"};

virtual ~MetricsLogger() = default;

virtual void logException(
std::string_view /* operation */,
LogOperation /* operation */,
const std::string& /* errorMessage */) const {}

virtual void logStripeLoad(const StripeLoadMetrics& /* metrics */) const {}
virtual void logStripeFlush(const StripeFlushMetrics& /* metrics */) const {}
virtual void logFileClose(const FileCloseMetrics& /* metrics */) const {}
virtual void logZstrongContext(const std::string&) const {}
virtual void logCompressionContext(const std::string&) const {}
};

class LoggingScope {
Expand Down
4 changes: 2 additions & 2 deletions dwio/nimble/common/Types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ std::string toString(CompressionType compressionType) {
return "Uncompressed";
case CompressionType::Zstd:
return "Zstd";
case CompressionType::Zstrong:
return "Zstrong";
case CompressionType::MetaInternal:
return "MetaInternal";
default:
return fmt::format(
"Unknown compression type: {}",
Expand Down
2 changes: 1 addition & 1 deletion dwio/nimble/common/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ enum class CompressionType : uint8_t {
Uncompressed = 0,
// Zstd doesn't require us to externally store level or any other info.
Zstd = 1,
Zstrong = 2,
MetaInternal = 2,
};

std::string toString(CompressionType compressionType);
Expand Down
88 changes: 38 additions & 50 deletions dwio/nimble/encodings/Compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,73 +16,61 @@
#include "dwio/nimble/encodings/Compression.h"
#include "dwio/nimble/common/EncodingPrimitives.h"
#include "dwio/nimble/common/Exceptions.h"
#include "dwio/nimble/encodings/CompressionInternal.h"
#include "dwio/nimble/encodings/ZstdCompressor.h"

#ifdef META_INTERNAL_COMPRESSOR
#include "dwio/nimble/encodings/fb/MetaInternalCompressor.h"
#endif

namespace facebook::nimble {

namespace {

struct CompressorRegistry {
CompressorRegistry() {
compressors.reserve(2);
compressors.emplace(
CompressionType::Zstd, std::make_unique<ZstdCompressor>());
#ifdef META_INTERNAL_COMPRESSOR
compressors.emplace(
CompressionType::MetaInternal,
std::make_unique<MetaInternalCompressor>());
#endif
}

std::unordered_map<CompressionType, std::unique_ptr<ICompressor>> compressors;
};

ICompressor& getCompressor(CompressionType compressionType) {
static CompressorRegistry registry;
auto it = registry.compressors.find(compressionType);
NIMBLE_CHECK(
it != registry.compressors.end(),
fmt::format(
"Compressor for type {} is not registered.",
toString(compressionType)));
return *it->second;
}
} // namespace

/* static */ CompressionResult Compression::compress(
velox::memory::MemoryPool& memoryPool,
std::string_view data,
DataType dataType,
int bitWidth,
const CompressionPolicy& compressionPolicy) {
auto compression = compressionPolicy.compression();
switch (compression.compressionType) {
#ifdef NIMBLE_HAS_ZSTD
case CompressionType::Zstd:
return compressZstd(
memoryPool,
data,
dataType,
bitWidth,
compressionPolicy,
compression.parameters.zstd);
#endif

#ifdef NIMBLE_HAS_ZSTRONG
case CompressionType::Zstrong:
return compressZstrong(
memoryPool,
data,
dataType,
bitWidth,
compressionPolicy,
compression.parameters.zstrong);
#endif

default:
break;
}
NIMBLE_NOT_SUPPORTED(fmt::format(
"Unsupported compression type: {}.",
toString(compression.compressionType)));
return getCompressor(compression.compressionType)
.compress(memoryPool, data, dataType, bitWidth, compressionPolicy);
}

/* static */ Vector<char> Compression::uncompress(
velox::memory::MemoryPool& memoryPool,
CompressionType compressionType,
std::string_view data) {
switch (compressionType) {
case CompressionType::Uncompressed: {
NIMBLE_UNREACHABLE(
"uncompress() shouldn't be called on uncompressed buffer.");
}

#ifdef NIMBLE_HAS_ZSTD
case CompressionType::Zstd:
return uncompressZstd(memoryPool, compressionType, data);
#endif

#ifdef NIMBLE_HAS_ZSTRONG
case CompressionType::Zstrong:
return uncompressZstrong(memoryPool, compressionType, data);
#endif

default:
break;
}
NIMBLE_NOT_SUPPORTED(fmt::format(
"Unsupported decompression type: {}.", toString(compressionType)));
return getCompressor(compressionType)
.uncompress(memoryPool, compressionType, data);
}

} // namespace facebook::nimble
20 changes: 20 additions & 0 deletions dwio/nimble/encodings/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,24 @@ struct CompressionResult {
std::optional<Vector<char>> buffer;
};

struct ICompressor {
virtual CompressionResult compress(
velox::memory::MemoryPool& memoryPool,
std::string_view data,
DataType dataType,
int bitWidth,
const CompressionPolicy& compressionPolicy) = 0;

virtual Vector<char> uncompress(
velox::memory::MemoryPool& memoryPool,
CompressionType compressionType,
std::string_view data) = 0;

virtual CompressionType compressionType() = 0;

virtual ~ICompressor() = default;
};

class Compression {
public:
static CompressionResult compress(
Expand All @@ -41,6 +59,8 @@ class Compression {
velox::memory::MemoryPool& memoryPool,
CompressionType compressionType,
std::string_view data);

static void registerCompressor(std::unique_ptr<ICompressor>&& compressor);
};

// Encodings using compression repeat the same pattern, which involves trying to
Expand Down
49 changes: 0 additions & 49 deletions dwio/nimble/encodings/CompressionInternal.h

This file was deleted.

Loading

0 comments on commit 9dba0cd

Please sign in to comment.