From 04465f2ddb41914225a5d0fbc3864cf23ebabbba Mon Sep 17 00:00:00 2001 From: Kylie Smith Date: Mon, 4 Dec 2023 12:52:25 +1000 Subject: [PATCH] Buffer received AMQP data (#19) --- src/messagedirector/message_director.cpp | 26 ++++++++++++++++++++---- src/messagedirector/message_director.h | 1 + 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/messagedirector/message_director.cpp b/src/messagedirector/message_director.cpp index ea810fa..3abf4a0 100644 --- a/src/messagedirector/message_director.cpp +++ b/src/messagedirector/message_director.cpp @@ -88,10 +88,24 @@ MessageDirector::MessageDirector() { _connectHandle->read(); }); - _connectHandle->on( - [this](const uvw::data_event &event, uvw::tcp_handle &) { - _connection->parse(event.data.get(), event.length); - }); + _connectHandle->on([this](const uvw::data_event &event, + uvw::tcp_handle &) { + // We've received a frame from RabbitMQ. + // It may be a partial frame, so we need to do buffering ourselves. + // See: + // https://github.com/CopernicaMarketingSoftware/AMQP-CPP#parsing-incoming-data + _frameBuffer.insert(_frameBuffer.end(), event.data.get(), + event.data.get() + event.length); + + auto processed = _connection->parse(&_frameBuffer[0], _frameBuffer.size()); + + // If we have processed at least one complete frame, we can clear the buffer + // ready for new data. In the event no bytes were processed (an in-complete + // frame), AMQP expects both the old data and any new data in the buffer. + if (processed != 0) { + _frameBuffer.clear(); + } + }); // Initialize metrics. InitMetrics(); @@ -140,6 +154,10 @@ void MessageDirector::onData(AMQP::Connection *connection, const char *buffer, * @param connection The connection that can now be used */ void MessageDirector::onReady(AMQP::Connection *connection) { + // Resize our frame buffer to the max frame length. + // This prevents buffer re-sizing at runtime. + _frameBuffer.reserve(connection->maxFrame()); + // Create our "global" exchange. _globalChannel = new AMQP::Channel(_connection); _globalChannel->declareExchange(kGlobalExchange, AMQP::fanout) diff --git a/src/messagedirector/message_director.h b/src/messagedirector/message_director.h index 26f71ec..3976304 100644 --- a/src/messagedirector/message_director.h +++ b/src/messagedirector/message_director.h @@ -54,6 +54,7 @@ class MessageDirector : public AMQP::ConnectionHandler { AMQP::Channel *_globalChannel{}; std::string _localQueue; std::string _consumeTag; + std::vector _frameBuffer; std::string _host = "127.0.0.1"; int _port = 7100;