Skip to content

Commit

Permalink
[Test] Support poller event
Browse files Browse the repository at this point in the history
Signed-off-by: Seaven <[email protected]>
  • Loading branch information
Seaven committed Aug 19, 2024
1 parent 3bad4e5 commit 7bcec27
Show file tree
Hide file tree
Showing 22 changed files with 250 additions and 4 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,8 @@ CONF_Bool(pipeline_analytic_enable_streaming_process, "true");
CONF_Bool(pipeline_analytic_enable_removable_cumulative_process, "true");
CONF_Int32(pipline_limit_max_delivery, "4096");

CONF_mBool(enable_pipeline_event_poller, "true");

CONF_mBool(use_default_dop_when_shared_scan, "true");
/// For parallel scan on the single tablet.
// These three configs are used to calculate the minimum number of rows picked up from a segment at one time.
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/context_with_dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class ContextWithDependency {
}

// When the output operator is finished, the context can be finished regardless of other running operators.
Status set_finished() {
virtual Status set_finished() {
_is_finished.store(true, std::memory_order_release);
return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/pipeline/exchange/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Operator::prepare(state));

_buffer->incr_sinker(state);
_buffer->bind_call_back([this]() { notify(); });

_be_number = state->be_number();
if (state->query_options().__isset.transmission_encode_level) {
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/pipeline/exchange/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Status ExchangeSourceOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperator::prepare(state));
_stream_recvr = static_cast<ExchangeSourceOperatorFactory*>(_factory)->create_stream_recvr(state);
_stream_recvr->bind_profile(_driver_sequence, _unique_metrics);
_stream_recvr->bind_call_back([&] { notify(); });
return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/exchange/sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,10 @@ Status SinkBuffer::_send_rpc(DisposableClosure<PTransmitChunkResult, ClosureCont
closure->cntl.request_attachment().append(request.attachment);
request.brpc_stub->transmit_chunk(&closure->cntl, request.params.get(), &closure->result, closure);
}

if (_callback) {
_callback();
}
return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/exchange/sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class SinkBuffer {

void incr_sinker(RuntimeState* state);

void bind_call_back(const std::function<void()>& callback) { _callback = callback; }

private:
using Mutex = bthread::Mutex;

Expand Down Expand Up @@ -194,6 +196,8 @@ class SinkBuffer {
std::atomic<int64_t> _request_sequence = 0;
int64_t _sent_audit_stats_frequency = 1;
int64_t _sent_audit_stats_frequency_upper_limit = 64;

std::function<void()> _callback = nullptr;
};

} // namespace starrocks::pipeline
3 changes: 2 additions & 1 deletion be/src/exec/pipeline/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "column/vectorized_fwd.h"
#include "common/statusor.h"
#include "exec/pipeline/pipeline_observer.h"
#include "exec/pipeline/runtime_filter_types.h"
#include "exec/spill/operator_mem_resource_manager.h"
#include "exprs/runtime_filter_bank.h"
Expand All @@ -39,7 +40,7 @@ using OperatorPtr = std::shared_ptr<Operator>;
using Operators = std::vector<OperatorPtr>;
using LocalRFWaitingSet = std::set<TPlanNodeId>;

class Operator {
class Operator : public PipelinePublisher {
friend class PipelineDriver;
friend class StreamPipelineDriver;

Expand Down
1 change: 1 addition & 0 deletions be/src/exec/pipeline/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ void Pipeline::instantiate_drivers(RuntimeState* state) {
<< " fragment_instance_id=" << print_id(fragment_ctx->fragment_instance_id());

setup_pipeline_profile(state);
LOG(INFO) << "Pipeline chains: " << to_readable_string();
for (size_t i = 0; i < dop; ++i) {
auto&& operators = create_operators(dop, i);
DriverPtr driver = nullptr;
Expand Down
12 changes: 12 additions & 0 deletions be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

#include <atomic>
#include <chrono>
#include <memory>

#include "column/vectorized_fwd.h"
#include "common/statusor.h"
#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/operator.h"
#include "exec/pipeline/operator_with_dependency.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "exec/pipeline/pipeline_observer.h"
#include "exec/pipeline/query_context.h"
#include "exec/pipeline/runtime_filter_types.h"
#include "exec/pipeline/scan/morsel.h"
Expand Down Expand Up @@ -455,6 +457,16 @@ class PipelineDriver {
return source_operator()->is_epoch_finishing() || sink_operator()->is_epoch_finishing();
}

void observe_operator(const std::function<void(PipelineDriver*)>& call_fn) {
auto observer = std::make_shared<PipelineObserver>([this, call_fn]() { call_fn(this); });
if (nullptr != source_operator()) {
source_operator()->attach(observer);
}
if (nullptr != sink_operator()) {
sink_operator()->attach(observer);
}
}

protected:
PipelineDriver()
: _operators(),
Expand Down
21 changes: 21 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include <memory>

#include "common/config.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "exec/pipeline/stream_pipeline_driver.h"
#include "exec/workgroup/work_group.h"
#include "gutil/strings/substitute.h"
Expand All @@ -42,6 +44,11 @@ GlobalDriverExecutor::GlobalDriverExecutor(const std::string& name, std::unique_
REGISTER_GAUGE_STARROCKS_METRIC(pipe_driver_queue_len, [this]() { return _driver_queue->size(); });
REGISTER_GAUGE_STARROCKS_METRIC(pipe_poller_block_queue_len,
[this]() { return _blocked_driver_poller->blocked_driver_queue_len(); });
REGISTER_GAUGE_STARROCKS_METRIC(pipe_poller_poll_count, [this]() { return _blocked_driver_poller->poll_count(); });
REGISTER_GAUGE_STARROCKS_METRIC(pipe_poller_ready_count,
[this]() { return _blocked_driver_poller->ready_count(); });
REGISTER_GAUGE_STARROCKS_METRIC(pipe_poller_backup_count,
[this]() { return _blocked_driver_poller->backup_count(); });
}

void GlobalDriverExecutor::close() {
Expand Down Expand Up @@ -251,6 +258,20 @@ StatusOr<DriverRawPtr> GlobalDriverExecutor::_get_next_driver(std::queue<DriverR
void GlobalDriverExecutor::submit(DriverRawPtr driver) {
driver->start_timers();

if (config::enable_pipeline_event_poller) {
driver->observe_operator([&](DriverRawPtr dr) {
if (dr->pending_finish() && !dr->is_still_pending_finish()) {
_blocked_driver_poller->upgrade_to_blocked_driver(dr);
return;
}
auto status_or_is_not_blocked = dr->is_not_blocked();
if (!status_or_is_not_blocked.ok() || status_or_is_not_blocked.value()) {
_blocked_driver_poller->upgrade_to_blocked_driver(dr);
return;
}
});
}

if (driver->is_precondition_block()) {
driver->set_driver_state(DriverState::PRECONDITION_BLOCK);
driver->mark_precondition_not_ready();
Expand Down
94 changes: 94 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_poller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include "pipeline_driver_poller.h"

#include <chrono>

#include "common/config.h"
namespace starrocks::pipeline {

void PipelineDriverPoller::start() {
Expand All @@ -24,6 +26,13 @@ void PipelineDriverPoller::start() {
if (!status.ok()) {
LOG(FATAL) << "Fail to create PipelineDriverPoller: error=" << status.to_string();
}

status = Thread::create(
"pipeline", "pipeline_backup_poller", [this]() { backup_run_internal(); }, &this->_backup_thread);
if (!status.ok()) {
LOG(FATAL) << "Fail to create PipelineDriverBackupPoller: error=" << status.to_string();
}

while (!this->_is_polling_thread_initialized.load(std::memory_order_acquire)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
Expand All @@ -34,6 +43,7 @@ void PipelineDriverPoller::shutdown() {
this->_is_shutdown.store(true, std::memory_order_release);
_cond.notify_one();
_polling_thread->join();
_backup_thread->join();
}
}

Expand All @@ -43,6 +53,7 @@ void PipelineDriverPoller::run_internal() {
int spin_count = 0;
std::vector<DriverRawPtr> ready_drivers;
while (!_is_shutdown.load(std::memory_order_acquire)) {
_poll_count++;
{
std::unique_lock<std::mutex> lock(_global_mutex);
tmp_blocked_drivers.splice(tmp_blocked_drivers.end(), _blocked_drivers);
Expand Down Expand Up @@ -175,7 +186,72 @@ void PipelineDriverPoller::run_internal() {
}
}

void PipelineDriverPoller::backup_run_internal() {
DriverList tmp_drivers;
while (!_is_shutdown.load(std::memory_order_acquire)) {
_backup_count++;
{
std::unique_lock<std::mutex> lock(_backup_mutex);
tmp_drivers.assign(_backup_drivers.begin(), _backup_drivers.end());
}
auto driver_it = tmp_drivers.begin();
while (driver_it != tmp_drivers.end()) {
auto* driver = *driver_it;

if (!driver->is_query_never_expired() && driver->query_ctx()->is_query_expired()) {
upgrade_to_blocked_driver(driver);
} else if (driver->fragment_ctx()->is_canceled() || driver->need_report_exec_state()) {
upgrade_to_blocked_driver(driver);
} else if ((driver->pending_finish() && driver->is_still_pending_finish()) ||
(driver->is_epoch_finishing() && driver->is_still_epoch_finishing()) ||
driver->is_epoch_finished() || driver->is_finished()) {
upgrade_to_blocked_driver(driver);
} else {
auto status_or_is_not_blocked = driver->is_not_blocked();
if (status_or_is_not_blocked.ok() && status_or_is_not_blocked.value()) {
upgrade_to_blocked_driver(driver);
}
}
}

tmp_drivers.clear();
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

// void PipelineDriverPoller::add_blocked_driver(const DriverRawPtr driver) {
// std::unique_lock<std::mutex> lock(_global_mutex);
// _blocked_drivers.push_back(driver);
// _blocked_driver_queue_len++;
// driver->_pending_timer_sw->reset();
// driver->driver_acct().clean_local_queue_infos();
// _cond.notify_one();
// }

void PipelineDriverPoller::add_blocked_driver(const DriverRawPtr driver) {
if (config::enable_pipeline_event_poller) {
if (!driver->is_query_never_expired() && driver->query_ctx()->is_query_expired()) {
} else if (driver->fragment_ctx()->is_canceled()) {
} else if (driver->need_report_exec_state()) {
} else if (driver->pending_finish() && driver->is_still_pending_finish()) {
std::unique_lock<std::mutex> lock(_backup_mutex);
_backup_drivers.push_back(driver);
return;
} else if (driver->is_epoch_finishing() && driver->is_still_epoch_finishing()) {
std::unique_lock<std::mutex> lock(_backup_mutex);
_backup_drivers.push_back(driver);
return;
} else if (driver->is_epoch_finished()) {
} else if (driver->is_finished()) {
} else {
auto status_or_is_not_blocked = driver->is_not_blocked();
if (status_or_is_not_blocked.ok() && !status_or_is_not_blocked.value()) {
std::unique_lock<std::mutex> lock(_backup_mutex);
_backup_drivers.push_back(driver);
return;
}
}
}
std::unique_lock<std::mutex> lock(_global_mutex);
_blocked_drivers.push_back(driver);
_blocked_driver_queue_len++;
Expand All @@ -184,6 +260,23 @@ void PipelineDriverPoller::add_blocked_driver(const DriverRawPtr driver) {
_cond.notify_one();
}

void PipelineDriverPoller::upgrade_to_blocked_driver(const DriverRawPtr driver) {
{
std::unique_lock<std::mutex> lock(_backup_mutex);
if (_backup_drivers.remove(driver) < 1) {
return;
}
}
{
std::unique_lock<std::mutex> lock(_global_mutex);
_blocked_drivers.push_back(driver);
_blocked_driver_queue_len++;
driver->_pending_timer_sw->reset();
driver->driver_acct().clean_local_queue_infos();
_cond.notify_one();
}
}

void PipelineDriverPoller::park_driver(const DriverRawPtr driver) {
std::unique_lock<std::mutex> lock(_global_parked_mutex);
VLOG_ROW << "Add to parked driver:" << driver->to_readable_string();
Expand Down Expand Up @@ -227,6 +320,7 @@ size_t PipelineDriverPoller::calculate_parked_driver(const ImmutableDriverPredic
}

void PipelineDriverPoller::remove_blocked_driver(DriverList& local_blocked_drivers, DriverList::iterator& driver_it) {
_ready_count++;
auto& driver = *driver_it;
driver->_pending_timer->update(driver->_pending_timer_sw->elapsed_time());
local_blocked_drivers.erase(driver_it++);
Expand Down
18 changes: 18 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <list>
#include <memory>
#include <mutex>
#include <vector>

#include "gutil/ref_counted.h"
#include "pipeline_driver.h"
#include "pipeline_driver_queue.h"
#include "util/thread.h"
Expand Down Expand Up @@ -61,8 +64,15 @@ class PipelineDriverPoller {

void iterate_immutable_driver(const IterateImmutableDriverFunc& call) const;

size_t poll_count() const { return _poll_count.load(); }
size_t ready_count() const { return _ready_count.load(); }
size_t backup_count() const { return _backup_count.load(); }

void upgrade_to_blocked_driver(const DriverRawPtr driver);

private:
void run_internal();
void backup_run_internal();
PipelineDriverPoller(const PipelineDriverPoller&) = delete;
PipelineDriverPoller& operator=(const PipelineDriverPoller&) = delete;

Expand All @@ -73,6 +83,11 @@ class PipelineDriverPoller {
mutable std::shared_mutex _local_mutex;
DriverList _local_blocked_drivers;

mutable std::mutex _backup_mutex;
DriverList _backup_drivers;
std::vector<uint64_t> _backup_drivers_wait_time;
scoped_refptr<Thread> _backup_thread;

DriverQueue* _driver_queue;
scoped_refptr<Thread> _polling_thread;
std::atomic<bool> _is_polling_thread_initialized;
Expand All @@ -84,5 +99,8 @@ class PipelineDriverPoller {
DriverList _parked_drivers;

std::atomic<size_t> _blocked_driver_queue_len;
std::atomic<size_t> _poll_count = 0;
std::atomic<size_t> _backup_count = 0;
std::atomic<size_t> _ready_count = 0;
};
} // namespace starrocks::pipeline
Loading

0 comments on commit 7bcec27

Please sign in to comment.