Skip to content

Commit

Permalink
NITWORK: Fix datas reading and parsing
Browse files Browse the repository at this point in the history
by removing multiple async_receive_from funcs

PATCH
  • Loading branch information
romainpanno committed Sep 29, 2023
1 parent 30b26e4 commit 3be7af5
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 72 deletions.
100 changes: 43 additions & 57 deletions src/Nitwork/Nitwork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ namespace Nitwork {
/* Start Section */
bool Nitwork::start(int port, int threadNb, int tick) {
try {
startReceiveHandler();
startInputHandler();
startOutputHandler();
if (!startServerConfig(port)) {
std::cerr << "Error: server config failed" << std::endl;
return false;
Expand All @@ -27,9 +30,6 @@ namespace Nitwork {
std::cerr << "Error: server threads failed" << std::endl;
return false;
}
startInputHandler();
startOutputHandler();
startReceiveHandler();
std::cout << "Server started on port " << port << " on " << boost::asio::ip::host_name() << " with ip " << _endpoint.address().to_string() << std::endl;
} catch (std::exception &e) {
std::cerr << "Nitwork Error : " << e.what() << std::endl;
Expand Down Expand Up @@ -81,7 +81,11 @@ namespace Nitwork {
std::cout << "Starting context threads" << std::endl;
for (int i = 0; i < threadNb; i++) {
_pool.emplace_back([this]() {
_context.run();
try {
_context.run();
} catch (std::exception &e) {
std::cerr << "Error: " << e.what() << std::endl;
}
});
if (!_pool.back().joinable()) {
std::cerr << "Error: thread nb: " << i << " not joinable" << std::endl;
Expand All @@ -98,12 +102,11 @@ namespace Nitwork {
std::cout << std::endl << "Starting input handler" << std::endl;
try {
while (true) {
std::cout << "Waiting for input" << std::endl;
_tickConvVar.wait(lockTick);
std::cout << "Input received" << std::endl;
for (auto &action : _actions) {
action.second(action.first.data, action.first.endpoint);
}
_actions.clear();
}
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
Expand All @@ -115,20 +118,17 @@ namespace Nitwork {
boost::asio::post(_context, [this]() {
std::unique_lock<std::mutex> lockQueue(_outputQueueMutex);
std::unique_lock<std::mutex> lockTick(_tickMutex);
std::size_t size = 0;

std::cout << std::endl << "Starting output handler" << std::endl;
try {
while (true) {
_tickConvVar.wait(lockTick);
size = _outputQueue.size();
for (std::size_t i = 0; i < size; i++) {
lockQueue.unlock();
auto &data = _outputQueue.front();
_outputQueue.pop_front();
_actionToSendHandlers[data.second.action](data.first, data.second.body);
for (auto &data : _outputQueue) {
lockQueue.lock();
_actionToSendHandlers[data.second.action](data.first, data.second.body);
lockQueue.unlock();
}
_outputQueue.clear();
}
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
Expand All @@ -138,18 +138,18 @@ namespace Nitwork {
/* End Start Section */

/* Receive Section */
bool Nitwork::startReceiveHandler() {
void Nitwork::startReceiveHandler() {

std::cout << "Starting receive handler" << std::endl;
_socket.async_receive_from(
boost::asio::buffer(&_headerPacket, HEADER_SIZE),
boost::asio::buffer(_receiveBuffer),
_clientEndpoint,
boost::bind(
&Nitwork::headerHandler,
this,
boost::asio::placeholders::bytes_transferred,
boost::asio::placeholders::error
)
)
);
}

Expand All @@ -159,60 +159,45 @@ namespace Nitwork {
startReceiveHandler();
return;
}
if (bytes_received != sizeof(struct header_s)) {
std::cerr << "Error: header not received" << std::endl;
startReceiveHandler();
return;
}
if (_headerPacket.nb_action > MAX_NB_ACTION || _headerPacket.nb_action < 0) {
std::cerr << "Error: too many actions received or no action" << std::endl;
startReceiveHandler();
return;
}
std::cout << "header received" << std::endl;
for (int i = 0; i < _headerPacket.nb_action; i++) {
handleBodyAction(_clientEndpoint);
try {
auto *header =
reinterpret_cast<struct header_s *>(_receiveBuffer.data());
if (bytes_received < sizeof(struct header_s)) {
std::cerr << "Error: header not received" << std::endl;
startReceiveHandler();
return;
}
if (header->nb_action > MAX_NB_ACTION || header->nb_action < 0) {
std::cerr << "Error: too many actions received or no action"
<< std::endl;
startReceiveHandler();
return;
}
std::cout << "header received" << std::endl;
for (int i = 0; i < header->nb_action; i++) {
handleBodyAction(*header, _clientEndpoint);
}
} catch (std::exception &e) {
std::cerr << "Error: " << e.what() << std::endl;
}
}

void Nitwork::handleBodyAction(const boost::asio::ip::udp::endpoint &endpoint) {
_socket.async_receive_from(
boost::asio::buffer(&_actionPacket, sizeof(struct action_s)),
_endpoint,
boost::bind(
&Nitwork::handleBodyActionData,
this,
endpoint,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
}

void Nitwork::handleBodyActionData(const boost::asio::ip::udp::endpoint &endpoint, const boost::system::error_code& error, const std::size_t bytes_received) {
if (error) {
std::cerr << "Error: " << error.message() << std::endl;
startReceiveHandler();
return;
}
if (bytes_received != sizeof(enum n_actionType_t)) {
std::cerr << "Error: body not received" << std::endl;
startReceiveHandler();
return;
}
void Nitwork::handleBodyAction(const struct header_s header, const boost::asio::ip::udp::endpoint &endpoint) {
auto *action = reinterpret_cast<struct action_s *>(_receiveBuffer.data() + sizeof(struct header_s));
std::cout << "action.magick: " << action->magick << std::endl;
auto endPointIt = std::find(_endpoints.begin(), _endpoints.end(), endpoint);
if (endPointIt == _endpoints.end() && _actionPacket.magick != INIT) {
if (endPointIt == _endpoints.end() && action->magick != INIT) {
std::cerr << "Error: endpoint not found" << std::endl;
startReceiveHandler();
return;
}
auto it = _actionsHandlers.find(_actionPacket.magick);
auto it = _actionsHandlers.find(action->magick);
if (it == _actionsHandlers.end()) {
std::cerr << "Error: action not found" << std::endl;
startReceiveHandler();
return;
}
it->second.first(_headerPacket, it->second.second);
it->second.first(header, it->second.second);
startReceiveHandler();
}
/* End Receive Section */
Expand Down Expand Up @@ -242,6 +227,7 @@ namespace Nitwork {
return;
}
_endpoints.emplace_back(endpoint);

}

void Nitwork::handleReadyMsg(const std::any &msg, boost::asio::ip::udp::endpoint &endpoint) {
Expand Down
27 changes: 12 additions & 15 deletions src/Nitwork/Nitwork.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <condition_variable>
#include <mutex>
#include <boost/bind/bind.hpp>
#include <boost/array.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/asio.hpp>
#include "Nitwork.h"
Expand Down Expand Up @@ -53,13 +54,11 @@ namespace Nitwork {
void startOutputHandler();

// start receive handler
bool startReceiveHandler();
void startReceiveHandler();
// handler func for receive handler which handle the header
void headerHandler(std::size_t bytes_received, const boost::system::error_code& error);
// handler func for headerHandler which handle the action
void handleBodyAction(const boost::asio::ip::udp::endpoint &endpoint);
// handler func for handleBodyAction which handle the body
void handleBodyActionData(const boost::asio::ip::udp::endpoint &endpoint, const boost::system::error_code& error, const std::size_t bytes_received);
void handleBodyAction(const struct header_s header, const boost::asio::ip::udp::endpoint &endpoint);
public:

void stop();
Expand All @@ -68,9 +67,6 @@ namespace Nitwork {


// Method which handle clock and unlock client threads each n ticks
void clockThread(int tick);

void clientsDatasHandler();

template<typename T>
void sendDatasToEndpoint(boost::asio::ip::udp::endpoint &endpoint, T &datas) {
Expand All @@ -88,24 +84,26 @@ namespace Nitwork {
void handleBodyDatas(const struct header_s &header, const std::function<void(const std::any &, boost::asio::ip::udp::endpoint &)> &handler, B body, const boost::system::error_code& error, const std::size_t bytes_received) {
if (error) {
std::cerr << "Error: " << error.message() << std::endl;
startReceiveHandler();
return;
}
if (bytes_received != sizeof(B)) {
std::cerr << "Error: body not received" << std::endl;
startReceiveHandler();
return;
}
std::unique_lock<std::mutex> lock(_inputQueueMutex);
std::lock_guard<std::mutex> lock(_inputQueueMutex);
clientData_s clientData = { _endpoint, std::any(body) };
lock.unlock();
std::cout << "adding action to queue" << std::endl;
_actions.emplace_back(clientData, handler);
lock.lock();
std::cout << "action added to queue" << std::endl;
startReceiveHandler();
}

template<typename B>
void handleBody(const struct header_s &header, const std::function<void(const std::any &, boost::asio::ip::udp::endpoint &)> &handler) {
B body;

_socket.async_receive_from(boost::asio::buffer(&body, sizeof(B)), _endpoint, boost::bind(&Nitwork::handleBodyDatas<B>, this, header, handler, body, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
B *body = reinterpret_cast<B *>(_receiveBuffer.data() + sizeof(struct header_s) + sizeof(struct action_s));
handleBodyDatas<B>(header, handler, *body, boost::system::error_code(), sizeof(B));
}


Expand All @@ -127,8 +125,6 @@ namespace Nitwork {
});
}

void outputHandler();

void handleInitMsg(const std::any &msg, boost::asio::ip::udp::endpoint &endpoint);

void handleReadyMsg(const std::any &msg, boost::asio::ip::udp::endpoint &endpoint);
Expand Down Expand Up @@ -167,6 +163,7 @@ namespace Nitwork {
struct msgInit_s _initPacket = { 'N' }; // A packet which will be used to receive the init message
struct msgReady_s _readyPacket = { 'N' }; // A packet which will be used to receive the ready message
boost::asio::ip::udp::endpoint _clientEndpoint; // An endpoint which will be used to receive the actions
boost::array<char, 1024> _receiveBuffer; // A buffer which will be used to receive the actions

// Actions ids
std::array<id_t, MAX_NB_ACTION> _ids{}; // An array of ids which will be used to identify the actions
Expand Down

0 comments on commit 3be7af5

Please sign in to comment.