diff --git a/src/Nitwork/Nitwork.cpp b/src/Nitwork/Nitwork.cpp index 5b418584..9d31b5a8 100644 --- a/src/Nitwork/Nitwork.cpp +++ b/src/Nitwork/Nitwork.cpp @@ -19,6 +19,9 @@ namespace Nitwork { /* Start Section */ bool Nitwork::start(int port, int threadNb, int tick) { try { + startReceiveHandler(); + startInputHandler(); + startOutputHandler(); if (!startServerConfig(port)) { std::cerr << "Error: server config failed" << std::endl; return false; @@ -27,9 +30,6 @@ namespace Nitwork { std::cerr << "Error: server threads failed" << std::endl; return false; } - startInputHandler(); - startOutputHandler(); - startReceiveHandler(); std::cout << "Server started on port " << port << " on " << boost::asio::ip::host_name() << " with ip " << _endpoint.address().to_string() << std::endl; } catch (std::exception &e) { std::cerr << "Nitwork Error : " << e.what() << std::endl; @@ -81,7 +81,11 @@ namespace Nitwork { std::cout << "Starting context threads" << std::endl; for (int i = 0; i < threadNb; i++) { _pool.emplace_back([this]() { - _context.run(); + try { + _context.run(); + } catch (std::exception &e) { + std::cerr << "Error: " << e.what() << std::endl; + } }); if (!_pool.back().joinable()) { std::cerr << "Error: thread nb: " << i << " not joinable" << std::endl; @@ -98,12 +102,11 @@ namespace Nitwork { std::cout << std::endl << "Starting input handler" << std::endl; try { while (true) { - std::cout << "Waiting for input" << std::endl; _tickConvVar.wait(lockTick); - std::cout << "Input received" << std::endl; for (auto &action : _actions) { action.second(action.first.data, action.first.endpoint); } + _actions.clear(); } } catch (std::exception &e) { std::cerr << e.what() << std::endl; @@ -115,20 +118,17 @@ namespace Nitwork { boost::asio::post(_context, [this]() { std::unique_lock lockQueue(_outputQueueMutex); std::unique_lock lockTick(_tickMutex); - std::size_t size = 0; std::cout << std::endl << "Starting output handler" << std::endl; try { while (true) { _tickConvVar.wait(lockTick); - size = _outputQueue.size(); - for (std::size_t i = 0; i < size; i++) { - lockQueue.unlock(); - auto &data = _outputQueue.front(); - _outputQueue.pop_front(); - _actionToSendHandlers[data.second.action](data.first, data.second.body); + for (auto &data : _outputQueue) { lockQueue.lock(); + _actionToSendHandlers[data.second.action](data.first, data.second.body); + lockQueue.unlock(); } + _outputQueue.clear(); } } catch (std::exception &e) { std::cerr << e.what() << std::endl; @@ -138,18 +138,18 @@ namespace Nitwork { /* End Start Section */ /* Receive Section */ - bool Nitwork::startReceiveHandler() { + void Nitwork::startReceiveHandler() { std::cout << "Starting receive handler" << std::endl; _socket.async_receive_from( - boost::asio::buffer(&_headerPacket, HEADER_SIZE), + boost::asio::buffer(_receiveBuffer), _clientEndpoint, boost::bind( &Nitwork::headerHandler, this, boost::asio::placeholders::bytes_transferred, boost::asio::placeholders::error - ) + ) ); } @@ -159,60 +159,45 @@ namespace Nitwork { startReceiveHandler(); return; } - if (bytes_received != sizeof(struct header_s)) { - std::cerr << "Error: header not received" << std::endl; - startReceiveHandler(); - return; - } - if (_headerPacket.nb_action > MAX_NB_ACTION || _headerPacket.nb_action < 0) { - std::cerr << "Error: too many actions received or no action" << std::endl; - startReceiveHandler(); - return; - } - std::cout << "header received" << std::endl; - for (int i = 0; i < _headerPacket.nb_action; i++) { - handleBodyAction(_clientEndpoint); + try { + auto *header = + reinterpret_cast(_receiveBuffer.data()); + if (bytes_received < sizeof(struct header_s)) { + std::cerr << "Error: header not received" << std::endl; + startReceiveHandler(); + return; + } + if (header->nb_action > MAX_NB_ACTION || header->nb_action < 0) { + std::cerr << "Error: too many actions received or no action" + << std::endl; + startReceiveHandler(); + return; + } + std::cout << "header received" << std::endl; + for (int i = 0; i < header->nb_action; i++) { + handleBodyAction(*header, _clientEndpoint); + } + } catch (std::exception &e) { + std::cerr << "Error: " << e.what() << std::endl; } } - void Nitwork::handleBodyAction(const boost::asio::ip::udp::endpoint &endpoint) { - _socket.async_receive_from( - boost::asio::buffer(&_actionPacket, sizeof(struct action_s)), - _endpoint, - boost::bind( - &Nitwork::handleBodyActionData, - this, - endpoint, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred - ) - ); - } - - void Nitwork::handleBodyActionData(const boost::asio::ip::udp::endpoint &endpoint, const boost::system::error_code& error, const std::size_t bytes_received) { - if (error) { - std::cerr << "Error: " << error.message() << std::endl; - startReceiveHandler(); - return; - } - if (bytes_received != sizeof(enum n_actionType_t)) { - std::cerr << "Error: body not received" << std::endl; - startReceiveHandler(); - return; - } + void Nitwork::handleBodyAction(const struct header_s header, const boost::asio::ip::udp::endpoint &endpoint) { + auto *action = reinterpret_cast(_receiveBuffer.data() + sizeof(struct header_s)); + std::cout << "action.magick: " << action->magick << std::endl; auto endPointIt = std::find(_endpoints.begin(), _endpoints.end(), endpoint); - if (endPointIt == _endpoints.end() && _actionPacket.magick != INIT) { + if (endPointIt == _endpoints.end() && action->magick != INIT) { std::cerr << "Error: endpoint not found" << std::endl; startReceiveHandler(); return; } - auto it = _actionsHandlers.find(_actionPacket.magick); + auto it = _actionsHandlers.find(action->magick); if (it == _actionsHandlers.end()) { std::cerr << "Error: action not found" << std::endl; startReceiveHandler(); return; } - it->second.first(_headerPacket, it->second.second); + it->second.first(header, it->second.second); startReceiveHandler(); } /* End Receive Section */ @@ -242,6 +227,7 @@ namespace Nitwork { return; } _endpoints.emplace_back(endpoint); + } void Nitwork::handleReadyMsg(const std::any &msg, boost::asio::ip::udp::endpoint &endpoint) { diff --git a/src/Nitwork/Nitwork.hpp b/src/Nitwork/Nitwork.hpp index 7350c98a..cd5ef2f7 100644 --- a/src/Nitwork/Nitwork.hpp +++ b/src/Nitwork/Nitwork.hpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include #include "Nitwork.h" @@ -53,13 +54,11 @@ namespace Nitwork { void startOutputHandler(); // start receive handler - bool startReceiveHandler(); + void startReceiveHandler(); // handler func for receive handler which handle the header void headerHandler(std::size_t bytes_received, const boost::system::error_code& error); // handler func for headerHandler which handle the action - void handleBodyAction(const boost::asio::ip::udp::endpoint &endpoint); - // handler func for handleBodyAction which handle the body - void handleBodyActionData(const boost::asio::ip::udp::endpoint &endpoint, const boost::system::error_code& error, const std::size_t bytes_received); + void handleBodyAction(const struct header_s header, const boost::asio::ip::udp::endpoint &endpoint); public: void stop(); @@ -68,9 +67,6 @@ namespace Nitwork { // Method which handle clock and unlock client threads each n ticks - void clockThread(int tick); - - void clientsDatasHandler(); template void sendDatasToEndpoint(boost::asio::ip::udp::endpoint &endpoint, T &datas) { @@ -88,24 +84,26 @@ namespace Nitwork { void handleBodyDatas(const struct header_s &header, const std::function &handler, B body, const boost::system::error_code& error, const std::size_t bytes_received) { if (error) { std::cerr << "Error: " << error.message() << std::endl; + startReceiveHandler(); return; } if (bytes_received != sizeof(B)) { std::cerr << "Error: body not received" << std::endl; + startReceiveHandler(); return; } - std::unique_lock lock(_inputQueueMutex); + std::lock_guard lock(_inputQueueMutex); clientData_s clientData = { _endpoint, std::any(body) }; - lock.unlock(); + std::cout << "adding action to queue" << std::endl; _actions.emplace_back(clientData, handler); - lock.lock(); + std::cout << "action added to queue" << std::endl; + startReceiveHandler(); } template void handleBody(const struct header_s &header, const std::function &handler) { - B body; - - _socket.async_receive_from(boost::asio::buffer(&body, sizeof(B)), _endpoint, boost::bind(&Nitwork::handleBodyDatas, this, header, handler, body, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + B *body = reinterpret_cast(_receiveBuffer.data() + sizeof(struct header_s) + sizeof(struct action_s)); + handleBodyDatas(header, handler, *body, boost::system::error_code(), sizeof(B)); } @@ -127,8 +125,6 @@ namespace Nitwork { }); } - void outputHandler(); - void handleInitMsg(const std::any &msg, boost::asio::ip::udp::endpoint &endpoint); void handleReadyMsg(const std::any &msg, boost::asio::ip::udp::endpoint &endpoint); @@ -167,6 +163,7 @@ namespace Nitwork { struct msgInit_s _initPacket = { 'N' }; // A packet which will be used to receive the init message struct msgReady_s _readyPacket = { 'N' }; // A packet which will be used to receive the ready message boost::asio::ip::udp::endpoint _clientEndpoint; // An endpoint which will be used to receive the actions + boost::array _receiveBuffer; // A buffer which will be used to receive the actions // Actions ids std::array _ids{}; // An array of ids which will be used to identify the actions