Skip to content

Commit

Permalink
[perf] fix CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL bigger, so always wai…
Browse files Browse the repository at this point in the history
…t problem.
  • Loading branch information
ChunelFeng committed Jan 6, 2024
1 parent d51d60e commit 6a05469
Show file tree
Hide file tree
Showing 15 changed files with 107 additions and 55 deletions.
4 changes: 4 additions & 0 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class UThreadBase : public UThreadObject {
*/
CVoid reset() {
done_ = false;
cv_.notify_one(); // 防止主线程 wait时间过长,导致的结束缓慢问题
if (thread_.joinable()) {
thread_.join(); // 等待线程结束
}
Expand Down Expand Up @@ -237,7 +238,10 @@ class UThreadBase : public UThreadObject {
UAtomicQueue<UTask>* pool_task_queue_; // 用于存放线程池中的普通任务
UAtomicPriorityQueue<UTask>* pool_priority_task_queue_; // 用于存放线程池中的包含优先级任务的队列,仅辅助线程可以执行
UThreadPoolConfigPtr config_ = nullptr; // 配置参数信息

std::thread thread_; // 线程类
std::mutex mutex_;
std::condition_variable cv_;
};

CGRAPH_NAMESPACE_END
Expand Down
4 changes: 1 addition & 3 deletions src/UtilsCtrl/ThreadPool/Thread/UThreadPrimary.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class UThreadPrimary : public UThreadBase {
return false;
}


/**
* 窃取的时候,仅从相邻的primary线程中窃取
* 待窃取相邻的数量,不能超过默认primary线程数
Expand Down Expand Up @@ -255,9 +256,6 @@ class UThreadPrimary : public UThreadBase {
std::vector<UThreadPrimary *>* pool_threads_; // 用于存放线程池中的线程信息
std::vector<int> steal_targets_; // 被偷的目标信息

std::mutex mutex_;
std::condition_variable cv_;

friend class UThreadPool;
friend class UAllocator;
};
Expand Down
2 changes: 1 addition & 1 deletion src/UtilsCtrl/ThreadPool/Thread/UThreadSecondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class UThreadSecondary : public UThreadBase {
if (popPoolTask(task)) {
runTask(task);
} else {
// 如果单词无法获取,则稍加等待
// 如果单次无法获取,则稍加等待
waitRunTask(config_->queue_emtpy_interval_);
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/UtilsCtrl/ThreadPool/UThreadPoolConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ struct UThreadPoolConfig : public CStruct {
}

if (default_thread_size_ + secondary_thread_size_ > max_thread_size_) {
CGRAPH_RETURN_ERROR_STATUS("max thread size is less than default + secondary thread")
CGRAPH_RETURN_ERROR_STATUS("max thread size is less than default + secondary thread. [" \
+ std::to_string(max_thread_size_) + "<" + std::to_string(default_thread_size_) + "+" \
+ std::to_string(secondary_thread_size_) + "]");
}

if (monitor_enable_ && monitor_span_ <= 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/UtilsCtrl/ThreadPool/UThreadPoolDefine.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static const int CGRAPH_MAX_LOCAL_BATCH_SIZE = 2;
static const int CGRAPH_MAX_POOL_BATCH_SIZE = 2; // 批量执行通用任务最大值
static const int CGRAPH_MAX_STEAL_BATCH_SIZE = 2; // 批量盗取任务最大值
static const int CGRAPH_PRIMARY_THREAD_BUSY_EPOCH = 10; // 主线程进入wait状态的轮数,数值越大,理论性能越高,但空转可能性也越大
static const CMSec CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL = 3; // 主线程进入休眠状态的默认时间
static const CMSec CGRAPH_PRIMARY_THREAD_EMPTY_INTERVAL = 50; // 主线程进入休眠状态的默认时间
static const CSec CGRAPH_SECONDARY_THREAD_TTL = 10; // 辅助线程ttl,单位为s
static const bool CGRAPH_MONITOR_ENABLE = false; // 是否开启监控程序
static const CSec CGRAPH_MONITOR_SPAN = 5; // 监控线程执行间隔,单位为s
Expand Down
1 change: 1 addition & 0 deletions test/Functional/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ set(CGRAPH_FUNCTIONAL_LIST
test-functional-02
test-functional-03
test-functional-04
test-functional-05
)

foreach(func ${CGRAPH_FUNCTIONAL_LIST})
Expand Down
22 changes: 11 additions & 11 deletions test/Functional/test-functional-01.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ void test_functional_01() {
GPipelinePtr pipeline = GPipelineFactory::create();
CStatus status;
GElementPtr a, b, c, d, e, f, g, h, i, j = nullptr;
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&a, {});
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&b, {});
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&c, {a});
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&d, {b});
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&e, {b, c});
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&f, {c});
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&g, {d, e, f});
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&h, {f});
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&i, {g, h});
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&j, {h});
status += pipeline->registerGElement<TestAdd1GNode>(&a, {});
status += pipeline->registerGElement<TestAdd1GNode>(&b, {});
status += pipeline->registerGElement<TestAdd1GNode>(&c, {a});
status += pipeline->registerGElement<TestAdd1GNode>(&d, {b});
status += pipeline->registerGElement<TestAdd1GNode>(&e, {b, c});
status += pipeline->registerGElement<TestAdd1GNode>(&f, {c});
status += pipeline->registerGElement<TestAdd1GNode>(&g, {d, e, f});
status += pipeline->registerGElement<TestAdd1GNode>(&h, {f});
status += pipeline->registerGElement<TestAdd1GNode>(&i, {g, h});
status += pipeline->registerGElement<TestAdd1GNode>(&j, {h});

{
UTimeCounter counter("test_functional_01");
status = pipeline->process(500000);
status = pipeline->process(100000);
}

if (status.isErr()) {
Expand Down
30 changes: 15 additions & 15 deletions test/Functional/test-functional-02.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,37 @@ void test_functional_02() {
GElementPtr a,b,c,d,e,f,g,h,i,j,k,l,m,n = nullptr;
GElementPtr region1, region2, cluster1, cluster2 = nullptr;

status += pipeline->registerGElement<TestMaterialAdd1GNode>(&a, {}, "a");
status += pipeline->registerGElement<TestAdd1GNode>(&a, {}, "a");

b = pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo({}, "b"));
c = pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo({}, "c"));
b = pipeline->createGNode<TestAdd1GNode>(GNodeInfo({}, "b"));
c = pipeline->createGNode<TestAdd1GNode>(GNodeInfo({}, "c"));

d = pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo({}, "d"));
e = pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo("e", 3));
f = pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo("f"));
d = pipeline->createGNode<TestAdd1GNode>(GNodeInfo({}, "d"));
e = pipeline->createGNode<TestAdd1GNode>(GNodeInfo("e", 3));
f = pipeline->createGNode<TestAdd1GNode>(GNodeInfo("f"));
cluster1 = pipeline->createGGroup<GCluster>({e, f}, {d}, "cluster1");

g = pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo({d}, "g"));
g = pipeline->createGNode<TestAdd1GNode>(GNodeInfo({d}, "g"));
region2 = pipeline->createGGroup<GRegion>({d, cluster1, g}, {}, "region2", 2);

region1 = pipeline->createGGroup<GRegion>({b, c, region2});

i = pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo("i"));
j = pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo("j"));
k = pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo("k"));
i = pipeline->createGNode<TestAdd1GNode>(GNodeInfo("i"));
j = pipeline->createGNode<TestAdd1GNode>(GNodeInfo("j"));
k = pipeline->createGNode<TestAdd1GNode>(GNodeInfo("k"));
cluster2 = pipeline->createGGroup<GCluster>({i, j, k}, {a, region1}, "cluster2");

status += pipeline->registerGElement<GRegion>(&region1, {}, "region1", 3);
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&h, {region1}, "h");
status += pipeline->registerGElement<TestAdd1GNode>(&h, {region1}, "h");
status += pipeline->registerGElement<GCluster>(&cluster2, {a, region1}, "cluster2");
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&l, {a}, "l");
status += pipeline->registerGElement<TestAdd1GNode>(&l, {a}, "l");

status += pipeline->registerGElement<TestMaterialAdd1GNode>(&m, {h, cluster2}, "m");
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&n, {l, cluster2}, "n");
status += pipeline->registerGElement<TestAdd1GNode>(&m, {h, cluster2}, "m");
status += pipeline->registerGElement<TestAdd1GNode>(&n, {l, cluster2}, "n");

{
UTimeCounter counter("test_functional_02");
status = pipeline->process(10000);
status += pipeline->process(1000);
}

if (status.isErr()) {
Expand Down
22 changes: 11 additions & 11 deletions test/Functional/test-functional-03.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,32 @@ void test_functional_03() {
GElementPtr a, b_cluster, c, d_region, e = nullptr;

b_cluster = pipeline->createGGroup<GCluster>({
pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo("nodeB1", 1)), // 创建名为nodeB1的node信息,并将其放入b_cluster中
pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo("nodeB2", 3)), // 创建名为nodeB2且自循环3次的node信息,并将其放入b_cluster中
pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo("nodeB3", 1))
pipeline->createGNode<TestAdd1GNode>(GNodeInfo("nodeB1", 1)), // 创建名为nodeB1的node信息,并将其放入b_cluster中
pipeline->createGNode<TestAdd1GNode>(GNodeInfo("nodeB2", 3)), // 创建名为nodeB2且自循环3次的node信息,并将其放入b_cluster中
pipeline->createGNode<TestAdd1GNode>(GNodeInfo("nodeB3", 1))
});

GElementPtr d1, d2, d3, d4, d23_cluster = nullptr;
d1 = pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo({}, "nodeD1", 1));
d2 = pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo("nodeD2", 1)); // 创建node,稍后放入cluster中
d3 = pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo("nodeD3", 1));
d1 = pipeline->createGNode<TestAdd1GNode>(GNodeInfo({}, "nodeD1", 1));
d2 = pipeline->createGNode<TestAdd1GNode>(GNodeInfo("nodeD2", 1)); // 创建node,稍后放入cluster中
d3 = pipeline->createGNode<TestAdd1GNode>(GNodeInfo("nodeD3", 1));
d23_cluster = pipeline->createGGroup<GCluster>({d2, d3}, {d1}, "clusterD23", 1);
d4 = pipeline->createGNode<TestMaterialAdd1GNode>(GNodeInfo({d1}, "nodeD4", 1));
d4 = pipeline->createGNode<TestAdd1GNode>(GNodeInfo({d1}, "nodeD4", 1));
d_region = pipeline->createGGroup<GRegion>({d1, d23_cluster, d4}); // 创建名为d_region的region信息,并将{d1,d23_cluster,d4}放入其中

status += pipeline->registerGElement<TestMaterialAdd1GNode>(&a, {}, "nodeA", 1);
status += pipeline->registerGElement<TestAdd1GNode>(&a, {}, "nodeA", 1);
status += pipeline->registerGGroup(&b_cluster, {}, "clusterB", 1);
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&c, {a, b_cluster}, "nodeC", 1);
status += pipeline->registerGElement<TestAdd1GNode>(&c, {a, b_cluster}, "nodeC", 1);
status += pipeline->registerGGroup(&d_region, {a, b_cluster}, "regionD", 2); // 将名为regionD,依赖{a,b_cluster}执行且自循环2次的region信息,注册入pipeline中
status += pipeline->registerGElement<TestMaterialAdd1GNode>(&e, {c, d_region}, "nodeE", 1);
status += pipeline->registerGElement<TestAdd1GNode>(&e, {c, d_region}, "nodeE", 1);
if (!status.isOK()) {
return;
}

{
UTimeCounter counter("test_functional_03");
pipeline->addGAspect<TestMaterialAdd1GAspect>();
status = pipeline->process(50000);
status = pipeline->process(2000);
}

if (g_test_node_cnt % 58 != 0) {
Expand Down
2 changes: 1 addition & 1 deletion test/Functional/test-functional-04.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ using namespace CGraph;

void test_functional_04() {
const int HALF_ARR_SIZE = 32;
const int RUN_TIMES = 200000;
const int RUN_TIMES = 100000;
CGRAPH_CREATE_MESSAGE_TOPIC(TestGMessageParam, g_test_message_key, 100)
std::unique_ptr<TestGMessageParam> mp(new TestGMessageParam());
CGRAPH_SEND_MPARAM(TestGMessageParam, g_test_message_key, mp, GMessagePushStrategy::WAIT)
Expand Down
39 changes: 39 additions & 0 deletions test/Functional/test-functional-05.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/***************************
@Author: Chunel
@Contact: [email protected]
@File: test-functional-05.cpp
@Time: 2024/1/6 16:45
@Desc:
***************************/


#include "../_Materials/TestInclude.h"

using namespace CGraph;

void test_functional_05() {
GPipelinePtr pipeline = GPipelineFactory::create();
CStatus status;
GElementPtr a, b, c, d = nullptr;
status += pipeline->registerGElement<TestReturnErrorGNode>(&a, {});
status += pipeline->registerGElement<TestReturnErrorGNode>(&b, {});
status += pipeline->registerGElement<TestReturnErrorGNode>(&c, {});
status += pipeline->registerGElement<TestReturnErrorGNode>(&d, {});

pipeline->init();
{
UTimeCounter counter("test_functional_05");
for (int i = 0; i < 30; i++) {
pipeline->run();
}
}

pipeline->destroy();
GPipelineFactory::remove(pipeline);
}


int main() {
test_functional_05();
return 0;
}
2 changes: 1 addition & 1 deletion test/Performance/test-performance-01.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ void test_performance_01() {
config.monitor_enable_ = false; // 关闭扩缩容机制
pipeline->setUniqueThreadPoolConfig(config);
for (auto& i : arr) {
pipeline->registerGElement<TestMaterialAdd1GNode>(&i);
pipeline->registerGElement<TestAdd1GNode>(&i);
}
pipeline->setAutoCheck(false);
status += pipeline->init();
Expand Down
4 changes: 2 additions & 2 deletions test/Performance/test-performance-02.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ void test_performance_02() {
GPipelinePtr pipeline = GPipelineFactory::create();
CStatus status;
GElementPtr arr[32];
pipeline->registerGElement<TestMaterialAdd1GNode>(&arr[0]);
pipeline->registerGElement<TestAdd1GNode>(&arr[0]);
for (int i = 1; i < 32; i++) {
pipeline->registerGElement<TestMaterialAdd1GNode>(&arr[i], {arr[i - 1]});
pipeline->registerGElement<TestAdd1GNode>(&arr[i], {arr[i - 1]});
}
pipeline->makeSerial();
pipeline->setAutoCheck(false);
Expand Down
12 changes: 6 additions & 6 deletions test/Performance/test-performance-03.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ void test_performance_03() {
config.primary_thread_priority_ = 10;
pipeline->setUniqueThreadPoolConfig(config);
pipeline->setAutoCheck(false);
pipeline->registerGElement<TestMaterialAdd1GNode>(&a);
pipeline->registerGElement<TestMaterialAdd1GNode>(&b1, {a});
pipeline->registerGElement<TestMaterialAdd1GNode>(&b2, {b1});
pipeline->registerGElement<TestMaterialAdd1GNode>(&c1, {a});
pipeline->registerGElement<TestMaterialAdd1GNode>(&c2, {c1});
pipeline->registerGElement<TestMaterialAdd1GNode>(&d, {b2, c2});
pipeline->registerGElement<TestAdd1GNode>(&a);
pipeline->registerGElement<TestAdd1GNode>(&b1, {a});
pipeline->registerGElement<TestAdd1GNode>(&b2, {b1});
pipeline->registerGElement<TestAdd1GNode>(&c1, {a});
pipeline->registerGElement<TestAdd1GNode>(&c2, {c1});
pipeline->registerGElement<TestAdd1GNode>(&d, {b2, c2});
status += pipeline->init();

{
Expand Down
12 changes: 10 additions & 2 deletions test/_Materials/TestGNodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include "TestCommonDefine.h"
#include "TestGParams.h"

class TestMaterialAdd1GNode : public CGraph::GNode {
class TestAdd1GNode : public CGraph::GNode {
public:
CStatus init() override {
g_test_node_cnt = 0;
Expand All @@ -28,14 +28,22 @@ class TestMaterialAdd1GNode : public CGraph::GNode {

CStatus destroy() override {
CStatus status;
if (0 != g_test_node_cnt % 10000) {
if (0 != g_test_node_cnt % 1000) {
status.setErrorInfo("test node count is " + std::to_string(g_test_node_cnt.load()));
}
return status;
}
};


class TestReturnErrorGNode : public CGraph::GNode {
public:
CStatus run() override {
return CStatus("test error return, no real problem");
}
};


class TestRecvMessageGNode : public CGraph::GNode {
public:
CStatus run() override {
Expand Down

0 comments on commit 6a05469

Please sign in to comment.