Skip to content

Commit

Permalink
refactor: NetworkServiceEvent
Browse files Browse the repository at this point in the history
  • Loading branch information
quake committed Nov 26, 2024
1 parent 0cd2daa commit 5967779
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 106 deletions.
72 changes: 34 additions & 38 deletions src/fiber/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::{
},
fiber::{
fee::{calculate_commitment_tx_fee, shutdown_tx_size},
network::{emit_service_event, sign_network_message},
network::sign_network_message,
types::{AnnouncementSignatures, Shutdown},
},
NetworkServiceEvent,
Expand Down Expand Up @@ -388,13 +388,11 @@ where
debug!("CommitmentSigned message received, but we haven't sent our commitment_signed message yet");
// Notify outside observers.
self.network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(
NetworkServiceEvent::CommitmentSignaturePending(
state.get_remote_peer_id(),
state.get_id(),
state.get_current_commitment_number(false),
),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::CommitmentSignaturePending(
state.get_remote_peer_id(),
state.get_id(),
state.get_current_commitment_number(false),
),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
Expand Down Expand Up @@ -1067,13 +1065,13 @@ where
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
self.network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::LocalCommitmentSigned(
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::LocalCommitmentSigned(
state.get_remote_peer_id(),
state.get_id(),
version,
commitment_tx,
)),
),
))
.expect("myself alive");

