Skip to content

Commit

Permalink
[bugfix] fix GSome, make sure run finish before run finished.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Sep 4, 2023
1 parent 3f2c3d7 commit f005649
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 7 deletions.
5 changes: 3 additions & 2 deletions src/GraphCtrl/GraphElement/GElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ GElementPtr GElement::setBindingIndex(CIndex index) {

GElementPtr GElement::setTimeout(CMSec timeout, CBool asError) {
CGRAPH_ASSERT_INIT_THROW_ERROR(false)
CGRAPH_THROW_EXCEPTION_BY_CONDITION((timeout < CGRAPH_DEFAULT_ELEMENT_TIMEOUT), "timeout value cannot smaller than 0")
CGRAPH_THROW_EXCEPTION_BY_CONDITION((timeout < CGRAPH_DEFAULT_ELEMENT_TIMEOUT), \
"timeout value cannot smaller than 0")
CGRAPH_THROW_EXCEPTION_BY_CONDITION((loop_ > 1 && CGRAPH_DEFAULT_ELEMENT_TIMEOUT != timeout), \
"cannot set timeout value when loop bigger than 1")

Expand Down Expand Up @@ -190,7 +191,7 @@ CStatus GElement::fatProcessor(const CFunctionType& type) {
status = doAspect(GAspectType::BEGIN_RUN);
CGRAPH_FUNCTION_CHECK_STATUS
do {
status = (!isAsync()) ? run() : asyncRun(); // 大概率是同步执行
status = (!isAsync()) ? run() : asyncRun();
/**
* 在实际run结束之后,首先需要判断一下是否进入yield状态了。
* 接下来,如果状态是ok的,并且被条件hold住,则循环执行
Expand Down
4 changes: 4 additions & 0 deletions src/GraphCtrl/GraphElement/GGroup/GSome/GSome.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@ class GSome : public GGroup {

CStatus addElement(GElementPtr element) override;

CStatus init() final;

CStatus run() final;

CBool isSerializable() final;

CVoid dump(std::ostream& oss) final;

CBool isHold() final;

CGRAPH_NO_ALLOWED_COPY(GSome)

private:
Expand Down
36 changes: 33 additions & 3 deletions src/GraphCtrl/GraphElement/GGroup/GSome/GSome.inl
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,42 @@ template<CInt TriggerNum>
CStatus GSome<TriggerNum>::addElement(GElementPtr element) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false)
CGRAPH_ASSERT_NOT_NULL(element)

/**
* GSome中插入的元素,必须要有 timeout的设定,如果没有的话,强行写到max值。
* 用于确保在pipeline run执行完成之前,所有的 element 都会被回收
*/
if (CGRAPH_DEFAULT_ELEMENT_TIMEOUT == element->timeout_) {
element->setTimeout(CGRAPH_MAX_BLOCK_TTL, false);
}

group_elements_arr_.emplace_back(element);
CGRAPH_FUNCTION_END
}


template<CInt TriggerNum>
CStatus GSome<TriggerNum>::run() {
CStatus GSome<TriggerNum>::init() {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_NOT_NULL(thread_pool_)
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION((CGRAPH_DEFAULT_LOOP_TIMES != loop_), "GSome cannot set loop > 1.")
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION((0 >= TriggerNum), "trigger num must bigger than 0.")
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION((group_elements_arr_.size() < TriggerNum), \
"this GSome need at least [" + std::to_string(TriggerNum) + "] element")
"this GSome need at least [" + std::to_string(TriggerNum) + "] element.")
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(std::any_of(group_elements_arr_.begin(), group_elements_arr_.end(), [](GElementPtr ptr) {
return !ptr->isAsync();
}), "GSome contains async node only.")

status = GGroup::init();

CGRAPH_FUNCTION_END
}


template<CInt TriggerNum>
CStatus GSome<TriggerNum>::run() {
CGRAPH_FUNCTION_BEGIN

left_num_ = TriggerNum; // 还剩n个,就完成当前GSome的执行逻辑
cur_status_ = CStatus();
Expand All @@ -55,7 +78,7 @@ CStatus GSome<TriggerNum>::run() {

CGRAPH_UNIQUE_LOCK lock(lock_);
cv_.wait(lock, [this] {
return left_num_ == 0 || cur_status_.isErr();
return left_num_ <= 0 || cur_status_.isErr();
});

status = cur_status_;
Expand Down Expand Up @@ -87,6 +110,13 @@ CVoid GSome<TriggerNum>::dump(std::ostream& oss) {
}
}


template<CInt TriggerNum>
CBool GSome<TriggerNum>::isHold() {
// 这里固定是不可以 hold的
return false;
}

CGRAPH_NAMESPACE_END

#endif //CGRAPH_GSOME_INL
4 changes: 2 additions & 2 deletions src/GraphCtrl/GraphEvent/GEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ CStatus GEvent::process(GEventType type) {
break;
case GEventType::ASYNC: // 异步触发
CGRAPH_ASSERT_NOT_NULL(this->thread_pool_)
thread_pool_->commitWithPriority([this] {
thread_pool_->commit([this] {
this->trigger(this->param_);
}, CGRAPH_DEFAULT_PRIORITY);
}, CGRAPH_POOL_TASK_STRATEGY);
break;
default:
CGRAPH_RETURN_ERROR_STATUS("unknown event type")
Expand Down

0 comments on commit f005649

Please sign in to comment.