Skip to content

Commit

Permalink
[feat] add GSome for pipeline async run.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Aug 20, 2023
1 parent 6717f7d commit 5222701
Show file tree
Hide file tree
Showing 23 changed files with 312 additions and 27 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ set(CGRAPH_TUTORIAL_LIST
T19-Cancel
T20-YieldResume
T21-MultiCondition
T22-Some

# 以下为工具类tutorial
# TU01-ThreadPool
TU01-ThreadPool
# TU02-Lru
# TU03-Trie
# TU04-Timer
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,10 @@ int main() {
[2023.07.12 - v2.4.3 - Chunel]
* 优化了`CStatus`功能,添加了异常定位信息
[2023.08.06 - v2.5.0 - Chunel]
[2023.08.20 - v2.5.0 - Chunel]
* 提供了perf功能,用于做`pipeline`的性能分析
* 提供了`GAsyncNode`(异步节点)功能
* 提供了`AsyncNode`(异步节点)功能
* 提供了`some`(部分)功能,优化`pipeline`的异步执行方式
</details>
Expand Down
6 changes: 6 additions & 0 deletions src/CBasic/CFuncType.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ enum class CFunctionType {
#define CGRAPH_THROW_EXCEPTION(info) \
throw CException(info, CGRAPH_GET_LOCATE); \

/** 在异常状态的情况下,抛出异常 */
#define CGRAPH_THROW_EXCEPTION_BY_STATUS(status) \
if (unlikely(status.isErr())) { \
CGRAPH_THROW_EXCEPTION(status.getInfo()); \
} \

CGRAPH_NAMESPACE_END

#endif //CGRAPH_CFUNCTYPE_H
1 change: 1 addition & 0 deletions src/GraphCtrl/GraphDefine.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ static const char* CGRAPH_STR_CLUSTER = "cluster";
static const char* CGRAPH_STR_REGION = "region";
static const char* CGRAPH_STR_CONDITION = "condition";
static const char* CGRAPH_STR_MULTI_CONDITION = "multi_condition";
static const char* CGRAPH_STR_SOME = "some";
static const char* CGRAPH_STR_FUNCTION = "function";
static const char* CGRAPH_STR_SINGLETON = "singleton";
static const char* CGRAPH_STR_DAEMON = "daemon";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
@Contact: [email protected]
@File: GSingleton.cpp
@Time: 2021/10/30 10:24 下午
@Desc:
@Desc:
***************************/

#ifndef CGRAPH_GSINGLETON_INL
#define CGRAPH_GSINGLETON_INL

#include "GSingleton.h"

CGRAPH_NAMESPACE_BEGIN
Expand Down Expand Up @@ -97,3 +100,5 @@ CStatus GSingleton<T>::setElementInfo(const std::set<GElementPtr> &dependElement
}

CGRAPH_NAMESPACE_END

#endif //CGRAPH_GSINGLETON_INL
2 changes: 1 addition & 1 deletion src/GraphCtrl/GraphElement/GElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ CVoid GElement::checkYield() {
}


CBool GElement::isGroup() {
CBool GElement::isGroup() const {
// 按位与 GROUP有值,表示是 GROUP的逻辑
return (long(element_type_) & long(GElementType::GROUP)) > 0;
}
Expand Down
3 changes: 2 additions & 1 deletion src/GraphCtrl/GraphElement/GElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class GElement : public GElementObject,
* 当前element是否是一个 group逻辑
* @return
*/
CBool isGroup();
CBool isGroup() const;

protected:
/**
Expand Down Expand Up @@ -314,6 +314,7 @@ class GElement : public GElementObject,
friend class GCluster;
friend class GRegion;
friend class GCondition;
template<CSize> friend class GSome;
template<GMultiConditionType> friend class GMultiCondition;
friend class GGroup;
friend class GPipeline;
Expand Down
1 change: 1 addition & 0 deletions src/GraphCtrl/GraphElement/GElementDefine.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum class GElementType {
CLUSTER = 0x00020001, // 簇
REGION = 0x00020002, // 区域
CONDITION = 0x00020004, // 条件
SOME = 0x00020008, // 部分
MULTI_CONDITION = 0x00020014, // 多条件
ADAPTER = 0x00040000, // 适配器
FUNCTION = 0x00040001, // 函数
Expand Down
2 changes: 1 addition & 1 deletion src/GraphCtrl/GraphElement/GElementRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ CStatus GElementRepository::setup() {
CStatus GElementRepository::reset() {
CGRAPH_FUNCTION_BEGIN
for (auto& cur : async_nodes_) {
status += ((GAsyncNodePtr)cur)->async_result_.get();
status += ((GAsyncNodePtr)cur)->getResult();
}

CGRAPH_FUNCTION_END
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
@Desc:
***************************/

#ifndef CGRAPH_GMULTICONDITION_INL
#define CGRAPH_GMULTICONDITION_INL

#include "GMultiCondition.h"

CGRAPH_NAMESPACE_BEGIN
Expand Down Expand Up @@ -104,4 +107,6 @@ CBool GMultiCondition<type>::isSerializable() {
});
}

CGRAPH_NAMESPACE_END
CGRAPH_NAMESPACE_END

#endif //CGRAPH_GMULTICONDITION_INL
4 changes: 2 additions & 2 deletions src/GraphCtrl/GraphElement/GGroup/GGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

#include "GGroupDefine.h"
#include "../GElement.h"
#include "../../GraphParam/GParamInclude.h"

CGRAPH_NAMESPACE_BEGIN

Expand Down Expand Up @@ -56,7 +55,7 @@ class GGroup : public GElement {
*/
CVoid dumpGroupLabelEnd(std::ostream& oss);

protected:
private:
GElementPtrArr group_elements_arr_; // 存放 element的数组

friend class GStaticEngine;
Expand All @@ -65,6 +64,7 @@ class GGroup : public GElement {
friend class GRegion;
friend class GCondition;
template<GMultiConditionType> friend class GMultiCondition;
template<CSize> friend class GSome;
};

using GGroupPtr = GGroup *;
Expand Down
1 change: 1 addition & 0 deletions src/GraphCtrl/GraphElement/GGroup/GGroupInclude.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@
#include "GCluster/GCluster.h"
#include "GRegion/GRegion.h"
#include "GCondition/GConditionInclude.h"
#include "GSome/GSome.h"

#endif //CGRAPH_GGROUPINCLUDE_H
4 changes: 2 additions & 2 deletions src/GraphCtrl/GraphElement/GGroup/GRegion/GRegion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ CVoid GRegion::dump(std::ostream& oss) {

dumpGroupLabelEnd(oss);

for (const auto& node : run_before_) {
dumpEdge(oss, this, node);
for (const auto& element : run_before_) {
dumpEdge(oss, this, element);
}
}

Expand Down
59 changes: 59 additions & 0 deletions src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/***************************
@Author: Chunel
@Contact: [email protected]
@File: GSome.h
@Time: 2023/8/20 11:20
@Desc:
***************************/

#ifndef CGRAPH_GSOME_H
#define CGRAPH_GSOME_H

#include <memory>
#include <mutex>
#include <condition_variable>

#include "../GGroup.h"

CGRAPH_NAMESPACE_BEGIN

template<CSize TriggerNum = 1>
class GSome : public GGroup {
protected:
CStatus addElement(GElementPtr element) override;

explicit GSome();

/**
* 异步处理 GSome中的所有信息
* @param ptr
* @return
*/
CVoid process(GAsyncNodePtr ptr);

CStatus run() override;

CBool isSerializable() override;

CVoid dump(std::ostream& oss) final;

CGRAPH_NO_ALLOWED_COPY(GSome)

private:
std::atomic<CSize> left_num_ {0}; // 还剩的触发结束的个数
CStatus cur_status_ ; // 记录异步时刻的当前状态信息

std::mutex lock_;
std::condition_variable cv_;

friend class GPipeline;
friend class GStaticEngine;
friend class GDynamicEngine;
friend class UAllocator;
};

CGRAPH_NAMESPACE_END

#include "GSome.inl"

#endif //CGRAPH_GSOME_H
106 changes: 106 additions & 0 deletions src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/***************************
@Author: Chunel
@Contact: [email protected]
@File: GSome.inl
@Time: 2023/8/20 11:20
@Desc:
***************************/

#ifndef CGRAPH_GSOME_INL
#define CGRAPH_GSOME_INL

#include "GSome.h"

CGRAPH_NAMESPACE_BEGIN

template<CSize TriggerNum>
CStatus GSome<TriggerNum>::addElement(GElementPtr element) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false)

if (GElementType::ASYNC_NODE != element->element_type_) {
group_elements_arr_.clear(); // 如果有错误的情况,就直接清空本地的内容
CGRAPH_RETURN_ERROR_STATUS("GSome can insert async node only current.")
}

group_elements_arr_.template emplace_back(element);
CGRAPH_FUNCTION_END
}


template<CSize TriggerNum>
GSome<TriggerNum>::GSome() {
element_type_ = GElementType::SOME;
session_ = URandom<>::generateSession(CGRAPH_STR_SOME);
}


template<CSize TriggerNum>
CStatus GSome<TriggerNum>::run() {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_NOT_NULL(thread_pool_)
left_num_.store(TriggerNum); // 还剩n个,就完成当前GSome的执行逻辑
cur_status_ = CStatus();

for (auto* element : group_elements_arr_) {
process((GAsyncNodePtr)element);
}

CGRAPH_UNIQUE_LOCK lock(lock_);
cv_.wait(lock, [this] {
return left_num_ <= 0 || cur_status_.isErr();
});

if (!cur_status_.isOK()) {
status = cur_status_; // 出错的话,赋值到外部去,让上游知道。
}
CGRAPH_FUNCTION_END
}


template<CSize TriggerNum>
CVoid GSome<TriggerNum>::process(GAsyncNodePtr ptr) {
// 这里的内容,仅可能为 GAsyncNode 的子类的信息
const auto& exec = [this, ptr] {
{
CGRAPH_UNIQUE_LOCK lock(lock_);
cur_status_ += ptr->run(); // 开始异步执行起来了
}
if (cur_status_.isOK()) {
cur_status_ += ptr->getResult();
}
left_num_--;
cv_.notify_one();
};

thread_pool_->commit(exec);
}


template<CSize TriggerNum>
CBool GSome<TriggerNum>::isSerializable() {
return false; // 情况较为复杂,默认不可以
}


template<CSize TriggerNum>
CVoid GSome<TriggerNum>::dump(std::ostream& oss) {
dumpElement(oss);
dumpGroupLabelBegin(oss);
oss << 'p' << this << "[shape=point height=0];\n";
oss << "color=blue;style=dashed;\n"; // 蓝色虚线

for (const auto& element : group_elements_arr_) {
element->dump(oss);
}

dumpGroupLabelEnd(oss);

for (const auto& element : run_before_) {
dumpEdge(oss, this, element);
}
}

CGRAPH_NAMESPACE_END

#endif //CGRAPH_GSOME_INL
13 changes: 13 additions & 0 deletions src/GraphCtrl/GraphElement/GNode/GAsyncNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ GAsyncNode::GAsyncNode() {
CStatus GAsyncNode::run() {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_NOT_NULL(thread_pool_)
if (loop_ > 1) {
CGRAPH_RETURN_ERROR_STATUS("GAsyncNode can set loop=1 only")
}

async_result_ = this->thread_pool_->commitWithPriority([this] {
return this->asyncRun();
Expand All @@ -27,4 +30,14 @@ CStatus GAsyncNode::run() {
CGRAPH_FUNCTION_END
}


CStatus GAsyncNode::getResult() {
CGRAPH_FUNCTION_BEGIN
if (async_result_.valid()) {
status = async_result_.get();
}

CGRAPH_FUNCTION_END
}

CGRAPH_NAMESPACE_END
7 changes: 7 additions & 0 deletions src/GraphCtrl/GraphElement/GNode/GAsyncNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ class GAsyncNode : public GNode {
*/
virtual CStatus asyncRun() = 0;

/**
* 异步获取结果信息
* @return
*/
CStatus getResult();

explicit GAsyncNode();

private:
Expand All @@ -32,6 +38,7 @@ class GAsyncNode : public GNode {
std::future<CStatus> async_result_; // 用于记录当前节点的执行情况

friend class GElementRepository;
template<CSize> friend class GSome;
};

using GAsyncNodePtr = GAsyncNode *;
Expand Down
1 change: 0 additions & 1 deletion src/GraphCtrl/GraphParam/GParamManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,4 @@ CGRAPH_NAMESPACE_END

#include "GParamManager.inl"


#endif //CGRAPH_GPARAMMANAGER_H
Loading

0 comments on commit 5222701

Please sign in to comment.