Skip to content

Commit

Permalink
[perf] optimize all parallel situation.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Mar 1, 2024
1 parent 237ae15 commit c374681
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 30 deletions.
6 changes: 3 additions & 3 deletions src/GraphCtrl/GraphElement/GElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,13 @@ CStatus GElement::fatProcessor(const CFunctionType& type) {
try {
switch (type) {
case CFunctionType::RUN: {
if (0 == trigger_times_) {
if (!is_prepared_) {
/** 第一次执行的时候,预先执行一下 prepareRun方法 */
status = prepareRun();
CGRAPH_FUNCTION_CHECK_STATUS
is_prepared_ = true;
}

trigger_times_++; // 记录实际上触发了多少次,而不是正式执行了多少次
for (CSize i = 0; i < this->loop_ && status.isOK() && GElementState::NORMAL == this->getCurState(); i++) {
/** 执行带切面的run方法 */
status += doAspect(GAspectType::BEGIN_RUN);
Expand All @@ -266,7 +266,7 @@ CStatus GElement::fatProcessor(const CFunctionType& type) {
}
case CFunctionType::INIT: {
concerned_params_.clear(); // 仅需要记录这一轮使用到的 GParam 信息
trigger_times_ = 0;
is_prepared_ = false;
status = doAspect(GAspectType::BEGIN_INIT);
CGRAPH_FUNCTION_CHECK_STATUS
status = init();
Expand Down
2 changes: 1 addition & 1 deletion src/GraphCtrl/GraphElement/GElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ class GElement : public GElementObject,
GAspectManagerPtr aspect_manager_ { nullptr }; // 整体流程的切面管理类
UThreadPoolPtr thread_pool_ { nullptr }; // 用于执行的线程池信息
GPerfInfo* perf_info_ { nullptr }; // 用于perf的信息
CULong trigger_times_ { 0 }; // 被触发的次数信息(loop执行n次,算触发1次)
CBool is_prepared_ { false }; // 判断是否已经执行过 prepareRun() 方法

/** 图相关信息 */
std::atomic<CSize> left_depend_ { 0 }; // 当 left_depend_ 值为0的时候,即可以执行该element信息
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,12 @@ CGRAPH_NAMESPACE_BEGIN

CStatus GDynamicEngine::setup(const GSortedGElementPtrSet& elements) {
CGRAPH_FUNCTION_BEGIN

// 给所有的值清空
total_element_arr_.clear();
front_element_arr_.clear();
total_end_size_ = 0;

// 确定所有的信息
for (GElementPtr element : elements) {
CGRAPH_ASSERT_NOT_NULL(element)
if (element->run_before_.empty()) {
total_end_size_++;
}

if (element->dependence_.empty()) {
front_element_arr_.emplace_back(element);
}
total_element_arr_.emplace_back(element);
}

/**
* 1. 标记数据,比如有多少个结束element等
* 2. 标记哪些数据,是linkable 的
* 3. 分析当前dag类型信息
*/
mark(elements);
GEngine::link(elements);
analysisDagType(elements);
CGRAPH_FUNCTION_END
Expand All @@ -56,6 +43,8 @@ CStatus GDynamicEngine::run() {
parallelRunAll();
break;
}
default:
CGRAPH_RETURN_ERROR_STATUS("unknown engine dag type")
}
status = cur_status_;
CGRAPH_FUNCTION_END
Expand Down Expand Up @@ -103,6 +92,24 @@ CVoid GDynamicEngine::beforeRun() {
}


CVoid GDynamicEngine::mark(const GSortedGElementPtrSet& elements) {
total_element_arr_.clear();
front_element_arr_.clear();
total_end_size_ = 0;

for (GElementPtr element : elements) {
if (element->run_before_.empty()) {
total_end_size_++;
}

if (element->dependence_.empty()) {
front_element_arr_.emplace_back(element);
}
total_element_arr_.emplace_back(element);
}
}


CVoid GDynamicEngine::analysisDagType(const GSortedGElementPtrSet& elements) {
CSize linkedSize = std::count_if(elements.begin(), elements.end(), [](GElementPtr element) {
return element->linkable_;
Expand Down Expand Up @@ -210,20 +217,22 @@ CVoid GDynamicEngine::parallelRunAll() {
* 非纯并行逻辑,不走此函数
*/
std::vector<std::future<CStatus>> futures;
futures.reserve(total_end_size_);
for (auto* element : front_element_arr_) {
futures.emplace_back(thread_pool_->commit([element] {
return element->fatProcessor(CFunctionType::RUN);
}, calcIndex(element)));
futures.reserve(total_end_size_ - 1);
for (int i = 1; i < total_end_size_; i++) {
futures.emplace_back(std::move(thread_pool_->commit([this, i] {
return total_element_arr_[i]->fatProcessor(CFunctionType::RUN);
}, calcIndex(total_element_arr_[i]))));
}

// 将 1~n 的数据,放入线程池。第0个,本地直接执行即可,类似亲和性处理
cur_status_ += (*front_element_arr_.begin())->fatProcessor(CFunctionType::RUN);
for (auto& fut : futures) {
cur_status_ += fut.get();
}
}


void GDynamicEngine::serialRunAll() {
CVoid GDynamicEngine::serialRunAll() {
/**
* 如果分析出来 dag是一个链式的,则直接依次执行element
* 直到所有element都执行完成,或者有出现错误的返回值
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ class GDynamicEngine : public GEngine {

CStatus afterRunCheck() override;

/**
* 记录当前 elements 数据信息
* @param elements
* @return
*/
CVoid mark(const GSortedGElementPtrSet& elements);

/**
* 分析当前的信息,主要用于区分dag的类型
* @return
Expand Down
2 changes: 1 addition & 1 deletion src/GraphCtrl/GraphParam/GParamManagerWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ CGRAPH_NAMESPACE_BEGIN
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(param_manager_) \
auto param = param_manager_->get<TGParam>(key); \
if (nullptr != param) { \
concerned_params_.insert(param); \
concerned_params_.insert(param); \
param->addBacktrace(name_.empty() ? session_ : name_); \
} \
return param; \
Expand Down

0 comments on commit c374681

Please sign in to comment.