Skip to content

Commit

Permalink
[feat] add some notice for threadpool.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Mar 2, 2024
1 parent c374681 commit 6c16ef8
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ CStatus GDynamicEngine::setup(const GSortedGElementPtrSet& elements) {
* 3. 分析当前dag类型信息
*/
mark(elements);
GEngine::link(elements);
link(elements);
analysisDagType(elements);
CGRAPH_FUNCTION_END
}
Expand Down Expand Up @@ -111,18 +111,14 @@ CVoid GDynamicEngine::mark(const GSortedGElementPtrSet& elements) {


CVoid GDynamicEngine::analysisDagType(const GSortedGElementPtrSet& elements) {
CSize linkedSize = std::count_if(elements.begin(), elements.end(), [](GElementPtr element) {
return element->linkable_;
});

if (1 == front_element_arr_.size() && linkedSize == total_element_arr_.size() - 1) {
if (front_element_arr_.size() == 1 && total_element_arr_.size() - 1 == linked_size_) {
/**
* 如果所有的信息中,只有一个是非linkable。则说明只有开头的那个是的,且只有一个开头
* 故,这里将其认定为一条 lineal 的情况
* ps: 只有一element的情况,也会被算到 ALL_SERIAL 中去
*/
dag_type_ = internal::GEngineDagType::ALL_SERIAL;
} else if (total_end_size_ == total_element_arr_.size()) {
} else if (total_element_arr_.size() == total_end_size_ && front_element_arr_.size() == total_end_size_) {
dag_type_ = internal::GEngineDagType::ALL_PARALLEL;
}
}
Expand All @@ -138,7 +134,7 @@ CVoid GDynamicEngine::process(GElementPtr element, CBool affinity) {
return;
}

const auto& exec = [this, element] {
const auto& execute = [this, element] {
const CStatus& curStatus = element->fatProcessor(CFunctionType::RUN);
if (unlikely(curStatus.isErr())) {
// 当且仅当整体状正常,且当前状态异常的时候,进入赋值逻辑。确保不重复赋值
Expand All @@ -150,9 +146,9 @@ CVoid GDynamicEngine::process(GElementPtr element, CBool affinity) {
if (affinity
&& CGRAPH_DEFAULT_BINDING_INDEX == element->getBindingIndex()) {
// 如果 affinity=true,表示用当前的线程,执行这个逻辑。以便增加亲和性
exec();
execute();
} else {
thread_pool_->commit(exec, calcIndex(element));
thread_pool_->commit(execute, calcIndex(element));
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/GraphCtrl/GraphElement/_GEngine/GEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,23 @@ class GEngine : public GEngineObject {
* @param elements
* @return
*/
static CVoid link(const GSortedGElementPtrSet& elements) {
CVoid link(const GSortedGElementPtrSet& elements) {
/**
* 认定图可以连通的判定条件:
* 1,当前元素仅有一个依赖
* 2,当前元素依赖的节点,只有一个后继
* 3,当前元素的依赖的后继,仍是当前节点
* 4,前后元素绑定机制是一样的
*/
linked_size_ = 0;
for (GElementPtr element : elements) {
element->linkable_ = false; // 防止出现之前的留存逻辑。确保只有当前链接关系下,需要设置 linkable的,才会设置为 true
if (1 == element->dependence_.size()
&& 1 == (*element->dependence_.begin())->run_before_.size()
&& (*(element->dependence_.begin()))->run_before_.find(element) != (*(element->dependence_.begin()))->run_before_.end()
&& element->getBindingIndex() == (*(element->dependence_.begin()))->getBindingIndex()) {
element->linkable_ = true;
linked_size_++;
}
}
}
Expand All @@ -76,6 +78,7 @@ class GEngine : public GEngineObject {
protected:
UThreadPoolPtr thread_pool_ { nullptr }; // 内部执行的线程池
int schedule_strategy_ = CGRAPH_DEFAULT_TASK_STRATEGY; // 调度策略
CSize linked_size_ = 0; // 标记有多少个element,是 linkable 的数据

friend class GElementManager;
friend class GPipeline;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ CGRAPH_NAMESPACE_BEGIN

CStatus GStaticEngine::setup(const GSortedGElementPtrSet& elements) {
CGRAPH_FUNCTION_BEGIN
GEngine::link(elements);
link(elements);
status = analyse(elements);
CGRAPH_FUNCTION_END
}
Expand Down
23 changes: 10 additions & 13 deletions src/UtilsCtrl/ThreadPool/Metrics/UMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ class UMetrics : public CStruct {
* @return
*/
CVoid reset() {
#ifndef _CGRAPH_SHOW_THREAD_METRICS_
return;
#endif
local_pop_real_num_ = 0;
local_pop_times_ = 0;
pool_pop_real_num_ = 0;
Expand All @@ -122,16 +119,16 @@ class UMetrics : public CStruct {
}

private:
CSize local_pop_real_num_ = 0;
CSize local_pop_times_ = 0;
CSize pool_pop_real_num_ = 0;
CSize pool_pop_times_ = 0;
CSize steal_pop_real_num_ = 0;
CSize steal_pop_times_ = 0;
CSize local_push_real_num_ = 0;
CSize local_push_yield_times_ = 0;
CSize fleet_wait_times_ = 0;
CSize deep_wait_times_ = 0;
CSize local_pop_real_num_ = 0; // 本地pop出来数据个数
CSize local_pop_times_ = 0; // 本地尝试pop的次数
CSize pool_pop_real_num_ = 0; // 从pool中pop出来的数据个数
CSize pool_pop_times_ = 0; // 从pool中尝试pop的次数
CSize steal_pop_real_num_ = 0; // 偷盗获取的数据的个数
CSize steal_pop_times_ = 0; // 偷盗的次数
CSize local_push_real_num_ = 0; // 写入的真实次数
CSize local_push_yield_times_ = 0; // 写入冲突导致yield的次数
CSize fleet_wait_times_ = 0; // 轻量级等待的次数
CSize deep_wait_times_ = 0; // 深度等待的次数(触发了cv的wait机制)

friend class UThreadBase;
friend class UThreadPrimary;
Expand Down

0 comments on commit 6c16ef8

Please sign in to comment.