diff --git a/.changes/fix-check-sockets.md b/.changes/fix-check-sockets.md new file mode 100644 index 0000000..5e214aa --- /dev/null +++ b/.changes/fix-check-sockets.md @@ -0,0 +1,6 @@ +--- +"qubit": patch:fix +--- + +Make sure that the subscription and channel are both still active before attempting to send data +down them. diff --git a/src/builder/rpc_builder.rs b/src/builder/rpc_builder.rs index 8703262..11b94fa 100644 --- a/src/builder/rpc_builder.rs +++ b/src/builder/rpc_builder.rs @@ -119,6 +119,12 @@ where // Recieve values on a new thread, sending them onwards to the subscription tokio::spawn(async move { while let Some(value) = rx.recv().await { + if subscription.is_closed() { + // Don't continue processing items once the web socket is + // closed + break; + } + subscription .send(SubscriptionMessage::from_json(&value).unwrap()) .await @@ -142,13 +148,15 @@ where // Run the handler, capturing each of the values sand forwarding it onwards // to the channel - handler(ctx, params) - .await - .for_each(|value| { + let mut stream = Box::pin(handler(ctx, params).await); + + while let Some(value) = stream.next().await { + if tx.send(value).await.is_ok() { count += 1; - tx.send(value).map(|result| result.unwrap()) - }) - .await; + } else { + break; + } + } // Notify that stream is closing SubscriptionCloseResponse::Notif(