Skip to content

Commit

Permalink
NITWORK: Add main loop with input/output/clock threads
Browse files Browse the repository at this point in the history
and refactor of datas structures for actions and action's functions container

MINOR
  • Loading branch information
romainpanno committed Sep 27, 2023
1 parent b8fe0ff commit 95e5cc6
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 59 deletions.
98 changes: 59 additions & 39 deletions src/Nitwork/Nitwork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace Nitwork {
_socket.open(boost::asio::ip::udp::v4());
_socket.bind(_endpoint);
ContinuousReception();
ClockThread(TICKS_PER_SECOND);
ClientsDatasHandler();
return true;
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
Expand All @@ -33,41 +35,39 @@ namespace Nitwork {
_context.stop();
_inputThread.join();
_clockThread.join();
for (auto &thread : _outputThreads) {
thread.join();
}
_outputThread.join();
}

void Nitwork::ContinuousReception() {
_inputThread = std::thread([this]() {
_context.run();
while (true) {
readDataFromEndpoint();
}
});
}

void Nitwork::ClockThread(int tick) {
_clockThread = std::thread([this]() {
_clockThread = std::thread([this, tick]() {
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000 / TICKS_PER_SECOND));
std::this_thread::sleep_for(std::chrono::milliseconds(1000 / tick));
_queueCondVar.notify_all();
}
});
}

