Skip to content

Commit

Permalink
[Feature] support event scheduler for scan/exchange/agg (part 2) (#54338
Browse files Browse the repository at this point in the history
)

Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain authored Dec 31, 2024
1 parent 102b6d1 commit a0b2a63
Show file tree
Hide file tree
Showing 71 changed files with 822 additions and 59 deletions.
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ set(EXEC_FILES
pipeline/group_execution/execution_group.cpp
pipeline/group_execution/execution_group_builder.cpp
pipeline/group_execution/group_operator.cpp
pipeline/schedule/timeout_tasks.cpp
pipeline/schedule/event_scheduler.cpp
pipeline/schedule/observer.cpp
pipeline/schedule/pipeline_timer.cpp
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -729,10 +729,12 @@ bool Aggregator::is_chunk_buffer_empty() {
}

ChunkPtr Aggregator::poll_chunk_buffer() {
auto notify = defer_notify_sink();
return _limited_buffer->pull();
}

void Aggregator::offer_chunk_to_buffer(const ChunkPtr& chunk) {
auto notify = defer_notify_source();
_limited_buffer->push(chunk);
}

Expand Down
12 changes: 12 additions & 0 deletions be/src/exec/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "exec/chunk_buffer_memory_manager.h"
#include "exec/limited_pipeline_chunk_buffer.h"
#include "exec/pipeline/context_with_dependency.h"
#include "exec/pipeline/schedule/observer.h"
#include "exec/pipeline/spill_process_channel.h"
#include "exprs/agg/aggregate_factory.h"
#include "exprs/expr.h"
Expand Down Expand Up @@ -402,6 +403,15 @@ class Aggregator : public pipeline::ContextWithDependency {

HashTableKeyAllocator _state_allocator;

void attach_sink_observer(RuntimeState* state, pipeline::PipelineObserver* observer) {
_pip_observable.attach_sink_observer(state, observer);
}
void attach_source_observer(RuntimeState* state, pipeline::PipelineObserver* observer) {
_pip_observable.attach_source_observer(state, observer);
}
auto defer_notify_source() { return _pip_observable.defer_notify_source(); }
auto defer_notify_sink() { return _pip_observable.defer_notify_sink(); }

protected:
AggregatorParamsPtr _params;

Expand Down Expand Up @@ -510,6 +520,8 @@ class Aggregator : public pipeline::ContextWithDependency {
// aggregate combinator functions since they are not persisted in agg hash map
std::vector<AggregateFunctionPtr> _combinator_function;

pipeline::PipeObservable _pip_observable;

public:
void build_hash_map(size_t chunk_size, bool agg_group_by_with_limit = false);
void build_hash_map(size_t chunk_size, std::atomic<int64_t>& shared_limit_countdown, bool agg_group_by_with_limit);
Expand Down
22 changes: 20 additions & 2 deletions be/src/exec/chunk_buffer_memory_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,17 @@ class ChunkBufferMemoryManager {
}

void update_memory_usage(int64_t memory_usage, int64_t num_rows) {
_memory_usage += memory_usage;
_buffered_num_rows += num_rows;
bool prev_full = is_full();
size_t prev_memusage = _memory_usage.fetch_add(memory_usage);
size_t prev_num_rows = _buffered_num_rows.fetch_add(num_rows);
bool is_full =
prev_memusage + memory_usage >= _max_memory_usage || prev_num_rows + num_rows >= _max_buffered_rows;
bool expect = false;
bool full_changed = prev_full != is_full;
if (!full_changed) {
return;
}
_full_events_changed.compare_exchange_strong(expect, full_changed);
}

size_t get_memory_limit_per_driver() const { return _max_memory_usage_per_driver; }
Expand All @@ -65,12 +74,21 @@ class ChunkBufferMemoryManager {
_buffered_num_rows = 0;
}

bool full_events_changed() {
if (!_full_events_changed.load(std::memory_order_acquire)) {
return false;
}
bool val = true;
return _full_events_changed.compare_exchange_strong(val, false);
}

private:
std::atomic<size_t> _max_memory_usage{128UL * 1024 * 1024 * 1024}; // 128GB
size_t _max_memory_usage_per_driver = 128 * 1024 * 1024UL; // 128MB
size_t _max_buffered_rows{};
std::atomic<int64_t> _memory_usage{};
std::atomic<int64_t> _buffered_num_rows{};
size_t _max_input_dop;
std::atomic<bool> _full_events_changed{};
};
} // namespace starrocks::pipeline
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Status AggregateBlockingSinkOperator::prepare(RuntimeState* state) {
_aggregator->limit() != -1 && // has limit
_aggregator->conjunct_ctxs().empty() && // no 'having' clause
_aggregator->get_aggr_phase() == AggrPhase2); // phase 2, keep it to make things safe
_aggregator->attach_sink_observer(state, this->_observer);
return Status::OK();
}

