diff --git a/CMakeLists.txt b/CMakeLists.txt index 2955e806..04522185 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -42,9 +42,10 @@ set(CGRAPH_TUTORIAL_LIST T19-Cancel T20-YieldResume T21-MultiCondition + T22-Some # 以下为工具类tutorial - # TU01-ThreadPool + TU01-ThreadPool # TU02-Lru # TU03-Trie # TU04-Timer diff --git a/README.md b/README.md index 64aabf02..9ea4b8e8 100644 --- a/README.md +++ b/README.md @@ -320,9 +320,10 @@ int main() { [2023.07.12 - v2.4.3 - Chunel] * 优化了`CStatus`功能,添加了异常定位信息 -[2023.08.06 - v2.5.0 - Chunel] +[2023.08.20 - v2.5.0 - Chunel] * 提供了perf功能,用于做`pipeline`的性能分析 -* 提供了`GAsyncNode`(异步节点)功能 +* 提供了`AsyncNode`(异步节点)功能 +* 提供了`some`(部分)功能,优化`pipeline`的异步执行方式 diff --git a/src/CBasic/CFuncType.h b/src/CBasic/CFuncType.h index dba66705..b1624bb1 100644 --- a/src/CBasic/CFuncType.h +++ b/src/CBasic/CFuncType.h @@ -75,6 +75,12 @@ enum class CFunctionType { #define CGRAPH_THROW_EXCEPTION(info) \ throw CException(info, CGRAPH_GET_LOCATE); \ +/** 在异常状态的情况下,抛出异常 */ +#define CGRAPH_THROW_EXCEPTION_BY_STATUS(status) \ + if (unlikely(status.isErr())) { \ + CGRAPH_THROW_EXCEPTION(status.getInfo()); \ + } \ + CGRAPH_NAMESPACE_END #endif //CGRAPH_CFUNCTYPE_H diff --git a/src/GraphCtrl/GraphDefine.h b/src/GraphCtrl/GraphDefine.h index 02e253cd..8efa01cf 100644 --- a/src/GraphCtrl/GraphDefine.h +++ b/src/GraphCtrl/GraphDefine.h @@ -20,6 +20,7 @@ static const char* CGRAPH_STR_CLUSTER = "cluster"; static const char* CGRAPH_STR_REGION = "region"; static const char* CGRAPH_STR_CONDITION = "condition"; static const char* CGRAPH_STR_MULTI_CONDITION = "multi_condition"; +static const char* CGRAPH_STR_SOME = "some"; static const char* CGRAPH_STR_FUNCTION = "function"; static const char* CGRAPH_STR_SINGLETON = "singleton"; static const char* CGRAPH_STR_DAEMON = "daemon"; diff --git a/src/GraphCtrl/GraphElement/GAdapter/GSingleton/GSingleton.inl b/src/GraphCtrl/GraphElement/GAdapter/GSingleton/GSingleton.inl index 2d0a9eca..fa7e4255 100644 --- a/src/GraphCtrl/GraphElement/GAdapter/GSingleton/GSingleton.inl +++ b/src/GraphCtrl/GraphElement/GAdapter/GSingleton/GSingleton.inl @@ -3,9 +3,12 @@ @Contact: chunel@foxmail.com @File: GSingleton.cpp @Time: 2021/10/30 10:24 下午 -@Desc: +@Desc: ***************************/ +#ifndef CGRAPH_GSINGLETON_INL +#define CGRAPH_GSINGLETON_INL + #include "GSingleton.h" CGRAPH_NAMESPACE_BEGIN @@ -97,3 +100,5 @@ CStatus GSingleton::setElementInfo(const std::set &dependElement } CGRAPH_NAMESPACE_END + +#endif //CGRAPH_GSINGLETON_INL diff --git a/src/GraphCtrl/GraphElement/GElement.cpp b/src/GraphCtrl/GraphElement/GElement.cpp index 2e17dd2a..54617f3a 100644 --- a/src/GraphCtrl/GraphElement/GElement.cpp +++ b/src/GraphCtrl/GraphElement/GElement.cpp @@ -308,7 +308,7 @@ CVoid GElement::checkYield() { } -CBool GElement::isGroup() { +CBool GElement::isGroup() const { // 按位与 GROUP有值,表示是 GROUP的逻辑 return (long(element_type_) & long(GElementType::GROUP)) > 0; } diff --git a/src/GraphCtrl/GraphElement/GElement.h b/src/GraphCtrl/GraphElement/GElement.h index f4574b92..e84066dc 100644 --- a/src/GraphCtrl/GraphElement/GElement.h +++ b/src/GraphCtrl/GraphElement/GElement.h @@ -108,7 +108,7 @@ class GElement : public GElementObject, * 当前element是否是一个 group逻辑 * @return */ - CBool isGroup(); + CBool isGroup() const; protected: /** @@ -314,6 +314,7 @@ class GElement : public GElementObject, friend class GCluster; friend class GRegion; friend class GCondition; + template friend class GSome; template friend class GMultiCondition; friend class GGroup; friend class GPipeline; diff --git a/src/GraphCtrl/GraphElement/GElementDefine.h b/src/GraphCtrl/GraphElement/GElementDefine.h index a97b4180..bcac88a3 100644 --- a/src/GraphCtrl/GraphElement/GElementDefine.h +++ b/src/GraphCtrl/GraphElement/GElementDefine.h @@ -26,6 +26,7 @@ enum class GElementType { CLUSTER = 0x00020001, // 簇 REGION = 0x00020002, // 区域 CONDITION = 0x00020004, // 条件 + SOME = 0x00020008, // 部分 MULTI_CONDITION = 0x00020014, // 多条件 ADAPTER = 0x00040000, // 适配器 FUNCTION = 0x00040001, // 函数 diff --git a/src/GraphCtrl/GraphElement/GElementRepository.cpp b/src/GraphCtrl/GraphElement/GElementRepository.cpp index 94c7c0d9..63cf008d 100644 --- a/src/GraphCtrl/GraphElement/GElementRepository.cpp +++ b/src/GraphCtrl/GraphElement/GElementRepository.cpp @@ -48,7 +48,7 @@ CStatus GElementRepository::setup() { CStatus GElementRepository::reset() { CGRAPH_FUNCTION_BEGIN for (auto& cur : async_nodes_) { - status += ((GAsyncNodePtr)cur)->async_result_.get(); + status += ((GAsyncNodePtr)cur)->getResult(); } CGRAPH_FUNCTION_END diff --git a/src/GraphCtrl/GraphElement/GGroup/GCondition/GMultiCondition.inl b/src/GraphCtrl/GraphElement/GGroup/GCondition/GMultiCondition.inl index e51d17ef..e5d48379 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GCondition/GMultiCondition.inl +++ b/src/GraphCtrl/GraphElement/GGroup/GCondition/GMultiCondition.inl @@ -6,6 +6,9 @@ @Desc: ***************************/ +#ifndef CGRAPH_GMULTICONDITION_INL +#define CGRAPH_GMULTICONDITION_INL + #include "GMultiCondition.h" CGRAPH_NAMESPACE_BEGIN @@ -104,4 +107,6 @@ CBool GMultiCondition::isSerializable() { }); } -CGRAPH_NAMESPACE_END \ No newline at end of file +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_GMULTICONDITION_INL diff --git a/src/GraphCtrl/GraphElement/GGroup/GGroup.h b/src/GraphCtrl/GraphElement/GGroup/GGroup.h index 7ad9bc24..2acc427b 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GGroup.h +++ b/src/GraphCtrl/GraphElement/GGroup/GGroup.h @@ -13,7 +13,6 @@ #include "GGroupDefine.h" #include "../GElement.h" -#include "../../GraphParam/GParamInclude.h" CGRAPH_NAMESPACE_BEGIN @@ -56,7 +55,7 @@ class GGroup : public GElement { */ CVoid dumpGroupLabelEnd(std::ostream& oss); -protected: +private: GElementPtrArr group_elements_arr_; // 存放 element的数组 friend class GStaticEngine; @@ -65,6 +64,7 @@ class GGroup : public GElement { friend class GRegion; friend class GCondition; template friend class GMultiCondition; + template friend class GSome; }; using GGroupPtr = GGroup *; diff --git a/src/GraphCtrl/GraphElement/GGroup/GGroupInclude.h b/src/GraphCtrl/GraphElement/GGroup/GGroupInclude.h index e34cb00b..e786302a 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GGroupInclude.h +++ b/src/GraphCtrl/GraphElement/GGroup/GGroupInclude.h @@ -14,5 +14,6 @@ #include "GCluster/GCluster.h" #include "GRegion/GRegion.h" #include "GCondition/GConditionInclude.h" +#include "GSome/GSome.h" #endif //CGRAPH_GGROUPINCLUDE_H diff --git a/src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.cpp b/src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.cpp index 71fa7edb..6bf2785b 100644 --- a/src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.cpp +++ b/src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.cpp @@ -93,8 +93,8 @@ CVoid GRegion::dump(std::ostream& oss) { dumpGroupLabelEnd(oss); - for (const auto& node : run_before_) { - dumpEdge(oss, this, node); + for (const auto& element : run_before_) { + dumpEdge(oss, this, element); } } diff --git a/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h new file mode 100644 index 00000000..b017b186 --- /dev/null +++ b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h @@ -0,0 +1,59 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: GSome.h +@Time: 2023/8/20 11:20 +@Desc: +***************************/ + +#ifndef CGRAPH_GSOME_H +#define CGRAPH_GSOME_H + +#include +#include +#include + +#include "../GGroup.h" + +CGRAPH_NAMESPACE_BEGIN + +template +class GSome : public GGroup { +protected: + CStatus addElement(GElementPtr element) override; + + explicit GSome(); + + /** + * 异步处理 GSome中的所有信息 + * @param ptr + * @return + */ + CVoid process(GAsyncNodePtr ptr); + + CStatus run() override; + + CBool isSerializable() override; + + CVoid dump(std::ostream& oss) final; + + CGRAPH_NO_ALLOWED_COPY(GSome) + +private: + std::atomic left_num_ {0}; // 还剩的触发结束的个数 + CStatus cur_status_ ; // 记录异步时刻的当前状态信息 + + std::mutex lock_; + std::condition_variable cv_; + + friend class GPipeline; + friend class GStaticEngine; + friend class GDynamicEngine; + friend class UAllocator; +}; + +CGRAPH_NAMESPACE_END + +#include "GSome.inl" + +#endif //CGRAPH_GSOME_H diff --git a/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl new file mode 100644 index 00000000..30e669d1 --- /dev/null +++ b/src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl @@ -0,0 +1,106 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: GSome.inl +@Time: 2023/8/20 11:20 +@Desc: +***************************/ + +#ifndef CGRAPH_GSOME_INL +#define CGRAPH_GSOME_INL + +#include "GSome.h" + +CGRAPH_NAMESPACE_BEGIN + +template +CStatus GSome::addElement(GElementPtr element) { + CGRAPH_FUNCTION_BEGIN + CGRAPH_ASSERT_INIT(false) + + if (GElementType::ASYNC_NODE != element->element_type_) { + group_elements_arr_.clear(); // 如果有错误的情况,就直接清空本地的内容 + CGRAPH_RETURN_ERROR_STATUS("GSome can insert async node only current.") + } + + group_elements_arr_.template emplace_back(element); + CGRAPH_FUNCTION_END +} + + +template +GSome::GSome() { + element_type_ = GElementType::SOME; + session_ = URandom<>::generateSession(CGRAPH_STR_SOME); +} + + +template +CStatus GSome::run() { + CGRAPH_FUNCTION_BEGIN + CGRAPH_ASSERT_NOT_NULL(thread_pool_) + left_num_.store(TriggerNum); // 还剩n个,就完成当前GSome的执行逻辑 + cur_status_ = CStatus(); + + for (auto* element : group_elements_arr_) { + process((GAsyncNodePtr)element); + } + + CGRAPH_UNIQUE_LOCK lock(lock_); + cv_.wait(lock, [this] { + return left_num_ <= 0 || cur_status_.isErr(); + }); + + if (!cur_status_.isOK()) { + status = cur_status_; // 出错的话,赋值到外部去,让上游知道。 + } + CGRAPH_FUNCTION_END +} + + +template +CVoid GSome::process(GAsyncNodePtr ptr) { + // 这里的内容,仅可能为 GAsyncNode 的子类的信息 + const auto& exec = [this, ptr] { + { + CGRAPH_UNIQUE_LOCK lock(lock_); + cur_status_ += ptr->run(); // 开始异步执行起来了 + } + if (cur_status_.isOK()) { + cur_status_ += ptr->getResult(); + } + left_num_--; + cv_.notify_one(); + }; + + thread_pool_->commit(exec); +} + + +template +CBool GSome::isSerializable() { + return false; // 情况较为复杂,默认不可以 +} + + +template +CVoid GSome::dump(std::ostream& oss) { + dumpElement(oss); + dumpGroupLabelBegin(oss); + oss << 'p' << this << "[shape=point height=0];\n"; + oss << "color=blue;style=dashed;\n"; // 蓝色虚线 + + for (const auto& element : group_elements_arr_) { + element->dump(oss); + } + + dumpGroupLabelEnd(oss); + + for (const auto& element : run_before_) { + dumpEdge(oss, this, element); + } +} + +CGRAPH_NAMESPACE_END + +#endif //CGRAPH_GSOME_INL \ No newline at end of file diff --git a/src/GraphCtrl/GraphElement/GNode/GAsyncNode.cpp b/src/GraphCtrl/GraphElement/GNode/GAsyncNode.cpp index fd60969f..5f889a13 100644 --- a/src/GraphCtrl/GraphElement/GNode/GAsyncNode.cpp +++ b/src/GraphCtrl/GraphElement/GNode/GAsyncNode.cpp @@ -19,6 +19,9 @@ GAsyncNode::GAsyncNode() { CStatus GAsyncNode::run() { CGRAPH_FUNCTION_BEGIN CGRAPH_ASSERT_NOT_NULL(thread_pool_) + if (loop_ > 1) { + CGRAPH_RETURN_ERROR_STATUS("GAsyncNode can set loop=1 only") + } async_result_ = this->thread_pool_->commitWithPriority([this] { return this->asyncRun(); @@ -27,4 +30,14 @@ CStatus GAsyncNode::run() { CGRAPH_FUNCTION_END } + +CStatus GAsyncNode::getResult() { + CGRAPH_FUNCTION_BEGIN + if (async_result_.valid()) { + status = async_result_.get(); + } + + CGRAPH_FUNCTION_END +} + CGRAPH_NAMESPACE_END \ No newline at end of file diff --git a/src/GraphCtrl/GraphElement/GNode/GAsyncNode.h b/src/GraphCtrl/GraphElement/GNode/GAsyncNode.h index 8ad3041c..e1a94d16 100644 --- a/src/GraphCtrl/GraphElement/GNode/GAsyncNode.h +++ b/src/GraphCtrl/GraphElement/GNode/GAsyncNode.h @@ -23,6 +23,12 @@ class GAsyncNode : public GNode { */ virtual CStatus asyncRun() = 0; + /** + * 异步获取结果信息 + * @return + */ + CStatus getResult(); + explicit GAsyncNode(); private: @@ -32,6 +38,7 @@ class GAsyncNode : public GNode { std::future async_result_; // 用于记录当前节点的执行情况 friend class GElementRepository; + template friend class GSome; }; using GAsyncNodePtr = GAsyncNode *; diff --git a/src/GraphCtrl/GraphParam/GParamManager.h b/src/GraphCtrl/GraphParam/GParamManager.h index b7f13bb7..25bf0cab 100644 --- a/src/GraphCtrl/GraphParam/GParamManager.h +++ b/src/GraphCtrl/GraphParam/GParamManager.h @@ -93,5 +93,4 @@ CGRAPH_NAMESPACE_END #include "GParamManager.inl" - #endif //CGRAPH_GPARAMMANAGER_H diff --git a/src/GraphCtrl/GraphPipeline/GPipeline.inl b/src/GraphCtrl/GraphPipeline/GPipeline.inl index a18b9363..8cd3b20a 100644 --- a/src/GraphCtrl/GraphPipeline/GPipeline.inl +++ b/src/GraphCtrl/GraphPipeline/GPipeline.inl @@ -94,13 +94,10 @@ GNodePtr GPipeline::createGNode(const GNodeInfo &info) { CGRAPH_FUNCTION_BEGIN CGRAPH_ASSERT_INIT_THROW_ERROR(false) - GNodePtr node = CGRAPH_SAFE_MALLOC_COBJECT(T) + GNodePtr node = CGRAPH_SAFE_MALLOC_COBJECT(T); status = node->setElementInfo(info.dependence_, info.name_, info.loop_, this->param_manager_, this->event_manager_); - if (!status.isOK()) { - CGRAPH_DELETE_PTR(node); - return nullptr; - } + CGRAPH_THROW_EXCEPTION_BY_STATUS(status) repository_.insert(node); return node; @@ -119,26 +116,24 @@ GGroupPtr GPipeline::createGGroup(const GElementPtrArr &elements, // 如果不是所有的都非空,则创建失败 if (std::any_of(elements.begin(), elements.end(), [](GElementPtr element) { return (nullptr == element); })) { - return nullptr; + CGRAPH_THROW_EXCEPTION("createGGroup elements have nullptr.") } if (std::any_of(dependElements.begin(), dependElements.end(), [](GElementPtr element) { return (nullptr == element); })) { - return nullptr; + CGRAPH_THROW_EXCEPTION("createGGroup dependElements have nullptr.") } GGroupPtr group = CGRAPH_SAFE_MALLOC_COBJECT(T) for (GElementPtr element : elements) { - group->addElement(element); + status += group->addElement(element); element->belong_ = group; // 从属于这个group的信息 } + CGRAPH_THROW_EXCEPTION_BY_STATUS(status) status = group->setElementInfo(dependElements, name, loop, nullptr, nullptr); // 注册group信息的时候,不能注册paramManager信息 - if (unlikely(!status.isOK())) { - CGRAPH_DELETE_PTR(group) - return nullptr; - } + CGRAPH_THROW_EXCEPTION_BY_STATUS(status) this->repository_.insert(group); return group; diff --git a/src/UtilsCtrl/ThreadPool/UThreadPool.inl b/src/UtilsCtrl/ThreadPool/UThreadPool.inl index 7485df74..d3cbd46e 100644 --- a/src/UtilsCtrl/ThreadPool/UThreadPool.inl +++ b/src/UtilsCtrl/ThreadPool/UThreadPool.inl @@ -60,4 +60,4 @@ auto UThreadPool::commitWithPriority(const FunctionType& func, int priority) CGRAPH_NAMESPACE_END -#endif // CGRAPH_UTHREADPOOL_INL \ No newline at end of file +#endif // CGRAPH_UTHREADPOOL_INL diff --git a/tutorial/MyGNode/MyAsyncNode.h b/tutorial/MyGNode/MyAsyncNode.h new file mode 100644 index 00000000..3f2a8780 --- /dev/null +++ b/tutorial/MyGNode/MyAsyncNode.h @@ -0,0 +1,30 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: MyAsyncNode.h +@Time: 2023/8/20 13:41 +@Desc: +***************************/ + +#ifndef CGRAPH_MYASYNCNODE_H +#define CGRAPH_MYASYNCNODE_H + +#include "CGraph.h" + +/** + * 实现一个异步节点,主要是可以异步执行。 + * 注册到pipeline内部的所有的异步节点, + * 会在pipeline结束完成前,全部执行完毕 + * */ +template +class MyAsyncNode : public CGraph::GAsyncNode { +public: + CStatus asyncRun() override { + CGraph::CGRAPH_ECHO("[%s], enter MyAsyncNode run function. Sleep for [%d] second ... ", \ + this->getName().c_str(), Second); + CGRAPH_SLEEP_SECOND(Second) // sleep 时间由外部设定 + return CStatus(); + } +}; + +#endif //CGRAPH_MYASYNCNODE_H diff --git a/tutorial/T03-Region.cpp b/tutorial/T03-Region.cpp index 3ffdf0d6..4a6724e1 100644 --- a/tutorial/T03-Region.cpp +++ b/tutorial/T03-Region.cpp @@ -40,7 +40,8 @@ void tutorial_region() { /** * 如果想查看pipeline内部,各部分(element)的运行耗时情况, * 请调用 perf()方法,并且将输出的内容(不包含node内部的打印信息), - * 复制到 https://dreampuf.github.io/GraphvizOnline/ + * 复制到 https://dreampuf.github.io/GraphvizOnline/ 查看效果 + * 具体字段解释,请参考函数头文件备注信息 */ // pipeline->perf(); diff --git a/tutorial/T22-Some.cpp b/tutorial/T22-Some.cpp new file mode 100644 index 00000000..46c84e7a --- /dev/null +++ b/tutorial/T22-Some.cpp @@ -0,0 +1,53 @@ +/*************************** +@Author: Chunel +@Contact: chunel@foxmail.com +@File: T22-Some.cpp +@Time: 2023/8/20 13:39 +@Desc: 本例主要演示,GSome的使用方法,插入的元素并发执行n个完毕后,整体结束(本例为 beta版本) +***************************/ + +#include "MyGNode/MyNode1.h" +#include "MyGNode/MyAsyncNode.h" + +using namespace CGraph; + +void tutorial_some() { + GPipelinePtr pipeline = GPipelineFactory::create(); + GElementPtr a, b_some, c, d = nullptr; + + /** + * 创建一个类型为GSome的组,其中注入3个异步节点,其中有[1]个执行完成,则GSome执行结束 + * 需要注意的是,当前GSome执行完成,并且今后后续的element后, + * 内部的异步节点仍会继续执行,故如果在异步节点内,做参数处理,请注意考虑后续影响 + * 当前pipeline的run()方法,会等到内部的异步节点全部执行完成后,才结束 + */ + b_some = pipeline->createGGroup>({ + /** + * 注入三个异步节点,分别sleep 5/2/8s, + * 执行完 sleep(2s)的异步节点后,当前的 GSome结束。内部其他节点继续执行 + * 注意:异步节点只允许 loop=1,否则运行时会报错 + */ + pipeline->createGNode>(GNodeInfo("asyncNodeB1")), + pipeline->createGNode>(GNodeInfo("asyncNodeB2")), + pipeline->createGNode>(GNodeInfo("asyncNodeB3")) + }); + + CStatus status = pipeline->registerGElement(&a, {}, "nodeA"); + status += pipeline->registerGElement>(&b_some, {a}, "someB"); + status += pipeline->registerGElement(&c, {a}, "nodeC"); + status += pipeline->registerGElement(&d, {b_some, c}, "nodeD"); + if (!status.isOK()) { + return; + } + + status += pipeline->process(); + CGRAPH_ECHO("pipeline run finished, error code is [%d]", status.getCode()); + + GPipelineFactory::clear(); +} + + +int main() { + tutorial_some(); + return 0; +}