From 43eb9c4ff8d1894cfc4256e8cd1d10a112bb6275 Mon Sep 17 00:00:00 2001 From: Tom Anderson <tom@ando.gq> Date: Fri, 24 May 2024 21:15:58 +1000 Subject: [PATCH] check that websocket and channel are active before sending data --- .changes/fix-check-sockets.md | 6 ++++++ src/builder/rpc_builder.rs | 20 ++++++++++++++------ 2 files changed, 20 insertions(+), 6 deletions(-) create mode 100644 .changes/fix-check-sockets.md diff --git a/.changes/fix-check-sockets.md b/.changes/fix-check-sockets.md new file mode 100644 index 0000000..5e214aa --- /dev/null +++ b/.changes/fix-check-sockets.md @@ -0,0 +1,6 @@ +--- +"qubit": patch:fix +--- + +Make sure that the subscription and channel are both still active before attempting to send data +down them. diff --git a/src/builder/rpc_builder.rs b/src/builder/rpc_builder.rs index 8703262..11b94fa 100644 --- a/src/builder/rpc_builder.rs +++ b/src/builder/rpc_builder.rs @@ -119,6 +119,12 @@ where // Recieve values on a new thread, sending them onwards to the subscription tokio::spawn(async move { while let Some(value) = rx.recv().await { + if subscription.is_closed() { + // Don't continue processing items once the web socket is + // closed + break; + } + subscription .send(SubscriptionMessage::from_json(&value).unwrap()) .await @@ -142,13 +148,15 @@ where // Run the handler, capturing each of the values sand forwarding it onwards // to the channel - handler(ctx, params) - .await - .for_each(|value| { + let mut stream = Box::pin(handler(ctx, params).await); + + while let Some(value) = stream.next().await { + if tx.send(value).await.is_ok() { count += 1; - tx.send(value).map(|result| result.unwrap()) - }) - .await; + } else { + break; + } + } // Notify that stream is closing SubscriptionCloseResponse::Notif(