Skip to content

Commit

Permalink
[perf] optimize serial situation, from 7s->4.5s, in test-perf-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Feb 7, 2024
1 parent fcef1a9 commit 183e63a
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@

CGRAPH_NAMESPACE_BEGIN

#define CGRAPH_SMALL_VECTOR_MAX_SIZE 16

CStatus GDynamicEngine::setup(const GSortedGElementPtrSet& elements) {
CGRAPH_FUNCTION_BEGIN
link(elements);

// 给所有的值清空
total_element_arr_.clear();
Expand Down Expand Up @@ -59,7 +62,7 @@ CStatus GDynamicEngine::afterRunCheck() {
*/
for (GElementCPtr element : total_element_arr_) {
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(!element->done_, \
element->getName() + ": dynamic engine run, check not finished...")
element->getName() + ": dynamic engine, check not run it...")
}
}

Expand Down Expand Up @@ -91,8 +94,13 @@ CVoid GDynamicEngine::beforeRun() {


CVoid GDynamicEngine::process(GElementPtr element, CBool affinity) {
if (unlikely(cur_status_.isErr())) {
return; // 如果已经有异常逻辑,则直接停止当前流程
if (unlikely(cur_status_.isErr() || element->done_)) {
/**
* 如果已经有异常逻辑,
* 或者传入的element,是已经执行过的了(理论上不会出现这种情况,由于提升性能的原因,取消了atomic计数的逻辑,故添加这一处判定,防止意外情况)
* 则直接停止当前流程
*/
return;
}

const auto& exec = [this, element] {
Expand All @@ -116,44 +124,49 @@ CVoid GDynamicEngine::process(GElementPtr element, CBool affinity) {

CVoid GDynamicEngine::afterElementRun(GElementPtr element) {
element->done_ = true;

if (!element->run_before_.empty() && cur_status_.isOK()) {
#ifndef _WIN32
/**
* 使用原来 std::vector<GElementPtr> ready 的分配方式,
* 在多次(例子为 32次)反复递归调用这里的时候,会造成较多的上下文切换,从而影响整体效率
* 故 在 mac 和 linux 环境上,使用 GElementPtr ready[maxSize]; 的方式进行分配
* 具体参考 https://github.com/ChunelFeng/CGraph/issues/343
*/
const CSize maxSize = element->run_before_.size();
GElementPtr ready[maxSize];
CSize realSize = 0;
for (auto* cur : element->run_before_) {
if (--cur->left_depend_ <= 0) {
ready[realSize] = cur;
realSize++;
auto curSize = element->run_before_.size();
if (1 == curSize && (*element->run_before_.begin())->linkable_) {
// 针对只有唯一后继的情况,做特殊判定
process(*(element->run_before_.begin()), true);
} else if (curSize < CGRAPH_SMALL_VECTOR_MAX_SIZE) {
/**
* 使用原来 std::vector<GElementPtr> ready 的分配方式,
* 在多次(自行执行的测例为 32 次)反复递归调用这里的时候,会造成较多的上下文切换,从而影响整体效率
* 故在有少量依赖的情况下,直接使用 本地数组来实现这个功能。
* 理论上大部分逻辑,均会走这个分支逻辑
* 后期考虑替换为 small-vector的逻辑实现
* 具体参考 https://github.com/ChunelFeng/CGraph/issues/343
*/
GElementPtr ready[CGRAPH_SMALL_VECTOR_MAX_SIZE];
CSize realSize = 0;
for (auto* cur : element->run_before_) {
if (--cur->left_depend_ <= 0) {
ready[realSize] = cur;
realSize++;
}
}
}

for (CSize i = 0; i < realSize; i++) {
process(ready[i], i == (realSize - 1));
}
#else
/**
* 在 windows 环境下,无法直接使用 GElementPtr ready[maxSize]; 的定义方式
* 故在 windows下,保留使用 std::vector<GElementPtr> ready; 的定义方式
*/
std::vector<GElementPtr> ready; // 表示可以执行的列表信息
for (auto* cur : element->run_before_) {
if (--cur->left_depend_ <= 0) {
ready.emplace_back(cur);
for (CSize i = 0; i < realSize; i++) {
process(ready[i], i == (realSize - 1));
}
} else {
/**
* 同上面的执行逻辑,完全一致
* 仅在后继节点较多的情况下,做兜底逻辑处理使用
*/
std::vector<GElementPtr> ready; // 表示可以执行的列表信息
ready.reserve(element->run_before_.size());
for (auto* cur : element->run_before_) {
if (--cur->left_depend_ <= 0) {
ready.emplace_back(cur);
}
}
}

for (auto& cur : ready) {
process(cur, cur == ready.back());
for (auto& cur : ready) {
process(cur, cur == ready.back());
}
}
#endif
} else {
CGRAPH_LOCK_GUARD lock(lock_);
/**
Expand Down
25 changes: 24 additions & 1 deletion src/GraphCtrl/GraphElement/_GEngine/GEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,33 @@ class GEngine : public GEngineObject {
? schedule_strategy_ : bindingIndex;
}

/**
* 分析所有的可以设置 linkable 的数据
* @param elements
* @return
*/
CVoid link(const GSortedGElementPtrSet& elements) {
/**
* 认定图可以连通的判定条件:
* 1,当前元素仅有一个依赖
* 2,当前元素依赖的节点,只有一个后继
* 3,当前元素的依赖的后继,仍是当前节点
* 4,前后元素绑定机制是一样的
*/
for (GElementPtr element : elements) {
element->linkable_ = false; // 防止出现之前的留存逻辑。确保只有当前链接关系下,需要设置 linkable的,才会设置为 true
if (1 == element->dependence_.size()
&& 1 == (*element->dependence_.begin())->run_before_.size()
&& (*(element->dependence_.begin()))->run_before_.find(element) != (*(element->dependence_.begin()))->run_before_.end()
&& element->getBindingIndex() == (*(element->dependence_.begin()))->getBindingIndex()) {
element->linkable_ = true;
}
}
}


protected:
UThreadPoolPtr thread_pool_ { nullptr }; // 内部执行的线程池
CUint total_element_size_ = 0; // 总的element的数量
int schedule_strategy_ = CGRAPH_DEFAULT_TASK_STRATEGY; // 调度策略

friend class GElementManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,17 @@ CGRAPH_NAMESPACE_BEGIN

CStatus GStaticEngine::setup(const GSortedGElementPtrSet& elements) {
CGRAPH_FUNCTION_BEGIN
status = mark(elements);
CGRAPH_FUNCTION_CHECK_STATUS

link(elements);
status = analyse(elements);
CGRAPH_FUNCTION_END
}


CStatus GStaticEngine::mark(const GSortedGElementPtrSet& elements) {
CGRAPH_FUNCTION_BEGIN
total_element_size_ = (CUint)elements.size(); // 仅赋值一次,不会改变了

/**
* 认定图可以连通的判定条件:
* 1,当前元素仅有一个依赖
* 2,当前元素依赖的节点,只有一个后继
* 3,当前元素的依赖的后继,仍是当前节点
* 4,前后元素绑定机制是一样的
*/
for (GElementPtr element : elements) {
if (1 == element->dependence_.size()
&& 1 == (*element->dependence_.begin())->run_before_.size()
&& (*(element->dependence_.begin()))->run_before_.find(element) != (*(element->dependence_.begin()))->run_before_.end()
&& element->getBindingIndex() == (*(element->dependence_.begin()))->getBindingIndex()) {
element->linkable_ = true;
}
}

CGRAPH_FUNCTION_END
}


CStatus GStaticEngine::analyse(const GSortedGElementPtrSet& elements) {
CGRAPH_FUNCTION_BEGIN
run_element_size_ = 0;
para_cluster_arrs_.clear();
total_element_size_ = (CUint)elements.size(); // 仅赋值一次,不会改变了

GClusterArr curClusterArr; // 记录每一层,可以并行的逻辑
for (GElementPtr element : elements) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@ class GStaticEngine : public GEngine {

CStatus afterRunCheck() override;

/**
* 对数据进行标记和整理
* @param elements
* @return
*/
CStatus mark(const GSortedGElementPtrSet& elements);

/**
* 将所有注册到 pipeline 中的信息,解析到 para_cluster_arrs_ 中
* @param elements
Expand All @@ -41,6 +34,7 @@ class GStaticEngine : public GEngine {
private:
ParaWorkedClusterArrs para_cluster_arrs_; // 可以并行的cluster数组
CUint run_element_size_ = 0; // 当前已经执行的element的数量
CUint total_element_size_ = 0; // 总的element的数量

friend class UAllocator;
};
Expand Down
2 changes: 0 additions & 2 deletions test/Performance/test-performance-03.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ void test_performance_03() {
config.primary_thread_empty_interval_ = 0;
config.primary_thread_busy_epoch_ = 500;
config.monitor_enable_ = false; // 关闭扩缩容机制
config.primary_thread_policy_ = CGRAPH_THREAD_SCHED_RR;
config.primary_thread_priority_ = 10;
pipeline->setUniqueThreadPoolConfig(config);
pipeline->setAutoCheck(false);
pipeline->registerGElement<TestAdd1GNode>(&a);
Expand Down

0 comments on commit 183e63a

Please sign in to comment.