Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Seaven <[email protected]>
  • Loading branch information
Seaven committed Aug 21, 2024
1 parent 54c2568 commit c2bb005
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 11 deletions.
6 changes: 4 additions & 2 deletions be/src/exec/pipeline/pipeline_driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,10 @@ class PipelineDriver {
size_t _driver_queue_level = 0;
std::atomic<bool> _in_ready_queue{false};

// metrics
RuntimeProfile::Counter* _total_timer = nullptr;
PipelineObserverPtr _observer = nullptr;

// metrics
RuntimeProfile::Counter* _total_timer = nullptr;
RuntimeProfile::Counter* _active_timer = nullptr;
RuntimeProfile::Counter* _overhead_timer = nullptr;
RuntimeProfile::Counter* _schedule_timer = nullptr;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ void GlobalDriverExecutor::submit(DriverRawPtr driver) {
// << ", sink finished: " << driver->sink_operator()->is_finished()
// << ", source finished: " << driver->source_operator()->is_finished()
// << ", source output: " << driver->source_operator()->has_output();
_blocked_driver_poller->upgrade_to_blocked_driver(dr);
_blocked_driver_poller->upgrade_to_blocked_driver2(dr);
});
}

Expand Down
47 changes: 47 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_poller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,53 @@ void PipelineDriverPoller::upgrade_to_blocked_driver(const DriverRawPtr driver)
}
}

void PipelineDriverPoller::upgrade_to_blocked_driver2(const DriverRawPtr driver) {
{
std::unique_lock<std::mutex> lock(_backup_mutex);
if (_backup_drivers.remove(driver) < 1) {
return;
}
}
bool is_ready = false;
bool is_upgrade = false;
if (!driver->is_query_never_expired() && driver->query_ctx()->is_query_expired()) {
is_upgrade = true;
} else if (driver->fragment_ctx()->is_canceled() || driver->need_report_exec_state()) {
is_upgrade = true;
} else if (driver->pending_finish() && !driver->is_still_pending_finish()) {
driver->set_driver_state(driver->fragment_ctx()->is_canceled() ? DriverState::CANCELED : DriverState::FINISH);
is_ready = true;
} else if (driver->is_epoch_finishing() && !driver->is_still_epoch_finishing()) {
driver->set_driver_state(driver->fragment_ctx()->is_canceled() ? DriverState::CANCELED
: DriverState::EPOCH_FINISH);
is_ready = true;
} else if (driver->is_epoch_finished() || driver->is_finished()) {
is_ready = true;
} else {
auto status_or_is_not_blocked = driver->is_not_blocked();
if (!status_or_is_not_blocked.ok()) {
is_upgrade = true;
} else if (status_or_is_not_blocked.value()) {
is_ready = true;
}
}
if (is_upgrade) {
std::unique_lock<std::mutex> lock(_global_mutex);
_blocked_drivers.push_back(driver);
_blocked_driver_queue_len++;
driver->_pending_timer_sw->reset();
driver->driver_acct().clean_local_queue_infos();
_cond.notify_one();
return;
}
if (is_ready) {
_driver_queue->put_back(driver);
} else {
std::unique_lock<std::mutex> lock(_backup_mutex);
_backup_drivers.emplace_back(driver);
}
}

void PipelineDriverPoller::park_driver(const DriverRawPtr driver) {
std::unique_lock<std::mutex> lock(_global_parked_mutex);
VLOG_ROW << "Add to parked driver:" << driver->to_readable_string();
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/pipeline/pipeline_driver_poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class PipelineDriverPoller {
size_t backup_count() const { return _backup_count.load(); }

void upgrade_to_blocked_driver(const DriverRawPtr driver);
void upgrade_to_blocked_driver2(const DriverRawPtr driver);

private:
void run_internal();
Expand Down
76 changes: 76 additions & 0 deletions be/src/exec/pipeline/pipeline_observer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <functional>
#include <memory>
#include <mutex>
#include <utility>
#include <vector>

#include "common/logging.h"

namespace starrocks::pipeline {
class PipelineDriver;
using ObserverFunc = std::function<void()>;

class PipelineObserver {
public:
static std::shared_ptr<PipelineObserver> create(ObserverFunc fn) { return std::make_shared<PipelineObserver>(fn); }

PipelineObserver(const ObserverFunc& fn) {
_observer_fn = std::move(fn);
DCHECK(_observer_fn != nullptr);
}

void update() {
DCHECK(_observer_fn != nullptr);
_observer_fn();
}

private:
friend class PipelinePublisher;
ObserverFunc _observer_fn = nullptr;
};

using PipelineObserverPtr = std::shared_ptr<PipelineObserver>;

class PipelinePublisher {
public:
void attach(PipelineObserverPtr& observer) {
std::lock_guard<std::mutex> l(_mutex);
_observers.emplace_back(observer);
}

void detach(PipelineObserverPtr& observer) {
std::lock_guard<std::mutex> l(_mutex);
_observers.erase(std::remove(_observers.begin(), _observers.end(), observer), _observers.end());
}

void notify() {
std::lock_guard<std::mutex> l(_mutex);
for (auto& ob : _observers) {
ob->update();
}
}

private:
std::mutex _mutex;
std::vector<std::shared_ptr<PipelineObserver>> _observers;
};

using PipelinePublisherPtr = std::shared_ptr<PipelinePublisher>;

} // namespace starrocks::pipeline
4 changes: 0 additions & 4 deletions be/src/exec/pipeline/scan/olap_scan_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ void OlapScanContext::detach_shared_input(int32_t operator_seq, int32_t source_i
VLOG_ROW << fmt::format("detach_shared_input ({}, {}), remain {}", operator_seq, source_index,
_active_inputs.size());
_active_inputs.erase(key);

if (_active_inputs.empty()) {
_publisher.notify();
}
}

bool OlapScanContext::has_active_input() const {
Expand Down
10 changes: 6 additions & 4 deletions be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,12 @@ Status ScanOperator::set_finishing(RuntimeState* state) {
: "");
}
DeferOp op = DeferOp([this] { notify(); });
std::lock_guard guard(_task_mutex);
_detach_chunk_sources();
set_buffer_finished();
_is_finished = true;
{
std::lock_guard guard(_task_mutex);
_detach_chunk_sources();
set_buffer_finished();
_is_finished = true;
}
return Status::OK();
}

Expand Down

0 comments on commit c2bb005

Please sign in to comment.