Skip to content

Commit

Permalink
Fix for hanging connection attempt.
Browse files Browse the repository at this point in the history
Fixed ManagedConnectionsTest.FUNC_API_Send test.
Fixes #1 maidsafe-archive#5
  • Loading branch information
Fraser999 committed Oct 2, 2012
1 parent 560e6d5 commit 067e04a
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 82 deletions.
25 changes: 20 additions & 5 deletions src/maidsafe/rudp/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ Connection::Connection(const std::shared_ptr<Transport> &transport,
state_(State::kPending),
state_mutex_(),
timeout_state_(TimeoutState::kConnecting),
sending_(false) {
sending_(false),
failure_functor_() {
static_assert((sizeof(DataSize)) == 4, "DataSize must be 4 bytes.");
timer_.expires_from_now(bptime::pos_infin);
}
Expand Down Expand Up @@ -108,28 +109,31 @@ void Connection::StartConnecting(const NodeId& peer_node_id,
const ip::udp::endpoint& peer_endpoint,
const std::string& validation_data,
const boost::posix_time::time_duration& connect_attempt_timeout,
const boost::posix_time::time_duration& lifespan) {
const boost::posix_time::time_duration& lifespan,
const std::function<void()>& failure_functor) {
strand_.dispatch(std::bind(&Connection::DoStartConnecting, shared_from_this(), peer_node_id,
peer_endpoint, validation_data, connect_attempt_timeout, lifespan,
PingFunctor()));
PingFunctor(), failure_functor));
}

void Connection::Ping(const NodeId& peer_node_id,
const ip::udp::endpoint& peer_endpoint,
const PingFunctor& ping_functor) {
strand_.dispatch(std::bind(&Connection::DoStartConnecting, shared_from_this(), peer_node_id,
peer_endpoint, "", Parameters::ping_timeout, bptime::time_duration(),
ping_functor));
ping_functor, std::function<void()>()));
}

void Connection::DoStartConnecting(const NodeId& peer_node_id,
const ip::udp::endpoint& peer_endpoint,
const std::string& validation_data,
const boost::posix_time::time_duration& connect_attempt_timeout,
const boost::posix_time::time_duration& lifespan,
const PingFunctor& ping_functor) {
const PingFunctor& ping_functor,
const std::function<void()>& failure_functor) {
peer_node_id_ = peer_node_id;
peer_endpoint_ = peer_endpoint;
failure_functor_ = failure_functor;
StartTick();
StartConnect(validation_data, connect_attempt_timeout, lifespan, ping_functor);
bs::error_code ignored_ec;
Expand Down Expand Up @@ -158,6 +162,17 @@ void Connection::MarkAsDuplicateAndClose() {
strand_.dispatch(std::bind(&Connection::DoClose, shared_from_this(), false));
}

std::function<void()> Connection::GetAndClearFailureFunctor() {
if (std::shared_ptr<Transport> transport = transport_.lock()) {
std::function<void()> failure_functor;
failure_functor.swap(failure_functor_);
return failure_functor;
} else {
return std::function<void()>();
}
}


ip::udp::endpoint Connection::RemoteNatDetectionEndpoint() const {
return socket_.RemoteNatDetectionEndpoint();
}
Expand Down
9 changes: 7 additions & 2 deletions src/maidsafe/rudp/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class Connection : public std::enable_shared_from_this<Connection> {
const boost::asio::ip::udp::endpoint& peer_endpoint,
const std::string& validation_data,
const boost::posix_time::time_duration& connect_attempt_timeout,
const boost::posix_time::time_duration& lifespan);
const boost::posix_time::time_duration& lifespan,
const std::function<void()>& failure_functor);
void Ping(const NodeId& peer_node_id,
const boost::asio::ip::udp::endpoint& peer_endpoint,
const std::function<void(int)> &ping_functor); // NOLINT (Fraser)
Expand All @@ -81,6 +82,8 @@ class Connection : public std::enable_shared_from_this<Connection> {
// pos_infin.
void MakePermanent(bool validated);
void MarkAsDuplicateAndClose();
std::function<void()> GetAndClearFailureFunctor();

// Get the remote endpoint offered for NAT detection.
boost::asio::ip::udp::endpoint RemoteNatDetectionEndpoint() const;
// Helpers for debugging
Expand All @@ -97,7 +100,8 @@ class Connection : public std::enable_shared_from_this<Connection> {
const std::string& validation_data,
const boost::posix_time::time_duration& connect_attempt_timeout,
const boost::posix_time::time_duration& lifespan,
const std::function<void(int)> &ping_functor); // NOLINT (Fraser)
const std::function<void(int)>& ping_functor, // NOLINT (Fraser)
const std::function<void()>& failure_functor);
void DoStartSending(const std::string& encrypted_data,
const std::function<void(int)> &message_sent_functor); // NOLINT (Fraser)

