Skip to content

Commit

Permalink
NITWORK: Fix architecture in order to match with asio boost logic
Browse files Browse the repository at this point in the history
PATCH
  • Loading branch information
romainpanno committed Sep 29, 2023
1 parent 1beee34 commit d7f8b4c
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 104 deletions.
234 changes: 153 additions & 81 deletions src/Nitwork/Nitwork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,143 +16,217 @@ namespace Nitwork {
return _instance;
}

bool Nitwork::isRunning() const {
return _isRunning;
}

bool Nitwork::start(int port) {
/* Start Section */
bool Nitwork::start(int port, int threadNb, int tick) {
try {
_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port);
_socket.open(boost::asio::ip::udp::v4());
_socket.bind(_endpoint);
inputHandler();
clockThread(TICKS_PER_SECOND);
outputHandler();
_isRunning = true;
std::cout << "Server started" << std::endl;
return true;
if (!startServerConfig(port)) {
std::cerr << "Error: server config failed" << std::endl;
return false;
}
startInputHandler();
startOutputHandler();
if (!startServerThreads(threadNb, tick)) {
std::cerr << "Error: server threads failed" << std::endl;
return false;
}
std::cout << "Server started on port " << port << " on " << boost::asio::ip::host_name() << " with ip " << _endpoint.address().to_string() << std::endl;
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
std::cerr << "Nitwork Error : " << e.what() << std::endl;
return false;
}
return true;
}

void Nitwork::stop() {
_inputThread.join();
_clockThread.join();
_outputThread.join();
_isRunning = false;
std::cout << "Server stopped" << std::endl;
bool Nitwork::startServerConfig(int port)
{
_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port);
_socket.open(boost::asio::ip::udp::v4());
if (!_socket.is_open()) {
std::cerr << "Error: socket not open" << std::endl;
return false;
}
_socket.bind(_endpoint);
return true;
}

