Skip to content

Commit

Permalink
[perf] optimize all parallel dag speed. origin 21.3s, now 12.4s
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Nov 27, 2023
1 parent 5399db3 commit 583cf40
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@ CStatus GDynamicEngine::setup(const GSortedGElementPtrSet& elements) {

CStatus GDynamicEngine::run() {
CGRAPH_FUNCTION_BEGIN
beforeRun();
asyncRunAndWait();

if (likely(total_end_size_ != total_element_arr_.size())) {
beforeRun();
asyncRunAndWait();
} else {
parallelRunAll();
}
status = cur_status_;
CGRAPH_FUNCTION_END
}
Expand Down Expand Up @@ -154,4 +158,25 @@ CVoid GDynamicEngine::wait() {
});
}


CVoid GDynamicEngine::parallelRunAll() {
/**
* 主要适用于dag是纯并发逻辑的情况
* 直接并发的执行所有的流程,从而减少调度损耗
* 实测效果,在32路纯并行的情况下,整体耗时从 21.5s降低到 12.5s
* 非纯并行逻辑,不走此函数
*/
std::vector<std::future<CStatus>> futures;
futures.reserve(front_element_arr_.size());
for (auto* element : front_element_arr_) {
futures.emplace_back(thread_pool_->commit([element] {
return element->fatProcessor(CFunctionType::RUN);
}, calcIndex(element)));
}

for (auto& fut : futures) {
cur_status_ += fut.get();
}
}

CGRAPH_NAMESPACE_END
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,18 @@ class GDynamicEngine : public GEngine {
*/
CVoid wait();

/**
* 并发的执行所有的节点
* @return
*/
CVoid parallelRunAll();

private:
GElementPtrArr total_element_arr_; // pipeline中所有的元素信息集合
GElementPtrArr front_element_arr_; // 没有依赖的元素信息
CSize total_end_size_ = 0; // 图结束节点数量
CSize finished_end_size_ = 0; // 执行结束节点数量
std::atomic<CSize> run_element_size_ { 0 }; // 执行元素的个数,用于后期校验。这里和静态不一样,需要加atomic
std::atomic<CSize> run_element_size_ { 0 }; // 执行元素的个数,用于后期校验。这里和静态不一样,需要加atomic
CStatus cur_status_; // 当前全局的状态信息

std::mutex lock_;
Expand Down
1 change: 0 additions & 1 deletion src/GraphCtrl/GraphParam/GParamManager.inl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ CStatus GParamManager::create(const std::string& key, CBool backtrace) {
template<typename T,
c_enable_if_t<std::is_base_of<GParam, T>::value, int>>
T* GParamManager::get(const std::string& key) {
CGRAPH_LOCK_GUARD lock(this->mutex_);
auto result = params_map_.find(key);
if (result == params_map_.end()) {
return nullptr;
Expand Down
11 changes: 9 additions & 2 deletions tutorial/MyGNode/MyWriteParamNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

class MyWriteParamNode : public CGraph::GNode {
public:
CStatus init () override {
CStatus init() override {
CStatus status;
/**
* 推荐在init()中,将可能用到的参数创建好。也支持在run的时候创建
Expand All @@ -24,7 +24,14 @@ class MyWriteParamNode : public CGraph::GNode {
return status;
}

CStatus run () override {
CStatus run() override {
/**
* 为了提高执行效率,
* 在【创建参数】的时候,【提供】锁保护机制
* 在【获取参数】的时候,【不提供】锁保护的机制
* 故无法通过在run()过程中,并发的通过 createGParam 和 getGParam 创建和获取参数
* 如果需要做此操作,请自行外部加锁
*/
auto* myParam = CGRAPH_GET_GPARAM_WITH_NO_EMPTY(MyParam, "param1")
int val = 0;
int cnt = 0;
Expand Down

0 comments on commit 583cf40

Please sign in to comment.