Skip to content

Commit

Permalink
NITWOK: Refactor implementatin of Nitwork
Browse files Browse the repository at this point in the history
Add Interface and Abstract classes, with fonctionnal begin implementation of NitworkServer
(receive is working)

PATCH
  • Loading branch information
romainpanno committed Oct 1, 2023
1 parent ab5a328 commit 2fba378
Show file tree
Hide file tree
Showing 10 changed files with 573 additions and 507 deletions.
250 changes: 88 additions & 162 deletions src/Nitwork/Nitwork.cpp → src/Nitwork/ANitwork.cpp
Original file line number Diff line number Diff line change
@@ -1,90 +1,38 @@
/*
** EPITECH PROJECT, 2023
** R-Bus
** r-type
** File description:
** Implementation of nitwork
** ANitwork
*/

#include "Nitwork.hpp"

#include "ANitwork.hpp"
namespace Nitwork {
// NOLINTBEGIN(cppcoreguidelines-avoid-non-const-global-variables)
Nitwork Nitwork::_instance = Nitwork();
// NOLINTEND(cppcoreguidelines-avoid-non-const-global-variables)

Nitwork &Nitwork::getInstance()
{
return _instance;
}
ANitwork::ANitwork()
: _socket(_context) {}

/* Start Section */
bool Nitwork::start(int port, int threadNb, int tick)
bool ANitwork::start(int port, int threadNb, int tick)
{
try {
startReceiveHandler();
startInputHandler();
startOutputHandler();
if (!startServerConfig(port)) {
std::cerr << "Error: server config failed" << std::endl;
if (!startNitworkConfig(port)) {
std::cerr << "Error: Nitwork config failed" << std::endl;
return false;
}
if (!startServerThreads(threadNb, tick)) {
std::cerr << "Error: server threads failed" << std::endl;
if (!startNitworkThreads(threadNb, tick)) {
std::cerr << "Error: Nitwork threads failed" << std::endl;
return false;
}
std::cout << "Server started on port " << port << " on "
<< boost::asio::ip::host_name() << " with ip "
<< _endpoint.address().to_string() << std::endl;
std::cout << "Nitwork started on port " << port << std::endl;
} catch (std::exception &e) {
std::cerr << "Nitwork Error : " << e.what() << std::endl;
return false;
}
return true;
}

bool Nitwork::startServerConfig(int port)
{
_endpoint =
boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port);
_socket.open(boost::asio::ip::udp::v4());
if (!_socket.is_open()) {
std::cerr << "Error: socket not open" << std::endl;
return false;
}
_socket.bind(_endpoint);
return true;
}

bool Nitwork::startClockThread(int tick)
{
_clockThread = std::thread([this, tick]() {
while (true) {
std::this_thread::sleep_for(
std::chrono::milliseconds(ONE_SECOND / tick));
_tickConvVar.notify_all();
}
});
if (!_clockThread.joinable()) {
std::cerr << "Error: clock thread not joinable" << std::endl;
return false;
}
return true;
}

bool Nitwork::startServerThreads(int threadNb, int tick)
{
if (!startContextThreads(threadNb)) {
std::cerr << "Error: context threads failed" << std::endl;
return false;
}
if (!startClockThread(tick)) {
std::cerr << "Error: clock thread failed" << std::endl;
return false;
}
return true;
}

bool Nitwork::startContextThreads(int threadNb)
bool ANitwork::startContextThreads(int threadNb)
{
std::cout << "Starting context threads" << std::endl;
for (int i = 0; i < threadNb; i++) {
Expand All @@ -104,67 +52,63 @@ namespace Nitwork {
return true;
}

void Nitwork::startInputHandler()
bool ANitwork::startClockThread(int tick)
{
boost::asio::post(_context, [this]() {
std::unique_lock<std::mutex> lockTick(_tickMutex);

std::cout << std::endl << "Starting input handler" << std::endl;
std::cout << "Starting clock thread" << std::endl;
_clockThread = std::thread([this, tick]() {
try {
while (true) {
_tickConvVar.wait(lockTick);
for (auto &action : _actions) {
action.second(action.first.data, action.first.endpoint);
}
_actions.clear();
std::this_thread::sleep_for(std::chrono::milliseconds(tick));
std::unique_lock<std::mutex> lockTick(_tickMutex);
_tickConvVar.notify_one();
}
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
std::cerr << "Error: " << e.what() << std::endl;
}
});
if (!_clockThread.joinable()) {
std::cerr << "Error: clock thread not joinable" << std::endl;
return false;
}
return true;
}

void Nitwork::startOutputHandler()
bool ANitwork::startNitworkThreads(int threadNb, int tick)
{
boost::asio::post(_context, [this]() {
std::unique_lock<std::mutex> lockQueue(_outputQueueMutex);
std::unique_lock<std::mutex> lockTick(_tickMutex);
if (!startContextThreads(threadNb)) {
std::cerr << "Error: context threads failed" << std::endl;
return false;
}
if (!startClockThread(tick)) {
std::cerr << "Error: clock thread failed" << std::endl;
return false;
}
return true;
}

std::cout << std::endl << "Starting output handler" << std::endl;
try {
while (true) {
_tickConvVar.wait(lockTick);
for (auto &data : _outputQueue) {
lockQueue.lock();
_actionToSendHandlers[data.second.action](
data.first,
data.second.body);
lockQueue.unlock();
}
_outputQueue.clear();
}
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
});
void ANitwork::stop()
{
_context.stop();
for (auto &thread : _pool)
thread.join();
_pool.clear();
_clockThread.join();
}
/* End Start Section */

/* Receive Section */
void Nitwork::startReceiveHandler()
void ANitwork::startReceiveHandler()
{
std::cout << "Starting receive handler" << std::endl;
_socket.async_receive_from(
boost::asio::buffer(_receiveBuffer),
_clientEndpoint,
_senderEndpoint,
boost::bind(
&Nitwork::headerHandler,
&ANitwork::headerHandler,
this,
boost::asio::placeholders::bytes_transferred,
boost::asio::placeholders::error));
}

void Nitwork::headerHandler(
void ANitwork::headerHandler(
std::size_t bytes_received,
const boost::system::error_code &error)
{
Expand All @@ -189,77 +133,59 @@ namespace Nitwork {
}
std::cout << "header received" << std::endl;
for (int i = 0; i < header->nb_action; i++) {
handleBodyAction(*header, _clientEndpoint);
handleBodyAction(*header, _senderEndpoint);
}
} catch (std::exception &e) {
std::cerr << "Error: " << e.what() << std::endl;
}
}

void Nitwork::handleBodyAction(
const struct header_s header,
const boost::asio::ip::udp::endpoint &endpoint)
void ANitwork::startInputHandler()
{
auto *action = reinterpret_cast<struct action_s *>(
_receiveBuffer.data() + sizeof(struct header_s));
std::cout << "action.magick: " << action->magick << std::endl;
auto endPointIt =
std::find(_endpoints.begin(), _endpoints.end(), endpoint);
if (endPointIt == _endpoints.end() && action->magick != INIT) {
std::cerr << "Error: endpoint not found" << std::endl;
startReceiveHandler();
return;
}
auto it = _actionsHandlers.find(action->magick);
if (it == _actionsHandlers.end()) {
std::cerr << "Error: action not found" << std::endl;
startReceiveHandler();
return;
}
it->second.first(header, it->second.second);
startReceiveHandler();
}
/* End Receive Section */
boost::asio::post(_context, [this]() {
std::unique_lock<std::mutex> lockTick(_tickMutex);

/* Stop Section */
void Nitwork::stop()
{
_inputThread.join();
_clockThread.join();
_outputThread.join();
_context.stop();
std::cout << "Server stopped" << std::endl;
std::cout << std::endl << "Starting input handler" << std::endl;
try {
while (true) {
_tickConvVar.wait(lockTick);
for (auto &action : _actions) {
action.second(action.first.data, action.first.endpoint);
}
_actions.clear();
}
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
});
}
/* End Stop Section */

/* Handle packet (msg) Section */
void Nitwork::handleInitMsg(
const std::any &msg,
boost::asio::ip::udp::endpoint &endpoint)
void ANitwork::startOutputHandler()
{
const struct msgInit_s &initMsg = std::any_cast<struct msgInit_s>(msg);

std::cout << "init" << std::endl;
if (_endpoints.size() >= MAX_CLIENTS) {
std::cerr << "Error: too many clients" << std::endl;
return;
}
auto endPointIt =
std::find(_endpoints.begin(), _endpoints.end(), endpoint);
if (endPointIt != _endpoints.end()) {
std::cerr << "Error: endpoint already init" << std::endl;
return;
}
_endpoints.emplace_back(endpoint);
}
boost::asio::post(_context, [this]() {
std::unique_lock<std::mutex> lockQueue(_outputQueueMutex);
std::unique_lock<std::mutex> lockTick(_tickMutex);
const std::map<enum n_actionType_t, actionHandler> &actionToSendHandlers = getActionToSendHandlers();

void Nitwork::handleReadyMsg(
const std::any &msg,
boost::asio::ip::udp::endpoint &endpoint)
{
const struct msgReady_s &readyMsg =
std::any_cast<struct msgReady_s>(msg);
std::cout << "ready" << std::endl;
std::cout << std::endl << "Starting output handler" << std::endl;
try {
while (true) {
_tickConvVar.wait(lockTick);
for (auto &data : _outputQueue) {
lockQueue.lock();
auto it = actionToSendHandlers.find(data.second.action);
if (it == actionToSendHandlers.end()) {
std::cerr << "Error: action not found" << std::endl;
continue;
}
it->second(data.second.body, data.first);
lockQueue.unlock();
}
_outputQueue.clear();
}
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
});
}
/* End Handle packet (msg) Section */
} // namespace Nitwork
Loading

0 comments on commit 2fba378

Please sign in to comment.