void Nitwork::inputHandler() {
_inputThread = std::thread([this]() {
std::unique_lock<std::mutex> lock(_queueMutex);
bool Nitwork::startClockThread(int tick) {
_clockThread = std::thread([this, tick]() {
while (true) {
_queueCondVar.wait(lock);
// std::cout << "input handler" << std::endl;
readDataFromEndpoint();
for (auto &action : _actions) {
action.second(action.first.data, action.first.endpoint);
}
std::this_thread::sleep_for(std::chrono::milliseconds(ONE_SECOND / tick));
_tickConvVar.notify_all();
}
});
if (!_clockThread.joinable()) {
std::cerr << "Error: clock thread not joinable" << std::endl;
return false;
}
return true;
}

void Nitwork::clockThread(int tick) {
_clockThread = std::thread([this, tick]() {
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000 / tick));
_queueCondVar.notify_all();
bool Nitwork::startServerThreads(int threadNb, int tick)
{
if (!startContextThreads(threadNb)) {
std::cerr << "Error: context threads failed" << std::endl;
return false;
}
if (!startClockThread(tick)) {
std::cerr << "Error: clock thread failed" << std::endl;
return false;
}
return true;
}

bool Nitwork::startContextThreads(int threadNb) {
std::cout << "Starting context threads" << std::endl;
for (int i = 0; i < threadNb; i++) {
_pool.emplace_back([this]() {
_context.run();
});
if (!_pool.back().joinable()) {
std::cerr << "Error: thread nb: " << i << " not joinable" << std::endl;
return false;
}
}
return true;
}

void Nitwork::startInputHandler() {
boost::asio::post(_context, [this]() {
std::unique_lock<std::mutex> lockTick(_tickMutex);

std::cout << std::endl << "Starting input handler" << std::endl;
try {
while (true) {
_tickConvVar.wait(lockTick);
for (auto &action : _actions) {
action.second(action.first.data, action.first.endpoint);
}
}
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
});
}

void Nitwork::outputHandler() {
_outputThread = std::thread([this]() {
std::unique_lock<std::mutex> lock(_queueMutex);
void Nitwork::startOutputHandler() {
boost::asio::post([this]() {
std::unique_lock<std::mutex> lockQueue(_outputQueueMutex);
std::unique_lock<std::mutex> lockTick(_tickMutex);
std::size_t size = 0;
while (true) {
_queueCondVar.wait(lock);
// std::cout << "output handler" << std::endl;
size = _outputQueue.size();
for (std::size_t i = 0; i < size; i++) {
lock.unlock();
auto &data = _outputQueue.front();
_outputQueue.pop_front();
_actionToSendHandlers[data.second.action](data.first, data.second.body);
lock.lock();

std::cout << std::endl << "Starting output handler" << std::endl;
try {
while (true) {
_tickConvVar.wait(lockTick);
size = _outputQueue.size();
for (std::size_t i = 0; i < size; i++) {
lockQueue.unlock();
auto &data = _outputQueue.front();
_outputQueue.pop_front();
_actionToSendHandlers[data.second.action](data.first, data.second.body);
lockQueue.lock();
}
}
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
});
}
/* End Start Section */

/* Receive Section */
bool Nitwork::startReceiveHandler() {
boost::asio::ip::udp::endpoint endpoint;

void Nitwork::handleBodyActionData(const struct action_s &action, const struct header_s &header, const boost::asio::ip::udp::endpoint &endpoint, const boost::system::error_code& error, std::size_t bytes_received) {
std::cout << "Starting receive handler" << std::endl;
_socket.async_receive_from(
boost::asio::buffer(&_headerPacket, HEADER_SIZE),
endpoint,
boost::bind(
&Nitwork::headerHandler,
this,
endpoint,
boost::asio::placeholders::bytes_transferred,
boost::asio::placeholders::error
)
);
}

void Nitwork::headerHandler(const boost::asio::ip::udp::endpoint &endpoint, const std::size_t bytes_received, const boost::system::error_code& error) {
if (error) {
std::cerr << "Error: " << error.message() << std::endl;
startReceiveHandler();
return;
}
if (bytes_received != sizeof(enum n_actionType_t)) {
std::cerr << "Error: body not received" << std::endl;
if (bytes_received != sizeof(struct header_s)) {
std::cerr << "Error: header not received" << std::endl;
startReceiveHandler();
return;
}
auto endPointIt = std::find(_endpoints.begin(), _endpoints.end(), endpoint);
if (endPointIt == _endpoints.end() && action.magick != INIT) {
std::cerr << "Error: endpoint not found" << std::endl;
if (_headerPacket.nb_action > MAX_NB_ACTION || _headerPacket.nb_action < 0) {
std::cerr << "Error: too many actions received or no action" << std::endl;
startReceiveHandler();
return;
}
auto it = _actionsHandlers.find(action.magick);
if (it == _actionsHandlers.end()) {
std::cerr << "Error: action not found" << std::endl;
return;
std::cout << "header received" << std::endl;
for (int i = 0; i < _headerPacket.nb_action; i++) {
handleBodyAction(endpoint);
}
it->second.first(header, it->second.second);
}

void Nitwork::handleBodyAction(const struct header_s &header, const boost::asio::ip::udp::endpoint &endpoint) {
struct action_s action = {NO_ACTION};

void Nitwork::handleBodyAction(const boost::asio::ip::udp::endpoint &endpoint) {
_socket.async_receive_from(
boost::asio::buffer(&action, sizeof(struct action_s)),
boost::asio::buffer(&_actionPacket, sizeof(struct action_s)),
_endpoint,
boost::bind(
&Nitwork::handleBodyActionData,
this,
action,
header,
endpoint,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
}

void Nitwork::readDataFromEndpointHandler(const struct header_s &header, const boost::asio::ip::udp::endpoint &endpoint, std::size_t bytes_received, const boost::system::error_code& error) {
void Nitwork::handleBodyActionData(const boost::asio::ip::udp::endpoint &endpoint, const boost::system::error_code& error, const std::size_t bytes_received) {
if (error) {
std::cerr << "Error: " << error.message() << std::endl;
startReceiveHandler();
return;
}
if (bytes_received != sizeof(struct header_s)) {
std::cerr << "Error: header not received" << std::endl;
if (bytes_received != sizeof(enum n_actionType_t)) {
std::cerr << "Error: body not received" << std::endl;
startReceiveHandler();
return;
}
if (header.nb_action > MAX_NB_ACTION || header.nb_action < 0) {
std::cerr << "Error: too many actions received or no action" << std::endl;
auto endPointIt = std::find(_endpoints.begin(), _endpoints.end(), endpoint);
if (endPointIt == _endpoints.end() && _actionPacket.magick != INIT) {
std::cerr << "Error: endpoint not found" << std::endl;
startReceiveHandler();
return;
}
std::cout << "header received" << std::endl;
for (int i = 0; i < header.nb_action; i++) {
handleBodyAction(header, endpoint);
auto it = _actionsHandlers.find(_actionPacket.magick);
if (it == _actionsHandlers.end()) {
std::cerr << "Error: action not found" << std::endl;
startReceiveHandler();
return;
}
it->second.first(_headerPacket, it->second.second);
startReceiveHandler();
}
/* End Receive Section */

void Nitwork::readDataFromEndpoint() {
struct header_s header = {0, 0, 0, 0, 0, 0};
boost::asio::ip::udp::endpoint endpoint;

_socket.async_receive_from(boost::asio::buffer(&header, HEADER_SIZE), endpoint, boost::bind(&Nitwork::readDataFromEndpointHandler, this, header, endpoint, boost::asio::placeholders::bytes_transferred, boost::asio::placeholders::error));
/* Stop Section */
void Nitwork::stop() {
_inputThread.join();
_clockThread.join();
_outputThread.join();
_context.stop();
std::cout << "Server stopped" << std::endl;
}
/* End Stop Section */

/* Handle packet (msg) Section */
void Nitwork::handleInitMsg(const std::any &msg, boost::asio::ip::udp::endpoint &endpoint) {
const struct msgInit_s &initMsg = std::any_cast<struct msgInit_s>(msg);

Expand All @@ -167,13 +241,11 @@ namespace Nitwork {
return;
}
_endpoints.emplace_back(endpoint);

/// A SUPPRIMER
stop();
}

void Nitwork::handleReadyMsg(const std::any &msg, boost::asio::ip::udp::endpoint &endpoint) {
const struct msgReady_s &readyMsg = std::any_cast<struct msgReady_s>(msg);
std::cout << "ready" << std::endl;
}
/* End Handle packet (msg) Section */
}
1 change: 1 addition & 0 deletions src/Nitwork/Nitwork.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#define HEADER_SIZE sizeof(struct header_s)
#define TICKS_PER_SECOND 20
#define ONE_SECOND 1000
#define MAX_NB_ACTION 16

typedef char n_magick_t;
Expand Down
Loading

0 comments on commit d7f8b4c

Please sign in to comment.