From 4c80a04efc0191457e1f0a7b208b03cc476800d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= <philipp.krueger1@gmail.com> Date: Thu, 20 Jun 2024 18:00:04 +0200 Subject: [PATCH] feat: Break the websocket wire protocol to avoid copies --- iroh-net/src/relay/codec.rs | 41 +++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/iroh-net/src/relay/codec.rs b/iroh-net/src/relay/codec.rs index 5a91db6027b..e7c81bfdde2 100644 --- a/iroh-net/src/relay/codec.rs +++ b/iroh-net/src/relay/codec.rs @@ -264,29 +264,36 @@ impl Frame { } } + fn from_ws_vec(vec: Vec<u8>) -> anyhow::Result<Self> { + if vec.is_empty() { + bail!("error parsing relay::codec::Frame: too few bytes (0)"); + } + let bytes = Bytes::from(vec); + let typ = FrameType::try_from(bytes[0])?; + let frame = Self::from_bytes(typ, bytes.slice(1..))?; + Ok(frame) + } + + fn to_ws_vec(self) -> Vec<u8> { + let mut bytes = BytesMut::new(); + bytes.put_u8(self.typ().into()); + self.write_to(&mut bytes); + bytes.to_vec() + } + pub fn into_ws_message(self) -> std::io::Result<tungstenite::Message> { - let mut bytes = bytes::BytesMut::new(); - DerpCodec.encode(self, &mut bytes)?; - Ok(tungstenite::Message::binary(bytes)) + Ok(tungstenite::Message::binary(self.to_ws_vec())) } pub fn into_wasm_ws_message(self) -> std::io::Result<tokio_tungstenite_wasm::Message> { - let mut bytes = bytes::BytesMut::new(); - DerpCodec.encode(self, &mut bytes)?; - Ok(tokio_tungstenite_wasm::Message::binary(bytes)) + Ok(tokio_tungstenite_wasm::Message::binary(self.to_ws_vec())) } pub fn from_ws_message( msg: Option<tungstenite::Result<tungstenite::Message>>, ) -> Option<anyhow::Result<Self>> { match msg { - Some(Ok(tungstenite::Message::Binary(vec))) => { - let mut bytes = BytesMut::new(); - bytes.extend_from_slice(&vec); // TODO(matheus23) this is slow/weird - Some(DerpCodec.decode(&mut bytes).and_then(|option| { - option.ok_or_else(|| anyhow::anyhow!("incomplete frame in websocket message")) - })) - } + Some(Ok(tungstenite::Message::Binary(vec))) => Some(Self::from_ws_vec(vec)), Some(Ok(msg)) => { tracing::warn!(?msg, "Got msg of unsupported type, skipping."); None @@ -300,13 +307,7 @@ impl Frame { msg: Option<tokio_tungstenite_wasm::Result<tokio_tungstenite_wasm::Message>>, ) -> Option<anyhow::Result<Self>> { match msg { - Some(Ok(tokio_tungstenite_wasm::Message::Binary(vec))) => { - let mut bytes = BytesMut::new(); - bytes.extend_from_slice(&vec); // TODO(matheus23) this is slow/weird - Some(DerpCodec.decode(&mut bytes).and_then(|option| { - option.ok_or_else(|| anyhow::anyhow!("incomplete frame in websocket message")) - })) - } + Some(Ok(tokio_tungstenite_wasm::Message::Binary(vec))) => Some(Self::from_ws_vec(vec)), Some(Ok(msg)) => { tracing::warn!(?msg, "Got msg of unsupported type, skipping."); None