Skip to content

Commit

Permalink
fix: Fix the driver block hanging issue in serialized execution mode (#…
Browse files Browse the repository at this point in the history
…11647)

Summary:
Pull Request resolved: #11647

Fix the driver block hanging issues in serialized execution mode such as for
Gluten. When task calls next, it will
keep iterating through the drivers from the all pipelines from all drivers. If there is no runnable drivers, it generate a future
by collecting all the blocked driver futures and return to the user. If there is no cross blocked drivers dependence, then
the user can wait for all the drivers to resume to continue, otherwise it will simply hanging there. Gluten found these in the
following two cases which might also happen for Meta internal use case when we have more complex pipelines:
(1) hash join: the probe operator initially wait for the build to finish, the hash build (or its associated pipeline) might
wait for external event for input. Then the whole task will hang;
(2) local exchange: the producer pipeline might wait for the consumer pipeline to consume data to proceed. Then the
whole task will hang.

This PR fixes the issue by using collect any instead of collect all. The use of collect any will also accelerate performance
as a task can proceed with the first ready future instead of waiting for all the blocked drivers to proceed, e.g. the gluten union
case for Spark under serialized execution mode.

Given a future is not usable after passing to collect any and we can't call into a driver if it has a pending future or waiting event,
we introduce a BlockingDriverState in task to capture the blocking driver state under serialized execution mode:
When a driver returns a future, we set it into the blocking state which setups the blocking state (a bool indicating if a driver
is blocked or not) and the continuation to clear the blocking state such as the derived the future for collect any operation;
When task to collect any future from the blocking drivers, it gets a derived future from the blocking driver state which creates
a promise contract and keeps the promise to signal when the corresponding driver future becomes ready.
Since the driver future ready is from async code path, there is a lock inside each blocking driver state to prevent the concurrency

Unit tests added to reprod and verification.

#11442

Reviewed By: Yuhta, bikramSingh91, oerling

Differential Revision: D66438632

fbshipit-source-id: c052fa3de2f4a48b1261382368d7af9fdce9fbef
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Nov 26, 2024
1 parent ea3656e commit e80bf12
Show file tree
Hide file tree
Showing 5 changed files with 427 additions and 16 deletions.
5 changes: 4 additions & 1 deletion velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1146,8 +1146,11 @@ std::string blockingReasonToString(BlockingReason reason) {
return "kYield";
case BlockingReason::kWaitForArbitration:
return "kWaitForArbitration";
default:
break;
}
VELOX_UNREACHABLE();
VELOX_UNREACHABLE(
fmt::format("Unknown blocking reason {}", static_cast<int>(reason)));
}

DriverThreadContext* driverThreadContext() {
Expand Down
105 changes: 95 additions & 10 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -647,9 +647,7 @@ RowVectorPtr Task::next(ContinueFuture* future) {
}

VELOX_CHECK_EQ(
static_cast<int>(state_),
static_cast<int>(kRunning),
"Task has already finished processing.");
state_, TaskState::kRunning, "Task has already finished processing.");

// On first call, create the drivers.
if (driverFactories_.empty()) {
Expand Down Expand Up @@ -684,6 +682,11 @@ RowVectorPtr Task::next(ContinueFuture* future) {
}

drivers_ = std::move(drivers);
driverBlockingStates_.reserve(drivers_.size());
for (auto i = 0; i < drivers_.size(); ++i) {
driverBlockingStates_.emplace_back(
std::make_unique<DriverBlockingState>(drivers_[i].get()));
}
}

