Skip to content

Commit

Permalink
[example] optimize third flow
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Sep 22, 2023
1 parent 2ecc6c9 commit 5fa2b9c
Showing 1 changed file with 23 additions and 17 deletions.
40 changes: 23 additions & 17 deletions example/E03-ThirdFlow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ struct ResultMParam : public GMessageParam {
std::string eng_info_;
};

class InputGNode : public GNode {
public:
CStatus run() override {
for (int i = 0; i < 30; i++) {
std::unique_ptr<InputMParam> input(new InputMParam());
randomSleep(1, 5); // 间隔1~6ms,发送一次
input->num_ = std::abs((int)std::random_device{}()) % 5 + 1;
CGRAPH_SEND_MPARAM(InputMParam, INPUT_TOPIC_NAME, input, GMessagePushStrategy::WAIT);
}

CGRAPH_ECHO("InputGNode run finished");
return CStatus();
}
};

class ProcessGNode : public GNode {
public:
Expand All @@ -44,7 +58,7 @@ class ProcessGNode : public GNode {
break; // 一阵子收不到消息了,就自动停止好了
}

int ms = randomSleep(1, 200); // 模拟处理流程,随机休息不超过 200ms
int ms = randomSleep(1, 100); // 模拟处理流程,随机休息不超过 100ms
std::unique_ptr<ResultMParam> result(new ResultMParam);
switch (input->num_) {
case 1: result->eng_info_ = "one"; break;
Expand Down Expand Up @@ -89,25 +103,17 @@ void example_third_flow() {
CGRAPH_CREATE_MESSAGE_TOPIC(ResultMParam, RESULT_TOPIC_NAME, 16);

auto pipeline = GPipelineFactory::create();
GElementPtr a, b = nullptr;
pipeline->registerGElement<ProcessGNode>(&a);
pipeline->registerGElement<ResultGNode>(&b);
GElementPtr input, process, result = nullptr;
pipeline->registerGElement<InputGNode>(&input);
pipeline->registerGElement<ProcessGNode>(&process);
pipeline->registerGElement<ResultGNode>(&result);

UThreadPoolConfig config;
config.secondary_thread_size_ = 2;
pipeline->setUniqueThreadPoolConfig(config);

auto fut = pipeline->asyncProcess();

// 在这里,一直制造input信息,然后放到后面的两个流中去处理
for (int i = 0; i < 30; i++) {
std::unique_ptr<InputMParam> input(new InputMParam());
randomSleep(1, 5); // 间隔1~6ms,发送一次
input->num_ = std::abs((int)std::random_device{}()) % 5 + 1;
CGRAPH_SEND_MPARAM(InputMParam, INPUT_TOPIC_NAME, input, GMessagePushStrategy::WAIT);
}
config.default_thread_size_ = 3;
config.secondary_thread_size_ = 0;
pipeline->setUniqueThreadPoolConfig(config); // 设置3个线程执行

fut.wait(); // 等待pipeline执行结束
pipeline->process();
CGRAPH_CLEAR_MESSAGES()
GPipelineFactory::clear();
}
Expand Down

0 comments on commit 5fa2b9c

Please sign in to comment.