Skip to content

Commit

Permalink
fix: check that websocket and channel are active before sending data (#6
Browse files Browse the repository at this point in the history
)
  • Loading branch information
andogq authored May 24, 2024
2 parents 20d436c + 43eb9c4 commit cadba74
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
6 changes: 6 additions & 0 deletions .changes/fix-check-sockets.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"qubit": patch:fix
---

Make sure that the subscription and channel are both still active before attempting to send data
down them.
20 changes: 14 additions & 6 deletions src/builder/rpc_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ where
// Recieve values on a new thread, sending them onwards to the subscription
tokio::spawn(async move {
while let Some(value) = rx.recv().await {
if subscription.is_closed() {
// Don't continue processing items once the web socket is
// closed
break;
}

subscription
.send(SubscriptionMessage::from_json(&value).unwrap())
.await
Expand All @@ -142,13 +148,15 @@ where

// Run the handler, capturing each of the values sand forwarding it onwards
// to the channel
handler(ctx, params)
.await
.for_each(|value| {
let mut stream = Box::pin(handler(ctx, params).await);

while let Some(value) = stream.next().await {
if tx.send(value).await.is_ok() {
count += 1;
tx.send(value).map(|result| result.unwrap())
})
.await;
} else {
break;
}
}

// Notify that stream is closing
SubscriptionCloseResponse::Notif(
Expand Down

0 comments on commit cadba74

Please sign in to comment.