Skip to content

Commit

Permalink
[bugfix] fix dynamic para run bug. change manager set function.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Dec 8, 2023
1 parent 31fab44 commit 10ffbc2
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 59 deletions.
14 changes: 14 additions & 0 deletions src/GraphCtrl/GraphAspect/GAspectManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ class GAspectManager : public GAspectObject,
CGRAPH_FUNCTION_END
}

void* setGParamManager(GParamManagerPtr pm) override {
for (auto* cur : aspect_arr_) {
cur->setGParamManager(pm);
}
return this;
}

void* setGEventManager(GEventManagerPtr em) override {
for (auto* cur : aspect_arr_) {
cur->setGEventManager(em);
}
return this;
}

CGRAPH_NO_ALLOWED_COPY(GAspectManager)

private:
Expand Down
18 changes: 5 additions & 13 deletions src/GraphCtrl/GraphElement/GAdapter/GSingleton/GSingleton.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,12 @@ class GSingleton : public GAdapter {
CStatus run() override;
CStatus destroy() override;

/**
* 适配singleton信息
* @param dependElements
* @param name
* @param loop
* @param paramManager
* @param eventManager
* @return
*/
CStatus setElementInfo(const std::set<GElementPtr> &dependElements,
CStatus addElementInfo(const std::set<GElementPtr> &dependElements,
const std::string &name,
CSize loop,
GParamManagerPtr paramManager,
GEventManagerPtr eventManager) override;
CSize loop) override;

GElementPtr setManagers(GParamManagerPtr paramManager,
GEventManagerPtr eventManager) override;

private:
static USingleton<T> s_singleton_; // 单例
Expand Down
29 changes: 13 additions & 16 deletions src/GraphCtrl/GraphElement/GAdapter/GSingleton/GSingleton.inl
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,11 @@ CStatus GSingleton<T>::destroy() {


template <typename T>
CStatus GSingleton<T>::setElementInfo(const std::set<GElementPtr> &dependElements,
CStatus GSingleton<T>::addElementInfo(const std::set<GElementPtr> &dependElements,
const std::string &name,
CSize loop,
GParamManagerPtr paramManager,
GEventManagerPtr eventManager) {
CSize loop) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false)
CGRAPH_ASSERT_NOT_NULL(paramManager, eventManager)

// 这里,内部和外部均需要设定name信息
this->setName(name)->setLoop(loop);
Expand All @@ -84,21 +81,21 @@ CStatus GSingleton<T>::setElementInfo(const std::set<GElementPtr> &dependElement

// 获取单例信息,然后将信息node中信息
auto element = dynamic_cast<GElementPtr>(s_singleton_.get());
if (element->param_manager_) {
// 设置一次即可,不支持多次设置
CGRAPH_FUNCTION_END
}

/**
* 内部不需要设置loop信息了,因为是根据adapter的loop次数循环的。
* 依赖关系也注册在adapter上
*/
element->param_manager_ = paramManager;
element->event_manager_ = eventManager;
element->name_ = name;
CGRAPH_FUNCTION_END
}


template <typename T>
GElementPtr GSingleton<T>::setManagers(GParamManagerPtr paramManager,
GEventManagerPtr eventManager) {
CGRAPH_ASSERT_INIT_THROW_ERROR(false)
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(paramManager, eventManager)
auto element = dynamic_cast<GElementPtr>(s_singleton_.get());
element->setManagers(paramManager, eventManager);
return this;
}

CGRAPH_NAMESPACE_END

#endif //CGRAPH_GSINGLETON_INL
29 changes: 21 additions & 8 deletions src/GraphCtrl/GraphElement/GElement.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,22 +177,35 @@ CStatus GElement::addDependGElements(const GElementPtrSet& elements) {
}


CStatus GElement::setElementInfo(const GElementPtrSet& dependElements,
CStatus GElement::addElementInfo(const GElementPtrSet& dependElements,
const std::string& name,
CSize loop,
GParamManagerPtr paramManager,
GEventManagerPtr eventManager) {
CSize loop) {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_INIT(false)
CGRAPH_ASSERT_INIT_THROW_ERROR(false)

this->setName(name)->setLoop(loop);
param_manager_ = paramManager;
event_manager_ = eventManager;
status = this->addDependGElements(dependElements);
CGRAPH_FUNCTION_CHECK_STATUS

this->setName(name)->setLoop(loop);
CGRAPH_FUNCTION_END
}


GElementPtr GElement::setManagers(GParamManagerPtr paramManager, GEventManagerPtr eventManager) {
CGRAPH_ASSERT_INIT_THROW_ERROR(false)
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(paramManager, eventManager)

this->param_manager_ = paramManager;
this->event_manager_ = eventManager;
if (aspect_manager_) {
aspect_manager_->setGParamManager(paramManager);
aspect_manager_->setGEventManager(eventManager);
}

return this;
}


CStatus GElement::doAspect(const GAspectType& aspectType, const CStatus& curStatus) {
CGRAPH_FUNCTION_BEGIN

Expand Down
17 changes: 11 additions & 6 deletions src/GraphCtrl/GraphElement/GElement.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,20 @@ class GElement : public GElementObject,
* @param dependElements
* @param name
* @param loop
* @param paramManager
* @paarm eventManager
* @return
*/
virtual CStatus setElementInfo(const std::set<GElement *>& dependElements,
virtual CStatus addElementInfo(const std::set<GElement *>& dependElements,
const std::string& name,
CSize loop,
GParamManagerPtr paramManager,
GEventManagerPtr eventManager);
CSize loop);

/**
* 设置manager信息
* @param paramManager
* @param eventManager
* @return
*/
virtual GElement* setManagers(GParamManagerPtr paramManager,
GEventManagerPtr eventManager);

/**
* 包含切面相关功能的函数,fat取自fatjar的意思
Expand Down
9 changes: 9 additions & 0 deletions src/GraphCtrl/GraphElement/GElementRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ GElementRepositoryPtr GElementRepository::setThreadPool(UThreadPoolPtr ptr) {
}


GElementRepositoryPtr GElementRepository::setManagers(GParamManagerPtr paramManager, GEventManagerPtr eventManager) {
CGRAPH_ASSERT_NOT_NULL_THROW_ERROR(paramManager, eventManager);
for (auto* cur : this->elements_) {
cur->setManagers(paramManager, eventManager);
}
return this;
}


CStatus GElementRepository::setup() {
CGRAPH_FUNCTION_BEGIN
// 一旦执行,全部设置为 normal状态
Expand Down
8 changes: 8 additions & 0 deletions src/GraphCtrl/GraphElement/GElementRepository.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ class GElementRepository : public GElementObject {
*/
GElementRepository* setThreadPool(UThreadPoolPtr ptr);

/**
* 给所有的element,设定manager信息
* @param paramManager
* @param eventManager
* @return
*/
GElementRepository* setManagers(GParamManagerPtr paramManager, GEventManagerPtr eventManager);

/**
* 准备执行流程
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,17 @@ CStatus GDynamicEngine::run() {

CStatus GDynamicEngine::afterRunCheck() {
CGRAPH_FUNCTION_BEGIN
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(run_element_size_.load(std::memory_order_consume) != total_element_arr_.size(), \
if (likely(total_end_size_ != total_element_arr_.size())) {
/**
* 非纯并行的逻辑下,才需要判断。
* 纯并行度逻辑,肯定会所有都跑一遍,并且等待全部结束,
* 故,不需要判断。
*/
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(run_element_size_.load(std::memory_order_consume) != total_element_arr_.size(), \
"dynamic engine run element size not match...")
for (GElementPtr element : total_element_arr_) {
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(!element->done_, "dynamic engine run check failed...")
for (GElementPtr element : total_element_arr_) {
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(!element->done_, "dynamic engine run check failed...")
}
}

CGRAPH_FUNCTION_END
Expand Down
2 changes: 1 addition & 1 deletion src/GraphCtrl/GraphEvent/GEventManagerWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ CGRAPH_NAMESPACE_BEGIN
* @param em
* @return
*/ \
void* setGEventManager(GEventManagerPtr em) { \
virtual void* setGEventManager(GEventManagerPtr em) { \
this->event_manager_ = em; \
return this; \
} \
Expand Down
2 changes: 1 addition & 1 deletion src/GraphCtrl/GraphParam/GParamManagerWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private:
* @param pm
* @return
*/ \
void* setGParamManager(GParamManagerPtr pm) { \
virtual void* setGParamManager(GParamManagerPtr pm) { \
this->param_manager_ = pm; \
return this; \
} \
Expand Down
5 changes: 3 additions & 2 deletions src/GraphCtrl/GraphPipeline/GPipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ CStatus GPipeline::registerGGroup(GElementPPtr groupRef, const GElementPtrSet &d
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(nullptr == group, "input is not based on GGroup")
CGRAPH_RETURN_ERROR_STATUS_BY_CONDITION(group->isRegistered(), "this group register duplicate")

status = group->setElementInfo(dependElements, name, loop, this->param_manager_, this->event_manager_);
status = group->addElementInfo(dependElements, name, loop);
CGRAPH_FUNCTION_CHECK_STATUS
status = element_manager_->add(group);
CGRAPH_FUNCTION_CHECK_STATUS
Expand Down Expand Up @@ -278,7 +278,7 @@ GPipelineState GPipeline::getCurState() const {

CStatus GPipeline::initEnv() {
CGRAPH_FUNCTION_BEGIN
CGRAPH_ASSERT_NOT_NULL(event_manager_, element_manager_)
CGRAPH_ASSERT_NOT_NULL(event_manager_, element_manager_, param_manager_)

status = schedule_.init();
CGRAPH_FUNCTION_CHECK_STATUS
Expand All @@ -290,6 +290,7 @@ CStatus GPipeline::initEnv() {

// 设置所有的element 中的thread_pool
repository_.setThreadPool(tp);
repository_.setManagers(param_manager_, event_manager_);

status += repository_.init();
CGRAPH_FUNCTION_END
Expand Down
13 changes: 4 additions & 9 deletions src/GraphCtrl/GraphPipeline/GPipeline.inl
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ CStatus GPipeline::registerGElement(GElementPPtr elementRef,
}

CGRAPH_ASSERT_NOT_NULL(*elementRef)
status = (*elementRef)->setElementInfo(dependElements, name, loop,
this->param_manager_,
this->event_manager_);
status = (*elementRef)->addElementInfo(dependElements, name, loop);
CGRAPH_FUNCTION_CHECK_STATUS

status = element_manager_->add(dynamic_cast<GElementPtr>(*elementRef));
Expand Down Expand Up @@ -95,8 +93,7 @@ CStatus GPipeline::registerGElement(GTemplateNodePtr<Args ...> *elementRef,
(*elementRef) = new(std::nothrow) TNode(std::forward<Args &&>(args)...);
CGRAPH_ASSERT_NOT_NULL(*elementRef)
// 以下 name,loop 信息,可以由外部设置
status = (*elementRef)->setElementInfo(dependElements, CGRAPH_EMPTY, CGRAPH_DEFAULT_LOOP_TIMES,
this->param_manager_, this->event_manager_);
status = (*elementRef)->addElementInfo(dependElements, CGRAPH_EMPTY, CGRAPH_DEFAULT_LOOP_TIMES);
CGRAPH_FUNCTION_CHECK_STATUS

status = element_manager_->add(dynamic_cast<GElementPtr>(*elementRef));
Expand All @@ -113,8 +110,7 @@ GNodePtr GPipeline::createGNode(const GNodeInfo &info) {
CGRAPH_ASSERT_INIT_THROW_ERROR(false)

GNodePtr node = CGRAPH_SAFE_MALLOC_COBJECT(T);
status = node->setElementInfo(info.dependence_, info.name_, info.loop_,
this->param_manager_, this->event_manager_);
status = node->addElementInfo(info.dependence_, info.name_, info.loop_);
CGRAPH_THROW_EXCEPTION_BY_STATUS(status)

repository_.insert(node);
Expand Down Expand Up @@ -149,8 +145,7 @@ GGroupPtr GPipeline::createGGroup(const GElementPtrArr &elements,
}
CGRAPH_THROW_EXCEPTION_BY_STATUS(status)

status = group->setElementInfo(dependElements, name, loop,
nullptr, nullptr); // 注册group信息的时候,不能注册paramManager信息
status = group->addElementInfo(dependElements, name, loop);
CGRAPH_THROW_EXCEPTION_BY_STATUS(status)

this->repository_.insert(group);
Expand Down

0 comments on commit 10ffbc2

Please sign in to comment.