Skip to content

Commit

Permalink
wip: sync improvments
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Apr 8, 2024
1 parent b2de059 commit 5956e48
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 6 deletions.
3 changes: 2 additions & 1 deletion iroh-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ impl<S: ranger::Store<SignedEntry> + PublicKeyStore + store::DownloadPolicyStore
validate_entry(system_time_now(), store, namespace, &entry, &origin)?;

let outcome = self.peer.put(entry.clone()).map_err(InsertError::Store)?;
tracing::debug!(?origin, hash = %entry.content_hash(), ?outcome, "insert");

let removed_count = match outcome {
InsertOutcome::Inserted { removed } => removed,
Expand Down Expand Up @@ -450,7 +451,7 @@ impl<S: ranger::Store<SignedEntry> + PublicKeyStore + store::DownloadPolicyStore
let download_policy = self
.peer
.store
.get_download_policy(&self.capability().id())
.get_download_policy(&self.id())
.unwrap_or_default();
let should_download = download_policy.matches(entry.entry());
Event::RemoteInsert {
Expand Down
1 change: 1 addition & 0 deletions iroh/src/sync_engine/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl GossipActor {
return Ok(false);
}
ToGossipActor::Join { namespace, peers } => {
debug!(?namespace, peers = peers.len(), "join gossip");
// join gossip for the topic to receive and send message
let fut = self
.gossip
Expand Down
28 changes: 24 additions & 4 deletions iroh/src/sync_engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
}

async fn start_sync(&mut self, namespace: NamespaceId, mut peers: Vec<NodeAddr>) -> Result<()> {
debug!(?namespace, peers = peers.len(), "start sync");
// update state to allow sync
if !self.state.is_syncing(&namespace) {
let opts = OpenOpts::default()
Expand Down Expand Up @@ -531,7 +532,7 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
};

let result_for_event = match &result {
Ok(_) => Ok(()),
Ok(details) => Ok(details.into()),
Err(err) => Err(err.to_string()),
};

Expand Down Expand Up @@ -686,8 +687,10 @@ impl<B: iroh_bytes::store::Store> LiveActor<B> {
};
debug!("incoming connection");
let sync = self.sync.clone();
self.running_sync_accept
.spawn(async move { handle_connection(sync, conn, accept_request_cb).await });
self.running_sync_accept.spawn(
async move { handle_connection(sync, conn, accept_request_cb).await }
.instrument(Span::current()),
);
}

pub fn accept_sync_request(
Expand All @@ -712,7 +715,24 @@ pub struct SyncEvent {
/// Timestamp when the sync finished
pub started: SystemTime,
/// Result of the sync operation
pub result: std::result::Result<(), String>,
pub result: std::result::Result<SyncDetails, String>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct SyncDetails {
/// Number of entries received
pub entries_received: usize,
/// Number of entries sent
pub entries_sent: usize,
}

impl From<&SyncFinished> for SyncDetails {
fn from(value: &SyncFinished) -> Self {
Self {
entries_received: value.outcome.num_recv,
entries_sent: value.outcome.num_sent,
}
}
}

#[derive(Debug, Default)]
Expand Down
2 changes: 1 addition & 1 deletion iroh/src/sync_engine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ impl NamespaceStates {
if state.start_connect(reason) {
true
} else {
debug!("abort connect: already syncing");
false
}
}
Expand Down Expand Up @@ -170,6 +169,7 @@ impl PeerState {
}

fn start_connect(&mut self, reason: SyncReason) -> bool {
debug!(?reason, "start connect");
match self.state {
// never run two syncs at the same time
SyncState::Running { .. } => {
Expand Down

0 comments on commit 5956e48

Please sign in to comment.