// Run drivers one at a time. If a driver blocks, continue running the other
Expand All @@ -698,7 +701,10 @@ RowVectorPtr Task::next(ContinueFuture* future) {
int runnableDrivers = 0;
int blockedDrivers = 0;
for (auto i = 0; i < numDrivers; ++i) {
if (drivers_[i] == nullptr) {
// Holds a reference to driver for access as async task terminate might
// remove drivers from 'drivers_' slot.
auto driver = getDriver(i);
if (driver == nullptr) {
// This driver has finished processing.
continue;
}
Expand All @@ -709,16 +715,25 @@ RowVectorPtr Task::next(ContinueFuture* future) {
continue;
}

ContinueFuture blockFuture = ContinueFuture::makeEmpty();
if (driverBlockingStates_[i]->blocked(&blockFuture)) {
VELOX_CHECK(blockFuture.valid());
futures[i] = std::move(blockFuture);
// This driver is still blocked.
++blockedDrivers;
continue;
}
++runnableDrivers;

ContinueFuture driverFuture = ContinueFuture::makeEmpty();
auto result = drivers_[i]->next(&driverFuture);
if (result) {
auto result = driver->next(&driverFuture);
if (result != nullptr) {
VELOX_CHECK(!driverFuture.valid());
return result;
}

if (driverFuture.valid()) {
futures[i] = std::move(driverFuture);
driverBlockingStates_[i]->setDriverFuture(driverFuture);
}

if (error()) {
Expand All @@ -728,7 +743,7 @@ RowVectorPtr Task::next(ContinueFuture* future) {

if (runnableDrivers == 0) {
if (blockedDrivers > 0) {
if (!future) {
if (future == nullptr) {
VELOX_FAIL(
"Cannot make progress as all remaining drivers are blocked and user are not expected to wait.");
} else {
Expand All @@ -738,14 +753,20 @@ RowVectorPtr Task::next(ContinueFuture* future) {
notReadyFutures.emplace_back(std::move(continueFuture));
}
}
*future = folly::collectAll(std::move(notReadyFutures)).unit();
*future = folly::collectAny(std::move(notReadyFutures)).unit();
}
}
return nullptr;
}
}
}

std::shared_ptr<Driver> Task::getDriver(uint32_t driverId) const {
VELOX_CHECK_LT(driverId, drivers_.size());
std::unique_lock<std::timed_mutex> l(mutex_);
return drivers_[driverId];
}

void Task::start(uint32_t maxDrivers, uint32_t concurrentSplitGroups) {
facebook::velox::process::ThreadDebugInfo threadDebugInfo{
queryCtx()->queryId(), taskId_, nullptr};
Expand Down Expand Up @@ -1480,7 +1501,7 @@ void Task::noMoreSplits(const core::PlanNodeId& planNodeId) {
}

if (allFinished) {
terminate(kFinished);
terminate(TaskState::kFinished);
}
}

Expand Down Expand Up @@ -3102,4 +3123,68 @@ void Task::MemoryReclaimer::abort(
<< "Timeout waiting for task to complete during query memory aborting.";
}
}

void Task::DriverBlockingState::setDriverFuture(ContinueFuture& driverFuture) {
VELOX_CHECK(!blocked_);
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(promises_.empty());
VELOX_CHECK_NULL(error_);
blocked_ = true;
}
std::move(driverFuture)
.via(&folly::InlineExecutor::instance())
.thenValue(
[&, driverHolder = driver_->shared_from_this()](auto&& /* unused */) {
std::vector<std::unique_ptr<ContinuePromise>> promises;
{
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(blocked_);
VELOX_CHECK_NULL(error_);
promises = std::move(promises_);
blocked_ = false;
}
for (auto& promise : promises) {
promise->setValue();
}
})
.thenError(
folly::tag_t<std::exception>{},
[&, driverHolder = driver_->shared_from_this()](
std::exception const& e) {
std::lock_guard<std::mutex> l(mutex_);
VELOX_CHECK(blocked_);
VELOX_CHECK_NULL(error_);
try {
VELOX_FAIL(
"A driver future from task {} was realized with error: {}",
driver_->task()->taskId(),
e.what());
} catch (const VeloxException&) {
error_ = std::current_exception();
}
blocked_ = false;
});
}

bool Task::DriverBlockingState::blocked(ContinueFuture* future) {
VELOX_CHECK_NOT_NULL(future);
std::lock_guard<std::mutex> l(mutex_);
if (error_ != nullptr) {
std::rethrow_exception(error_);
}
if (!blocked_) {
VELOX_CHECK(promises_.empty());
return false;
}
auto [blockPromise, blockFuture] =
makeVeloxContinuePromiseContract(fmt::format(
"DriverBlockingState {} from task {}",
driver_->driverCtx()->driverId,
driver_->task()->taskId()));
*future = std::move(blockFuture);
promises_.emplace_back(
std::make_unique<ContinuePromise>(std::move(blockPromise)));
return true;
}
} // namespace facebook::velox::exec
39 changes: 37 additions & 2 deletions velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -613,13 +613,13 @@ class Task : public std::enable_shared_from_this<Task> {
/// realized when the last thread stops running for 'this'. This is used to
/// mark cancellation by the user.
ContinueFuture requestCancel() {
return terminate(kCanceled);
return terminate(TaskState::kCanceled);
}

/// Like requestCancel but sets end state to kAborted. This is for stopping
/// Tasks due to failures of other parts of the query.
ContinueFuture requestAbort() {
return terminate(kAborted);
return terminate(TaskState::kAborted);
}

void requestYield() {
Expand Down Expand Up @@ -996,6 +996,8 @@ class Task : public std::enable_shared_from_this<Task> {
// trace enabled.
void maybeInitTrace();

std::shared_ptr<Driver> getDriver(uint32_t driverId) const;

// Universally unique identifier of the task. Used to identify the task when
// calling TaskListener.
const std::string uuid_;
Expand Down Expand Up @@ -1067,6 +1069,39 @@ class Task : public std::enable_shared_from_this<Task> {

std::vector<std::unique_ptr<DriverFactory>> driverFactories_;
std::vector<std::shared_ptr<Driver>> drivers_;

// Tracks the blocking state for each driver under serialized execution mode.
class DriverBlockingState {
public:
explicit DriverBlockingState(const Driver* driver) : driver_(driver) {
VELOX_CHECK_NOT_NULL(driver_);
}

/// Sets driver future by setting the continuation callback via inline
/// executor.
void setDriverFuture(ContinueFuture& diverFuture);

/// Indicates if the associated driver is blocked or not. If blocked,
/// 'future' is set which becomes realized when the driver is unblocked.
///
/// NOTE: the function throws if the driver has encountered error.
bool blocked(ContinueFuture* future);

private:
const Driver* const driver_;

mutable std::mutex mutex_;
// Indicates if the associated driver is blocked or not.
bool blocked_{false};
// Sets the driver future error if not null.
std::exception_ptr error_{nullptr};
// Promises to fulfill when the driver is unblocked.
std::vector<std::unique_ptr<ContinuePromise>> promises_;
};

// Tracks the driver blocking state under serialized execution mode.
std::vector<std::unique_ptr<DriverBlockingState>> driverBlockingStates_;

// When Drivers are closed by the Task, there is a chance that race and/or
// bugs can cause such Drivers to be held forever, in turn holding a pointer
// to the Task making it a zombie Tasks. This vector is used to keep track of
Expand Down
28 changes: 27 additions & 1 deletion velox/exec/TaskStructs.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,24 @@ class MergeSource;
class MergeJoinSource;
struct Split;

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
enum TaskState {
kRunning = 0,
kFinished = 1,
kCanceled = 2,
kAborted = 3,
kFailed = 4
};
#else
/// Corresponds to Presto TaskState, needed for reporting query completion.
enum TaskState { kRunning, kFinished, kCanceled, kAborted, kFailed };
enum class TaskState : int {
kRunning = 0,
kFinished = 1,
kCanceled = 2,
kAborted = 3,
kFailed = 4
};
#endif

std::string taskStateString(TaskState state);

Expand Down Expand Up @@ -139,3 +155,13 @@ struct SplitGroupState {
};

} // namespace facebook::velox::exec

template <>
struct fmt::formatter<facebook::velox::exec::TaskState>
: formatter<std::string> {
auto format(facebook::velox::exec::TaskState state, format_context& ctx)
const {
return formatter<std::string>::format(
facebook::velox::exec::taskStateString(state), ctx);
}
};
Loading

0 comments on commit e80bf12

Please sign in to comment.