Skip to content

Commit

Permalink
Buffer received AMQP data
Browse files Browse the repository at this point in the history
  • Loading branch information
ksmit799 committed Dec 4, 2023
1 parent bc8dd53 commit 9c9bf48
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
26 changes: 22 additions & 4 deletions src/messagedirector/message_director.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,24 @@ MessageDirector::MessageDirector() {
_connectHandle->read();
});

_connectHandle->on<uvw::data_event>(
[this](const uvw::data_event &event, uvw::tcp_handle &) {
_connection->parse(event.data.get(), event.length);
});
_connectHandle->on<uvw::data_event>([this](const uvw::data_event &event,
uvw::tcp_handle &) {
// We've received a frame from RabbitMQ.
// It may be a partial frame, so we need to do buffering ourselves.
// See:
// https://github.com/CopernicaMarketingSoftware/AMQP-CPP#parsing-incoming-data
_frameBuffer.insert(_frameBuffer.end(), event.data.get(),
event.data.get() + event.length);

auto processed = _connection->parse(&_frameBuffer[0], _frameBuffer.size());

// If we have processed at least one complete frame, we can clear the buffer
// ready for new data. In the event no bytes were processed (an in-complete
// frame), AMQP expects both the old data and any new data in the buffer.
if (processed != 0) {
_frameBuffer.clear();
}
});

// Initialize metrics.
InitMetrics();
Expand Down Expand Up @@ -140,6 +154,10 @@ void MessageDirector::onData(AMQP::Connection *connection, const char *buffer,
* @param connection The connection that can now be used
*/
void MessageDirector::onReady(AMQP::Connection *connection) {
// Resize our frame buffer to the max frame length.
// This prevents buffer re-sizing at runtime.
_frameBuffer.reserve(connection->maxFrame());

// Create our "global" exchange.
_globalChannel = new AMQP::Channel(_connection);
_globalChannel->declareExchange(kGlobalExchange, AMQP::fanout)
Expand Down
1 change: 1 addition & 0 deletions src/messagedirector/message_director.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class MessageDirector : public AMQP::ConnectionHandler {
AMQP::Channel *_globalChannel{};
std::string _localQueue;
std::string _consumeTag;
std::vector<char> _frameBuffer;

std::string _host = "127.0.0.1";
int _port = 7100;
Expand Down

0 comments on commit 9c9bf48

Please sign in to comment.