Expand Down Expand Up @@ -4499,13 +4497,11 @@ impl ChannelActorState {
if flags.contains(CollaboratingFundingTxFlags::COLLABRATION_COMPLETED) {
// Notify outside observers.
network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(
NetworkServiceEvent::CommitmentSignaturePending(
self.get_remote_peer_id(),
self.get_id(),
self.get_current_commitment_number(false),
),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::CommitmentSignaturePending(
self.get_remote_peer_id(),
self.get_id(),
self.get_current_commitment_number(false),
),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
Expand Down Expand Up @@ -4592,14 +4588,12 @@ impl ChannelActorState {

// Notify outside observers.
network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(
NetworkServiceEvent::RemoteCommitmentSigned(
self.get_remote_peer_id(),
self.get_id(),
num,
tx.clone(),
),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::RemoteCommitmentSigned(
self.get_remote_peer_id(),
self.get_id(),
num,
tx.clone(),
),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
Expand Down Expand Up @@ -4916,18 +4910,20 @@ impl ChannelActorState {
self.update_state_on_raa_msg(true);
self.append_remote_commitment_point(next_per_commitment_point);

emit_service_event(
network,
NetworkServiceEvent::RevokeAndAckReceived(
self.get_remote_peer_id(),
self.get_id(),
commitment_number,
x_only_aggregated_pubkey,
aggregate_signature,
output,
output_data,
),
);
network
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::RevokeAndAckReceived(
self.get_remote_peer_id(),
self.get_id(),
commitment_number,
x_only_aggregated_pubkey,
aggregate_signature,
output,
output_data,
),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);

Ok(())
}

Expand Down
98 changes: 30 additions & 68 deletions src/fiber/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,10 @@ impl NetworkActorMessage {
pub fn new_command(command: NetworkActorCommand) -> Self {
Self::Command(command)
}

pub fn new_notification(service_event: NetworkServiceEvent) -> Self {
Self::Notification(service_event)
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -596,14 +600,6 @@ pub enum NetworkActorEvent {

// A tlc remove message is received. (payment_hash, remove_tlc)
TlcRemoveReceived(Hash256, RemoveTlcReason),

/// Network service events to be sent to outside observers.
/// These events may be both present at `NetworkActorEvent` and
/// this branch of `NetworkActorEvent`. This is because some events
/// (e.g. `ChannelClosed`)require some processing internally,
/// and they are also interesting to outside observers.
/// Once we processed these events, we will send them to outside observers.
NetworkServiceEvent(NetworkServiceEvent),
}

#[derive(Copy, Clone, Debug)]
Expand All @@ -622,6 +618,7 @@ impl Default for GraphSyncerExitStatus {
pub enum NetworkActorMessage {
Command(NetworkActorCommand),
Event(NetworkActorEvent),
Notification(NetworkServiceEvent),
}

#[derive(Debug)]
Expand Down Expand Up @@ -675,10 +672,6 @@ where
}
}

pub async fn on_service_event(&self, event: NetworkServiceEvent) {
let _ = self.event_sender.send(event).await;
}

pub async fn handle_peer_message(
&self,
state: &mut NetworkActorState<S>,
Expand Down Expand Up @@ -1104,29 +1097,21 @@ where
) -> crate::Result<()> {
debug!("Handling event: {:?}", event);
match event {
NetworkActorEvent::NetworkServiceEvent(e) => {
self.on_service_event(e).await;
}
NetworkActorEvent::PeerConnected(id, pubkey, session) => {
state.on_peer_connected(&id, pubkey, &session).await;
// Notify outside observers.
myself
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::PeerConnected(
id,
session.address,
)),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::PeerConnected(id, session.address),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}
NetworkActorEvent::PeerDisconnected(id, session) => {
state.on_peer_disconnected(&id);
// Notify outside observers.
myself
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(
NetworkServiceEvent::PeerDisConnected(id, session.address),
),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::PeerDisConnected(id, session.address),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}
Expand Down Expand Up @@ -1188,12 +1173,8 @@ where

// Notify outside observers.
myself
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelReady(
peer_id,
channel_id,
channel_outpoint,
)),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::ChannelReady(peer_id, channel_id, channel_outpoint),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}
Expand Down Expand Up @@ -1248,11 +1229,9 @@ where
NetworkActorEvent::LocalCommitmentSigned(peer_id, channel_id, version, tx) => {
// Notify outside observers.
myself
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(
NetworkServiceEvent::LocalCommitmentSigned(
peer_id, channel_id, version, tx,
),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::LocalCommitmentSigned(
peer_id, channel_id, version, tx,
),
))
.expect("myself alive");
Expand Down Expand Up @@ -1661,10 +1640,8 @@ where
}
// Send a service event that manifests the syncing is done.
myself
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(
NetworkServiceEvent::SyncingCompleted,
),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::SyncingCompleted,
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}
Expand Down Expand Up @@ -3604,11 +3581,8 @@ where
debug!("Channel {:x} created", &id);
// Notify outside observers.
self.network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelCreated(
peer_id.clone(),
id,
)),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::ChannelCreated(peer_id.clone(), id),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}
Expand Down Expand Up @@ -3677,12 +3651,8 @@ where
.await;
// Notify outside observers.
self.network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelClosed(
peer_id.clone(),
*channel_id,
tx_hash,
)),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::ChannelClosed(peer_id.clone(), *channel_id, tx_hash),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}
Expand Down Expand Up @@ -3723,10 +3693,8 @@ where
// Notify outside observers.
self.network
.clone()
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(
NetworkServiceEvent::ChannelPendingToBeAccepted(peer_id, id),
),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::ChannelPendingToBeAccepted(peer_id, id),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
Ok(())
Expand Down Expand Up @@ -4019,12 +3987,12 @@ where
let control = service.control().to_owned();

myself
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::NetworkStarted(
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::NetworkStarted(
my_peer_id.clone(),
listening_addr.clone(),
announced_addrs.clone(),
)),
),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);

Expand Down Expand Up @@ -4170,6 +4138,11 @@ where
error!("Failed to handle ckb network command: {}", err);
}
}
NetworkActorMessage::Notification(event) => {
if let Err(err) = self.event_sender.send(event).await {
error!("Failed to notify outside observers: {}", err);
}
}
}
Ok(())
}
Expand Down Expand Up @@ -4317,17 +4290,6 @@ impl ServiceHandle for Handle {
}
}

pub(crate) fn emit_service_event(
network: &ActorRef<NetworkActorMessage>,
event: NetworkServiceEvent,
) {
network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(event),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}

pub async fn start_network<
S: NetworkActorStateStore
+ ChannelActorStateStore
Expand Down

0 comments on commit 5967779

Please sign in to comment.