Skip to content

Commit

Permalink
feat: Break the websocket wire protocol to avoid copies
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Jun 20, 2024
1 parent 58cc22a commit 4c80a04
Showing 1 changed file with 21 additions and 20 deletions.
41 changes: 21 additions & 20 deletions iroh-net/src/relay/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,29 +264,36 @@ impl Frame {
}
}

fn from_ws_vec(vec: Vec<u8>) -> anyhow::Result<Self> {
if vec.is_empty() {
bail!("error parsing relay::codec::Frame: too few bytes (0)");
}
let bytes = Bytes::from(vec);
let typ = FrameType::try_from(bytes[0])?;
let frame = Self::from_bytes(typ, bytes.slice(1..))?;
Ok(frame)
}

fn to_ws_vec(self) -> Vec<u8> {
let mut bytes = BytesMut::new();
bytes.put_u8(self.typ().into());
self.write_to(&mut bytes);
bytes.to_vec()
}

pub fn into_ws_message(self) -> std::io::Result<tungstenite::Message> {
let mut bytes = bytes::BytesMut::new();
DerpCodec.encode(self, &mut bytes)?;
Ok(tungstenite::Message::binary(bytes))
Ok(tungstenite::Message::binary(self.to_ws_vec()))
}

pub fn into_wasm_ws_message(self) -> std::io::Result<tokio_tungstenite_wasm::Message> {
let mut bytes = bytes::BytesMut::new();
DerpCodec.encode(self, &mut bytes)?;
Ok(tokio_tungstenite_wasm::Message::binary(bytes))
Ok(tokio_tungstenite_wasm::Message::binary(self.to_ws_vec()))
}

pub fn from_ws_message(
msg: Option<tungstenite::Result<tungstenite::Message>>,
) -> Option<anyhow::Result<Self>> {
match msg {
Some(Ok(tungstenite::Message::Binary(vec))) => {
let mut bytes = BytesMut::new();
bytes.extend_from_slice(&vec); // TODO(matheus23) this is slow/weird
Some(DerpCodec.decode(&mut bytes).and_then(|option| {
option.ok_or_else(|| anyhow::anyhow!("incomplete frame in websocket message"))
}))
}
Some(Ok(tungstenite::Message::Binary(vec))) => Some(Self::from_ws_vec(vec)),
Some(Ok(msg)) => {
tracing::warn!(?msg, "Got msg of unsupported type, skipping.");
None
Expand All @@ -300,13 +307,7 @@ impl Frame {
msg: Option<tokio_tungstenite_wasm::Result<tokio_tungstenite_wasm::Message>>,
) -> Option<anyhow::Result<Self>> {
match msg {
Some(Ok(tokio_tungstenite_wasm::Message::Binary(vec))) => {
let mut bytes = BytesMut::new();
bytes.extend_from_slice(&vec); // TODO(matheus23) this is slow/weird
Some(DerpCodec.decode(&mut bytes).and_then(|option| {
option.ok_or_else(|| anyhow::anyhow!("incomplete frame in websocket message"))
}))
}
Some(Ok(tokio_tungstenite_wasm::Message::Binary(vec))) => Some(Self::from_ws_vec(vec)),
Some(Ok(msg)) => {
tracing::warn!(?msg, "Got msg of unsupported type, skipping.");
None
Expand Down

0 comments on commit 4c80a04

Please sign in to comment.