diff --git a/src/fiber/channel.rs b/src/fiber/channel.rs index ff93f181..ab1573df 100644 --- a/src/fiber/channel.rs +++ b/src/fiber/channel.rs @@ -57,7 +57,7 @@ use crate::{ }, fiber::{ fee::{calculate_commitment_tx_fee, shutdown_tx_size}, - network::{emit_service_event, sign_network_message}, + network::sign_network_message, types::{AnnouncementSignatures, Shutdown}, }, NetworkServiceEvent, @@ -388,13 +388,11 @@ where debug!("CommitmentSigned message received, but we haven't sent our commitment_signed message yet"); // Notify outside observers. self.network - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::CommitmentSignaturePending( - state.get_remote_peer_id(), - state.get_id(), - state.get_current_commitment_number(false), - ), + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::CommitmentSignaturePending( + state.get_remote_peer_id(), + state.get_id(), + state.get_current_commitment_number(false), ), )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); @@ -1067,13 +1065,13 @@ where )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); self.network - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::LocalCommitmentSigned( + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::LocalCommitmentSigned( state.get_remote_peer_id(), state.get_id(), version, commitment_tx, - )), + ), )) .expect("myself alive"); @@ -4499,13 +4497,11 @@ impl ChannelActorState { if flags.contains(CollaboratingFundingTxFlags::COLLABRATION_COMPLETED) { // Notify outside observers. network - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::CommitmentSignaturePending( - self.get_remote_peer_id(), - self.get_id(), - self.get_current_commitment_number(false), - ), + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::CommitmentSignaturePending( + self.get_remote_peer_id(), + self.get_id(), + self.get_current_commitment_number(false), ), )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); @@ -4592,14 +4588,12 @@ impl ChannelActorState { // Notify outside observers. network - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::RemoteCommitmentSigned( - self.get_remote_peer_id(), - self.get_id(), - num, - tx.clone(), - ), + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::RemoteCommitmentSigned( + self.get_remote_peer_id(), + self.get_id(), + num, + tx.clone(), ), )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); @@ -4916,18 +4910,20 @@ impl ChannelActorState { self.update_state_on_raa_msg(true); self.append_remote_commitment_point(next_per_commitment_point); - emit_service_event( - network, - NetworkServiceEvent::RevokeAndAckReceived( - self.get_remote_peer_id(), - self.get_id(), - commitment_number, - x_only_aggregated_pubkey, - aggregate_signature, - output, - output_data, - ), - ); + network + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::RevokeAndAckReceived( + self.get_remote_peer_id(), + self.get_id(), + commitment_number, + x_only_aggregated_pubkey, + aggregate_signature, + output, + output_data, + ), + )) + .expect(ASSUME_NETWORK_ACTOR_ALIVE); + Ok(()) } diff --git a/src/fiber/network.rs b/src/fiber/network.rs index 1d43c8fb..5e565249 100644 --- a/src/fiber/network.rs +++ b/src/fiber/network.rs @@ -487,12 +487,14 @@ impl NetworkActorMessage { pub fn new_command(command: NetworkActorCommand) -> Self { Self::Command(command) } + + pub fn new_notification(service_event: NetworkServiceEvent) -> Self { + Self::Notification(service_event) + } } #[derive(Debug)] pub enum NetworkServiceEvent { - ServiceError(ServiceError), - ServiceEvent(ServiceEvent), NetworkStarted(PeerId, MultiAddr, Vec), NetworkStopped(PeerId), PeerConnected(PeerId, Multiaddr), @@ -598,14 +600,6 @@ pub enum NetworkActorEvent { // A tlc remove message is received. (payment_hash, remove_tlc) TlcRemoveReceived(Hash256, RemoveTlcReason), - - /// Network service events to be sent to outside observers. - /// These events may be both present at `NetworkActorEvent` and - /// this branch of `NetworkActorEvent`. This is because some events - /// (e.g. `ChannelClosed`)require some processing internally, - /// and they are also interesting to outside observers. - /// Once we processed these events, we will send them to outside observers. - NetworkServiceEvent(NetworkServiceEvent), } #[derive(Copy, Clone, Debug)] @@ -624,6 +618,7 @@ impl Default for GraphSyncerExitStatus { pub enum NetworkActorMessage { Command(NetworkActorCommand), Event(NetworkActorEvent), + Notification(NetworkServiceEvent), } #[derive(Debug)] @@ -677,10 +672,6 @@ where } } - pub async fn on_service_event(&self, event: NetworkServiceEvent) { - let _ = self.event_sender.send(event).await; - } - pub async fn handle_peer_message( &self, state: &mut NetworkActorState, @@ -1106,28 +1097,12 @@ where ) -> crate::Result<()> { debug!("Handling event: {:?}", event); match event { - NetworkActorEvent::NetworkServiceEvent(e) => { - match &e { - NetworkServiceEvent::ServiceError(ServiceError::DialerError { - address, - error, - }) => { - error!("Dialer error: {:?} -> {:?}", address, error); - state.maybe_tell_syncer_peer_disconnected_multiaddr(address) - } - _ => {} - } - self.on_service_event(e).await; - } NetworkActorEvent::PeerConnected(id, pubkey, session) => { state.on_peer_connected(&id, pubkey, &session).await; // Notify outside observers. myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::PeerConnected( - id, - session.address, - )), + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::PeerConnected(id, session.address), )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); } @@ -1135,10 +1110,8 @@ where state.on_peer_disconnected(&id); // Notify outside observers. myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::PeerDisConnected(id, session.address), - ), + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::PeerDisConnected(id, session.address), )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); } @@ -1200,12 +1173,8 @@ where // Notify outside observers. myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelReady( - peer_id, - channel_id, - channel_outpoint, - )), + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::ChannelReady(peer_id, channel_id, channel_outpoint), )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); } @@ -1260,11 +1229,9 @@ where NetworkActorEvent::LocalCommitmentSigned(peer_id, channel_id, version, tx) => { // Notify outside observers. myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::LocalCommitmentSigned( - peer_id, channel_id, version, tx, - ), + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::LocalCommitmentSigned( + peer_id, channel_id, version, tx, ), )) .expect("myself alive"); @@ -1673,10 +1640,8 @@ where } // Send a service event that manifests the syncing is done. myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::SyncingCompleted, - ), + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::SyncingCompleted, )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); } @@ -3581,19 +3546,6 @@ where } } - fn maybe_tell_syncer_peer_disconnected_multiaddr(&self, multiaddr: &Multiaddr) { - if let NetworkSyncStatus::Running(ref state) = self.sync_status { - if let Some(peer_id) = state - .pinned_syncing_peers - .iter() - .find(|(_p, a)| a == multiaddr) - .map(|x| &x.0) - { - self.maybe_tell_syncer_peer_disconnected(peer_id); - } - } - } - fn maybe_finish_sync(&mut self) { if let NetworkSyncStatus::Running(state) = &self.sync_status { // TODO: It is better to sync with a few more peers to make sure we have the latest data. @@ -3629,11 +3581,8 @@ where debug!("Channel {:x} created", &id); // Notify outside observers. self.network - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelCreated( - peer_id.clone(), - id, - )), + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::ChannelCreated(peer_id.clone(), id), )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); } @@ -3702,12 +3651,8 @@ where .await; // Notify outside observers. self.network - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelClosed( - peer_id.clone(), - *channel_id, - tx_hash, - )), + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::ChannelClosed(peer_id.clone(), *channel_id, tx_hash), )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); } @@ -3748,10 +3693,8 @@ where // Notify outside observers. self.network .clone() - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::ChannelPendingToBeAccepted(peer_id, id), - ), + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::ChannelPendingToBeAccepted(peer_id, id), )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); Ok(()) @@ -4044,12 +3987,12 @@ where let control = service.control().to_owned(); myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::NetworkStarted( + .send_message(NetworkActorMessage::new_notification( + NetworkServiceEvent::NetworkStarted( my_peer_id.clone(), listening_addr.clone(), announced_addrs.clone(), - )), + ), )) .expect(ASSUME_NETWORK_MYSELF_ALIVE); @@ -4195,6 +4138,11 @@ where error!("Failed to handle ckb network command: {}", err); } } + NetworkActorMessage::Notification(event) => { + if let Err(err) = self.event_sender.send(event).await { + error!("Failed to notify outside observers: {}", err); + } + } } Ok(()) } @@ -4256,12 +4204,6 @@ impl Handle { let _ = self.actor.send_message(message); } - fn emit_event(&self, event: NetworkServiceEvent) { - self.send_actor_message(NetworkActorMessage::Event( - NetworkActorEvent::NetworkServiceEvent(event), - )); - } - fn create_meta(self, id: ProtocolId) -> ProtocolMeta { MetaBuilder::new() .id(id) @@ -4338,24 +4280,16 @@ impl ServiceProtocol for Handle { #[async_trait] impl ServiceHandle for Handle { async fn handle_error(&mut self, _context: &mut ServiceContext, error: ServiceError) { - self.emit_event(NetworkServiceEvent::ServiceError(error)); + trace!("Service error: {:?}", error); + // TODO + // ServiceError::DialerError => remove address from peer store + // ServiceError::ProtocolError => ban peer } async fn handle_event(&mut self, _context: &mut ServiceContext, event: ServiceEvent) { - self.emit_event(NetworkServiceEvent::ServiceEvent(event)); + trace!("Service event: {:?}", event); } } -pub(crate) fn emit_service_event( - network: &ActorRef, - event: NetworkServiceEvent, -) { - network - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent(event), - )) - .expect(ASSUME_NETWORK_MYSELF_ALIVE); -} - pub async fn start_network< S: NetworkActorStateStore + ChannelActorStateStore