From 6c16ef83a64c93244b4793e299afdcd5eb3f0302 Mon Sep 17 00:00:00 2001 From: ChunelFeng Date: Sat, 2 Mar 2024 11:22:47 +0800 Subject: [PATCH] [feat] add some notice for threadpool. --- .../GDynamicEngine/GDynamicEngine.cpp | 16 +++++-------- src/GraphCtrl/GraphElement/_GEngine/GEngine.h | 5 +++- .../_GEngine/GStaticEngine/GStaticEngine.cpp | 2 +- src/UtilsCtrl/ThreadPool/Metrics/UMetrics.h | 23 ++++++++----------- 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp b/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp index 7c183c3a..32747f40 100644 --- a/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp +++ b/src/GraphCtrl/GraphElement/_GEngine/GDynamicEngine/GDynamicEngine.cpp @@ -20,7 +20,7 @@ CStatus GDynamicEngine::setup(const GSortedGElementPtrSet& elements) { * 3. 分析当前dag类型信息 */ mark(elements); - GEngine::link(elements); + link(elements); analysisDagType(elements); CGRAPH_FUNCTION_END } @@ -111,18 +111,14 @@ CVoid GDynamicEngine::mark(const GSortedGElementPtrSet& elements) { CVoid GDynamicEngine::analysisDagType(const GSortedGElementPtrSet& elements) { - CSize linkedSize = std::count_if(elements.begin(), elements.end(), [](GElementPtr element) { - return element->linkable_; - }); - - if (1 == front_element_arr_.size() && linkedSize == total_element_arr_.size() - 1) { + if (front_element_arr_.size() == 1 && total_element_arr_.size() - 1 == linked_size_) { /** * 如果所有的信息中,只有一个是非linkable。则说明只有开头的那个是的,且只有一个开头 * 故,这里将其认定为一条 lineal 的情况 * ps: 只有一element的情况,也会被算到 ALL_SERIAL 中去 */ dag_type_ = internal::GEngineDagType::ALL_SERIAL; - } else if (total_end_size_ == total_element_arr_.size()) { + } else if (total_element_arr_.size() == total_end_size_ && front_element_arr_.size() == total_end_size_) { dag_type_ = internal::GEngineDagType::ALL_PARALLEL; } } @@ -138,7 +134,7 @@ CVoid GDynamicEngine::process(GElementPtr element, CBool affinity) { return; } - const auto& exec = [this, element] { + const auto& execute = [this, element] { const CStatus& curStatus = element->fatProcessor(CFunctionType::RUN); if (unlikely(curStatus.isErr())) { // 当且仅当整体状正常,且当前状态异常的时候,进入赋值逻辑。确保不重复赋值 @@ -150,9 +146,9 @@ CVoid GDynamicEngine::process(GElementPtr element, CBool affinity) { if (affinity && CGRAPH_DEFAULT_BINDING_INDEX == element->getBindingIndex()) { // 如果 affinity=true,表示用当前的线程,执行这个逻辑。以便增加亲和性 - exec(); + execute(); } else { - thread_pool_->commit(exec, calcIndex(element)); + thread_pool_->commit(execute, calcIndex(element)); } } diff --git a/src/GraphCtrl/GraphElement/_GEngine/GEngine.h b/src/GraphCtrl/GraphElement/_GEngine/GEngine.h index 192ae96a..c002dd12 100644 --- a/src/GraphCtrl/GraphElement/_GEngine/GEngine.h +++ b/src/GraphCtrl/GraphElement/_GEngine/GEngine.h @@ -53,7 +53,7 @@ class GEngine : public GEngineObject { * @param elements * @return */ - static CVoid link(const GSortedGElementPtrSet& elements) { + CVoid link(const GSortedGElementPtrSet& elements) { /** * 认定图可以连通的判定条件: * 1,当前元素仅有一个依赖 @@ -61,6 +61,7 @@ class GEngine : public GEngineObject { * 3,当前元素的依赖的后继,仍是当前节点 * 4,前后元素绑定机制是一样的 */ + linked_size_ = 0; for (GElementPtr element : elements) { element->linkable_ = false; // 防止出现之前的留存逻辑。确保只有当前链接关系下,需要设置 linkable的,才会设置为 true if (1 == element->dependence_.size() @@ -68,6 +69,7 @@ class GEngine : public GEngineObject { && (*(element->dependence_.begin()))->run_before_.find(element) != (*(element->dependence_.begin()))->run_before_.end() && element->getBindingIndex() == (*(element->dependence_.begin()))->getBindingIndex()) { element->linkable_ = true; + linked_size_++; } } } @@ -76,6 +78,7 @@ class GEngine : public GEngineObject { protected: UThreadPoolPtr thread_pool_ { nullptr }; // 内部执行的线程池 int schedule_strategy_ = CGRAPH_DEFAULT_TASK_STRATEGY; // 调度策略 + CSize linked_size_ = 0; // 标记有多少个element,是 linkable 的数据 friend class GElementManager; friend class GPipeline; diff --git a/src/GraphCtrl/GraphElement/_GEngine/GStaticEngine/GStaticEngine.cpp b/src/GraphCtrl/GraphElement/_GEngine/GStaticEngine/GStaticEngine.cpp index 7fd93956..2b2cacd6 100644 --- a/src/GraphCtrl/GraphElement/_GEngine/GStaticEngine/GStaticEngine.cpp +++ b/src/GraphCtrl/GraphElement/_GEngine/GStaticEngine/GStaticEngine.cpp @@ -12,7 +12,7 @@ CGRAPH_NAMESPACE_BEGIN CStatus GStaticEngine::setup(const GSortedGElementPtrSet& elements) { CGRAPH_FUNCTION_BEGIN - GEngine::link(elements); + link(elements); status = analyse(elements); CGRAPH_FUNCTION_END } diff --git a/src/UtilsCtrl/ThreadPool/Metrics/UMetrics.h b/src/UtilsCtrl/ThreadPool/Metrics/UMetrics.h index d378bb1b..3a4fcff4 100644 --- a/src/UtilsCtrl/ThreadPool/Metrics/UMetrics.h +++ b/src/UtilsCtrl/ThreadPool/Metrics/UMetrics.h @@ -106,9 +106,6 @@ class UMetrics : public CStruct { * @return */ CVoid reset() { -#ifndef _CGRAPH_SHOW_THREAD_METRICS_ - return; -#endif local_pop_real_num_ = 0; local_pop_times_ = 0; pool_pop_real_num_ = 0; @@ -122,16 +119,16 @@ class UMetrics : public CStruct { } private: - CSize local_pop_real_num_ = 0; - CSize local_pop_times_ = 0; - CSize pool_pop_real_num_ = 0; - CSize pool_pop_times_ = 0; - CSize steal_pop_real_num_ = 0; - CSize steal_pop_times_ = 0; - CSize local_push_real_num_ = 0; - CSize local_push_yield_times_ = 0; - CSize fleet_wait_times_ = 0; - CSize deep_wait_times_ = 0; + CSize local_pop_real_num_ = 0; // 本地pop出来数据个数 + CSize local_pop_times_ = 0; // 本地尝试pop的次数 + CSize pool_pop_real_num_ = 0; // 从pool中pop出来的数据个数 + CSize pool_pop_times_ = 0; // 从pool中尝试pop的次数 + CSize steal_pop_real_num_ = 0; // 偷盗获取的数据的个数 + CSize steal_pop_times_ = 0; // 偷盗的次数 + CSize local_push_real_num_ = 0; // 写入的真实次数 + CSize local_push_yield_times_ = 0; // 写入冲突导致yield的次数 + CSize fleet_wait_times_ = 0; // 轻量级等待的次数 + CSize deep_wait_times_ = 0; // 深度等待的次数(触发了cv的wait机制) friend class UThreadBase; friend class UThreadPrimary;