Expand All @@ -46,6 +47,7 @@ void AggregateBlockingSinkOperator::close(RuntimeState* state) {
Status AggregateBlockingSinkOperator::set_finishing(RuntimeState* state) {
if (_is_finished) return Status::OK();
ONCE_DETECT(_set_finishing_once);
auto notify = _aggregator->defer_notify_source();
auto defer = DeferOp([this]() {
COUNTER_UPDATE(_aggregator->input_row_count(), _aggregator->num_input_rows());
_aggregator->sink_complete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class AggregateBlockingSinkOperatorFactory final : public OperatorFactory {

~AggregateBlockingSinkOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

Status prepare(RuntimeState* state) override;

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ bool AggregateBlockingSourceOperator::is_finished() const {
}

Status AggregateBlockingSourceOperator::set_finished(RuntimeState* state) {
auto notify = _aggregator->defer_notify_sink();
return _aggregator->set_finished();
}

Expand All @@ -37,6 +38,12 @@ void AggregateBlockingSourceOperator::close(RuntimeState* state) {
SourceOperator::close(state);
}

Status AggregateBlockingSourceOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperator::prepare(state));
_aggregator->attach_source_observer(state, this->_observer);
return Status::OK();
}

StatusOr<ChunkPtr> AggregateBlockingSourceOperator::pull_chunk(RuntimeState* state) {
RETURN_IF_CANCELLED(state);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class AggregateBlockingSourceOperator : public SourceOperator {

bool has_output() const override;
bool is_finished() const override;
Status prepare(RuntimeState* state) override;

Status set_finished(RuntimeState* state) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ namespace starrocks::pipeline {
Status AggregateDistinctBlockingSinkOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Operator::prepare(state));
RETURN_IF_ERROR(_aggregator->prepare(state, state->obj_pool(), _unique_metrics.get()));
return _aggregator->open(state);
RETURN_IF_ERROR(_aggregator->open(state));
_aggregator->attach_sink_observer(state, this->_observer);
return Status::OK();
}

void AggregateDistinctBlockingSinkOperator::close(RuntimeState* state) {
Expand All @@ -35,6 +37,7 @@ void AggregateDistinctBlockingSinkOperator::close(RuntimeState* state) {
Status AggregateDistinctBlockingSinkOperator::set_finishing(RuntimeState* state) {
if (_is_finished) return Status::OK();
ONCE_DETECT(_set_finishing_once);
auto notify = _aggregator->defer_notify_source();
auto defer = DeferOp([this]() {
COUNTER_UPDATE(_aggregator->input_row_count(), _aggregator->num_input_rows());
_aggregator->sink_complete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class AggregateDistinctBlockingSinkOperatorFactory final : public OperatorFactor

~AggregateDistinctBlockingSinkOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

Status prepare(RuntimeState* state) override {
RETURN_IF_ERROR(OperatorFactory::prepare(state));
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ void AggregateDistinctBlockingSourceOperator::close(RuntimeState* state) {
SourceOperator::close(state);
}

Status AggregateDistinctBlockingSourceOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperator::prepare(state));
_aggregator->attach_source_observer(state, this->_observer);
return Status::OK();
}

StatusOr<ChunkPtr> AggregateDistinctBlockingSourceOperator::pull_chunk(RuntimeState* state) {
RETURN_IF_CANCELLED(state);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class AggregateDistinctBlockingSourceOperator : public SourceOperator {

bool has_output() const override;
bool is_finished() const override;
Status prepare(RuntimeState* state) override;

Status set_finished(RuntimeState* state) override;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Status AggregateDistinctStreamingSinkOperator::prepare(RuntimeState* state) {
if (_aggregator->streaming_preaggregation_mode() == TStreamingPreaggregationMode::LIMITED_MEM) {
_limited_mem_state.limited_memory_size = config::streaming_agg_limited_memory_size;
}
_aggregator->attach_sink_observer(state, this->_observer);
return _aggregator->open(state);
}

Expand All @@ -37,6 +38,7 @@ void AggregateDistinctStreamingSinkOperator::close(RuntimeState* state) {
}

Status AggregateDistinctStreamingSinkOperator::set_finishing(RuntimeState* state) {
auto notify = _aggregator->defer_notify_source();
_is_finished = true;

// skip processing if cancelled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class AggregateDistinctStreamingSinkOperatorFactory final : public OperatorFacto

~AggregateDistinctStreamingSinkOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<AggregateDistinctStreamingSinkOperator>(
this, _id, _plan_node_id, driver_sequence, _aggregator_factory->get_or_create(driver_sequence));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,18 @@

#include "aggregate_distinct_streaming_source_operator.h"

#include "common/status.h"
#include "exec/pipeline/source_operator.h"
#include "runtime/runtime_state.h"

namespace starrocks::pipeline {

Status AggregateDistinctStreamingSourceOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperator::prepare(state));
_aggregator->attach_source_observer(state, this->_observer);
return Status::OK();
}

bool AggregateDistinctStreamingSourceOperator::has_output() const {
// There are two cases where chunk buffer is not null
// case1:streaming mode is 'FORCE_STREAMING'
Expand Down Expand Up @@ -46,6 +56,7 @@ bool AggregateDistinctStreamingSourceOperator::is_finished() const {
}

Status AggregateDistinctStreamingSourceOperator::set_finished(RuntimeState* state) {
auto notify = _aggregator->defer_notify_sink();
return _aggregator->set_finished();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class AggregateDistinctStreamingSourceOperator : public SourceOperator {

~AggregateDistinctStreamingSourceOperator() override = default;

Status prepare(RuntimeState* state) override;
bool has_output() const override;
bool is_finished() const override;

Expand Down Expand Up @@ -60,6 +61,8 @@ class AggregateDistinctStreamingSourceOperatorFactory final : public SourceOpera

~AggregateDistinctStreamingSourceOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<AggregateDistinctStreamingSourceOperator>(
this, _id, _plan_node_id, driver_sequence, _aggregator_factory->get_or_create(driver_sequence));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ Status AggregateStreamingSinkOperator::prepare(RuntimeState* state) {
if (_aggregator->streaming_preaggregation_mode() == TStreamingPreaggregationMode::LIMITED_MEM) {
_limited_mem_state.limited_memory_size = config::streaming_agg_limited_memory_size;
}
return _aggregator->open(state);
RETURN_IF_ERROR(_aggregator->open(state));
_aggregator->attach_sink_observer(state, this->_observer);
return Status::OK();
}

void AggregateStreamingSinkOperator::close(RuntimeState* state) {
Expand All @@ -40,8 +42,8 @@ void AggregateStreamingSinkOperator::close(RuntimeState* state) {
}

Status AggregateStreamingSinkOperator::set_finishing(RuntimeState* state) {
auto notify = _aggregator->defer_notify_source();
_is_finished = true;

// skip processing if cancelled
if (state->is_cancelled()) {
return Status::OK();
Expand Down Expand Up @@ -314,4 +316,12 @@ Status AggregateStreamingSinkOperator::reset_state(RuntimeState* state, const st
_is_finished = false;
return _aggregator->reset_state(state, refill_chunks, this);
}

std::string AggregateStreamingSinkOperator::get_name() const {
std::string finished = is_finished() ? "X" : "O";
auto full = _aggregator->is_chunk_buffer_full();
return fmt::format("{}_{}_{}({}) {{ full:{} has_output:{}}}", _name, _plan_node_id, (void*)this, finished, full,
has_output());
}

} // namespace starrocks::pipeline
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class AggregateStreamingSinkOperator : public Operator {
bool releaseable() const override { return true; }
void set_execute_mode(int performance_level) override;

std::string get_name() const override;

private:
// Invoked by push_chunk if current mode is TStreamingPreaggregationMode::FORCE_STREAMING
Status _push_chunk_by_force_streaming(const ChunkPtr& chunk);
Expand Down Expand Up @@ -85,6 +87,8 @@ class AggregateStreamingSinkOperatorFactory final : public OperatorFactory {

~AggregateStreamingSinkOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<AggregateStreamingSinkOperator>(this, _id, _plan_node_id, driver_sequence,
_aggregator_factory->get_or_create(driver_sequence));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,16 @@ bool AggregateStreamingSourceOperator::is_finished() const {
}

Status AggregateStreamingSourceOperator::set_finished(RuntimeState* state) {
auto notify = _aggregator->defer_notify_sink();
return _aggregator->set_finished();
}

Status AggregateStreamingSourceOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperator::prepare(state));
_aggregator->attach_source_observer(state, this->_observer);
return Status::OK();
}

void AggregateStreamingSourceOperator::close(RuntimeState* state) {
_aggregator->unref(state);
SourceOperator::close(state);
Expand Down Expand Up @@ -97,6 +104,7 @@ Status AggregateStreamingSourceOperator::_output_chunk_from_hash_map(ChunkPtr* c
}
});

// TODO: notify sink here
if (need_reset_aggregator) {
if (!_aggregator->is_sink_complete()) {
RETURN_IF_ERROR(_aggregator->reset_state(state, {}, nullptr, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class AggregateStreamingSourceOperator : public SourceOperator {

bool has_output() const override;
bool is_finished() const override;
Status prepare(RuntimeState* state) override;
Status set_finished(RuntimeState* state) override;

void close(RuntimeState* state) override;
Expand All @@ -58,6 +59,8 @@ class AggregateStreamingSourceOperatorFactory final : public SourceOperatorFacto

~AggregateStreamingSourceOperatorFactory() override = default;

bool support_event_scheduler() const override { return true; }

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<AggregateStreamingSourceOperator>(this, _id, _plan_node_id, driver_sequence,
_aggregator_factory->get_or_create(driver_sequence));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ class SpillableAggregateBlockingSinkOperatorFactory : public OperatorFactory {

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override;

bool support_event_scheduler() const override { return false; }

private:
ObjectPool _pool;
SortExecExprs _sort_exprs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class SpillableAggregateBlockingSourceOperatorFactory final : public SourceOpera

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override;

bool support_event_scheduler() const override { return false; }

private:
AggregatorFactoryPtr _hash_aggregator_factory;
// _stream_aggregatory_factory is only used when spilling happens
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ class SpillableAggregateDistinctBlockingSinkOperatorFactory final : public Opera

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override;

bool support_event_scheduler() const override { return false; }

private:
ObjectPool _pool;
SortExecExprs _sort_exprs;
Expand Down Expand Up @@ -144,6 +146,8 @@ class SpillableAggregateDistinctBlockingSourceOperatorFactory final : public Sou

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override;

bool support_event_scheduler() const override { return false; }

private:
AggregatorFactoryPtr _hash_aggregator_factory;
// _stream_aggregatory_factory is only used when spilling happens
Expand Down
Loading

0 comments on commit a0b2a63

Please sign in to comment.