Skip to content

Commit

Permalink
Capture KeepAlive instead of Executor in WriterOptions (#115)
Browse files Browse the repository at this point in the history
Summary:

folly::Executor::KeepAlive<> is the recommended way of holding
references to Executors, as they ensure the executor is kept alive until the
KeepAlive object is destroyed. Because of this, some folly APIs can only return
KeepAlive (and not shared_ptr), such as Global pools.

These APIs cannot use WriterOption is it takes a shared_ptr

Reviewed By: xiaoxmeng, HuamengJiang

Differential Revision: D66741079
  • Loading branch information
pedroerp authored and facebook-github-bot committed Dec 13, 2024
1 parent 2fa1587 commit 8f69f60
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 11 deletions.
24 changes: 19 additions & 5 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,25 @@ struct VeloxWriterOptions {

const velox::common::SpillConfig* spillConfig{nullptr};

// If provided, internal encoding operations will happen in parallel using
// this executor.
std::shared_ptr<folly::Executor> encodingExecutor;
// If provided, internal ingestion operations will happen in parallel
std::shared_ptr<folly::Executor> writeExecutor;
// If provided, internal writing/encoding operations will happen in parallel
// using the specified executors.
//
// The KeepAlive wrappers ensures that the executor object will be kept alive
// (allocated), and that the pool will be open for receiving new tasks. A
// shared_ptr would only guarantee that the object is still allocated, but not
// necessarily open for new task (e.g. it could have been .join()'ed through a
// different reference). Because of that, many libraries only provide
// KeepAlive references to executors, not shared_ptr, so taking a KeepAlive
// also makes it more convenient to clients.
//
// As a result, if a KeepAlive is still being held, clients trying to destruct
// the last reference of a shared_ptr to that executor will block until all
// KeepAlive references are destructed.
//
// - encodingExecutor: execute stream encoding operations in parallel.
// - writeExecutor: execute FieldWriter::write() operations in parallel.
folly::Executor::KeepAlive<> encodingExecutor;
folly::Executor::KeepAlive<> writeExecutor;

bool enableChunking = false;

Expand Down
17 changes: 11 additions & 6 deletions dwio/nimble/velox/tests/VeloxReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2426,14 +2426,17 @@ TEST_F(VeloxReaderTests, FuzzSimple) {
auto iterations = 20;
auto batches = 20;
std::mt19937 rng{seed};

for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
nimble::VeloxWriterOptions writerOptions;
std::shared_ptr<folly::CPUThreadPoolExecutor> executor;

if (parallelismFactor > 0) {
auto executor =
executor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.encodingExecutor = executor;
writerOptions.writeExecutor = executor;
writerOptions.encodingExecutor = folly::getKeepAliveToken(*executor);
writerOptions.writeExecutor = folly::getKeepAliveToken(*executor);
}

for (auto i = 0; i < iterations; ++i) {
Expand Down Expand Up @@ -2527,11 +2530,13 @@ TEST_F(VeloxReaderTests, FuzzComplex) {

for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
std::shared_ptr<folly::CPUThreadPoolExecutor> executor;

if (parallelismFactor > 0) {
auto executor =
executor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.encodingExecutor = executor;
writerOptions.writeExecutor = executor;
writerOptions.encodingExecutor = folly::getKeepAliveToken(*executor);
writerOptions.writeExecutor = folly::getKeepAliveToken(*executor);
}

for (auto i = 0; i < iterations; ++i) {
Expand Down

0 comments on commit 8f69f60

Please sign in to comment.