Skip to content

Commit

Permalink
Thread safe peer list access
Browse files Browse the repository at this point in the history
  • Loading branch information
Zitrax committed Dec 10, 2024
1 parent 4db6f32 commit 2ce0567
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 13 deletions.
37 changes: 26 additions & 11 deletions src/torrent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,7 @@ std::vector<std::shared_ptr<Peer>> Torrent::tracker_request(
}

void Torrent::start() {
scoped_lock lock(m_peers_mutex);
if (!m_peers.empty()) {
throw runtime_error("Local peer vector not empty");
}
Expand Down Expand Up @@ -940,8 +941,11 @@ void Torrent::run() {
const auto is_stopped = [](auto& p) { return p->io_service().stopped(); };
while (!m_stopped && (!done() || !ranges::all_of(m_peers, is_stopped))) {
std::size_t ran = 0;
for (auto& p : m_peers) {
ran += p->io_service().poll_one();
{
scoped_lock lock(m_peers_mutex);
for (auto& p : m_peers) {
ran += p->io_service().poll_one();
}
}
ran += m_io_context.poll_one();
// If no handlers ran, then sleep.
Expand All @@ -959,11 +963,12 @@ void Torrent::run() {
}

void Torrent::stop() {
scoped_lock lock(m_peers_mutex);
for (auto& peer : m_peers) {
peer->stop();
}
tracker_request(TrackerEvent::STOPPED);
m_stopped = true;
tracker_request(TrackerEvent::STOPPED);
}

Torrent* Torrent::get(const Sha1& info_hash) {
Expand Down Expand Up @@ -1046,6 +1051,7 @@ void Torrent::retry_pieces() {
// To hit different peers for each invocation - shuffle the list
std::random_device rd;
std::mt19937 g(rd());
scoped_lock lock(m_peers_mutex);
shuffle(m_peers.begin(), m_peers.end(), g);

auto it = m_peers.begin();
Expand All @@ -1068,7 +1074,9 @@ void Torrent::retry_pieces() {
}
}

bool Torrent::add_peer(shared_ptr<Peer> peer) {
bool Torrent::add_peer(
shared_ptr<Peer> peer,
optional<reference_wrapper<std::vector<std::shared_ptr<Peer>>>> peers) {
const bool in_use =
std::find_if(m_peers.begin(), m_peers.end(),
[&peer](auto& existing_peer) {
Expand All @@ -1078,9 +1086,14 @@ bool Torrent::add_peer(shared_ptr<Peer> peer) {
peer->url().value().str() ==
existing_peer->url().value().str();
}) != m_peers.end();
logger()->debug("Candidate {} was inactive: {}", peer->str(), in_use);
logger()->debug("Candidate {} in use: {}", peer->str(), in_use);
if (!in_use) {
peer->handshake();
if (peers) {
peers->get().push_back(peer);
} else {
m_peers.push_back(peer);
}
return true;
}
return false;
Expand All @@ -1091,6 +1104,7 @@ void Torrent::retry_peers() {
return;
}

scoped_lock lock(m_peers_mutex);
const ScopeGuard scope_guard([this]() { schedule_retry_peers(); });

logger()->debug("Checking peers for retry");
Expand All @@ -1112,9 +1126,7 @@ void Torrent::retry_peers() {
std::vector<std::shared_ptr<Peer>> new_peers;

for (const auto& tracker_peer : tracker_peers) {
if (add_peer(tracker_peer)) {
new_peers.push_back(tracker_peer);
}
add_peer(tracker_peer, new_peers);
}

if (!new_peers.empty()) {
Expand Down Expand Up @@ -1192,9 +1204,12 @@ std::tuple<FileInfo, int64_t, int64_t> Torrent::file_at_pos(int64_t pos) const {
void Torrent::last_piece_written() {
logger()->info("{} completed. Notifying peers and tracker.", m_name);

for (auto& peer : m_peers) {
if (!peer->is_listening()) {
peer->set_am_interested(false);
{
scoped_lock lock(m_peers_mutex);
for (auto& peer : m_peers) {
if (!peer->is_listening()) {
peer->set_am_interested(false);
}
}
}

Expand Down
11 changes: 9 additions & 2 deletions src/torrent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,17 @@ class Torrent {
[[nodiscard]] auto& peers() { return m_peers; }

/**
* Add a peer to the list of connected peers.
* Add peer if there is no other peer handling the same url.
*
* @parm peer The peer to add
* @parm peers The vector to add the peer to, defaults to m_peers
*
* @return true if the peer was added
*/
bool add_peer(std::shared_ptr<Peer> peer);
bool add_peer(
std::shared_ptr<Peer> peer,
std::optional<std::reference_wrapper<std::vector<std::shared_ptr<Peer>>>>
peers = {});

/**
* Callback that will be called whenever a piece has finished downloading.
Expand Down Expand Up @@ -495,6 +501,7 @@ class Torrent {
std::string m_peer_id{};
ListeningPort m_listening_port;
ConnectionPort m_connection_port;
std::mutex m_peers_mutex{};
std::vector<std::shared_ptr<Peer>> m_peers{};
HttpGet m_http_get;
asio::steady_timer m_retry_pieces_timer;
Expand Down

0 comments on commit 2ce0567

Please sign in to comment.