diff --git a/src/maidsafe/rudp/connection.cc b/src/maidsafe/rudp/connection.cc index ee3c2f46..ca19789b 100644 --- a/src/maidsafe/rudp/connection.cc +++ b/src/maidsafe/rudp/connection.cc @@ -71,7 +71,8 @@ Connection::Connection(const std::shared_ptr &transport, state_(State::kPending), state_mutex_(), timeout_state_(TimeoutState::kConnecting), - sending_(false) { + sending_(false), + failure_functor_() { static_assert((sizeof(DataSize)) == 4, "DataSize must be 4 bytes."); timer_.expires_from_now(bptime::pos_infin); } @@ -108,10 +109,11 @@ void Connection::StartConnecting(const NodeId& peer_node_id, const ip::udp::endpoint& peer_endpoint, const std::string& validation_data, const boost::posix_time::time_duration& connect_attempt_timeout, - const boost::posix_time::time_duration& lifespan) { + const boost::posix_time::time_duration& lifespan, + const std::function& failure_functor) { strand_.dispatch(std::bind(&Connection::DoStartConnecting, shared_from_this(), peer_node_id, peer_endpoint, validation_data, connect_attempt_timeout, lifespan, - PingFunctor())); + PingFunctor(), failure_functor)); } void Connection::Ping(const NodeId& peer_node_id, @@ -119,7 +121,7 @@ void Connection::Ping(const NodeId& peer_node_id, const PingFunctor& ping_functor) { strand_.dispatch(std::bind(&Connection::DoStartConnecting, shared_from_this(), peer_node_id, peer_endpoint, "", Parameters::ping_timeout, bptime::time_duration(), - ping_functor)); + ping_functor, std::function())); } void Connection::DoStartConnecting(const NodeId& peer_node_id, @@ -127,9 +129,11 @@ void Connection::DoStartConnecting(const NodeId& peer_node_id, const std::string& validation_data, const boost::posix_time::time_duration& connect_attempt_timeout, const boost::posix_time::time_duration& lifespan, - const PingFunctor& ping_functor) { + const PingFunctor& ping_functor, + const std::function& failure_functor) { peer_node_id_ = peer_node_id; peer_endpoint_ = peer_endpoint; + failure_functor_ = failure_functor; StartTick(); StartConnect(validation_data, connect_attempt_timeout, lifespan, ping_functor); bs::error_code ignored_ec; @@ -158,6 +162,17 @@ void Connection::MarkAsDuplicateAndClose() { strand_.dispatch(std::bind(&Connection::DoClose, shared_from_this(), false)); } +std::function Connection::GetAndClearFailureFunctor() { + if (std::shared_ptr transport = transport_.lock()) { + std::function failure_functor; + failure_functor.swap(failure_functor_); + return failure_functor; + } else { + return std::function(); + } +} + + ip::udp::endpoint Connection::RemoteNatDetectionEndpoint() const { return socket_.RemoteNatDetectionEndpoint(); } diff --git a/src/maidsafe/rudp/connection.h b/src/maidsafe/rudp/connection.h index 6f655063..485135a3 100644 --- a/src/maidsafe/rudp/connection.h +++ b/src/maidsafe/rudp/connection.h @@ -71,7 +71,8 @@ class Connection : public std::enable_shared_from_this { const boost::asio::ip::udp::endpoint& peer_endpoint, const std::string& validation_data, const boost::posix_time::time_duration& connect_attempt_timeout, - const boost::posix_time::time_duration& lifespan); + const boost::posix_time::time_duration& lifespan, + const std::function& failure_functor); void Ping(const NodeId& peer_node_id, const boost::asio::ip::udp::endpoint& peer_endpoint, const std::function &ping_functor); // NOLINT (Fraser) @@ -81,6 +82,8 @@ class Connection : public std::enable_shared_from_this { // pos_infin. void MakePermanent(bool validated); void MarkAsDuplicateAndClose(); + std::function GetAndClearFailureFunctor(); + // Get the remote endpoint offered for NAT detection. boost::asio::ip::udp::endpoint RemoteNatDetectionEndpoint() const; // Helpers for debugging @@ -97,7 +100,8 @@ class Connection : public std::enable_shared_from_this { const std::string& validation_data, const boost::posix_time::time_duration& connect_attempt_timeout, const boost::posix_time::time_duration& lifespan, - const std::function &ping_functor); // NOLINT (Fraser) + const std::function& ping_functor, // NOLINT (Fraser) + const std::function& failure_functor); void DoStartSending(const std::string& encrypted_data, const std::function &message_sent_functor); // NOLINT (Fraser) @@ -149,6 +153,7 @@ class Connection : public std::enable_shared_from_this { mutable std::mutex state_mutex_; enum class TimeoutState { kConnecting, kConnected, kClosing } timeout_state_; bool sending_; + std::function failure_functor_; }; diff --git a/src/maidsafe/rudp/connection_manager.cc b/src/maidsafe/rudp/connection_manager.cc index e905ac0e..ac2ab8a2 100644 --- a/src/maidsafe/rudp/connection_manager.cc +++ b/src/maidsafe/rudp/connection_manager.cc @@ -81,11 +81,12 @@ void ConnectionManager::Connect(const NodeId& peer_id, const Endpoint& peer_endpoint, const std::string& validation_data, const bptime::time_duration& connect_attempt_timeout, - const bptime::time_duration& lifespan) { + const bptime::time_duration& lifespan, + const std::function& failure_functor) { if (std::shared_ptr transport = transport_.lock()) { ConnectionPtr connection(std::make_shared(transport, strand_, multiplexer_)); connection->StartConnecting(peer_id, peer_endpoint, validation_data, connect_attempt_timeout, - lifespan); + lifespan, failure_functor); } } diff --git a/src/maidsafe/rudp/connection_manager.h b/src/maidsafe/rudp/connection_manager.h index 848e9e76..a3787c79 100644 --- a/src/maidsafe/rudp/connection_manager.h +++ b/src/maidsafe/rudp/connection_manager.h @@ -59,7 +59,8 @@ class ConnectionManager { const boost::asio::ip::udp::endpoint& peer_endpoint, const std::string& validation_data, const boost::posix_time::time_duration& connect_attempt_timeout, - const boost::posix_time::time_duration& lifespan); + const boost::posix_time::time_duration& lifespan, + const std::function& failure_functor = nullptr); bool AddConnection(std::shared_ptr connection); bool CloseConnection(const NodeId& peer_id); diff --git a/src/maidsafe/rudp/core/multiplexer.h b/src/maidsafe/rudp/core/multiplexer.h index 72c142ee..823f1a8b 100644 --- a/src/maidsafe/rudp/core/multiplexer.h +++ b/src/maidsafe/rudp/core/multiplexer.h @@ -57,8 +57,7 @@ class Multiplexer { DispatchOp op(handler, &socket_, boost::asio::buffer(receive_buffer_), &sender_endpoint_, &dispatcher_); - socket_.async_receive_from(boost::asio::buffer(receive_buffer_), - sender_endpoint_, 0, op); + socket_.async_receive_from(boost::asio::buffer(receive_buffer_), sender_endpoint_, 0, op); } // Called by the socket objects to send a packet. Returns kSuccess if the data was sent diff --git a/src/maidsafe/rudp/managed_connections.cc b/src/maidsafe/rudp/managed_connections.cc index c5653b49..91738cef 100644 --- a/src/maidsafe/rudp/managed_connections.cc +++ b/src/maidsafe/rudp/managed_connections.cc @@ -77,8 +77,6 @@ ManagedConnections::ManagedConnections() ManagedConnections::~ManagedConnections() { { std::lock_guard lock(mutex_); - message_received_functor_ = MessageReceivedFunctor(); - connection_lost_functor_ = ConnectionLostFunctor(); for (auto connection_details : connections_) connection_details.second->Close(); connections_.clear(); diff --git a/src/maidsafe/rudp/tests/managed_connections_test.cc b/src/maidsafe/rudp/tests/managed_connections_test.cc index 74471f36..9828bd69 100644 --- a/src/maidsafe/rudp/tests/managed_connections_test.cc +++ b/src/maidsafe/rudp/tests/managed_connections_test.cc @@ -705,25 +705,16 @@ TEST_F(ManagedConnectionsTest, FUNC_API_Send) { EXPECT_EQ(node_.validation_data(), peer_messages[0]); EXPECT_EQ(nodes_[1]->validation_data(), this_node_messages[0]); - // Invalid node id - node_.ResetData(); - nodes_[1]->ResetData(); - node_.managed_connections()->Send(NodeId(), "message7", MessageSentFunctor()); - result_of_send = kSuccess; - result_arrived = false; - node_.managed_connections()->Send(NodeId(), "message8", message_sent_functor); - ASSERT_FALSE(wait_for_result()); - // Unavailable node id node_.ResetData(); nodes_[1]->ResetData(); node_.managed_connections()->Send(NodeId(NodeId::kRandomId), - "message9", + "message7", MessageSentFunctor()); result_of_send = kSuccess; result_arrived = false; node_.managed_connections()->Send(NodeId(NodeId::kRandomId), - "message10", + "message8", message_sent_functor); ASSERT_TRUE(wait_for_result()); EXPECT_EQ(kInvalidConnection, result_of_send); @@ -732,35 +723,35 @@ TEST_F(ManagedConnectionsTest, FUNC_API_Send) { node_.ResetData(); nodes_[1]->ResetData(); future_messages_at_peer = nodes_[1]->GetFutureForMessages(2); - node_.managed_connections()->Send(nodes_[1]->node_id(), "message11", MessageSentFunctor()); + node_.managed_connections()->Send(nodes_[1]->node_id(), "message9", MessageSentFunctor()); result_of_send = kConnectError; result_arrived = false; - node_.managed_connections()->Send(nodes_[1]->node_id(), "message12", message_sent_functor); + node_.managed_connections()->Send(nodes_[1]->node_id(), "message10", message_sent_functor); ASSERT_TRUE(wait_for_result()); EXPECT_EQ(kSuccess, result_of_send); ASSERT_TRUE(future_messages_at_peer.timed_wait(bptime::milliseconds(200))); messages = future_messages_at_peer.get(); ASSERT_EQ(2U, messages.size()); - EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message11")); - EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message12")); + EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message9")); + EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message10")); // Valid Send from nodes_[1] to node_ node_.ResetData(); nodes_[1]->ResetData(); future_messages_at_peer = node_.GetFutureForMessages(2); - nodes_[1]->managed_connections()->Send(node_.node_id(), "message13", + nodes_[1]->managed_connections()->Send(node_.node_id(), "message11", MessageSentFunctor()); result_of_send = kConnectError; result_arrived = false; - nodes_[1]->managed_connections()->Send(node_.node_id(), "message14", + nodes_[1]->managed_connections()->Send(node_.node_id(), "message12", message_sent_functor); ASSERT_TRUE(wait_for_result()); EXPECT_EQ(kSuccess, result_of_send); ASSERT_TRUE(future_messages_at_peer.timed_wait(bptime::milliseconds(200))); messages = future_messages_at_peer.get(); ASSERT_EQ(2U, messages.size()); - EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message13")); - EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message14")); + EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message11")); + EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message12")); // After Remove node_.ResetData(); @@ -779,10 +770,10 @@ TEST_F(ManagedConnectionsTest, FUNC_API_Send) { node_.ResetData(); nodes_[0]->ResetData(); - node_.managed_connections()->Send(nodes_[0]->node_id(), "message15", MessageSentFunctor()); + node_.managed_connections()->Send(nodes_[0]->node_id(), "message13", MessageSentFunctor()); result_of_send = kSuccess; result_arrived = false; - node_.managed_connections()->Send(nodes_[0]->node_id(), "message16", message_sent_functor); + node_.managed_connections()->Send(nodes_[0]->node_id(), "message14", message_sent_functor); ASSERT_TRUE(wait_for_result()); EXPECT_EQ(kInvalidConnection, result_of_send); @@ -797,7 +788,7 @@ TEST_F(ManagedConnectionsTest, FUNC_API_Send) { sent_message, message_sent_functor); ASSERT_TRUE(cond_var.wait_for(lock, - std::chrono::seconds(10), + std::chrono::seconds(20), [&result_arrived]() { return result_arrived; })); // NOLINT (Fraser) EXPECT_EQ(kSuccess, result_of_send); ASSERT_TRUE(future_messages_at_peer.timed_wait(bptime::seconds(20))); diff --git a/src/maidsafe/rudp/transport.cc b/src/maidsafe/rudp/transport.cc index 32497bd8..ded211c5 100644 --- a/src/maidsafe/rudp/transport.cc +++ b/src/maidsafe/rudp/transport.cc @@ -230,54 +230,20 @@ void Transport::DoConnect(const NodeId& peer_id, if (!multiplexer_->IsOpen()) return; - // TODO(Fraser#5#): 2012-09-11 - This code block is largely copied from ConnectToBootstrapEndpoint - // - move to separate function. if (IsValid(peer_endpoint_pair.external)) { - // Temporarily connect to the signals until the connect attempt has succeeded or failed. - boost::mutex local_mutex; - boost::condition_variable local_cond_var; - bool slot_called(false); - bool timed_out_connecting(false); - auto slot_connection_added(on_connection_added_.connect( - [&](const NodeId& connected_peer_id, - TransportPtr /*transport*/, - bool /*temporary_connection*/, - bool& /*is_duplicate_normal_connection*/) { - boost::mutex::scoped_lock local_lock(local_mutex); - if (peer_id == connected_peer_id) { - slot_called = true; - local_cond_var.notify_one(); - } - }, boost::signals2::at_back)); - auto slot_connection_lost(on_connection_lost_.connect( - [&](const NodeId& connected_peer_id, - TransportPtr /*transport*/, - bool /*temporary_connection*/, - bool timed_out) { - boost::mutex::scoped_lock local_lock(local_mutex); - if (peer_id == connected_peer_id) { - slot_called = true; - timed_out_connecting = timed_out; - local_cond_var.notify_one(); - } - }, boost::signals2::at_back)); - - boost::mutex::scoped_lock lock(local_mutex); + std::function failure_functor; + if (peer_endpoint_pair.local != peer_endpoint_pair.external) { + failure_functor = [=] { + if (!multiplexer_->IsOpen()) + return; + connection_manager_->Connect(peer_id, peer_endpoint_pair.local, validation_data, + Parameters::rendezvous_connect_timeout, bptime::pos_infin); + }; + } connection_manager_->Connect(peer_id, peer_endpoint_pair.external, validation_data, - Parameters::rendezvous_connect_timeout, bptime::pos_infin); - - bool success(local_cond_var.timed_wait( - lock, - Parameters::bootstrap_connect_timeout + bptime::seconds(1), - [&] { return slot_called; })); // NOLINT (Fraser) - slot_connection_added.disconnect(); - slot_connection_lost.disconnect(); - - if (success && !timed_out_connecting) - return; - } - - if (peer_endpoint_pair.local != peer_endpoint_pair.external) { + Parameters::rendezvous_connect_timeout, bptime::pos_infin, + failure_functor); + } else { connection_manager_->Connect(peer_id, peer_endpoint_pair.local, validation_data, Parameters::rendezvous_connect_timeout, bptime::pos_infin); } @@ -374,6 +340,9 @@ void Transport::AddConnection(ConnectionPtr connection) { } void Transport::DoAddConnection(ConnectionPtr connection) { + // Discard failure_functor + connection->GetAndClearFailureFunctor(); + bool is_duplicate_normal_connection(false); on_connection_added_(connection->Socket().PeerNodeId(), shared_from_this(), @@ -417,6 +386,12 @@ void Transport::DoRemoveConnection(ConnectionPtr connection, bool timed_out) { // execution of the slot. if (connection->state() != Connection::State::kTemporary) connection_manager_->RemoveConnection(connection); + + // If the connection has a failure_functor, invoke that, otherwise invoke on_connection_lost_. + auto failure_functor(connection->GetAndClearFailureFunctor()); + if (failure_functor) + return failure_functor(); + if (connection->state() != Connection::State::kDuplicate) { on_connection_lost_(connection->Socket().PeerNodeId(), shared_from_this(),