Skip to content

Commit

Permalink
[perf] optimize message, with only one copy
Browse files Browse the repository at this point in the history
  • Loading branch information
ChunelFeng committed Sep 22, 2023
1 parent 1b5fff3 commit d2bb6a3
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 11 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ int main() {
* 提供`coordinator`(协调)功能
[2023.09.16 - v2.5.2 - Chunel]
* 优化`message`(消息)功能,可以设定写入阻塞时的处理方式
* 优化`message`(消息)功能,可以设定写入阻塞时的处理方式,减少内存copy次数
* 添加`example`相关内容,针对不同行业,提供一些简单实现
</details>
Expand Down
13 changes: 13 additions & 0 deletions src/GraphCtrl/GraphMessage/GMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ class GMessage : public GMessageObject {
queue_.push(value, strategy);
}

/**
* 写入智能指针类型的参数
* @tparam TImpl
* @param value
* @param strategy
* @return
*/
template<class TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CVoid send(std::unique_ptr<TImpl>& value, GMessagePushStrategy strategy) {
queue_.push(value, strategy);
}

/**
* 获取参数
* @param value
Expand Down
29 changes: 28 additions & 1 deletion src/GraphCtrl/GraphMessage/GMessageManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,33 @@ class GMessageManager : public GMessageObject,
CGRAPH_FUNCTION_END
}

/**
* 根据传入的topic,输入智能指针类型的信息
* @tparam TImpl
* @param topic
* @param value
* @param strategy
* @return
*/
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CStatus sendTopicValue(const std::string& topic,
std::unique_ptr<TImpl>& value,
GMessagePushStrategy strategy) {
CGRAPH_FUNCTION_BEGIN
auto innerTopic = SEND_RECV_PREFIX + topic;
auto result = send_recv_message_map_.find(innerTopic);
if (result == send_recv_message_map_.end()) {
CGRAPH_RETURN_ERROR_STATUS("no find [" + topic + "] topic");
}

auto message = static_cast<GMessagePtr<T> >(result->second);
CGRAPH_ASSERT_NOT_NULL(message);

message->send(value, strategy);
CGRAPH_FUNCTION_END
}

/**
* 绑定对应的topic信息,并且获取 conn_id 信息
* @tparam TImpl
Expand All @@ -165,10 +192,10 @@ class GMessageManager : public GMessageObject,
template<typename TImpl,
c_enable_if_t<std::is_base_of<T, TImpl>::value, int> = 0>
CIndex bindTopic(const std::string& topic, CUint size) {
CGRAPH_LOCK_GUARD lock(bind_mutex_);
auto innerTopic = PUB_SUB_PREFIX + topic;
auto message = UAllocator::safeMallocTemplateCObject<GMessage<TImpl>, CUint>(size);

CGRAPH_LOCK_GUARD lock(bind_mutex_);
CIndex connId = (++cur_conn_id_);
auto result = pub_sub_message_map_.find(innerTopic);
if (result != pub_sub_message_map_.end()) {
Expand Down
30 changes: 30 additions & 0 deletions src/UtilsCtrl/ThreadPool/Queue/UAtomicRingBufferQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,36 @@ class UAtomicRingBufferQueue : public UQueueObject {
pop_cv_.notify_one();
}

/**
* 写入智能指针类型的信息
* @tparam TImpl
* @param value
* @param strategy
* @return
*/
template<class TImpl = T>
CVoid push(std::unique_ptr<TImpl>& value, URingBufferPushStrategy strategy) {
{
CGRAPH_UNIQUE_LOCK lk(mutex_);
if (isFull()) {
switch (strategy) {
case URingBufferPushStrategy::WAIT:
push_cv_.wait(lk, [this] { return !isFull(); });
break;
case URingBufferPushStrategy::REPLACE:
head_ = (head_ + 1) % capacity_;
break;
case URingBufferPushStrategy::DROP:
return; // 直接返回,不写入即可
}
}

ring_buffer_queue_[tail_] = std::move(value);
tail_ = (tail_ + 1) % capacity_;
}
pop_cv_.notify_one();
}

/**
* 等待弹出信息
* @param value
Expand Down
4 changes: 2 additions & 2 deletions tutorial/MyGNode/MyRecvMessageNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
class MyRecvMessageNode : public CGraph::GNode {
public:
CStatus run() override {
MyMessageParam mp; // 接收一个消息
std::unique_ptr<MyMessageParam> mp = nullptr; // 接收一个消息
CStatus status = CGRAPH_RECV_MPARAM(MyMessageParam, "send-recv", mp);
if (!status.isOK()) {
CGraph::CGRAPH_ECHO("MySubMessageNode sub message error");
return status;
}

CGraph::CGRAPH_ECHO("num = [%d], info = [%s]", mp.num, mp.info.c_str());
CGraph::CGRAPH_ECHO("num = [%d], info = [%s]", mp->num, mp->info.c_str());
return status;
}
};
Expand Down
12 changes: 9 additions & 3 deletions tutorial/MyGNode/MySendMessageNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@
class MySendMessageNode : public CGraph::GNode {
public:
CStatus run() override {
MyMessageParam mp; // 创建一个消息,并且发送出去
mp.num = (num_++) * 10;
mp.info = "this is a test send info, num = " + std::to_string(mp.num);
/**
* 可以使用 MyMessageParam mp; 构建值的方式传递
* 推荐使用 unique_ptr 的方式,进行 send 和 recv
* 可以减少内存copy次数
*/
std::unique_ptr<MyMessageParam> mp(new MyMessageParam());

mp->num = (num_++) * 10;
mp->info = "this is a test send info, num = " + std::to_string(mp->num);
/**
* 在v2.5.1版本,增加了 GMessagePushStrategy 策略,不兼容之前版本
* 如果需要跟之前逻辑保持一致,直接设定 CGraph::GMessagePushStrategy::WAIT 即可
Expand Down
5 changes: 1 addition & 4 deletions tutorial/MyParams/MyMessageParam.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ struct MyMessageParam : public CGraph::GMessageParam {
int num = 0;
std::string info;

explicit MyMessageParam() {
num = 0;
info = "";
}
explicit MyMessageParam() = default;

/**
* 注意:
Expand Down

0 comments on commit d2bb6a3

Please sign in to comment.