diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 2a1f202..4fbc842 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -126,11 +126,25 @@ struct VeloxWriterOptions { const velox::common::SpillConfig* spillConfig{nullptr}; - // If provided, internal encoding operations will happen in parallel using - // this executor. - std::shared_ptr encodingExecutor; - // If provided, internal ingestion operations will happen in parallel - std::shared_ptr writeExecutor; + // If provided, internal writing/encoding operations will happen in parallel + // using the specified executors. + // + // The KeepAlive wrappers ensures that the executor object will be kept alive + // (allocated), and that the pool will be open for receiving new tasks. A + // shared_ptr would only guarantee that the object is still allocated, but not + // necessarily open for new task (e.g. it could have been .join()'ed through a + // different reference). Because of that, many libraries only provide + // KeepAlive references to executors, not shared_ptr, so taking a KeepAlive + // also makes it more convenient to clients. + // + // As a result, if a KeepAlive is still being held, clients trying to destruct + // the last reference of a shared_ptr to that executor will block until all + // KeepAlive references are destructed. + // + // - encodingExecutor: execute stream encoding operations in parallel. + // - writeExecutor: execute FieldWriter::write() operations in parallel. + folly::Executor::KeepAlive<> encodingExecutor; + folly::Executor::KeepAlive<> writeExecutor; bool enableChunking = false; diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index 8b0cf02..8f67b59 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -2426,14 +2426,17 @@ TEST_F(VeloxReaderTests, FuzzSimple) { auto iterations = 20; auto batches = 20; std::mt19937 rng{seed}; + for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; nimble::VeloxWriterOptions writerOptions; + std::shared_ptr executor; + if (parallelismFactor > 0) { - auto executor = + executor = std::make_shared(parallelismFactor); - writerOptions.encodingExecutor = executor; - writerOptions.writeExecutor = executor; + writerOptions.encodingExecutor = folly::getKeepAliveToken(*executor); + writerOptions.writeExecutor = folly::getKeepAliveToken(*executor); } for (auto i = 0; i < iterations; ++i) { @@ -2527,11 +2530,13 @@ TEST_F(VeloxReaderTests, FuzzComplex) { for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; + std::shared_ptr executor; + if (parallelismFactor > 0) { - auto executor = + executor = std::make_shared(parallelismFactor); - writerOptions.encodingExecutor = executor; - writerOptions.writeExecutor = executor; + writerOptions.encodingExecutor = folly::getKeepAliveToken(*executor); + writerOptions.writeExecutor = folly::getKeepAliveToken(*executor); } for (auto i = 0; i < iterations; ++i) {