void Nitwork::ClientsDatasHandler() {
_outputThreads = std::array<std::thread, MAX_PLAYERS>();
for (int i = 0; i < MAX_PLAYERS; i++) {
_outputThreads.at(i) = std::thread([this]() {
std::unique_lock<std::mutex> lock(_queueMutex);
while (true) {
_queueCondVar.wait(lock);
// handle actions and send it to the client
_outputThread = std::thread([this]() {
std::unique_lock<std::mutex> lock(_queueMutex);
while (true) {
_queueCondVar.wait(lock);
for (auto &action : _actions) {
action.second(action.first);
}
});
}
}
});
}


void Nitwork::handleBodyActionDatas(const struct action_s &action, const struct header_s &header, const boost::system::error_code& error, std::size_t bytes_received) {
void Nitwork::handleBodyActionData(const struct action_s &action, const struct header_s &header, const boost::asio::ip::udp::endpoint &endpoint, const boost::system::error_code& error, std::size_t bytes_received) {
if (error) {
std::cerr << "Error: " << error.message() << std::endl;
return;
Expand All @@ -76,6 +76,11 @@ namespace Nitwork {
std::cerr << "Error: body not received" << std::endl;
return;
}
auto endPointIt = std::find(_endpoints.begin(), _endpoints.end(), endpoint);
if (endPointIt == _endpoints.end() && action.magick != INIT) {
std::cerr << "Error: endpoint not found" << std::endl;
return;
}
auto it = _actionsHandlers.find(action.magick);
if (it == _actionsHandlers.end()) {
std::cerr << "Error: action not found" << std::endl;
Expand All @@ -84,51 +89,66 @@ namespace Nitwork {
it->second.first(header, it->second.second);
}

void Nitwork::handleBodyAction(const struct header_s &header) {
void Nitwork::handleBodyAction(const struct header_s &header, const boost::asio::ip::udp::endpoint &endpoint) {
struct action_s action = {NO_ACTION};

_socket.async_receive_from(
boost::asio::buffer(&action, sizeof(struct action_s)),
_endpoint,
boost::bind(
&Nitwork::handleBodyActionDatas,
&Nitwork::handleBodyActionData,
this,
action,
endpoint,
header,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
}

void Nitwork::readDataFromEndpoint(boost::asio::ip::udp::endpoint &endpoint) {
size_t header_bytes_received = 0;
struct header_s header = {0, 0, 0, 0, 0, 0};
void Nitwork::readDataFromEndpointHandler(const struct header_s &header, const boost::asio::ip::udp::endpoint &endpoint, std::size_t bytes_received, const boost::system::error_code& error) {
if (error) {
std::cerr << "Error: " << error.message() << std::endl;
return;
}
if (bytes_received != sizeof(struct header_s)) {
std::cerr << "Error: header not received" << std::endl;
return;
}
if (header.nb_action > MAX_NB_ACTION || header.nb_action < 0) {
std::cerr << "Error: too many actions received or no action" << std::endl;
return;
}
for (int i = 0; i < header.nb_action; i++) {
handleBodyAction(header, endpoint);
}
}

while (true) {
header_bytes_received = _socket.receive_from(boost::asio::buffer(&header, HEADER_SIZE), endpoint);
void Nitwork::readDataFromEndpoint() {
struct header_s header = {0, 0, 0, 0, 0, 0};
boost::asio::ip::udp::endpoint endpoint;

if (header_bytes_received != sizeof(struct header_s)) {
std::cerr << "Error: header not received" << std::endl;
continue;
}
if (header.nb_action > MAX_NB_ACTION || header.nb_action < 0) {
std::cerr << "Error: too many actions received or no action" << std::endl;
continue;
}
for (int i = 0; i < header.nb_action; i++) {
handleBodyAction(header);
}
break;
}
_socket.async_receive_from(boost::asio::buffer(&header, HEADER_SIZE), endpoint, boost::bind(&Nitwork::readDataFromEndpointHandler, this, endpoint, header, boost::asio::placeholders::bytes_transferred, boost::asio::placeholders::error));
}

void Nitwork::handleInitMsg(const std::any &msg) {
void Nitwork::handleInitMsg(const std::any &msg, boost::asio::ip::udp::endpoint &endpoint) {
const struct msgInit_s &initMsg = std::any_cast<struct msgInit_s>(msg);

std::cout << "init" << std::endl;
if (_endpoints.size() >= MAX_CLIENTS) {
std::cerr << "Error: too many clients" << std::endl;
return;
}
auto endPointIt = std::find(_endpoints.begin(), _endpoints.end(), endpoint);
if (endPointIt != _endpoints.end()) {
std::cerr << "Error: endpoint already init" << std::endl;
return;
}
_endpoints.emplace_back(endpoint);
}

void Nitwork::handleReadyMsg(const std::any &msg) {
void Nitwork::handleReadyMsg(const std::any &msg, boost::asio::ip::udp::endpoint &endpoint) {
const struct msgReady_s &readyMsg = std::any_cast<struct msgReady_s>(msg);
std::cout << "ready" << std::endl;
}
Expand Down
40 changes: 20 additions & 20 deletions src/Nitwork/Nitwork.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <boost/asio.hpp>
#include "Nitwork.h"

#define MAX_PLAYERS 4
#define MAX_CLIENTS 4

namespace Nitwork {
class Nitwork {
Expand Down Expand Up @@ -62,26 +62,27 @@ namespace Nitwork {
std::cerr << "Error: body not received" << std::endl;
return;
}
_actions.emplace_back(body);
_actionsHandler.emplace_back(handler);
_actions.emplace_back(std::make_pair(std::any(body), handler));
}

template<typename B>
void handleBody(const struct header_s &header, const std::function<void(const std::any &)> &handler) {
void handleBody(const struct header_s &header, const std::function<void(const std::any &, boost::asio::ip::udp::endpoint &)> &handler) {
B body;

_socket.async_receive_from(boost::asio::buffer(&body, sizeof(B)), _endpoint, boost::bind(&Nitwork::handleBodyDatas<B>, this, header, handler, body, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

void handleBodyActionDatas(const struct action_s &action, const struct header_s &header, const boost::system::error_code& error, std::size_t bytes_received);
void handleBodyActionData(const struct action_s &action, const struct header_s &header, const boost::asio::ip::udp::endpoint &endpoint, const boost::system::error_code& error, std::size_t bytes_received);

void handleBodyAction(const struct header_s &header);
void handleBodyAction(const struct header_s &header, const boost::asio::ip::udp::endpoint &endpoint);

void readDataFromEndpoint(boost::asio::ip::udp::endpoint &endpoint);
void readDataFromEndpointHandler(const struct header_s &header, const boost::asio::ip::udp::endpoint &endpoint, std::size_t bytes_received, const boost::system::error_code& error);

void handleInitMsg(const std::any &msg);
void readDataFromEndpoint();

void handleReadyMsg(const std::any &msg);
void handleInitMsg(const std::any &msg, boost::asio::ip::udp::endpoint &endpoint);

void handleReadyMsg(const std::any &msg, boost::asio::ip::udp::endpoint &endpoint);

protected:
private:
Expand All @@ -91,36 +92,35 @@ namespace Nitwork {
boost::asio::io_context _context; // second context which will handle the outputs (handle actions and send it, each n ticks
std::thread _inputThread; // A thread for the first context
std::thread _clockThread; // A thread for the clock which is in the second context
std::array<std::thread, MAX_PLAYERS> _outputThreads; // A thread for each player which is in the second context
std::thread _outputThread; // A thread for each player which is in the second context

// Body handler vars
std::list<std::any> _actions; // A list of actions which will be handled by the second context
std::list<std::function<void(const std::any &)>> _actionsHandler; // A list of functions which will be used to handle the actions
// Body handler var
std::list<std::pair<std::any, std::function<void(const std::any &)>>> _actions; // A list of actions which will be handled by the second context

std::mutex _queueMutex; // A mutex to lock the queue which will be used by both contexts
std::condition_variable _queueCondVar; // A condition variable to wait for the queue to be used by the second context
std::vector<boost::asio::ip::udp::endpoint> _endpoints; // A vector of endpoints which will be used to send the actions to the clients and identify them
std::list<boost::asio::ip::udp::endpoint> _endpoints; // A vector of endpoints which will be used to send the actions to the clients and identify them
std::array<id_t, MAX_NB_ACTION> _ids{}; // An array of ids which will be used to identify the actions
boost::asio::ip::udp::socket _socket; // The socket which will be used to send and receive the actions
boost::asio::ip::udp::endpoint _endpoint; // The endpoint which will be used to send and receive the actions

const std::map<
enum n_actionType_t,
std::pair<
std::function<void(const struct header_s &, const std::function<void(const std::any &)> &)>,
std::function<void(const std::any &)>
std::function<void(const struct header_s &, const std::function<void(const std::any &, boost::asio::ip::udp::endpoint &)> &)>,
std::function<void(const std::any &, boost::asio::ip::udp::endpoint &)>
>
> _actionsHandlers = {
{INIT,
std::make_pair(
std::function<void(const struct header_s &, const std::function<void(const std::any &)> &)>(std::bind(&Nitwork::handleBody<struct msgInit_s>, this, std::placeholders::_1, std::placeholders::_2)),
std::function<void(const std::any &)>(std::bind(&Nitwork::handleInitMsg, this, std::placeholders::_1))
std::function<void(const struct header_s &, const std::function<void(const std::any &, boost::asio::ip::udp::endpoint &)> &)>(std::bind(&Nitwork::handleBody<struct msgInit_s>, this, std::placeholders::_1, std::placeholders::_2)),
std::function<void(const std::any &, boost::asio::ip::udp::endpoint &)>(std::bind(&Nitwork::handleInitMsg, this, std::placeholders::_1, std::placeholders::_2))
)
},
{READY,
std::make_pair(
std::function<void(const struct header_s &, const std::function<void(const std::any &)> &)>(std::bind(&Nitwork::handleBody<struct msgReady_s>, this, std::placeholders::_1, std::placeholders::_2)),
std::function<void(const std::any &)>(std::bind(&Nitwork::handleReadyMsg, this, std::placeholders::_1))
std::function<void(const struct header_s &, const std::function<void(const std::any &, boost::asio::ip::udp::endpoint &)> &)>(std::bind(&Nitwork::handleBody<struct msgReady_s>, this, std::placeholders::_1, std::placeholders::_2)),
std::function<void(const std::any &, boost::asio::ip::udp::endpoint &)>(std::bind(&Nitwork::handleReadyMsg, this, std::placeholders::_1, std::placeholders::_2))
)
}
};
Expand Down

0 comments on commit 95e5cc6

Please sign in to comment.