Expand Down Expand Up @@ -149,6 +153,7 @@ class Connection : public std::enable_shared_from_this<Connection> {
mutable std::mutex state_mutex_;
enum class TimeoutState { kConnecting, kConnected, kClosing } timeout_state_;
bool sending_;
std::function<void()> failure_functor_;
};


Expand Down
5 changes: 3 additions & 2 deletions src/maidsafe/rudp/connection_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ void ConnectionManager::Connect(const NodeId& peer_id,
const Endpoint& peer_endpoint,
const std::string& validation_data,
const bptime::time_duration& connect_attempt_timeout,
const bptime::time_duration& lifespan) {
const bptime::time_duration& lifespan,
const std::function<void()>& failure_functor) {
if (std::shared_ptr<Transport> transport = transport_.lock()) {
ConnectionPtr connection(std::make_shared<Connection>(transport, strand_, multiplexer_));
connection->StartConnecting(peer_id, peer_endpoint, validation_data, connect_attempt_timeout,
lifespan);
lifespan, failure_functor);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/maidsafe/rudp/connection_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class ConnectionManager {
const boost::asio::ip::udp::endpoint& peer_endpoint,
const std::string& validation_data,
const boost::posix_time::time_duration& connect_attempt_timeout,
const boost::posix_time::time_duration& lifespan);
const boost::posix_time::time_duration& lifespan,
const std::function<void()>& failure_functor = nullptr);

bool AddConnection(std::shared_ptr<Connection> connection);
bool CloseConnection(const NodeId& peer_id);
Expand Down
3 changes: 1 addition & 2 deletions src/maidsafe/rudp/core/multiplexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ class Multiplexer {
DispatchOp<DispatchHandler> op(handler, &socket_,
boost::asio::buffer(receive_buffer_),
&sender_endpoint_, &dispatcher_);
socket_.async_receive_from(boost::asio::buffer(receive_buffer_),
sender_endpoint_, 0, op);
socket_.async_receive_from(boost::asio::buffer(receive_buffer_), sender_endpoint_, 0, op);
}

