Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactoring NetworkServiceEvent and NetworkActorEvent #342

Merged
merged 3 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 34 additions & 38 deletions src/fiber/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::{
},
fiber::{
fee::{calculate_commitment_tx_fee, shutdown_tx_size},
network::{emit_service_event, sign_network_message},
network::sign_network_message,
types::{AnnouncementSignatures, Shutdown},
},
NetworkServiceEvent,
Expand Down Expand Up @@ -388,13 +388,11 @@ where
debug!("CommitmentSigned message received, but we haven't sent our commitment_signed message yet");
// Notify outside observers.
self.network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(
NetworkServiceEvent::CommitmentSignaturePending(
state.get_remote_peer_id(),
state.get_id(),
state.get_current_commitment_number(false),
),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::CommitmentSignaturePending(
state.get_remote_peer_id(),
state.get_id(),
state.get_current_commitment_number(false),
),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
Expand Down Expand Up @@ -1067,13 +1065,13 @@ where
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
self.network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::LocalCommitmentSigned(
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::LocalCommitmentSigned(
state.get_remote_peer_id(),
state.get_id(),
version,
commitment_tx,
)),
),
))
.expect("myself alive");

Expand Down Expand Up @@ -4499,13 +4497,11 @@ impl ChannelActorState {
if flags.contains(CollaboratingFundingTxFlags::COLLABRATION_COMPLETED) {
// Notify outside observers.
network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(
NetworkServiceEvent::CommitmentSignaturePending(
self.get_remote_peer_id(),
self.get_id(),
self.get_current_commitment_number(false),
),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::CommitmentSignaturePending(
self.get_remote_peer_id(),
self.get_id(),
self.get_current_commitment_number(false),
),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
Expand Down Expand Up @@ -4592,14 +4588,12 @@ impl ChannelActorState {

// Notify outside observers.
network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(
NetworkServiceEvent::RemoteCommitmentSigned(
self.get_remote_peer_id(),
self.get_id(),
num,
tx.clone(),
),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::RemoteCommitmentSigned(
self.get_remote_peer_id(),
self.get_id(),
num,
tx.clone(),
),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
Expand Down Expand Up @@ -4916,18 +4910,20 @@ impl ChannelActorState {
self.update_state_on_raa_msg(true);
self.append_remote_commitment_point(next_per_commitment_point);

emit_service_event(
network,
NetworkServiceEvent::RevokeAndAckReceived(
self.get_remote_peer_id(),
self.get_id(),
commitment_number,
x_only_aggregated_pubkey,
aggregate_signature,
output,
output_data,
),
);
network
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::RevokeAndAckReceived(
self.get_remote_peer_id(),
self.get_id(),
commitment_number,
x_only_aggregated_pubkey,
aggregate_signature,
output,
output_data,
),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);

Ok(())
}

Expand Down
136 changes: 35 additions & 101 deletions src/fiber/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,12 +487,14 @@ impl NetworkActorMessage {
pub fn new_command(command: NetworkActorCommand) -> Self {
Self::Command(command)
}

pub fn new_notification(service_event: NetworkServiceEvent) -> Self {
Self::Notification(service_event)
}
}

#[derive(Debug)]
pub enum NetworkServiceEvent {
ServiceError(ServiceError),
ServiceEvent(ServiceEvent),
NetworkStarted(PeerId, MultiAddr, Vec<Multiaddr>),
NetworkStopped(PeerId),
PeerConnected(PeerId, Multiaddr),
Expand Down Expand Up @@ -598,14 +600,6 @@ pub enum NetworkActorEvent {

// A tlc remove message is received. (payment_hash, remove_tlc)
TlcRemoveReceived(Hash256, RemoveTlcReason),

/// Network service events to be sent to outside observers.
/// These events may be both present at `NetworkActorEvent` and
/// this branch of `NetworkActorEvent`. This is because some events
/// (e.g. `ChannelClosed`)require some processing internally,
/// and they are also interesting to outside observers.
/// Once we processed these events, we will send them to outside observers.
NetworkServiceEvent(NetworkServiceEvent),
}

#[derive(Copy, Clone, Debug)]
Expand All @@ -624,6 +618,7 @@ impl Default for GraphSyncerExitStatus {
pub enum NetworkActorMessage {
Command(NetworkActorCommand),
Event(NetworkActorEvent),
Notification(NetworkServiceEvent),
}

#[derive(Debug)]
Expand Down Expand Up @@ -677,10 +672,6 @@ where
}
}

pub async fn on_service_event(&self, event: NetworkServiceEvent) {
let _ = self.event_sender.send(event).await;
}

pub async fn handle_peer_message(
&self,
state: &mut NetworkActorState<S>,
Expand Down Expand Up @@ -1106,39 +1097,21 @@ where
) -> crate::Result<()> {
debug!("Handling event: {:?}", event);
match event {
NetworkActorEvent::NetworkServiceEvent(e) => {
match &e {
NetworkServiceEvent::ServiceError(ServiceError::DialerError {
address,
error,
}) => {
error!("Dialer error: {:?} -> {:?}", address, error);
state.maybe_tell_syncer_peer_disconnected_multiaddr(address)
}
_ => {}
}
self.on_service_event(e).await;
}
NetworkActorEvent::PeerConnected(id, pubkey, session) => {
state.on_peer_connected(&id, pubkey, &session).await;
// Notify outside observers.
myself
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::PeerConnected(
id,
session.address,
)),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::PeerConnected(id, session.address),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}
NetworkActorEvent::PeerDisconnected(id, session) => {
state.on_peer_disconnected(&id);
// Notify outside observers.
myself
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(
NetworkServiceEvent::PeerDisConnected(id, session.address),
),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::PeerDisConnected(id, session.address),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}
Expand Down Expand Up @@ -1200,12 +1173,8 @@ where

// Notify outside observers.
myself
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelReady(
peer_id,
channel_id,
channel_outpoint,
)),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::ChannelReady(peer_id, channel_id, channel_outpoint),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}
Expand Down Expand Up @@ -1260,11 +1229,9 @@ where
NetworkActorEvent::LocalCommitmentSigned(peer_id, channel_id, version, tx) => {
// Notify outside observers.
myself
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(
NetworkServiceEvent::LocalCommitmentSigned(
peer_id, channel_id, version, tx,
),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::LocalCommitmentSigned(
peer_id, channel_id, version, tx,
),
))
.expect("myself alive");
Expand Down Expand Up @@ -1673,10 +1640,8 @@ where
}
// Send a service event that manifests the syncing is done.
myself
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(
NetworkServiceEvent::SyncingCompleted,
),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::SyncingCompleted,
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}
Expand Down Expand Up @@ -3581,19 +3546,6 @@ where
}
}

