diff --git a/example/E03-ThirdFlow.cpp b/example/E03-ThirdFlow.cpp index f8888502..fd2a636b 100644 --- a/example/E03-ThirdFlow.cpp +++ b/example/E03-ThirdFlow.cpp @@ -33,6 +33,20 @@ struct ResultMParam : public GMessageParam { std::string eng_info_; }; +class InputGNode : public GNode { +public: + CStatus run() override { + for (int i = 0; i < 30; i++) { + std::unique_ptr input(new InputMParam()); + randomSleep(1, 5); // 间隔1~6ms,发送一次 + input->num_ = std::abs((int)std::random_device{}()) % 5 + 1; + CGRAPH_SEND_MPARAM(InputMParam, INPUT_TOPIC_NAME, input, GMessagePushStrategy::WAIT); + } + + CGRAPH_ECHO("InputGNode run finished"); + return CStatus(); + } +}; class ProcessGNode : public GNode { public: @@ -44,7 +58,7 @@ class ProcessGNode : public GNode { break; // 一阵子收不到消息了,就自动停止好了 } - int ms = randomSleep(1, 200); // 模拟处理流程,随机休息不超过 200ms + int ms = randomSleep(1, 100); // 模拟处理流程,随机休息不超过 100ms std::unique_ptr result(new ResultMParam); switch (input->num_) { case 1: result->eng_info_ = "one"; break; @@ -89,25 +103,17 @@ void example_third_flow() { CGRAPH_CREATE_MESSAGE_TOPIC(ResultMParam, RESULT_TOPIC_NAME, 16); auto pipeline = GPipelineFactory::create(); - GElementPtr a, b = nullptr; - pipeline->registerGElement(&a); - pipeline->registerGElement(&b); + GElementPtr input, process, result = nullptr; + pipeline->registerGElement(&input); + pipeline->registerGElement(&process); + pipeline->registerGElement(&result); UThreadPoolConfig config; - config.secondary_thread_size_ = 2; - pipeline->setUniqueThreadPoolConfig(config); - - auto fut = pipeline->asyncProcess(); - - // 在这里,一直制造input信息,然后放到后面的两个流中去处理 - for (int i = 0; i < 30; i++) { - std::unique_ptr input(new InputMParam()); - randomSleep(1, 5); // 间隔1~6ms,发送一次 - input->num_ = std::abs((int)std::random_device{}()) % 5 + 1; - CGRAPH_SEND_MPARAM(InputMParam, INPUT_TOPIC_NAME, input, GMessagePushStrategy::WAIT); - } + config.default_thread_size_ = 3; + config.secondary_thread_size_ = 0; + pipeline->setUniqueThreadPoolConfig(config); // 设置3个线程执行 - fut.wait(); // 等待pipeline执行结束 + pipeline->process(); CGRAPH_CLEAR_MESSAGES() GPipelineFactory::clear(); }