// Called by the socket objects to send a packet. Returns kSuccess if the data was sent
Expand Down
2 changes: 0 additions & 2 deletions src/maidsafe/rudp/managed_connections.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ ManagedConnections::ManagedConnections()
ManagedConnections::~ManagedConnections() {
{
std::lock_guard<std::mutex> lock(mutex_);
message_received_functor_ = MessageReceivedFunctor();
connection_lost_functor_ = ConnectionLostFunctor();
for (auto connection_details : connections_)
connection_details.second->Close();
connections_.clear();
Expand Down
35 changes: 13 additions & 22 deletions src/maidsafe/rudp/tests/managed_connections_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -705,25 +705,16 @@ TEST_F(ManagedConnectionsTest, FUNC_API_Send) {
EXPECT_EQ(node_.validation_data(), peer_messages[0]);
EXPECT_EQ(nodes_[1]->validation_data(), this_node_messages[0]);

// Invalid node id
node_.ResetData();
nodes_[1]->ResetData();
node_.managed_connections()->Send(NodeId(), "message7", MessageSentFunctor());
result_of_send = kSuccess;
result_arrived = false;
node_.managed_connections()->Send(NodeId(), "message8", message_sent_functor);
ASSERT_FALSE(wait_for_result());

// Unavailable node id
node_.ResetData();
nodes_[1]->ResetData();
node_.managed_connections()->Send(NodeId(NodeId::kRandomId),
"message9",
"message7",
MessageSentFunctor());
result_of_send = kSuccess;
result_arrived = false;
node_.managed_connections()->Send(NodeId(NodeId::kRandomId),
"message10",
"message8",
message_sent_functor);
ASSERT_TRUE(wait_for_result());
EXPECT_EQ(kInvalidConnection, result_of_send);
Expand All @@ -732,35 +723,35 @@ TEST_F(ManagedConnectionsTest, FUNC_API_Send) {
node_.ResetData();
nodes_[1]->ResetData();
future_messages_at_peer = nodes_[1]->GetFutureForMessages(2);
node_.managed_connections()->Send(nodes_[1]->node_id(), "message11", MessageSentFunctor());
node_.managed_connections()->Send(nodes_[1]->node_id(), "message9", MessageSentFunctor());
result_of_send = kConnectError;
result_arrived = false;
node_.managed_connections()->Send(nodes_[1]->node_id(), "message12", message_sent_functor);
node_.managed_connections()->Send(nodes_[1]->node_id(), "message10", message_sent_functor);
ASSERT_TRUE(wait_for_result());
EXPECT_EQ(kSuccess, result_of_send);
ASSERT_TRUE(future_messages_at_peer.timed_wait(bptime::milliseconds(200)));
messages = future_messages_at_peer.get();
ASSERT_EQ(2U, messages.size());
EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message11"));
EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message12"));
EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message9"));
EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message10"));

// Valid Send from nodes_[1] to node_
node_.ResetData();
nodes_[1]->ResetData();
future_messages_at_peer = node_.GetFutureForMessages(2);
nodes_[1]->managed_connections()->Send(node_.node_id(), "message13",
nodes_[1]->managed_connections()->Send(node_.node_id(), "message11",
MessageSentFunctor());
result_of_send = kConnectError;
result_arrived = false;
nodes_[1]->managed_connections()->Send(node_.node_id(), "message14",
nodes_[1]->managed_connections()->Send(node_.node_id(), "message12",
message_sent_functor);
ASSERT_TRUE(wait_for_result());
EXPECT_EQ(kSuccess, result_of_send);
ASSERT_TRUE(future_messages_at_peer.timed_wait(bptime::milliseconds(200)));
messages = future_messages_at_peer.get();
ASSERT_EQ(2U, messages.size());
EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message13"));
EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message14"));
EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message11"));
EXPECT_NE(messages.end(), std::find(messages.begin(), messages.end(), "message12"));

// After Remove
node_.ResetData();
Expand All @@ -779,10 +770,10 @@ TEST_F(ManagedConnectionsTest, FUNC_API_Send) {

node_.ResetData();
nodes_[0]->ResetData();
node_.managed_connections()->Send(nodes_[0]->node_id(), "message15", MessageSentFunctor());
node_.managed_connections()->Send(nodes_[0]->node_id(), "message13", MessageSentFunctor());
result_of_send = kSuccess;
result_arrived = false;
node_.managed_connections()->Send(nodes_[0]->node_id(), "message16", message_sent_functor);
node_.managed_connections()->Send(nodes_[0]->node_id(), "message14", message_sent_functor);
ASSERT_TRUE(wait_for_result());
EXPECT_EQ(kInvalidConnection, result_of_send);

Expand All @@ -797,7 +788,7 @@ TEST_F(ManagedConnectionsTest, FUNC_API_Send) {
sent_message,
message_sent_functor);
ASSERT_TRUE(cond_var.wait_for(lock,
std::chrono::seconds(10),
std::chrono::seconds(20),
[&result_arrived]() { return result_arrived; })); // NOLINT (Fraser)
EXPECT_EQ(kSuccess, result_of_send);
ASSERT_TRUE(future_messages_at_peer.timed_wait(bptime::seconds(20)));
Expand Down
67 changes: 21 additions & 46 deletions src/maidsafe/rudp/transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,54 +230,20 @@ void Transport::DoConnect(const NodeId& peer_id,
if (!multiplexer_->IsOpen())
return;

// TODO(Fraser#5#): 2012-09-11 - This code block is largely copied from ConnectToBootstrapEndpoint
// - move to separate function.
if (IsValid(peer_endpoint_pair.external)) {
// Temporarily connect to the signals until the connect attempt has succeeded or failed.
boost::mutex local_mutex;
boost::condition_variable local_cond_var;
bool slot_called(false);
bool timed_out_connecting(false);
auto slot_connection_added(on_connection_added_.connect(
[&](const NodeId& connected_peer_id,
TransportPtr /*transport*/,
bool /*temporary_connection*/,
bool& /*is_duplicate_normal_connection*/) {
boost::mutex::scoped_lock local_lock(local_mutex);
if (peer_id == connected_peer_id) {
slot_called = true;
local_cond_var.notify_one();
}
}, boost::signals2::at_back));
auto slot_connection_lost(on_connection_lost_.connect(
[&](const NodeId& connected_peer_id,
TransportPtr /*transport*/,
bool /*temporary_connection*/,
bool timed_out) {
boost::mutex::scoped_lock local_lock(local_mutex);
if (peer_id == connected_peer_id) {
slot_called = true;
timed_out_connecting = timed_out;
local_cond_var.notify_one();
}
}, boost::signals2::at_back));

boost::mutex::scoped_lock lock(local_mutex);
std::function<void()> failure_functor;
if (peer_endpoint_pair.local != peer_endpoint_pair.external) {
failure_functor = [=] {
if (!multiplexer_->IsOpen())
return;
connection_manager_->Connect(peer_id, peer_endpoint_pair.local, validation_data,
Parameters::rendezvous_connect_timeout, bptime::pos_infin);
};
}
connection_manager_->Connect(peer_id, peer_endpoint_pair.external, validation_data,
Parameters::rendezvous_connect_timeout, bptime::pos_infin);

bool success(local_cond_var.timed_wait(
lock,
Parameters::bootstrap_connect_timeout + bptime::seconds(1),
[&] { return slot_called; })); // NOLINT (Fraser)
slot_connection_added.disconnect();
slot_connection_lost.disconnect();

if (success && !timed_out_connecting)
return;
}

if (peer_endpoint_pair.local != peer_endpoint_pair.external) {
Parameters::rendezvous_connect_timeout, bptime::pos_infin,
failure_functor);
} else {
connection_manager_->Connect(peer_id, peer_endpoint_pair.local, validation_data,
Parameters::rendezvous_connect_timeout, bptime::pos_infin);
}
Expand Down Expand Up @@ -374,6 +340,9 @@ void Transport::AddConnection(ConnectionPtr connection) {
}

void Transport::DoAddConnection(ConnectionPtr connection) {
// Discard failure_functor
connection->GetAndClearFailureFunctor();

bool is_duplicate_normal_connection(false);
on_connection_added_(connection->Socket().PeerNodeId(),
shared_from_this(),
Expand Down Expand Up @@ -417,6 +386,12 @@ void Transport::DoRemoveConnection(ConnectionPtr connection, bool timed_out) {
// execution of the slot.
if (connection->state() != Connection::State::kTemporary)
connection_manager_->RemoveConnection(connection);

// If the connection has a failure_functor, invoke that, otherwise invoke on_connection_lost_.
auto failure_functor(connection->GetAndClearFailureFunctor());
if (failure_functor)
return failure_functor();

if (connection->state() != Connection::State::kDuplicate) {
on_connection_lost_(connection->Socket().PeerNodeId(),
shared_from_this(),
Expand Down

0 comments on commit 067e04a

Please sign in to comment.