Skip to content

Commit

Permalink
dhtrunner: handle exception during initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
aberaud committed Mar 9, 2022
1 parent 714af68 commit de68a47
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 60 deletions.
133 changes: 74 additions & 59 deletions src/dhtrunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,81 +105,96 @@ DhtRunner::run(const Config& config, Context&& context)
auto expected = State::Idle;
if (not running.compare_exchange_strong(expected, State::Running)) {
if (context.logger)
context.logger->w("[runner %p] Node is already running. Call join() first before calling run() again.");
context.logger->w("[runner %p] Node is already running. Call join() first before calling run() again.", this);
return;
}

auto local4 = config.bind4;
auto local6 = config.bind6;
if (not local4 and not local6) {
if (context.logger)
context.logger->w("[runner %p] No address to bind specified in the configuration, using default addresses");
local4.setFamily(AF_INET);
local6.setFamily(AF_INET6);
}
auto state_path = config.dht_config.node_config.persist_path;
if (not state_path.empty() && (local4.getPort() == 0 || local6.getPort() == 0)) {
state_path += "_port.txt";
std::ifstream inConfig(state_path);
if (inConfig.is_open()) {
in_port_t port;
if (inConfig >> port) {
if (local4.getPort() == 0) {
if (context.logger)
context.logger->d("[runner %p] Using IPv4 port %hu from saved configuration", this, port);
local4.setPort(port);
try {
auto local4 = config.bind4;
auto local6 = config.bind6;
if (not local4 and not local6) {
if (context.logger)
context.logger->w("[runner %p] No address to bind specified in the configuration, using default addresses", this);
local4.setFamily(AF_INET);
local6.setFamily(AF_INET6);
}
auto state_path = config.dht_config.node_config.persist_path;
if (not state_path.empty() && (local4.getPort() == 0 || local6.getPort() == 0)) {
state_path += "_port.txt";
std::ifstream inConfig(state_path);
if (inConfig.is_open()) {
in_port_t port;
if (inConfig >> port) {
if (local4.getPort() == 0) {
if (context.logger)
context.logger->d("[runner %p] Using IPv4 port %hu from saved configuration", this, port);
local4.setPort(port);
}
}
}
if (inConfig >> port) {
if (local6.getPort() == 0) {
if (context.logger)
context.logger->d("[runner %p] Using IPv6 port %hu from saved configuration", this, port);
local6.setPort(port);
if (inConfig >> port) {
if (local6.getPort() == 0) {
if (context.logger)
context.logger->d("[runner %p] Using IPv6 port %hu from saved configuration", this, port);
local6.setPort(port);
}
}
}
}
}

if (not context.sock)
context.sock.reset(new net::UdpSocket(local4, local6, context.logger));

if (not state_path.empty()) {
std::ofstream outConfig(state_path);
outConfig << context.sock->getBoundRef(AF_INET).getPort() << std::endl;
outConfig << context.sock->getBoundRef(AF_INET6).getPort() << std::endl;
}
if (context.logger) {
logger_ = context.logger;
}

if (context.logger) {
logger_ = context.logger;
logger_->d("[runner %p] state changed to Running", this);
}
if (not context.sock) {
context.sock.reset(new net::UdpSocket(local4, local6, context.logger));
}

context.sock->setOnReceive([&] (net::PacketList&& pkts) {
net::PacketList ret;
{
std::lock_guard<std::mutex> lck(sock_mtx);
auto maxSize = net::RX_QUEUE_MAX_SIZE - pkts.size();
while (rcv.size() > maxSize) {
if (logger_)
logger_->e("Dropping packet: queue is full!");
rcv.pop_front();
}
if (not state_path.empty()) {
std::ofstream outConfig(state_path);
outConfig << context.sock->getBoundRef(AF_INET).getPort() << std::endl;
outConfig << context.sock->getBoundRef(AF_INET6).getPort() << std::endl;
}

rcv.splice(rcv.end(), std::move(pkts));
ret = std::move(rcv_free);
if (context.logger) {
logger_->d("[runner %p] state changed to Running", this);
}
cv.notify_all();
return ret;
});

context.sock->setOnReceive([&] (net::PacketList&& pkts) {
net::PacketList ret;
{
std::lock_guard<std::mutex> lck(sock_mtx);
auto maxSize = net::RX_QUEUE_MAX_SIZE - pkts.size();
while (rcv.size() > maxSize) {
if (logger_)
logger_->e("Dropping packet: queue is full!");
rcv.pop_front();
}

rcv.splice(rcv.end(), std::move(pkts));
ret = std::move(rcv_free);
}
cv.notify_all();
return ret;
});

#ifdef OPENDHT_PROXY_CLIENT
config_ = config;
identityAnnouncedCb_ = context.identityAnnouncedCb;
config_ = config;
identityAnnouncedCb_ = context.identityAnnouncedCb;
#endif
auto dht = std::unique_ptr<DhtInterface>(new Dht(std::move(context.sock), SecureDht::getConfig(config.dht_config), context.logger));
dht_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht), config.dht_config, std::move(context.identityAnnouncedCb), context.logger));
auto dht = std::unique_ptr<DhtInterface>(new Dht(std::move(context.sock), SecureDht::getConfig(config.dht_config), context.logger));
dht_ = std::unique_ptr<SecureDht>(new SecureDht(std::move(dht), config.dht_config, std::move(context.identityAnnouncedCb), context.logger));
enableProxy(not config.proxy_server.empty());
} catch(const std::exception& e) {
config_ = {};
identityAnnouncedCb_ = {};
dht_.reset();
#ifdef OPENDHT_PROXY_CLIENT
dht_via_proxy_.reset();
#endif
running = State::Idle;
throw std::runtime_error(std::string("Error starting the node: ") + e.what());
}

enableProxy(not config.proxy_server.empty());
if (context.logger and dht_via_proxy_) {
dht_via_proxy_->setLogger(context.logger);
}
Expand Down
2 changes: 1 addition & 1 deletion tools/dhtnode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ void cmd_loop(std::shared_ptr<DhtRunner>& node, dht_params& params
if (auto stats = proxy.second->stats())
std::cout << " " << stats->toString() << std::endl;
else
std::cout << " (stats not available yet)" << std::endl;
std::cout << " (stats not available yet)" << std::endl;
}
#endif
});
Expand Down

0 comments on commit de68a47

Please sign in to comment.