From 583cf40984b91ec3700f242afd105385b89bc21d Mon Sep 17 00:00:00 2001 From: ChunelFeng Date: Mon, 27 Nov 2023 23:38:00 +0800 Subject: [PATCH] [perf] optimize all parallel dag speed. origin 21.3s, now 12.4s --- .../GDynamicEngine/GDynamicEngine.cpp | 29 +++++++++++++++++-- .../_GEngine/GDynamicEngine/GDynamicEngine.h | 8 ++++- src/GraphCtrl/GraphParam/GParamManager.inl | 1 - tutorial/MyGNode/MyWriteParamNode.h | 11 +++++-- 4 files changed, 43 insertions(+), 6 deletions(-) diff --git a/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp b/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp index 6eb27233..b8d33760 100644 --- a/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp +++ b/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp @@ -37,9 +37,13 @@ CStatus GDynamicEngine::setup(const GSortedGElementPtrSet& elements) { CStatus GDynamicEngine::run() { CGRAPH_FUNCTION_BEGIN - beforeRun(); - asyncRunAndWait(); + if (likely(total_end_size_ != total_element_arr_.size())) { + beforeRun(); + asyncRunAndWait(); + } else { + parallelRunAll(); + } status = cur_status_; CGRAPH_FUNCTION_END } @@ -154,4 +158,25 @@ CVoid GDynamicEngine::wait() { }); } + +CVoid GDynamicEngine::parallelRunAll() { + /** + * 主要适用于dag是纯并发逻辑的情况 + * 直接并发的执行所有的流程,从而减少调度损耗 + * 实测效果,在32路纯并行的情况下,整体耗时从 21.5s降低到 12.5s + * 非纯并行逻辑,不走此函数 + */ + std::vector> futures; + futures.reserve(front_element_arr_.size()); + for (auto* element : front_element_arr_) { + futures.emplace_back(thread_pool_->commit([element] { + return element->fatProcessor(CFunctionType::RUN); + }, calcIndex(element))); + } + + for (auto& fut : futures) { + cur_status_ += fut.get(); + } +} + CGRAPH_NAMESPACE_END diff --git a/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.h b/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.h index 8c60f4b5..11489faf 100644 --- a/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.h +++ b/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.h @@ -62,12 +62,18 @@ class GDynamicEngine : public GEngine { */ CVoid wait(); + /** + * 并发的执行所有的节点 + * @return + */ + CVoid parallelRunAll(); + private: GElementPtrArr total_element_arr_; // pipeline中所有的元素信息集合 GElementPtrArr front_element_arr_; // 没有依赖的元素信息 CSize total_end_size_ = 0; // 图结束节点数量 CSize finished_end_size_ = 0; // 执行结束节点数量 - std::atomic run_element_size_ { 0 }; // 执行元素的个数,用于后期校验。这里和静态不一样,需要加atomic + std::atomic run_element_size_ { 0 }; // 执行元素的个数,用于后期校验。这里和静态不一样,需要加atomic CStatus cur_status_; // 当前全局的状态信息 std::mutex lock_; diff --git a/src/GraphCtrl/GraphParam/GParamManager.inl b/src/GraphCtrl/GraphParam/GParamManager.inl index b26dab31..4c18ed45 100644 --- a/src/GraphCtrl/GraphParam/GParamManager.inl +++ b/src/GraphCtrl/GraphParam/GParamManager.inl @@ -37,7 +37,6 @@ CStatus GParamManager::create(const std::string& key, CBool backtrace) { template::value, int>> T* GParamManager::get(const std::string& key) { - CGRAPH_LOCK_GUARD lock(this->mutex_); auto result = params_map_.find(key); if (result == params_map_.end()) { return nullptr; diff --git a/tutorial/MyGNode/MyWriteParamNode.h b/tutorial/MyGNode/MyWriteParamNode.h index 131266bf..fb50088c 100644 --- a/tutorial/MyGNode/MyWriteParamNode.h +++ b/tutorial/MyGNode/MyWriteParamNode.h @@ -14,7 +14,7 @@ class MyWriteParamNode : public CGraph::GNode { public: - CStatus init () override { + CStatus init() override { CStatus status; /** * 推荐在init()中,将可能用到的参数创建好。也支持在run的时候创建 @@ -24,7 +24,14 @@ class MyWriteParamNode : public CGraph::GNode { return status; } - CStatus run () override { + CStatus run() override { + /** + * 为了提高执行效率, + * 在【创建参数】的时候,【提供】锁保护机制 + * 在【获取参数】的时候,【不提供】锁保护的机制 + * 故无法通过在run()过程中,并发的通过 createGParam 和 getGParam 创建和获取参数 + * 如果需要做此操作,请自行外部加锁 + */ auto* myParam = CGRAPH_GET_GPARAM_WITH_NO_EMPTY(MyParam, "param1") int val = 0; int cnt = 0;