-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
254 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
mod protocol; | ||
|
||
use std::{ | ||
collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, | ||
task::{Context, Poll}, | ||
}; | ||
|
||
use rust_ipfs::{ | ||
libp2p::{ | ||
core::Endpoint, | ||
swarm::{ | ||
derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionDenied, | ||
ConnectionId, FromSwarm, OneShotHandler, PollParameters, THandler, THandlerInEvent, | ||
THandlerOutEvent, ToSwarm, | ||
}, | ||
}, | ||
Keypair, Multiaddr, NetworkBehaviour, PeerId, | ||
}; | ||
|
||
use self::protocol::{IdentityProtocol, Message, WirePayload}; | ||
|
||
pub struct Behaviour { | ||
pending_events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>, | ||
|
||
connections: HashMap<PeerId, Vec<ConnectionId>>, | ||
|
||
responsive: HashSet<PeerId>, | ||
|
||
keypair: Keypair, | ||
|
||
share_platform: bool, | ||
|
||
command: futures::channel::mpsc::Receiver<()>, | ||
|
||
blocked: HashSet<PeerId>, | ||
blocked_by: HashSet<PeerId>, | ||
} | ||
|
||
impl NetworkBehaviour for Behaviour { | ||
type ConnectionHandler = OneShotHandler<IdentityProtocol, WirePayload, protocol::Message>; | ||
type ToSwarm = void::Void; | ||
|
||
fn handle_pending_inbound_connection( | ||
&mut self, | ||
_: ConnectionId, | ||
_: &Multiaddr, | ||
_: &Multiaddr, | ||
) -> Result<(), ConnectionDenied> { | ||
Ok(()) | ||
} | ||
|
||
fn handle_pending_outbound_connection( | ||
&mut self, | ||
_: ConnectionId, | ||
_: Option<PeerId>, | ||
_: &[Multiaddr], | ||
_: Endpoint, | ||
) -> Result<Vec<Multiaddr>, ConnectionDenied> { | ||
Ok(vec![]) | ||
} | ||
|
||
fn handle_established_inbound_connection( | ||
&mut self, | ||
_: ConnectionId, | ||
_: PeerId, | ||
_: &Multiaddr, | ||
_: &Multiaddr, | ||
) -> Result<THandler<Self>, ConnectionDenied> { | ||
Ok(OneShotHandler::default()) | ||
} | ||
|
||
fn handle_established_outbound_connection( | ||
&mut self, | ||
_: ConnectionId, | ||
_: PeerId, | ||
_: &Multiaddr, | ||
_: Endpoint, | ||
) -> Result<THandler<Self>, ConnectionDenied> { | ||
Ok(OneShotHandler::default()) | ||
} | ||
|
||
fn on_connection_handler_event( | ||
&mut self, | ||
peer_id: PeerId, | ||
_: ConnectionId, | ||
event: THandlerOutEvent<Self>, | ||
) { | ||
let event = match event { | ||
Message::Received { event } => event, | ||
Message::Sent => { | ||
//TODO: Await response before timing out oneshot handler | ||
return; | ||
} | ||
}; | ||
} | ||
|
||
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) { | ||
match event { | ||
FromSwarm::ConnectionEstablished(ConnectionEstablished { | ||
peer_id, | ||
connection_id, | ||
other_established, | ||
.. | ||
}) => match self.connections.entry(peer_id) { | ||
Entry::Occupied(mut entry) => { | ||
let connections = entry.get_mut(); | ||
if !connections.contains(&connection_id) { | ||
connections.push(connection_id); | ||
} | ||
} | ||
Entry::Vacant(entry) => { | ||
entry.insert(vec![connection_id]); | ||
} | ||
}, | ||
FromSwarm::ConnectionClosed(ConnectionClosed { | ||
peer_id, | ||
connection_id, | ||
.. | ||
}) => { | ||
if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) { | ||
let connections = entry.get_mut(); | ||
connections.retain(|conn| conn != &connection_id); | ||
if connections.is_empty() { | ||
entry.remove(); | ||
} | ||
} | ||
} | ||
_ => {} | ||
} | ||
} | ||
|
||
fn poll( | ||
&mut self, | ||
cx: &mut Context, | ||
_: &mut impl PollParameters, | ||
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> { | ||
if let Some(event) = self.pending_events.pop_front() { | ||
return Poll::Ready(event); | ||
} | ||
|
||
Poll::Pending | ||
} | ||
} |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
use std::{iter, marker::PhantomData}; | ||
|
||
use futures::{future::BoxFuture, AsyncRead, AsyncWrite, AsyncWriteExt}; | ||
use rust_ipfs::libp2p::{ | ||
core::{upgrade, UpgradeInfo}, | ||
InboundUpgrade, OutboundUpgrade, StreamProtocol, | ||
}; | ||
use serde::{de::DeserializeOwned, Deserialize, Serialize}; | ||
|
||
pub const PROTOCOL: StreamProtocol = StreamProtocol::new("/shuttle/identity"); | ||
|
||
const BUF_SIZE: usize = 3 * 1024 * 1024; | ||
|
||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)] | ||
#[serde(rename_all = "lowercase")] | ||
pub enum WireType { | ||
Request, | ||
Response, | ||
None, | ||
} | ||
|
||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)] | ||
#[serde(rename_all = "lowercase")] | ||
pub enum Payload {} | ||
|
||
#[derive(Serialize, Deserialize, Debug, Clone)] | ||
pub struct WirePayload { | ||
pub wire_type: WireType, | ||
pub payload: (), | ||
pub signature: Vec<u8>, | ||
} | ||
|
||
#[derive(Default, Copy, Clone, Debug)] | ||
pub struct IdentityProtocol; | ||
|
||
impl UpgradeInfo for IdentityProtocol { | ||
type Info = StreamProtocol; | ||
type InfoIter = iter::Once<Self::Info>; | ||
|
||
fn protocol_info(&self) -> Self::InfoIter { | ||
iter::once(PROTOCOL) | ||
} | ||
} | ||
|
||
impl<TSocket> InboundUpgrade<TSocket> for IdentityProtocol | ||
where | ||
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, | ||
{ | ||
type Output = WirePayload; | ||
type Error = warp::error::Error; | ||
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>; | ||
|
||
#[inline] | ||
fn upgrade_inbound(self, mut socket: TSocket, _info: Self::Info) -> Self::Future { | ||
Box::pin(async move { | ||
let packet = upgrade::read_length_prefixed(&mut socket, BUF_SIZE).await?; | ||
let message = serde_json::from_slice(&packet)?; | ||
Ok(message) | ||
}) | ||
} | ||
} | ||
|
||
impl UpgradeInfo for WirePayload { | ||
type Info = StreamProtocol; | ||
type InfoIter = iter::Once<Self::Info>; | ||
|
||
fn protocol_info(&self) -> Self::InfoIter { | ||
iter::once(PROTOCOL) | ||
} | ||
} | ||
|
||
impl<TSocket> OutboundUpgrade<TSocket> for WirePayload | ||
where | ||
TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, | ||
{ | ||
type Output = (); | ||
type Error = std::io::Error; | ||
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>; | ||
|
||
#[inline] | ||
fn upgrade_outbound(self, mut socket: TSocket, _info: Self::Info) -> Self::Future { | ||
Box::pin(async move { | ||
let bytes = serde_json::to_vec(&self) | ||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; | ||
upgrade::write_length_prefixed(&mut socket, bytes).await?; | ||
socket.close().await | ||
}) | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
#[allow(clippy::large_enum_variant)] | ||
pub enum Message { | ||
Sent, | ||
Received { event: WirePayload }, | ||
} | ||
|
||
impl From<WirePayload> for Message { | ||
#[inline] | ||
fn from(event: WirePayload) -> Self { | ||
Self::Received { event } | ||
} | ||
} | ||
|
||
impl From<()> for Message { | ||
#[inline] | ||
fn from(_: ()) -> Self { | ||
Self::Sent | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,2 @@ | ||
pub mod gateway; | ||
pub mod identity; | ||
pub mod identity; |