fn maybe_tell_syncer_peer_disconnected_multiaddr(&self, multiaddr: &Multiaddr) {
if let NetworkSyncStatus::Running(ref state) = self.sync_status {
if let Some(peer_id) = state
.pinned_syncing_peers
.iter()
.find(|(_p, a)| a == multiaddr)
.map(|x| &x.0)
{
self.maybe_tell_syncer_peer_disconnected(peer_id);
}
}
}

fn maybe_finish_sync(&mut self) {
if let NetworkSyncStatus::Running(state) = &self.sync_status {
// TODO: It is better to sync with a few more peers to make sure we have the latest data.
Expand Down Expand Up @@ -3629,11 +3581,8 @@ where
debug!("Channel {:x} created", &id);
// Notify outside observers.
self.network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelCreated(
peer_id.clone(),
id,
)),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::ChannelCreated(peer_id.clone(), id),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}
Expand Down Expand Up @@ -3702,12 +3651,8 @@ where
.await;
// Notify outside observers.
self.network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelClosed(
peer_id.clone(),
*channel_id,
tx_hash,
)),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::ChannelClosed(peer_id.clone(), *channel_id, tx_hash),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}
Expand Down Expand Up @@ -3748,10 +3693,8 @@ where
// Notify outside observers.
self.network
.clone()
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(
NetworkServiceEvent::ChannelPendingToBeAccepted(peer_id, id),
),
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::ChannelPendingToBeAccepted(peer_id, id),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
Ok(())
Expand Down Expand Up @@ -4044,12 +3987,12 @@ where
let control = service.control().to_owned();

myself
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::NetworkStarted(
.send_message(NetworkActorMessage::new_notification(
NetworkServiceEvent::NetworkStarted(
my_peer_id.clone(),
listening_addr.clone(),
announced_addrs.clone(),
)),
),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);

Expand Down Expand Up @@ -4195,6 +4138,11 @@ where
error!("Failed to handle ckb network command: {}", err);
}
}
NetworkActorMessage::Notification(event) => {
if let Err(err) = self.event_sender.send(event).await {
error!("Failed to notify outside observers: {}", err);
}
}
}
Ok(())
}
Expand Down Expand Up @@ -4256,12 +4204,6 @@ impl Handle {
let _ = self.actor.send_message(message);
}

fn emit_event(&self, event: NetworkServiceEvent) {
self.send_actor_message(NetworkActorMessage::Event(
NetworkActorEvent::NetworkServiceEvent(event),
));
}

fn create_meta(self, id: ProtocolId) -> ProtocolMeta {
MetaBuilder::new()
.id(id)
Expand Down Expand Up @@ -4338,24 +4280,16 @@ impl ServiceProtocol for Handle {
#[async_trait]
impl ServiceHandle for Handle {
async fn handle_error(&mut self, _context: &mut ServiceContext, error: ServiceError) {
self.emit_event(NetworkServiceEvent::ServiceError(error));
trace!("Service error: {:?}", error);
// TODO
// ServiceError::DialerError => remove address from peer store
// ServiceError::ProtocolError => ban peer
}
async fn handle_event(&mut self, _context: &mut ServiceContext, event: ServiceEvent) {
self.emit_event(NetworkServiceEvent::ServiceEvent(event));
trace!("Service event: {:?}", event);
}
}

pub(crate) fn emit_service_event(
network: &ActorRef<NetworkActorMessage>,
event: NetworkServiceEvent,
) {
network
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::NetworkServiceEvent(event),
))
.expect(ASSUME_NETWORK_MYSELF_ALIVE);
}

pub async fn start_network<
S: NetworkActorStateStore
+ ChannelActorStateStore
Expand Down