diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f11024980..37f11917a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -101,6 +101,8 @@ jobs: - { feature: time, crate: juniper } - { feature: url, crate: juniper } - { feature: uuid, crate: juniper } + - { feature: graphql-transport-ws, crate: juniper_graphql_ws } + - { feature: graphql-ws, crate: juniper_graphql_ws } - { feature: , crate: juniper_actix } - { feature: subscriptions, crate: juniper_actix } - { feature: , crate: juniper_warp } @@ -134,7 +136,6 @@ jobs: - juniper_codegen - juniper - juniper_subscriptions - - juniper_graphql_transport_ws - juniper_graphql_ws #- juniper_actix - juniper_hyper @@ -189,7 +190,6 @@ jobs: - juniper_codegen - juniper - juniper_subscriptions - - juniper_graphql_transport_ws - juniper_graphql_ws - juniper_integration_tests - juniper_codegen_tests @@ -318,7 +318,6 @@ jobs: - juniper_codegen - juniper - juniper_subscriptions - - juniper_graphql_transport_ws - juniper_graphql_ws - juniper_actix - juniper_hyper diff --git a/Cargo.toml b/Cargo.toml index d505b21a0..212c896d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,6 @@ members = [ "juniper_iron", "juniper_rocket", "juniper_subscriptions", - "juniper_graphql_transport_ws", "juniper_graphql_ws", "juniper_warp", "juniper_actix", diff --git a/Makefile b/Makefile index 2624f9a79..50741cf76 100644 --- a/Makefile +++ b/Makefile @@ -94,7 +94,7 @@ ifeq ($(clean),yes) cargo clean endif $(eval target := $(strip $(shell cargo -vV | sed -n 's/host: //p'))) - cargo build + cargo build --all-features mdbook test book -L target/debug/deps $(strip \ $(if $(call eq,$(findstring windows,$(target)),),,\ $(shell cargo metadata -q \ diff --git a/juniper_actix/Cargo.toml b/juniper_actix/Cargo.toml index fd93a87e6..7b5d02466 100644 --- a/juniper_actix/Cargo.toml +++ b/juniper_actix/Cargo.toml @@ -22,7 +22,6 @@ rustdoc-args = ["--cfg", "docsrs"] subscriptions = [ "dep:actix", "dep:actix-web-actors", - "dep:juniper_graphql_transport_ws", "dep:juniper_graphql_ws", "dep:tokio", ] @@ -35,8 +34,7 @@ actix-web-actors = { version = "4.1", optional = true } anyhow = "1.0.47" futures = "0.3.22" juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false } -juniper_graphql_transport_ws = { version = "0.4.0-dev", path = "../juniper_graphql_transport_ws", optional = true } -juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws", optional = true } +juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws", features = ["graphql-transport-ws", "graphql-ws"], optional = true } http = "0.2.4" serde = { version = "1.0.122", features = ["derive"] } serde_json = "1.0.18" diff --git a/juniper_actix/src/lib.rs b/juniper_actix/src/lib.rs index 6d69951b6..2a476d331 100644 --- a/juniper_actix/src/lib.rs +++ b/juniper_actix/src/lib.rs @@ -183,7 +183,7 @@ pub mod subscriptions { SinkExt as _, Stream, StreamExt as _, }; use juniper::{GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue}; - use juniper_graphql_transport_ws::{ArcSchema, Init}; + use juniper_graphql_ws::{graphql_transport_ws, graphql_ws, ArcSchema, Init}; use tokio::sync::Mutex; /// Serves by auto-selecting between the @@ -194,11 +194,10 @@ pub mod subscriptions { /// The `schema` argument is your [`juniper`] schema. /// /// The `init` argument is used to provide the custom [`juniper::Context`] and additional - /// configuration for connections. This can be a - /// [`juniper_graphql_transport_ws::ConnectionConfig`] if the context and configuration are - /// already known, or it can be a closure that gets executed asynchronously whenever a client - /// sends the subscription initialization message. Using a closure allows to perform an - /// authentication based on the parameters provided by a client. + /// configuration for connections. This can be a [`juniper_graphql_ws::ConnectionConfig`] if the + /// context and configuration are already known, or it can be a closure that gets executed + /// asynchronously whenever a client sends the subscription initialization message. Using a + /// closure allows to perform an authentication based on the parameters provided by a client. /// /// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md /// [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md @@ -262,8 +261,7 @@ pub mod subscriptions { S: ScalarValue + Send + Sync + 'static, I: Init + Send, { - let (s_tx, s_rx) = - juniper_graphql_ws::Connection::new(ArcSchema(schema), init).split::(); + let (s_tx, s_rx) = graphql_ws::Connection::new(ArcSchema(schema), init).split::(); let mut resp = ws::start( Actor { @@ -285,10 +283,10 @@ pub mod subscriptions { /// Serves the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new]. /// /// The `init` argument is used to provide the context and additional configuration for - /// connections. This can be a [`juniper_graphql_transport_ws::ConnectionConfig`] if the context - /// and configuration are already known, or it can be a closure that gets executed - /// asynchronously when the client sends the `ConnectionInit` message. Using a closure allows to - /// perform an authentication based on the parameters provided by a client. + /// connections. This can be a [`juniper_graphql_ws::ConnectionConfig`] if the context and + /// configuration are already known, or it can be a closure that gets executed asynchronously + /// when the client sends the `ConnectionInit` message. Using a closure allows to perform an + /// authentication based on the parameters provided by a client. /// /// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md pub async fn graphql_transport_ws_handler( @@ -308,8 +306,8 @@ pub mod subscriptions { S: ScalarValue + Send + Sync + 'static, I: Init + Send, { - let (s_tx, s_rx) = juniper_graphql_transport_ws::Connection::new(ArcSchema(schema), init) - .split::(); + let (s_tx, s_rx) = + graphql_transport_ws::Connection::new(ArcSchema(schema), init).split::(); let mut resp = ws::start( Actor { @@ -429,7 +427,7 @@ pub mod subscriptions { fn into_ws_response(self) -> Result; } - impl IntoWsResponse for juniper_graphql_transport_ws::Output { + impl IntoWsResponse for graphql_transport_ws::Output { fn into_ws_response(self) -> Result { match self { Self::Message(msg) => serde_json::to_string(&msg).map_err(|e| ws::CloseReason { @@ -444,7 +442,7 @@ pub mod subscriptions { } } - impl IntoWsResponse for juniper_graphql_ws::ServerMessage { + impl IntoWsResponse for graphql_ws::ServerMessage { fn into_ws_response(self) -> Result { serde_json::to_string(&self).map_err(|e| ws::CloseReason { code: ws::CloseCode::Error, @@ -456,7 +454,7 @@ pub mod subscriptions { #[derive(Debug)] struct Message(ws::Message); - impl TryFrom for juniper_graphql_transport_ws::Input { + impl TryFrom for graphql_transport_ws::Input { type Error = Error; fn try_from(msg: Message) -> Result { @@ -470,7 +468,7 @@ pub mod subscriptions { } } - impl TryFrom for juniper_graphql_ws::ClientMessage { + impl TryFrom for graphql_ws::ClientMessage { type Error = Error; fn try_from(msg: Message) -> Result { diff --git a/juniper_graphql_transport_ws/CHANGELOG.md b/juniper_graphql_transport_ws/CHANGELOG.md deleted file mode 100644 index f0cc1799b..000000000 --- a/juniper_graphql_transport_ws/CHANGELOG.md +++ /dev/null @@ -1,27 +0,0 @@ -`juniper_graphql_transport_ws` changelog -======================================== - -All user visible changes to `juniper_graphql_transport_ws` crate will be documented in this file. This project uses [Semantic Versioning 2.0.0]. - - - - -## master - -### Implemented - -- [`graphql-transport-ws` GraphQL over WebSocket Protocol][proto-5.14.0] as of 5.14.0 version of [`graphql-ws` npm package]. ([#1158], [#1191], [#1022]) -- On top of 0.16 version of [`juniper` crate] and 0.17 version of [`juniper_subscriptions` crate]. - -[#1022]: /../../issues/1022 -[#1158]: /../../pull/1158 -[#1191]: /../../pull/1191 -[proto-5.14.0]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md - - - - -[`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws -[`juniper` crate]: https://docs.rs/juniper -[`juniper_subscriptions` crate]: https://docs.rs/juniper_subscriptions -[Semantic Versioning 2.0.0]: https://semver.org diff --git a/juniper_graphql_transport_ws/Cargo.toml b/juniper_graphql_transport_ws/Cargo.toml deleted file mode 100644 index 8bd0db340..000000000 --- a/juniper_graphql_transport_ws/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "juniper_graphql_transport_ws" -version = "0.4.0-dev" -edition = "2021" -rust-version = "1.65" -description = "`graphql-transport-ws` GraphQL over WebSocket Protocol implementation for `juniper` crate." -license = "BSD-2-Clause" -authors = ["Christopher Brown "] -documentation = "https://docs.rs/juniper_graphql_transport_ws" -homepage = "https://github.com/graphql-rust/juniper/tree/master/juniper_graphql_transport_ws" -repository = "https://github.com/graphql-rust/juniper" -readme = "README.md" -categories = ["asynchronous", "web-programming", "web-programming::http-server"] -keywords = ["apollo", "graphql", "graphql-transport-ws", "subscription", "websocket"] -exclude = ["/release.toml"] - -[dependencies] -juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false } -juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws" } -juniper_subscriptions = { version = "0.17.0-dev", path = "../juniper_subscriptions" } -serde = { version = "1.0.122", features = ["derive"], default-features = false } -tokio = { version = "1.0", features = ["macros", "rt", "time"], default-features = false } - -[dev-dependencies] -serde_json = "1.0.18" diff --git a/juniper_graphql_transport_ws/LICENSE b/juniper_graphql_transport_ws/LICENSE deleted file mode 100644 index c64e2be75..000000000 --- a/juniper_graphql_transport_ws/LICENSE +++ /dev/null @@ -1,25 +0,0 @@ -BSD 2-Clause License - -Copyright (c) 2018-2023, Christopher Brown -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -* Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -* Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/juniper_graphql_transport_ws/README.md b/juniper_graphql_transport_ws/README.md deleted file mode 100644 index 11946a513..000000000 --- a/juniper_graphql_transport_ws/README.md +++ /dev/null @@ -1,29 +0,0 @@ -`juniper_graphql_transport_ws` crate -==================================== - -[![Crates.io](https://img.shields.io/crates/v/juniper_graphql_transport_ws.svg?maxAge=2592000)](https://crates.io/crates/juniper_graphql_transport_ws) -[![Documentation](https://docs.rs/juniper_graphql_transport_ws/badge.svg)](https://docs.rs/juniper_graphql_transport_ws) -[![CI](https://github.com/graphql-rust/juniper/workflows/CI/badge.svg?branch=master "CI")](https://github.com/graphql-rust/juniper/actions?query=workflow%3ACI+branch%3Amaster) -[![Rust 1.65+](https://img.shields.io/badge/rustc-1.65+-lightgray.svg "Rust 1.65+")](https://blog.rust-lang.org/2022/11/03/Rust-1.65.0.html) - -- [Changelog](https://github.com/graphql-rust/juniper/blob/master/juniper_graphql_transport_ws/CHANGELOG.md) - -This crate contains an implementation of the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], as now used by [Apollo] and [`graphql-ws` npm package]. - -Implementation of the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old] may be found in the [`juniper_graphql_ws` crate]. - - - - -## License - -This project is licensed under [BSD 2-Clause License](https://github.com/graphql-rust/juniper/blob/master/juniper_graphql_transport_ws/LICENSE). - - - - -[`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws -[`juniper_graphql_ws` crate]: https://docs.rs/juniper_graphql_ws -[Apollo]: https://www.apollographql.com -[new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md -[old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md diff --git a/juniper_graphql_transport_ws/release.toml b/juniper_graphql_transport_ws/release.toml deleted file mode 100644 index ecf64dd55..000000000 --- a/juniper_graphql_transport_ws/release.toml +++ /dev/null @@ -1,24 +0,0 @@ -[[pre-release-replacements]] -file = "../juniper_actix/Cargo.toml" -exactly = 1 -search = "juniper_graphql_transport_ws = \\{ version = \"[^\"]+\"" -replace = "juniper_graphql_transport_ws = { version = \"{{version}}\"" - -[[pre-release-replacements]] -file = "../juniper_warp/Cargo.toml" -exactly = 1 -search = "juniper_graphql_transport_ws = \\{ version = \"[^\"]+\"" -replace = "juniper_graphql_transport_ws = { version = \"{{version}}\"" - -[[pre-release-replacements]] -file = "CHANGELOG.md" -max = 1 -min = 0 -search = "## master" -replace = "## [{{version}}] · {{date}}\n[{{version}}]: /../../tree/{{crate_name}}-v{{version}}/{{crate_name}}" - -[[pre-release-replacements]] -file = "README.md" -exactly = 2 -search = "graphql-rust/juniper/blob/[^/]+/" -replace = "graphql-rust/juniper/blob/{{crate_name}}-v{{version}}/" diff --git a/juniper_graphql_transport_ws/src/schema.rs b/juniper_graphql_transport_ws/src/schema.rs deleted file mode 100644 index e0eb47960..000000000 --- a/juniper_graphql_transport_ws/src/schema.rs +++ /dev/null @@ -1,2 +0,0 @@ -#[doc(inline)] -pub use juniper_graphql_ws::{ArcSchema, Schema}; diff --git a/juniper_graphql_ws/CHANGELOG.md b/juniper_graphql_ws/CHANGELOG.md index e809bdb83..5c84f57f1 100644 --- a/juniper_graphql_ws/CHANGELOG.md +++ b/juniper_graphql_ws/CHANGELOG.md @@ -10,14 +10,24 @@ All user visible changes to `juniper_graphql_ws` crate will be documented in thi ### BC Breaks +- Moved existing implementation to `graphql_ws` module implementing [legacy `graphql-ws` GraphQL over WebSocket Protocol][proto-legacy] behind `graphql-ws` Cargo feature. ([#1196]) - Switched to 0.16 version of [`juniper` crate]. - Switched to 0.17 version of [`juniper_subscriptions` crate]. +### Added + +- `graphql_transport_ws` module implementing [`graphql-transport-ws` GraphQL over WebSocket Protocol][proto-5.14.0] as of 5.14.0 version of [`graphql-ws` npm package] behind `graphql-transport-ws` Cargo feature. ([#1158], [#1191], [#1196], [#1022]) + ### Changed -- Made fields of `ConnectionConfig` public to reuse in [`juniper_graphql_transport_ws` crate]. ([#1191]) +- Made fields of `ConnectionConfig` public. ([#1191]) +[#1022]: /../../issues/1022 +[#1158]: /../../pull/1158 [#1191]: /../../pull/1191 +[#1196]: /../../pull/1196 +[proto-5.14.0]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md +[proto-legacy]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md @@ -29,7 +39,7 @@ See [old CHANGELOG](/../../blob/juniper_graphql_ws-v0.3.0/juniper_graphql_ws/CHA +[`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws [`juniper` crate]: https://docs.rs/juniper -[`juniper_graphql_transport_ws` crate]: https://docs.rs/juniper_graphql_transport_ws [`juniper_subscriptions` crate]: https://docs.rs/juniper_subscriptions [Semantic Versioning 2.0.0]: https://semver.org diff --git a/juniper_graphql_ws/Cargo.toml b/juniper_graphql_ws/Cargo.toml index 51e39e5c8..385e95121 100644 --- a/juniper_graphql_ws/Cargo.toml +++ b/juniper_graphql_ws/Cargo.toml @@ -3,17 +3,28 @@ name = "juniper_graphql_ws" version = "0.4.0-dev" edition = "2021" rust-version = "1.65" -description = "Legacy `graphql-ws` GraphQL over WebSocket Protocol implementation for `juniper` crate." +description = "GraphQL over WebSocket Protocol implementations for `juniper` crate." license = "BSD-2-Clause" -authors = ["Christopher Brown "] +authors = [ + "Christopher Brown ", + "Kai Ren ", +] documentation = "https://docs.rs/juniper_graphql_ws" homepage = "https://github.com/graphql-rust/juniper/tree/master/juniper_graphql_ws" repository = "https://github.com/graphql-rust/juniper" readme = "README.md" categories = ["asynchronous", "web-programming", "web-programming::http-server"] -keywords = ["apollo", "graphql", "graphql-ws", "subscription", "websocket"] +keywords = ["apollo", "graphql-transport-ws", "graphql-ws", "subscription", "websocket"] exclude = ["/release.toml"] +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[features] +graphql-transport-ws = [] +graphql-ws = [] + [dependencies] juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false } juniper_subscriptions = { version = "0.17.0-dev", path = "../juniper_subscriptions" } diff --git a/juniper_graphql_ws/README.md b/juniper_graphql_ws/README.md index b3a304110..5710b8cf6 100644 --- a/juniper_graphql_ws/README.md +++ b/juniper_graphql_ws/README.md @@ -8,7 +8,11 @@ - [Changelog](https://github.com/graphql-rust/juniper/blob/master/juniper_graphql_ws/CHANGELOG.md) -This crate contains an implementation of the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old], as formerly used by [Apollo] and [`subscriptions-transport-ws` npm package]. It has now been deprecated in favor of the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], implemented by the new [`juniper_graphql_transport_ws` crate] and new [`graphql-ws` npm package]. +This crate contains implementations of 2 protocols: + +1. (`graphql-transport-ws` feature) The [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], as now used by [Apollo] and [`graphql-ws` npm package]. + +2. (`graphql-ws` feature) The [legacy `graphql-ws` GraphQL over WebSocket Protocol][old], as formerly used by [Apollo] and [`subscriptions-transport-ws` npm package] (deprecated in favor of the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new] mentioned above). @@ -21,7 +25,6 @@ This project is licensed under [BSD 2-Clause License](https://github.com/graphql [`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws -[`juniper_graphql_transport_ws` crate]: https://docs.rs/juniper_graphql_transport_ws [`subscriptions-transport-ws` npm package]: https://npmjs.com/package/subscriptions-transport-ws [Apollo]: https://www.apollographql.com [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md diff --git a/juniper_graphql_transport_ws/src/client_message.rs b/juniper_graphql_ws/src/graphql_transport_ws/client_message.rs similarity index 99% rename from juniper_graphql_transport_ws/src/client_message.rs rename to juniper_graphql_ws/src/graphql_transport_ws/client_message.rs index 1ac4fc108..62fd34d14 100644 --- a/juniper_graphql_transport_ws/src/client_message.rs +++ b/juniper_graphql_ws/src/graphql_transport_ws/client_message.rs @@ -1,7 +1,7 @@ use juniper::Variables; use serde::Deserialize; -use crate::utils::default_for_null; +use crate::util::default_for_null; /// The payload for a client's "start" message. This triggers execution of a query, mutation, or /// subscription. diff --git a/juniper_graphql_transport_ws/src/lib.rs b/juniper_graphql_ws/src/graphql_transport_ws/mod.rs similarity index 97% rename from juniper_graphql_transport_ws/src/lib.rs rename to juniper_graphql_ws/src/graphql_transport_ws/mod.rs index 4dafe7951..4d22c6a22 100644 --- a/juniper_graphql_transport_ws/src/lib.rs +++ b/juniper_graphql_ws/src/graphql_transport_ws/mod.rs @@ -1,16 +1,17 @@ -#![doc = include_str!("../README.md")] -#![deny(missing_docs, warnings)] +//! Implementation of the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], as now +//! used by [Apollo] and [`graphql-ws` npm package]. +//! +//! Implementation of the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old] may be found in +//! the [`graphql_ws` module]. +//! +//! [`graphql_ws` module]: crate::graphql_ws +//! [`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws +//! [Apollo]: https://www.apollographql.com +//! [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md +//! [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md mod client_message; -pub use client_message::*; - mod server_message; -pub use server_message::*; - -mod schema; -pub use schema::*; - -mod utils; use std::{ collections::HashMap, convert::Infallible, error::Error, marker::PhantomPinned, pin::Pin, @@ -28,8 +29,12 @@ use juniper::{ GraphQLError, RuleError, ScalarValue, }; -#[doc(inline)] -pub use juniper_graphql_ws::{ConnectionConfig, Init}; +use super::{ConnectionConfig, Init, Schema}; + +pub use self::{ + client_message::{ClientMessage, SubscribePayload}, + server_message::{ErrorPayload, NextPayload, ServerMessage}, +}; struct ExecutionParams { subscribe_payload: SubscribePayload, diff --git a/juniper_graphql_transport_ws/src/server_message.rs b/juniper_graphql_ws/src/graphql_transport_ws/server_message.rs similarity index 60% rename from juniper_graphql_transport_ws/src/server_message.rs rename to juniper_graphql_ws/src/graphql_transport_ws/server_message.rs index dc8a4f11d..6b786c158 100644 --- a/juniper_graphql_transport_ws/src/server_message.rs +++ b/juniper_graphql_ws/src/graphql_transport_ws/server_message.rs @@ -1,11 +1,11 @@ -use std::{any::Any, fmt, marker::PhantomPinned}; +use juniper::{ExecutionError, Value}; +use serde::Serialize; -use juniper::{ExecutionError, GraphQLError, Value}; -use serde::{Serialize, Serializer}; +pub use crate::server_message::ErrorPayload; /// Sent after execution of an operation. For queries and mutations, this is sent to the client /// once. For subscriptions, this is sent for every event in the event stream. -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, PartialEq, Serialize)] #[serde(rename_all = "camelCase")] pub struct NextPayload { /// The result data. @@ -17,67 +17,8 @@ pub struct NextPayload { pub errors: Vec>, } -/// A payload for errors that can happen before execution. Errors that happen during execution are -/// instead sent to the client via `NextPayload`. `ErrorPayload` is a wrapper for an owned -/// `GraphQLError`. -// XXX: Think carefully before deriving traits. This is self-referential (error references -// _execution_params). -pub struct ErrorPayload { - _execution_params: Option>, - error: GraphQLError, - _marker: PhantomPinned, -} - -impl ErrorPayload { - /// Creates a new [`ErrorPayload`] out of the provide `execution_params` and - /// [`GraphQLError`]. - pub(crate) fn new(execution_params: Box, error: GraphQLError) -> Self { - Self { - _execution_params: Some(execution_params), - error, - _marker: PhantomPinned, - } - } - - /// Returns the contained GraphQLError. - pub fn graphql_error(&self) -> &GraphQLError { - &self.error - } -} - -impl fmt::Debug for ErrorPayload { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.error.fmt(f) - } -} - -impl PartialEq for ErrorPayload { - fn eq(&self, other: &Self) -> bool { - self.error.eq(&other.error) - } -} - -impl Serialize for ErrorPayload { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - self.error.serialize(serializer) - } -} - -impl From for ErrorPayload { - fn from(error: GraphQLError) -> Self { - Self { - _execution_params: None, - error, - _marker: PhantomPinned, - } - } -} - /// ServerMessage defines the message types that servers can send. -#[derive(Debug, Serialize, PartialEq)] +#[derive(Debug, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] #[serde(tag = "type")] pub enum ServerMessage { @@ -111,7 +52,7 @@ pub enum ServerMessage { #[cfg(test)] mod test { - use juniper::{graphql_value, DefaultScalarValue}; + use juniper::{graphql_value, DefaultScalarValue, GraphQLError}; use super::*; diff --git a/juniper_graphql_ws/src/client_message.rs b/juniper_graphql_ws/src/graphql_ws/client_message.rs similarity index 99% rename from juniper_graphql_ws/src/client_message.rs rename to juniper_graphql_ws/src/graphql_ws/client_message.rs index b1ccd595a..bd9550551 100644 --- a/juniper_graphql_ws/src/client_message.rs +++ b/juniper_graphql_ws/src/graphql_ws/client_message.rs @@ -1,7 +1,7 @@ use juniper::Variables; use serde::Deserialize; -use crate::utils::default_for_null; +use crate::util::default_for_null; /// The payload for a client's "start" message. This triggers execution of a query, mutation, or /// subscription. diff --git a/juniper_graphql_ws/src/graphql_ws/mod.rs b/juniper_graphql_ws/src/graphql_ws/mod.rs new file mode 100644 index 000000000..2c99a324e --- /dev/null +++ b/juniper_graphql_ws/src/graphql_ws/mod.rs @@ -0,0 +1,959 @@ +//! Implementation of the [legacy `graphql-ws` GraphQL over WebSocket Protocol][old], as formerly +//! used by [Apollo] and [`subscriptions-transport-ws` npm package]. +//! +//! It has now been deprecated in favor of the +//! [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new], implemented by the +//! [`graphql_transport_ws` module] and new [`graphql-ws` npm package]. +//! +//! [`graphql_transport_ws` module]: crate::graphql_transport_ws +//! [`graphql-ws` npm package]: https://npmjs.com/package/graphql-ws +//! [`subscriptions-transport-ws` npm package]: https://npmjs.com/package/subscriptions-transport-ws +//! [Apollo]: https://www.apollographql.com +//! [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md +//! [old]: https://github.com/apollographql/subscriptions-transport-ws/blob/v0.11.0/PROTOCOL.md + +mod client_message; +mod server_message; + +use std::{ + collections::HashMap, convert::Infallible, error::Error, marker::PhantomPinned, pin::Pin, + sync::Arc, time::Duration, +}; + +use juniper::{ + futures::{ + channel::oneshot, + future::{self, BoxFuture, Either, Future, FutureExt, TryFutureExt}, + stream::{self, BoxStream, SelectAll, StreamExt}, + task::{Context, Poll, Waker}, + Sink, Stream, + }, + GraphQLError, RuleError, +}; + +use super::{ConnectionConfig, Init, Schema}; + +pub use self::{ + client_message::{ClientMessage, StartPayload}, + server_message::{ConnectionErrorPayload, DataPayload, ErrorPayload, ServerMessage}, +}; + +struct ExecutionParams { + start_payload: StartPayload, + config: Arc>, + schema: S, +} + +enum Reaction { + ServerMessage(ServerMessage), + EndStream, +} + +impl Reaction { + /// Converts the reaction into a one-item stream. + fn into_stream(self) -> BoxStream<'static, Self> { + stream::once(future::ready(self)).boxed() + } +} + +enum ConnectionState> { + /// PreInit is the state before a ConnectionInit message has been accepted. + PreInit { init: I, schema: S }, + /// Active is the state after a ConnectionInit message has been accepted. + Active { + config: Arc>, + stoppers: HashMap>, + schema: S, + }, + /// Terminated is the state after a ConnectionInit message has been rejected. + Terminated, +} + +impl> ConnectionState { + // Each message we receive results in a stream of zero or more reactions. For example, a + // ConnectionTerminate message results in a one-item stream with the EndStream reaction. + async fn handle_message( + self, + msg: ClientMessage, + ) -> (Self, BoxStream<'static, Reaction>) { + if let ClientMessage::ConnectionTerminate = msg { + return (self, Reaction::EndStream.into_stream()); + } + + match self { + Self::PreInit { init, schema } => match msg { + ClientMessage::ConnectionInit { payload } => match init.init(payload).await { + Ok(config) => { + let keep_alive_interval = config.keep_alive_interval; + + let mut s = stream::iter(vec![Reaction::ServerMessage( + ServerMessage::ConnectionAck, + )]) + .boxed(); + + if keep_alive_interval > Duration::from_secs(0) { + s = s + .chain( + Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive) + .into_stream(), + ) + .boxed(); + s = s + .chain(stream::unfold((), move |_| async move { + tokio::time::sleep(keep_alive_interval).await; + Some(( + Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive), + (), + )) + })) + .boxed(); + } + + ( + Self::Active { + config: Arc::new(config), + stoppers: HashMap::new(), + schema, + }, + s, + ) + } + Err(e) => ( + Self::Terminated, + stream::iter(vec![ + Reaction::ServerMessage(ServerMessage::ConnectionError { + payload: ConnectionErrorPayload { + message: e.to_string(), + }, + }), + Reaction::EndStream, + ]) + .boxed(), + ), + }, + _ => (Self::PreInit { init, schema }, stream::empty().boxed()), + }, + Self::Active { + config, + mut stoppers, + schema, + } => { + let reactions = match msg { + ClientMessage::Start { id, payload } => { + if stoppers.contains_key(&id) { + // We already have an operation with this id, so we can't start a new + // one. + stream::empty().boxed() + } else { + // Go ahead and prune canceled stoppers before adding a new one. + stoppers.retain(|_, tx| !tx.is_canceled()); + + if config.max_in_flight_operations > 0 + && stoppers.len() >= config.max_in_flight_operations + { + // Too many in-flight operations. Just send back a validation error. + stream::iter(vec![ + Reaction::ServerMessage(ServerMessage::Error { + id: id.clone(), + payload: GraphQLError::ValidationError(vec![ + RuleError::new("Too many in-flight operations.", &[]), + ]) + .into(), + }), + Reaction::ServerMessage(ServerMessage::Complete { id }), + ]) + .boxed() + } else { + // Create a channel that we can use to cancel the operation. + let (tx, rx) = oneshot::channel::<()>(); + stoppers.insert(id.clone(), tx); + + // Create the operation stream. This stream will emit Data and Error + // messages, but will not emit Complete – that part is up to us. + let s = Self::start( + id.clone(), + ExecutionParams { + start_payload: payload, + config: config.clone(), + schema: schema.clone(), + }, + ) + .into_stream() + .flatten(); + + // Combine this with our oneshot channel so that the stream ends if the + // oneshot is ever fired. + let s = stream::unfold((rx, s.boxed()), |(rx, mut s)| async move { + let next = match future::select(rx, s.next()).await { + Either::Left(_) => None, + Either::Right((r, rx)) => r.map(|r| (r, rx)), + }; + next.map(|(r, rx)| (r, (rx, s))) + }); + + // Once the stream ends, send the Complete message. + let s = s.chain( + Reaction::ServerMessage(ServerMessage::Complete { id }) + .into_stream(), + ); + + s.boxed() + } + } + } + ClientMessage::Stop { id } => { + stoppers.remove(&id); + stream::empty().boxed() + } + _ => stream::empty().boxed(), + }; + ( + Self::Active { + config, + stoppers, + schema, + }, + reactions, + ) + } + Self::Terminated => (self, stream::empty().boxed()), + } + } + + async fn start(id: String, params: ExecutionParams) -> BoxStream<'static, Reaction> { + // TODO: This could be made more efficient if `juniper` exposed + // functionality to allow us to parse and validate the query, + // determine whether it's a subscription, and then execute it. + // For now, the query gets parsed and validated twice. + + let params = Arc::new(params); + + // Try to execute this as a query or mutation. + match juniper::execute( + ¶ms.start_payload.query, + params.start_payload.operation_name.as_deref(), + params.schema.root_node(), + ¶ms.start_payload.variables, + ¶ms.config.context, + ) + .await + { + Ok((data, errors)) => { + return Reaction::ServerMessage(ServerMessage::Data { + id: id.clone(), + payload: DataPayload { data, errors }, + }) + .into_stream(); + } + Err(GraphQLError::IsSubscription) => {} + Err(e) => { + return Reaction::ServerMessage(ServerMessage::Error { + id: id.clone(), + payload: ErrorPayload::new(Box::new(params.clone()), e), + }) + .into_stream(); + } + } + + // Try to execute as a subscription. + SubscriptionStart::new(id, params.clone()).boxed() + } +} + +struct InterruptableStream { + stream: S, + rx: oneshot::Receiver<()>, +} + +impl Stream for InterruptableStream { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + match Pin::new(&mut self.rx).poll(cx) { + Poll::Ready(_) => return Poll::Ready(None), + Poll::Pending => {} + } + Pin::new(&mut self.stream).poll_next(cx) + } +} + +/// SubscriptionStartState is the state for a subscription operation. +enum SubscriptionStartState { + /// Init is the start before being polled for the first time. + Init { id: String }, + /// ResolvingIntoStream is the state after being polled for the first time. In this state, + /// we're parsing, validating, and getting the actual event stream. + ResolvingIntoStream { + id: String, + future: BoxFuture< + 'static, + Result, GraphQLError>, + >, + }, + /// Streaming is the state after we've successfully obtained the event stream for the + /// subscription. In this state, we're just forwarding events back to the client. + Streaming { + id: String, + stream: juniper_subscriptions::Connection<'static, S::ScalarValue>, + }, + /// Terminated is the state once we're all done. + Terminated, +} + +/// SubscriptionStart is the stream for a subscription operation. +struct SubscriptionStart { + params: Arc>, + state: SubscriptionStartState, + _marker: PhantomPinned, +} + +impl SubscriptionStart { + fn new(id: String, params: Arc>) -> Pin> { + Box::pin(Self { + params, + state: SubscriptionStartState::Init { id }, + _marker: PhantomPinned, + }) + } +} + +impl Stream for SubscriptionStart { + type Item = Reaction; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let (params, state) = unsafe { + // XXX: The execution parameters are referenced by state and must not be modified. + // Modifying state is fine though. + let inner = self.get_unchecked_mut(); + (&inner.params, &mut inner.state) + }; + + loop { + match state { + SubscriptionStartState::Init { id } => { + // XXX: resolve_into_stream returns a Future that references the execution + // parameters, and the returned stream also references them. We can guarantee + // that everything has the same lifetime in this self-referential struct. + let params = Arc::as_ptr(params); + *state = SubscriptionStartState::ResolvingIntoStream { + id: id.clone(), + future: unsafe { + juniper::resolve_into_stream( + &(*params).start_payload.query, + (*params).start_payload.operation_name.as_deref(), + (*params).schema.root_node(), + &(*params).start_payload.variables, + &(*params).config.context, + ) + } + .map_ok(|(stream, errors)| { + juniper_subscriptions::Connection::from_stream(stream, errors) + }) + .boxed(), + }; + } + SubscriptionStartState::ResolvingIntoStream { + ref id, + ref mut future, + } => match future.as_mut().poll(cx) { + Poll::Ready(r) => match r { + Ok(stream) => { + *state = SubscriptionStartState::Streaming { + id: id.clone(), + stream, + } + } + Err(e) => { + return Poll::Ready(Some(Reaction::ServerMessage( + ServerMessage::Error { + id: id.clone(), + payload: ErrorPayload::new(Box::new(params.clone()), e), + }, + ))); + } + }, + Poll::Pending => return Poll::Pending, + }, + SubscriptionStartState::Streaming { + ref id, + ref mut stream, + } => match Pin::new(stream).poll_next(cx) { + Poll::Ready(Some(output)) => { + return Poll::Ready(Some(Reaction::ServerMessage(ServerMessage::Data { + id: id.clone(), + payload: DataPayload { + data: output.data, + errors: output.errors, + }, + }))); + } + Poll::Ready(None) => { + *state = SubscriptionStartState::Terminated; + return Poll::Ready(None); + } + Poll::Pending => return Poll::Pending, + }, + SubscriptionStartState::Terminated => return Poll::Ready(None), + } + } + } +} + +enum ConnectionSinkState> { + Ready { + state: ConnectionState, + }, + HandlingMessage { + #[allow(clippy::type_complexity)] + result: BoxFuture<'static, (ConnectionState, BoxStream<'static, Reaction>)>, + }, + Closed, +} + +/// Implements the graphql-ws protocol. This is a sink for `TryInto` and a stream of +/// `ServerMessage`. +pub struct Connection> { + reactions: SelectAll>>, + stream_waker: Option, + sink_state: ConnectionSinkState, +} + +impl Connection +where + S: Schema, + I: Init, +{ + /// Creates a new connection, which is a sink for `TryInto` and a stream of `ServerMessage`. + /// + /// The `schema` argument should typically be an `Arc>`. + /// + /// The `init` argument is used to provide the context and additional configuration for + /// connections. This can be a `ConnectionConfig` if the context and configuration are already + /// known, or it can be a closure that gets executed asynchronously when the client sends the + /// ConnectionInit message. Using a closure allows you to perform authentication based on the + /// parameters provided by the client. + pub fn new(schema: S, init: I) -> Self { + Self { + reactions: SelectAll::new(), + stream_waker: None, + sink_state: ConnectionSinkState::Ready { + state: ConnectionState::PreInit { init, schema }, + }, + } + } +} + +impl Sink for Connection +where + T: TryInto>, + T::Error: Error, + S: Schema, + I: Init + Send, +{ + type Error = Infallible; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + match &mut self.sink_state { + ConnectionSinkState::Ready { .. } => Poll::Ready(Ok(())), + ConnectionSinkState::HandlingMessage { ref mut result } => { + match Pin::new(result).poll(cx) { + Poll::Ready((state, reactions)) => { + self.reactions.push(reactions); + self.sink_state = ConnectionSinkState::Ready { state }; + Poll::Ready(Ok(())) + } + Poll::Pending => Poll::Pending, + } + } + ConnectionSinkState::Closed => panic!("poll_ready called after close"), + } + } + + fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + let s = self.get_mut(); + let state = &mut s.sink_state; + *state = match std::mem::replace(state, ConnectionSinkState::Closed) { + ConnectionSinkState::Ready { state } => { + match item.try_into() { + Ok(msg) => ConnectionSinkState::HandlingMessage { + result: state.handle_message(msg).boxed(), + }, + Err(e) => { + // If we weren't able to parse the message, send back an error. + s.reactions.push( + Reaction::ServerMessage(ServerMessage::ConnectionError { + payload: ConnectionErrorPayload { + message: e.to_string(), + }, + }) + .into_stream(), + ); + ConnectionSinkState::Ready { state } + } + } + } + _ => panic!("start_send called when not ready"), + }; + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + >::poll_ready(self, cx) + } + + fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + self.sink_state = ConnectionSinkState::Closed; + if let Some(waker) = self.stream_waker.take() { + // Wake up the stream so it can close too. + waker.wake(); + } + Poll::Ready(Ok(())) + } +} + +impl Stream for Connection +where + S: Schema, + I: Init, +{ + type Item = ServerMessage; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.stream_waker = Some(cx.waker().clone()); + + if let ConnectionSinkState::Closed = self.sink_state { + return Poll::Ready(None); + } + + // Poll the reactions for new outgoing messages. + if !self.reactions.is_empty() { + match Pin::new(&mut self.reactions).poll_next(cx) { + Poll::Ready(Some(reaction)) => match reaction { + Reaction::ServerMessage(msg) => return Poll::Ready(Some(msg)), + Reaction::EndStream => return Poll::Ready(None), + }, + Poll::Ready(None) => { + // In rare cases, the reaction stream may terminate. For example, this will + // happen if the first message we receive does not require any reaction. Just + // recreate it in that case. + self.reactions = SelectAll::new(); + } + _ => (), + } + } + Poll::Pending + } +} + +#[cfg(test)] +mod test { + use std::{convert::Infallible, io}; + + use juniper::{ + futures::sink::SinkExt, + graphql_input_value, graphql_object, graphql_subscription, graphql_value, graphql_vars, + parser::{ParseError, Spanning}, + DefaultScalarValue, EmptyMutation, FieldError, FieldResult, RootNode, Variables, + }; + + use super::*; + + struct Context(i32); + + impl juniper::Context for Context {} + + struct Query; + + #[graphql_object(context = Context)] + impl Query { + /// context just resolves to the current context. + async fn context(context: &Context) -> i32 { + context.0 + } + } + + struct Subscription; + + #[graphql_subscription(context = Context)] + impl Subscription { + /// never never emits anything. + async fn never(_context: &Context) -> BoxStream<'static, FieldResult> { + tokio::time::sleep(Duration::from_secs(10000)) + .map(|_| unreachable!()) + .into_stream() + .boxed() + } + + /// context emits the current context once, then never emits anything else. + async fn context(context: &Context) -> BoxStream<'static, FieldResult> { + stream::once(future::ready(Ok(context.0))) + .chain( + tokio::time::sleep(Duration::from_secs(10000)) + .map(|_| unreachable!()) + .into_stream(), + ) + .boxed() + } + + /// error emits an error once, then never emits anything else. + async fn error(_context: &Context) -> BoxStream<'static, FieldResult> { + stream::once(future::ready(Err(FieldError::new( + "field error", + graphql_value!(null), + )))) + .chain( + tokio::time::sleep(Duration::from_secs(10000)) + .map(|_| unreachable!()) + .into_stream(), + ) + .boxed() + } + } + + type ClientMessage = super::ClientMessage; + type ServerMessage = super::ServerMessage; + + fn new_test_schema() -> Arc, Subscription>> { + Arc::new(RootNode::new(Query, EmptyMutation::new(), Subscription)) + } + + #[tokio::test] + async fn test_query() { + let mut conn = Connection::new( + new_test_schema(), + ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)), + ); + + conn.send(ClientMessage::ConnectionInit { + payload: graphql_vars! {}, + }) + .await + .unwrap(); + + assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); + + conn.send(ClientMessage::Start { + id: "foo".into(), + payload: StartPayload { + query: "{context}".into(), + variables: graphql_vars! {}, + operation_name: None, + }, + }) + .await + .unwrap(); + + assert_eq!( + ServerMessage::Data { + id: "foo".into(), + payload: DataPayload { + data: graphql_value!({"context": 1}), + errors: vec![], + }, + }, + conn.next().await.unwrap() + ); + + assert_eq!( + ServerMessage::Complete { id: "foo".into() }, + conn.next().await.unwrap() + ); + } + + #[tokio::test] + async fn test_subscriptions() { + let mut conn = Connection::new( + new_test_schema(), + ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)), + ); + + conn.send(ClientMessage::ConnectionInit { + payload: graphql_vars! {}, + }) + .await + .unwrap(); + + assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); + + conn.send(ClientMessage::Start { + id: "foo".into(), + payload: StartPayload { + query: "subscription Foo {context}".into(), + variables: graphql_vars! {}, + operation_name: None, + }, + }) + .await + .unwrap(); + + assert_eq!( + ServerMessage::Data { + id: "foo".into(), + payload: DataPayload { + data: graphql_value!({"context": 1}), + errors: vec![], + }, + }, + conn.next().await.unwrap() + ); + + conn.send(ClientMessage::Start { + id: "bar".into(), + payload: StartPayload { + query: "subscription Bar {context}".into(), + variables: graphql_vars! {}, + operation_name: None, + }, + }) + .await + .unwrap(); + + assert_eq!( + ServerMessage::Data { + id: "bar".into(), + payload: DataPayload { + data: graphql_value!({"context": 1}), + errors: vec![], + }, + }, + conn.next().await.unwrap() + ); + + conn.send(ClientMessage::Stop { id: "foo".into() }) + .await + .unwrap(); + + assert_eq!( + ServerMessage::Complete { id: "foo".into() }, + conn.next().await.unwrap() + ); + } + + #[tokio::test] + async fn test_init_params_ok() { + let mut conn = Connection::new(new_test_schema(), |params: Variables| async move { + assert_eq!(params.get("foo"), Some(&graphql_input_value!("bar"))); + Ok(ConnectionConfig::new(Context(1))) as Result<_, Infallible> + }); + + conn.send(ClientMessage::ConnectionInit { + payload: graphql_vars! {"foo": "bar"}, + }) + .await + .unwrap(); + + assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); + } + + #[tokio::test] + async fn test_init_params_error() { + let mut conn = Connection::new(new_test_schema(), |params: Variables| async move { + assert_eq!(params.get("foo"), Some(&graphql_input_value!("bar"))); + Err(io::Error::new(io::ErrorKind::Other, "init error")) + }); + + conn.send(ClientMessage::ConnectionInit { + payload: graphql_vars! {"foo": "bar"}, + }) + .await + .unwrap(); + + assert_eq!( + ServerMessage::ConnectionError { + payload: ConnectionErrorPayload { + message: "init error".into(), + }, + }, + conn.next().await.unwrap() + ); + } + + #[tokio::test] + async fn test_max_in_flight_operations() { + let mut conn = Connection::new( + new_test_schema(), + ConnectionConfig::new(Context(1)) + .with_keep_alive_interval(Duration::from_secs(0)) + .with_max_in_flight_operations(1), + ); + + conn.send(ClientMessage::ConnectionInit { + payload: graphql_vars! {}, + }) + .await + .unwrap(); + + assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); + + conn.send(ClientMessage::Start { + id: "foo".into(), + payload: StartPayload { + query: "subscription Foo {never}".into(), + variables: graphql_vars! {}, + operation_name: None, + }, + }) + .await + .unwrap(); + + conn.send(ClientMessage::Start { + id: "bar".into(), + payload: StartPayload { + query: "subscription Bar {never}".into(), + variables: graphql_vars! {}, + operation_name: None, + }, + }) + .await + .unwrap(); + + match conn.next().await.unwrap() { + ServerMessage::Error { id, .. } => { + assert_eq!(id, "bar"); + } + msg => panic!("expected error, got: {msg:?}"), + } + } + + #[tokio::test] + async fn test_parse_error() { + let mut conn = Connection::new( + new_test_schema(), + ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)), + ); + + conn.send(ClientMessage::ConnectionInit { + payload: graphql_vars! {}, + }) + .await + .unwrap(); + + assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); + + conn.send(ClientMessage::Start { + id: "foo".into(), + payload: StartPayload { + query: "asd".into(), + variables: graphql_vars! {}, + operation_name: None, + }, + }) + .await + .unwrap(); + + match conn.next().await.unwrap() { + ServerMessage::Error { id, payload } => { + assert_eq!(id, "foo"); + match payload.graphql_error() { + GraphQLError::ParseError(Spanning { + item: ParseError::UnexpectedToken(token), + .. + }) => assert_eq!(token, "asd"), + p => panic!("expected graphql parse error, got: {p:?}"), + } + } + msg => panic!("expected error, got: {msg:?}"), + } + } + + #[tokio::test] + async fn test_keep_alives() { + let mut conn = Connection::new( + new_test_schema(), + ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_millis(20)), + ); + + conn.send(ClientMessage::ConnectionInit { + payload: graphql_vars! {}, + }) + .await + .unwrap(); + + assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); + + for _ in 0..10 { + assert_eq!( + ServerMessage::ConnectionKeepAlive, + conn.next().await.unwrap() + ); + } + } + + #[tokio::test] + async fn test_slow_init() { + let mut conn = Connection::new( + new_test_schema(), + ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)), + ); + + conn.send(ClientMessage::ConnectionInit { + payload: graphql_vars! {}, + }) + .await + .unwrap(); + + // If we send the start message before the init is handled, we should still get results. + conn.send(ClientMessage::Start { + id: "foo".into(), + payload: StartPayload { + query: "{context}".into(), + variables: graphql_vars! {}, + operation_name: None, + }, + }) + .await + .unwrap(); + + assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); + + assert_eq!( + ServerMessage::Data { + id: "foo".into(), + payload: DataPayload { + data: graphql_value!({"context": 1}), + errors: vec![], + }, + }, + conn.next().await.unwrap() + ); + } + + #[tokio::test] + async fn test_subscription_field_error() { + let mut conn = Connection::new( + new_test_schema(), + ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)), + ); + + conn.send(ClientMessage::ConnectionInit { + payload: graphql_vars! {}, + }) + .await + .unwrap(); + + assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); + + conn.send(ClientMessage::Start { + id: "foo".into(), + payload: StartPayload { + query: "subscription Foo {error}".into(), + variables: graphql_vars! {}, + operation_name: None, + }, + }) + .await + .unwrap(); + + match conn.next().await.unwrap() { + ServerMessage::Data { + id, + payload: DataPayload { data, errors }, + } => { + assert_eq!(id, "foo"); + assert_eq!(data, graphql_value!({ "error": null })); + assert_eq!(errors.len(), 1); + } + msg => panic!("expected data, got: {msg:?}"), + } + } +} diff --git a/juniper_graphql_ws/src/graphql_ws/server_message.rs b/juniper_graphql_ws/src/graphql_ws/server_message.rs new file mode 100644 index 000000000..6bd47460d --- /dev/null +++ b/juniper_graphql_ws/src/graphql_ws/server_message.rs @@ -0,0 +1,127 @@ +use juniper::{ExecutionError, Value}; +use serde::Serialize; + +pub use crate::server_message::ErrorPayload; + +/// The payload for errors that are not associated with a GraphQL operation. +#[derive(Debug, Eq, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ConnectionErrorPayload { + /// The error message. + pub message: String, +} + +/// Sent after execution of an operation. For queries and mutations, this is sent to the client +/// once. For subscriptions, this is sent for every event in the event stream. +#[derive(Debug, PartialEq, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct DataPayload { + /// The result data. + pub data: Value, + + /// The errors that have occurred during execution. Note that parse and validation errors are + /// not included here. They are sent via Error messages. + #[serde(skip_serializing_if = "Vec::is_empty")] + pub errors: Vec>, +} + +/// ServerMessage defines the message types that servers can send. +#[derive(Debug, PartialEq, Serialize)] +#[serde(rename_all = "snake_case")] +#[serde(tag = "type")] +pub enum ServerMessage { + /// ConnectionError is used for errors that are not associated with a GraphQL operation. For + /// example, this will be used when: + /// + /// * The server is unable to parse a client's message. + /// * The client's initialization parameters are rejected. + ConnectionError { + /// The error that occurred. + payload: ConnectionErrorPayload, + }, + /// ConnectionAck is sent in response to a client's ConnectionInit message if the server accepted a + /// connection. + ConnectionAck, + /// Data contains the result of a query, mutation, or subscription event. + Data { + /// The id of the operation that the data is for. + id: String, + + /// The data and errors that occurred during execution. + payload: DataPayload, + }, + /// Error contains an error that occurs before execution, such as validation errors. + Error { + /// The id of the operation that triggered this error. + id: String, + + /// The error(s). + payload: ErrorPayload, + }, + /// Complete indicates that no more data will be sent for the given operation. + Complete { + /// The id of the operation that has completed. + id: String, + }, + /// ConnectionKeepAlive is sent periodically after accepting a connection. + #[serde(rename = "ka")] + ConnectionKeepAlive, +} + +#[cfg(test)] +mod test { + use juniper::{graphql_value, DefaultScalarValue, GraphQLError}; + + use super::*; + + #[test] + fn test_serialization() { + type ServerMessage = super::ServerMessage; + + assert_eq!( + serde_json::to_string(&ServerMessage::ConnectionError { + payload: ConnectionErrorPayload { + message: "foo".into(), + }, + }) + .unwrap(), + r#"{"type":"connection_error","payload":{"message":"foo"}}"#, + ); + + assert_eq!( + serde_json::to_string(&ServerMessage::ConnectionAck).unwrap(), + r#"{"type":"connection_ack"}"#, + ); + + assert_eq!( + serde_json::to_string(&ServerMessage::Data { + id: "foo".into(), + payload: DataPayload { + data: graphql_value!(null), + errors: vec![], + }, + }) + .unwrap(), + r#"{"type":"data","id":"foo","payload":{"data":null}}"#, + ); + + assert_eq!( + serde_json::to_string(&ServerMessage::Error { + id: "foo".into(), + payload: GraphQLError::UnknownOperationName.into(), + }) + .unwrap(), + r#"{"type":"error","id":"foo","payload":[{"message":"Unknown operation"}]}"#, + ); + + assert_eq!( + serde_json::to_string(&ServerMessage::Complete { id: "foo".into() }).unwrap(), + r#"{"type":"complete","id":"foo"}"#, + ); + + assert_eq!( + serde_json::to_string(&ServerMessage::ConnectionKeepAlive).unwrap(), + r#"{"type":"ka"}"#, + ); + } +} diff --git a/juniper_graphql_ws/src/lib.rs b/juniper_graphql_ws/src/lib.rs index 977413118..b8aceed4b 100644 --- a/juniper_graphql_ws/src/lib.rs +++ b/juniper_graphql_ws/src/lib.rs @@ -1,38 +1,30 @@ #![doc = include_str!("../README.md")] +#![cfg_attr(docsrs, feature(doc_auto_cfg))] #![deny(missing_docs, warnings)] -mod client_message; -pub use client_message::*; - -mod server_message; -pub use server_message::*; +#[cfg(not(any(feature = "graphql-transport-ws", feature = "graphql-ws")))] +compile_error!( + r#"at least one feature must be enabled (either "graphql-transport-ws" or "graphql-ws")"# +); +#[cfg(feature = "graphql-transport-ws")] +pub mod graphql_transport_ws; +#[cfg(feature = "graphql-ws")] +pub mod graphql_ws; mod schema; -pub use schema::*; - -mod utils; +mod server_message; +mod util; use std::{ - collections::HashMap, convert::Infallible, error::Error, marker::PhantomPinned, pin::Pin, - sync::Arc, time::Duration, + convert::Infallible, + error::Error, + future::{self, Future}, + time::Duration, }; -use juniper::{ - futures::{ - channel::oneshot, - future::{self, BoxFuture, Either, Future, FutureExt, TryFutureExt}, - stream::{self, BoxStream, SelectAll, StreamExt}, - task::{Context, Poll, Waker}, - Sink, Stream, - }, - GraphQLError, RuleError, ScalarValue, Variables, -}; +use juniper::{ScalarValue, Variables}; -struct ExecutionParams { - start_payload: StartPayload, - config: Arc>, - schema: S, -} +pub use self::schema::{ArcSchema, Schema}; /// ConnectionConfig is used to configure the connection once the client sends the ConnectionInit /// message. @@ -96,18 +88,6 @@ impl Init for ConnectionC } } -enum Reaction { - ServerMessage(ServerMessage), - EndStream, -} - -impl Reaction { - /// Converts the reaction into a one-item stream. - fn into_stream(self) -> BoxStream<'static, Self> { - stream::once(future::ready(self)).boxed() - } -} - /// Init defines the requirements for types that can provide connection configurations when /// ConnectionInit messages are received. Implementations are provided for `ConnectionConfig` and /// closures that meet the requirements. @@ -137,905 +117,3 @@ where self(params) } } - -enum ConnectionState> { - /// PreInit is the state before a ConnectionInit message has been accepted. - PreInit { init: I, schema: S }, - /// Active is the state after a ConnectionInit message has been accepted. - Active { - config: Arc>, - stoppers: HashMap>, - schema: S, - }, - /// Terminated is the state after a ConnectionInit message has been rejected. - Terminated, -} - -impl> ConnectionState { - // Each message we receive results in a stream of zero or more reactions. For example, a - // ConnectionTerminate message results in a one-item stream with the EndStream reaction. - async fn handle_message( - self, - msg: ClientMessage, - ) -> (Self, BoxStream<'static, Reaction>) { - if let ClientMessage::ConnectionTerminate = msg { - return (self, Reaction::EndStream.into_stream()); - } - - match self { - Self::PreInit { init, schema } => match msg { - ClientMessage::ConnectionInit { payload } => match init.init(payload).await { - Ok(config) => { - let keep_alive_interval = config.keep_alive_interval; - - let mut s = stream::iter(vec![Reaction::ServerMessage( - ServerMessage::ConnectionAck, - )]) - .boxed(); - - if keep_alive_interval > Duration::from_secs(0) { - s = s - .chain( - Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive) - .into_stream(), - ) - .boxed(); - s = s - .chain(stream::unfold((), move |_| async move { - tokio::time::sleep(keep_alive_interval).await; - Some(( - Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive), - (), - )) - })) - .boxed(); - } - - ( - Self::Active { - config: Arc::new(config), - stoppers: HashMap::new(), - schema, - }, - s, - ) - } - Err(e) => ( - Self::Terminated, - stream::iter(vec![ - Reaction::ServerMessage(ServerMessage::ConnectionError { - payload: ConnectionErrorPayload { - message: e.to_string(), - }, - }), - Reaction::EndStream, - ]) - .boxed(), - ), - }, - _ => (Self::PreInit { init, schema }, stream::empty().boxed()), - }, - Self::Active { - config, - mut stoppers, - schema, - } => { - let reactions = match msg { - ClientMessage::Start { id, payload } => { - if stoppers.contains_key(&id) { - // We already have an operation with this id, so we can't start a new - // one. - stream::empty().boxed() - } else { - // Go ahead and prune canceled stoppers before adding a new one. - stoppers.retain(|_, tx| !tx.is_canceled()); - - if config.max_in_flight_operations > 0 - && stoppers.len() >= config.max_in_flight_operations - { - // Too many in-flight operations. Just send back a validation error. - stream::iter(vec![ - Reaction::ServerMessage(ServerMessage::Error { - id: id.clone(), - payload: GraphQLError::ValidationError(vec![ - RuleError::new("Too many in-flight operations.", &[]), - ]) - .into(), - }), - Reaction::ServerMessage(ServerMessage::Complete { id }), - ]) - .boxed() - } else { - // Create a channel that we can use to cancel the operation. - let (tx, rx) = oneshot::channel::<()>(); - stoppers.insert(id.clone(), tx); - - // Create the operation stream. This stream will emit Data and Error - // messages, but will not emit Complete – that part is up to us. - let s = Self::start( - id.clone(), - ExecutionParams { - start_payload: payload, - config: config.clone(), - schema: schema.clone(), - }, - ) - .into_stream() - .flatten(); - - // Combine this with our oneshot channel so that the stream ends if the - // oneshot is ever fired. - let s = stream::unfold((rx, s.boxed()), |(rx, mut s)| async move { - let next = match future::select(rx, s.next()).await { - Either::Left(_) => None, - Either::Right((r, rx)) => r.map(|r| (r, rx)), - }; - next.map(|(r, rx)| (r, (rx, s))) - }); - - // Once the stream ends, send the Complete message. - let s = s.chain( - Reaction::ServerMessage(ServerMessage::Complete { id }) - .into_stream(), - ); - - s.boxed() - } - } - } - ClientMessage::Stop { id } => { - stoppers.remove(&id); - stream::empty().boxed() - } - _ => stream::empty().boxed(), - }; - ( - Self::Active { - config, - stoppers, - schema, - }, - reactions, - ) - } - Self::Terminated => (self, stream::empty().boxed()), - } - } - - async fn start(id: String, params: ExecutionParams) -> BoxStream<'static, Reaction> { - // TODO: This could be made more efficient if `juniper` exposed - // functionality to allow us to parse and validate the query, - // determine whether it's a subscription, and then execute it. - // For now, the query gets parsed and validated twice. - - let params = Arc::new(params); - - // Try to execute this as a query or mutation. - match juniper::execute( - ¶ms.start_payload.query, - params.start_payload.operation_name.as_deref(), - params.schema.root_node(), - ¶ms.start_payload.variables, - ¶ms.config.context, - ) - .await - { - Ok((data, errors)) => { - return Reaction::ServerMessage(ServerMessage::Data { - id: id.clone(), - payload: DataPayload { data, errors }, - }) - .into_stream(); - } - Err(GraphQLError::IsSubscription) => {} - Err(e) => { - return Reaction::ServerMessage(ServerMessage::Error { - id: id.clone(), - payload: ErrorPayload::new(Box::new(params.clone()), e), - }) - .into_stream(); - } - } - - // Try to execute as a subscription. - SubscriptionStart::new(id, params.clone()).boxed() - } -} - -struct InterruptableStream { - stream: S, - rx: oneshot::Receiver<()>, -} - -impl Stream for InterruptableStream { - type Item = S::Item; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match Pin::new(&mut self.rx).poll(cx) { - Poll::Ready(_) => return Poll::Ready(None), - Poll::Pending => {} - } - Pin::new(&mut self.stream).poll_next(cx) - } -} - -/// SubscriptionStartState is the state for a subscription operation. -enum SubscriptionStartState { - /// Init is the start before being polled for the first time. - Init { id: String }, - /// ResolvingIntoStream is the state after being polled for the first time. In this state, - /// we're parsing, validating, and getting the actual event stream. - ResolvingIntoStream { - id: String, - future: BoxFuture< - 'static, - Result, GraphQLError>, - >, - }, - /// Streaming is the state after we've successfully obtained the event stream for the - /// subscription. In this state, we're just forwarding events back to the client. - Streaming { - id: String, - stream: juniper_subscriptions::Connection<'static, S::ScalarValue>, - }, - /// Terminated is the state once we're all done. - Terminated, -} - -/// SubscriptionStart is the stream for a subscription operation. -struct SubscriptionStart { - params: Arc>, - state: SubscriptionStartState, - _marker: PhantomPinned, -} - -impl SubscriptionStart { - fn new(id: String, params: Arc>) -> Pin> { - Box::pin(Self { - params, - state: SubscriptionStartState::Init { id }, - _marker: PhantomPinned, - }) - } -} - -impl Stream for SubscriptionStart { - type Item = Reaction; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let (params, state) = unsafe { - // XXX: The execution parameters are referenced by state and must not be modified. - // Modifying state is fine though. - let inner = self.get_unchecked_mut(); - (&inner.params, &mut inner.state) - }; - - loop { - match state { - SubscriptionStartState::Init { id } => { - // XXX: resolve_into_stream returns a Future that references the execution - // parameters, and the returned stream also references them. We can guarantee - // that everything has the same lifetime in this self-referential struct. - let params = Arc::as_ptr(params); - *state = SubscriptionStartState::ResolvingIntoStream { - id: id.clone(), - future: unsafe { - juniper::resolve_into_stream( - &(*params).start_payload.query, - (*params).start_payload.operation_name.as_deref(), - (*params).schema.root_node(), - &(*params).start_payload.variables, - &(*params).config.context, - ) - } - .map_ok(|(stream, errors)| { - juniper_subscriptions::Connection::from_stream(stream, errors) - }) - .boxed(), - }; - } - SubscriptionStartState::ResolvingIntoStream { - ref id, - ref mut future, - } => match future.as_mut().poll(cx) { - Poll::Ready(r) => match r { - Ok(stream) => { - *state = SubscriptionStartState::Streaming { - id: id.clone(), - stream, - } - } - Err(e) => { - return Poll::Ready(Some(Reaction::ServerMessage( - ServerMessage::Error { - id: id.clone(), - payload: ErrorPayload::new(Box::new(params.clone()), e), - }, - ))); - } - }, - Poll::Pending => return Poll::Pending, - }, - SubscriptionStartState::Streaming { - ref id, - ref mut stream, - } => match Pin::new(stream).poll_next(cx) { - Poll::Ready(Some(output)) => { - return Poll::Ready(Some(Reaction::ServerMessage(ServerMessage::Data { - id: id.clone(), - payload: DataPayload { - data: output.data, - errors: output.errors, - }, - }))); - } - Poll::Ready(None) => { - *state = SubscriptionStartState::Terminated; - return Poll::Ready(None); - } - Poll::Pending => return Poll::Pending, - }, - SubscriptionStartState::Terminated => return Poll::Ready(None), - } - } - } -} - -enum ConnectionSinkState> { - Ready { - state: ConnectionState, - }, - HandlingMessage { - #[allow(clippy::type_complexity)] - result: BoxFuture<'static, (ConnectionState, BoxStream<'static, Reaction>)>, - }, - Closed, -} - -/// Implements the graphql-ws protocol. This is a sink for `TryInto` and a stream of -/// `ServerMessage`. -pub struct Connection> { - reactions: SelectAll>>, - stream_waker: Option, - sink_state: ConnectionSinkState, -} - -impl Connection -where - S: Schema, - I: Init, -{ - /// Creates a new connection, which is a sink for `TryInto` and a stream of `ServerMessage`. - /// - /// The `schema` argument should typically be an `Arc>`. - /// - /// The `init` argument is used to provide the context and additional configuration for - /// connections. This can be a `ConnectionConfig` if the context and configuration are already - /// known, or it can be a closure that gets executed asynchronously when the client sends the - /// ConnectionInit message. Using a closure allows you to perform authentication based on the - /// parameters provided by the client. - pub fn new(schema: S, init: I) -> Self { - Self { - reactions: SelectAll::new(), - stream_waker: None, - sink_state: ConnectionSinkState::Ready { - state: ConnectionState::PreInit { init, schema }, - }, - } - } -} - -impl Sink for Connection -where - T: TryInto>, - T::Error: Error, - S: Schema, - I: Init + Send, -{ - type Error = Infallible; - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - match &mut self.sink_state { - ConnectionSinkState::Ready { .. } => Poll::Ready(Ok(())), - ConnectionSinkState::HandlingMessage { ref mut result } => { - match Pin::new(result).poll(cx) { - Poll::Ready((state, reactions)) => { - self.reactions.push(reactions); - self.sink_state = ConnectionSinkState::Ready { state }; - Poll::Ready(Ok(())) - } - Poll::Pending => Poll::Pending, - } - } - ConnectionSinkState::Closed => panic!("poll_ready called after close"), - } - } - - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - let s = self.get_mut(); - let state = &mut s.sink_state; - *state = match std::mem::replace(state, ConnectionSinkState::Closed) { - ConnectionSinkState::Ready { state } => { - match item.try_into() { - Ok(msg) => ConnectionSinkState::HandlingMessage { - result: state.handle_message(msg).boxed(), - }, - Err(e) => { - // If we weren't able to parse the message, send back an error. - s.reactions.push( - Reaction::ServerMessage(ServerMessage::ConnectionError { - payload: ConnectionErrorPayload { - message: e.to_string(), - }, - }) - .into_stream(), - ); - ConnectionSinkState::Ready { state } - } - } - } - _ => panic!("start_send called when not ready"), - }; - Ok(()) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - >::poll_ready(self, cx) - } - - fn poll_close(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { - self.sink_state = ConnectionSinkState::Closed; - if let Some(waker) = self.stream_waker.take() { - // Wake up the stream so it can close too. - waker.wake(); - } - Poll::Ready(Ok(())) - } -} - -impl Stream for Connection -where - S: Schema, - I: Init, -{ - type Item = ServerMessage; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.stream_waker = Some(cx.waker().clone()); - - if let ConnectionSinkState::Closed = self.sink_state { - return Poll::Ready(None); - } - - // Poll the reactions for new outgoing messages. - if !self.reactions.is_empty() { - match Pin::new(&mut self.reactions).poll_next(cx) { - Poll::Ready(Some(reaction)) => match reaction { - Reaction::ServerMessage(msg) => return Poll::Ready(Some(msg)), - Reaction::EndStream => return Poll::Ready(None), - }, - Poll::Ready(None) => { - // In rare cases, the reaction stream may terminate. For example, this will - // happen if the first message we receive does not require any reaction. Just - // recreate it in that case. - self.reactions = SelectAll::new(); - } - _ => (), - } - } - Poll::Pending - } -} - -#[cfg(test)] -mod test { - use std::{convert::Infallible, io}; - - use juniper::{ - futures::sink::SinkExt, - graphql_input_value, graphql_object, graphql_subscription, graphql_value, graphql_vars, - parser::{ParseError, Spanning}, - DefaultScalarValue, EmptyMutation, FieldError, FieldResult, RootNode, - }; - - use super::*; - - struct Context(i32); - - impl juniper::Context for Context {} - - struct Query; - - #[graphql_object(context = Context)] - impl Query { - /// context just resolves to the current context. - async fn context(context: &Context) -> i32 { - context.0 - } - } - - struct Subscription; - - #[graphql_subscription(context = Context)] - impl Subscription { - /// never never emits anything. - async fn never(_context: &Context) -> BoxStream<'static, FieldResult> { - tokio::time::sleep(Duration::from_secs(10000)) - .map(|_| unreachable!()) - .into_stream() - .boxed() - } - - /// context emits the current context once, then never emits anything else. - async fn context(context: &Context) -> BoxStream<'static, FieldResult> { - stream::once(future::ready(Ok(context.0))) - .chain( - tokio::time::sleep(Duration::from_secs(10000)) - .map(|_| unreachable!()) - .into_stream(), - ) - .boxed() - } - - /// error emits an error once, then never emits anything else. - async fn error(_context: &Context) -> BoxStream<'static, FieldResult> { - stream::once(future::ready(Err(FieldError::new( - "field error", - graphql_value!(null), - )))) - .chain( - tokio::time::sleep(Duration::from_secs(10000)) - .map(|_| unreachable!()) - .into_stream(), - ) - .boxed() - } - } - - type ClientMessage = super::ClientMessage; - type ServerMessage = super::ServerMessage; - - fn new_test_schema() -> Arc, Subscription>> { - Arc::new(RootNode::new(Query, EmptyMutation::new(), Subscription)) - } - - #[tokio::test] - async fn test_query() { - let mut conn = Connection::new( - new_test_schema(), - ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)), - ); - - conn.send(ClientMessage::ConnectionInit { - payload: graphql_vars! {}, - }) - .await - .unwrap(); - - assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); - - conn.send(ClientMessage::Start { - id: "foo".into(), - payload: StartPayload { - query: "{context}".into(), - variables: graphql_vars! {}, - operation_name: None, - }, - }) - .await - .unwrap(); - - assert_eq!( - ServerMessage::Data { - id: "foo".into(), - payload: DataPayload { - data: graphql_value!({"context": 1}), - errors: vec![], - }, - }, - conn.next().await.unwrap() - ); - - assert_eq!( - ServerMessage::Complete { id: "foo".into() }, - conn.next().await.unwrap() - ); - } - - #[tokio::test] - async fn test_subscriptions() { - let mut conn = Connection::new( - new_test_schema(), - ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)), - ); - - conn.send(ClientMessage::ConnectionInit { - payload: graphql_vars! {}, - }) - .await - .unwrap(); - - assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); - - conn.send(ClientMessage::Start { - id: "foo".into(), - payload: StartPayload { - query: "subscription Foo {context}".into(), - variables: graphql_vars! {}, - operation_name: None, - }, - }) - .await - .unwrap(); - - assert_eq!( - ServerMessage::Data { - id: "foo".into(), - payload: DataPayload { - data: graphql_value!({"context": 1}), - errors: vec![], - }, - }, - conn.next().await.unwrap() - ); - - conn.send(ClientMessage::Start { - id: "bar".into(), - payload: StartPayload { - query: "subscription Bar {context}".into(), - variables: graphql_vars! {}, - operation_name: None, - }, - }) - .await - .unwrap(); - - assert_eq!( - ServerMessage::Data { - id: "bar".into(), - payload: DataPayload { - data: graphql_value!({"context": 1}), - errors: vec![], - }, - }, - conn.next().await.unwrap() - ); - - conn.send(ClientMessage::Stop { id: "foo".into() }) - .await - .unwrap(); - - assert_eq!( - ServerMessage::Complete { id: "foo".into() }, - conn.next().await.unwrap() - ); - } - - #[tokio::test] - async fn test_init_params_ok() { - let mut conn = Connection::new(new_test_schema(), |params: Variables| async move { - assert_eq!(params.get("foo"), Some(&graphql_input_value!("bar"))); - Ok(ConnectionConfig::new(Context(1))) as Result<_, Infallible> - }); - - conn.send(ClientMessage::ConnectionInit { - payload: graphql_vars! {"foo": "bar"}, - }) - .await - .unwrap(); - - assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); - } - - #[tokio::test] - async fn test_init_params_error() { - let mut conn = Connection::new(new_test_schema(), |params: Variables| async move { - assert_eq!(params.get("foo"), Some(&graphql_input_value!("bar"))); - Err(io::Error::new(io::ErrorKind::Other, "init error")) - }); - - conn.send(ClientMessage::ConnectionInit { - payload: graphql_vars! {"foo": "bar"}, - }) - .await - .unwrap(); - - assert_eq!( - ServerMessage::ConnectionError { - payload: ConnectionErrorPayload { - message: "init error".into(), - }, - }, - conn.next().await.unwrap() - ); - } - - #[tokio::test] - async fn test_max_in_flight_operations() { - let mut conn = Connection::new( - new_test_schema(), - ConnectionConfig::new(Context(1)) - .with_keep_alive_interval(Duration::from_secs(0)) - .with_max_in_flight_operations(1), - ); - - conn.send(ClientMessage::ConnectionInit { - payload: graphql_vars! {}, - }) - .await - .unwrap(); - - assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); - - conn.send(ClientMessage::Start { - id: "foo".into(), - payload: StartPayload { - query: "subscription Foo {never}".into(), - variables: graphql_vars! {}, - operation_name: None, - }, - }) - .await - .unwrap(); - - conn.send(ClientMessage::Start { - id: "bar".into(), - payload: StartPayload { - query: "subscription Bar {never}".into(), - variables: graphql_vars! {}, - operation_name: None, - }, - }) - .await - .unwrap(); - - match conn.next().await.unwrap() { - ServerMessage::Error { id, .. } => { - assert_eq!(id, "bar"); - } - msg => panic!("expected error, got: {msg:?}"), - } - } - - #[tokio::test] - async fn test_parse_error() { - let mut conn = Connection::new( - new_test_schema(), - ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)), - ); - - conn.send(ClientMessage::ConnectionInit { - payload: graphql_vars! {}, - }) - .await - .unwrap(); - - assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); - - conn.send(ClientMessage::Start { - id: "foo".into(), - payload: StartPayload { - query: "asd".into(), - variables: graphql_vars! {}, - operation_name: None, - }, - }) - .await - .unwrap(); - - match conn.next().await.unwrap() { - ServerMessage::Error { id, payload } => { - assert_eq!(id, "foo"); - match payload.graphql_error() { - GraphQLError::ParseError(Spanning { - item: ParseError::UnexpectedToken(token), - .. - }) => assert_eq!(token, "asd"), - p => panic!("expected graphql parse error, got: {p:?}"), - } - } - msg => panic!("expected error, got: {msg:?}"), - } - } - - #[tokio::test] - async fn test_keep_alives() { - let mut conn = Connection::new( - new_test_schema(), - ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_millis(20)), - ); - - conn.send(ClientMessage::ConnectionInit { - payload: graphql_vars! {}, - }) - .await - .unwrap(); - - assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); - - for _ in 0..10 { - assert_eq!( - ServerMessage::ConnectionKeepAlive, - conn.next().await.unwrap() - ); - } - } - - #[tokio::test] - async fn test_slow_init() { - let mut conn = Connection::new( - new_test_schema(), - ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)), - ); - - conn.send(ClientMessage::ConnectionInit { - payload: graphql_vars! {}, - }) - .await - .unwrap(); - - // If we send the start message before the init is handled, we should still get results. - conn.send(ClientMessage::Start { - id: "foo".into(), - payload: StartPayload { - query: "{context}".into(), - variables: graphql_vars! {}, - operation_name: None, - }, - }) - .await - .unwrap(); - - assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); - - assert_eq!( - ServerMessage::Data { - id: "foo".into(), - payload: DataPayload { - data: graphql_value!({"context": 1}), - errors: vec![], - }, - }, - conn.next().await.unwrap() - ); - } - - #[tokio::test] - async fn test_subscription_field_error() { - let mut conn = Connection::new( - new_test_schema(), - ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)), - ); - - conn.send(ClientMessage::ConnectionInit { - payload: graphql_vars! {}, - }) - .await - .unwrap(); - - assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap()); - - conn.send(ClientMessage::Start { - id: "foo".into(), - payload: StartPayload { - query: "subscription Foo {error}".into(), - variables: graphql_vars! {}, - operation_name: None, - }, - }) - .await - .unwrap(); - - match conn.next().await.unwrap() { - ServerMessage::Data { - id, - payload: DataPayload { data, errors }, - } => { - assert_eq!(id, "foo"); - assert_eq!(data, graphql_value!({ "error": null })); - assert_eq!(errors.len(), 1); - } - msg => panic!("expected data, got: {msg:?}"), - } - } -} diff --git a/juniper_graphql_ws/src/schema.rs b/juniper_graphql_ws/src/schema.rs index 68d282f0b..43b7f94b4 100644 --- a/juniper_graphql_ws/src/schema.rs +++ b/juniper_graphql_ws/src/schema.rs @@ -1,6 +1,7 @@ -use juniper::{GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue}; use std::sync::Arc; +use juniper::{GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue}; + /// Schema defines the requirements for schemas that can be used for operations. Typically this is /// just an `Arc>` and you should not have to implement it yourself. pub trait Schema: Unpin + Clone + Send + Sync + 'static { diff --git a/juniper_graphql_ws/src/server_message.rs b/juniper_graphql_ws/src/server_message.rs index 5cbd54843..b542cdbb4 100644 --- a/juniper_graphql_ws/src/server_message.rs +++ b/juniper_graphql_ws/src/server_message.rs @@ -1,53 +1,37 @@ +//! Common definitions regarding server messages. + use std::{any::Any, fmt, marker::PhantomPinned}; -use juniper::{ExecutionError, GraphQLError, Value}; +use juniper::GraphQLError; use serde::{Serialize, Serializer}; -/// The payload for errors that are not associated with a GraphQL operation. -#[derive(Debug, Eq, PartialEq, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct ConnectionErrorPayload { - /// The error message. - pub message: String, -} - -/// Sent after execution of an operation. For queries and mutations, this is sent to the client -/// once. For subscriptions, this is sent for every event in the event stream. -#[derive(Debug, Serialize, PartialEq)] -#[serde(rename_all = "camelCase")] -pub struct DataPayload { - /// The result data. - pub data: Value, - - /// The errors that have occurred during execution. Note that parse and validation errors are - /// not included here. They are sent via Error messages. - #[serde(skip_serializing_if = "Vec::is_empty")] - pub errors: Vec>, -} - -/// A payload for errors that can happen before execution. Errors that happen during execution are -/// instead sent to the client via `DataPayload`. `ErrorPayload` is a wrapper for an owned -/// `GraphQLError`. +/// Payload for errors that can happen before execution. +/// +/// Errors that happen during execution are instead sent to the client via +/// [`graphql_ws::DataPayload`] or [`graphql_transport_ws::NextPayload`]. [`ErrorPayload`] is a +/// wrapper for an owned [`GraphQLError`]. +/// +/// [`graphql_transport_ws::NextPayload`]: crate::graphql_transport_ws::NextPayload +/// [`graphql_ws::DataPayload`]: crate::graphql_ws::DataPayload // XXX: Think carefully before deriving traits. This is self-referential (error references // _execution_params). pub struct ErrorPayload { _execution_params: Option>, error: GraphQLError, - _marker: PhantomPinned, + _pinned: PhantomPinned, } impl ErrorPayload { - /// Creates a new [`ErrorPayload`] out of the provide `execution_params` and - /// [`GraphQLError`]. + /// Creates a new [`ErrorPayload`] out of the provide `execution_params` and [`GraphQLError`]. pub(crate) fn new(execution_params: Box, error: GraphQLError) -> Self { Self { _execution_params: Some(execution_params), error, - _marker: PhantomPinned, + _pinned: PhantomPinned, } } - /// Returns the contained GraphQLError. + /// Returns the contained [`GraphQLError`]. pub fn graphql_error(&self) -> &GraphQLError { &self.error } @@ -79,108 +63,7 @@ impl From for ErrorPayload { Self { _execution_params: None, error, - _marker: PhantomPinned, + _pinned: PhantomPinned, } } } - -/// ServerMessage defines the message types that servers can send. -#[derive(Debug, Serialize, PartialEq)] -#[serde(rename_all = "snake_case")] -#[serde(tag = "type")] -pub enum ServerMessage { - /// ConnectionError is used for errors that are not associated with a GraphQL operation. For - /// example, this will be used when: - /// - /// * The server is unable to parse a client's message. - /// * The client's initialization parameters are rejected. - ConnectionError { - /// The error that occurred. - payload: ConnectionErrorPayload, - }, - /// ConnectionAck is sent in response to a client's ConnectionInit message if the server accepted a - /// connection. - ConnectionAck, - /// Data contains the result of a query, mutation, or subscription event. - Data { - /// The id of the operation that the data is for. - id: String, - - /// The data and errors that occurred during execution. - payload: DataPayload, - }, - /// Error contains an error that occurs before execution, such as validation errors. - Error { - /// The id of the operation that triggered this error. - id: String, - - /// The error(s). - payload: ErrorPayload, - }, - /// Complete indicates that no more data will be sent for the given operation. - Complete { - /// The id of the operation that has completed. - id: String, - }, - /// ConnectionKeepAlive is sent periodically after accepting a connection. - #[serde(rename = "ka")] - ConnectionKeepAlive, -} - -#[cfg(test)] -mod test { - use juniper::{graphql_value, DefaultScalarValue}; - - use super::*; - - #[test] - fn test_serialization() { - type ServerMessage = super::ServerMessage; - - assert_eq!( - serde_json::to_string(&ServerMessage::ConnectionError { - payload: ConnectionErrorPayload { - message: "foo".into(), - }, - }) - .unwrap(), - r#"{"type":"connection_error","payload":{"message":"foo"}}"#, - ); - - assert_eq!( - serde_json::to_string(&ServerMessage::ConnectionAck).unwrap(), - r#"{"type":"connection_ack"}"#, - ); - - assert_eq!( - serde_json::to_string(&ServerMessage::Data { - id: "foo".into(), - payload: DataPayload { - data: graphql_value!(null), - errors: vec![], - }, - }) - .unwrap(), - r#"{"type":"data","id":"foo","payload":{"data":null}}"#, - ); - - assert_eq!( - serde_json::to_string(&ServerMessage::Error { - id: "foo".into(), - payload: GraphQLError::UnknownOperationName.into(), - }) - .unwrap(), - r#"{"type":"error","id":"foo","payload":[{"message":"Unknown operation"}]}"#, - ); - - assert_eq!( - serde_json::to_string(&ServerMessage::Complete { id: "foo".into() }).unwrap(), - r#"{"type":"complete","id":"foo"}"#, - ); - - assert_eq!( - serde_json::to_string(&ServerMessage::ConnectionKeepAlive).unwrap(), - r#"{"type":"ka"}"#, - ); - } -} diff --git a/juniper_graphql_transport_ws/src/utils.rs b/juniper_graphql_ws/src/util.rs similarity index 75% rename from juniper_graphql_transport_ws/src/utils.rs rename to juniper_graphql_ws/src/util.rs index 75106a4cf..b6fc4b74d 100644 --- a/juniper_graphql_transport_ws/src/utils.rs +++ b/juniper_graphql_ws/src/util.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Deserializer}; +/// Deserializes `null`able value by placing the [`Default`] value instead of `null`. pub(crate) fn default_for_null<'de, D, T>(deserializer: D) -> Result where D: Deserializer<'de>, diff --git a/juniper_graphql_ws/src/utils.rs b/juniper_graphql_ws/src/utils.rs deleted file mode 100644 index 75106a4cf..000000000 --- a/juniper_graphql_ws/src/utils.rs +++ /dev/null @@ -1,9 +0,0 @@ -use serde::{Deserialize, Deserializer}; - -pub(crate) fn default_for_null<'de, D, T>(deserializer: D) -> Result -where - D: Deserializer<'de>, - T: Deserialize<'de> + Default, -{ - Ok(Option::::deserialize(deserializer)?.unwrap_or_default()) -} diff --git a/juniper_subscriptions/release.toml b/juniper_subscriptions/release.toml index e7e23359b..a12a90656 100644 --- a/juniper_subscriptions/release.toml +++ b/juniper_subscriptions/release.toml @@ -10,12 +10,6 @@ exactly = 1 search = "juniper_subscriptions = \\{ version = \"[^\"]+\"" replace = "juniper_subscriptions = { version = \"{{version}}\"" -[[pre-release-replacements]] -file = "../juniper_graphql_transport_ws/Cargo.toml" -exactly = 1 -search = "juniper_subscriptions = \\{ version = \"[^\"]+\"" -replace = "juniper_subscriptions = { version = \"{{version}}\"" - [[pre-release-replacements]] file = "CHANGELOG.md" max = 1 diff --git a/juniper_warp/Cargo.toml b/juniper_warp/Cargo.toml index 95e9c0b36..3c124b4ab 100644 --- a/juniper_warp/Cargo.toml +++ b/juniper_warp/Cargo.toml @@ -20,7 +20,6 @@ rustdoc-args = ["--cfg", "docsrs"] [features] subscriptions = [ - "dep:juniper_graphql_transport_ws", "dep:juniper_graphql_ws", "dep:log", "warp/websocket", @@ -30,8 +29,7 @@ subscriptions = [ anyhow = "1.0.47" futures = "0.3.22" juniper = { version = "0.16.0-dev", path = "../juniper", default-features = false } -juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws", optional = true } -juniper_graphql_transport_ws = { version = "0.4.0-dev", path = "../juniper_graphql_transport_ws", optional = true } +juniper_graphql_ws = { version = "0.4.0-dev", path = "../juniper_graphql_ws", features = ["graphql-transport-ws", "graphql-ws"], optional = true } log = { version = "0.4", optional = true } serde = { version = "1.0.122", features = ["derive"] } serde_json = "1.0.18" diff --git a/juniper_warp/examples/subscription.rs b/juniper_warp/examples/subscription.rs index f01cdde2d..73b2c9fe0 100644 --- a/juniper_warp/examples/subscription.rs +++ b/juniper_warp/examples/subscription.rs @@ -7,7 +7,7 @@ use juniper::{ graphql_object, graphql_subscription, graphql_value, EmptyMutation, FieldError, GraphQLEnum, RootNode, }; -use juniper_graphql_transport_ws::ConnectionConfig; +use juniper_graphql_ws::ConnectionConfig; use warp::{http::Response, Filter}; #[derive(Clone)] diff --git a/juniper_warp/src/lib.rs b/juniper_warp/src/lib.rs index c95805a4b..595fe8dca 100644 --- a/juniper_warp/src/lib.rs +++ b/juniper_warp/src/lib.rs @@ -349,13 +349,12 @@ pub mod subscriptions { }, GraphQLSubscriptionType, GraphQLTypeAsync, RootNode, ScalarValue, }; - use juniper_graphql_transport_ws; - use juniper_graphql_ws; + use juniper_graphql_ws::{graphql_transport_ws, graphql_ws}; use warp::{filters::BoxedFilter, reply::Reply, Filter as _}; struct Message(warp::ws::Message); - impl TryFrom for juniper_graphql_ws::ClientMessage { + impl TryFrom for graphql_ws::ClientMessage { type Error = serde_json::Error; fn try_from(msg: Message) -> serde_json::Result { @@ -367,7 +366,7 @@ pub mod subscriptions { } } - impl TryFrom for juniper_graphql_transport_ws::Input { + impl TryFrom for graphql_transport_ws::Input { type Error = serde_json::Error; fn try_from(msg: Message) -> serde_json::Result { @@ -423,11 +422,10 @@ pub mod subscriptions { /// The `schema` argument is your [`juniper`] schema. /// /// The `init` argument is used to provide the custom [`juniper::Context`] and additional - /// configuration for connections. This can be a - /// [`juniper_graphql_transport_ws::ConnectionConfig`] if the context and configuration are - /// already known, or it can be a closure that gets executed asynchronously whenever a client - /// sends the subscription initialization message. Using a closure allows to perform an - /// authentication based on the parameters provided by a client. + /// configuration for connections. This can be a [`juniper_graphql_ws::ConnectionConfig`] if the + /// context and configuration are already known, or it can be a closure that gets executed + /// asynchronously whenever a client sends the subscription initialization message. Using a + /// closure allows to perform an authentication based on the parameters provided by a client. /// /// # Example /// @@ -436,7 +434,7 @@ pub mod subscriptions { /// # /// # use futures::Stream; /// # use juniper::{graphql_object, graphql_subscription, EmptyMutation, RootNode}; - /// # use juniper_graphql_transport_ws::ConnectionConfig; + /// # use juniper_graphql_ws::ConnectionConfig; /// # use juniper_warp::make_graphql_filter; /// # use warp::Filter as _; /// # @@ -532,7 +530,7 @@ pub mod subscriptions { Subscription::TypeInfo: Send + Sync, CtxT: Unpin + Send + Sync + 'static, S: ScalarValue + Send + Sync + 'static, - I: juniper_graphql_transport_ws::Init + Clone + Send + Sync, + I: juniper_graphql_ws::Init + Clone + Send + Sync, { let schema = schema.into(); @@ -598,8 +596,7 @@ pub mod subscriptions { { let (ws_tx, ws_rx) = websocket.split(); let (s_tx, s_rx) = - juniper_graphql_ws::Connection::new(juniper_graphql_ws::ArcSchema(root_node), init) - .split(); + graphql_ws::Connection::new(juniper_graphql_ws::ArcSchema(root_node), init).split(); let ws_rx = ws_rx.map(|r| r.map(Message)); let s_rx = s_rx.map(|msg| { @@ -622,10 +619,10 @@ pub mod subscriptions { /// Serves the [new `graphql-transport-ws` GraphQL over WebSocket Protocol][new]. /// /// The `init` argument is used to provide the context and additional configuration for - /// connections. This can be a [`juniper_graphql_transport_ws::ConnectionConfig`] if the context - /// and configuration are already known, or it can be a closure that gets executed - /// asynchronously when the client sends the `ConnectionInit` message. Using a closure allows to - /// perform an authentication based on the parameters provided by a client. + /// connections. This can be a [`juniper_graphql_ws::ConnectionConfig`] if the context and + /// configuration are already known, or it can be a closure that gets executed asynchronously + /// when the client sends the `ConnectionInit` message. Using a closure allows to perform an + /// authentication based on the parameters provided by a client. /// /// [new]: https://github.com/enisdenjo/graphql-ws/blob/v5.14.0/PROTOCOL.md pub async fn serve_graphql_transport_ws( @@ -642,21 +639,19 @@ pub mod subscriptions { Subscription::TypeInfo: Send + Sync, CtxT: Unpin + Send + Sync + 'static, S: ScalarValue + Send + Sync + 'static, - I: juniper_graphql_transport_ws::Init + Send, + I: juniper_graphql_ws::Init + Send, { let (ws_tx, ws_rx) = websocket.split(); - let (s_tx, s_rx) = juniper_graphql_transport_ws::Connection::new( - juniper_graphql_transport_ws::ArcSchema(root_node), - init, - ) - .split(); + let (s_tx, s_rx) = + graphql_transport_ws::Connection::new(juniper_graphql_ws::ArcSchema(root_node), init) + .split(); let ws_rx = ws_rx.map(|r| r.map(Message)); let s_rx = s_rx.map(|output| match output { - juniper_graphql_transport_ws::Output::Message(msg) => serde_json::to_string(&msg) + graphql_transport_ws::Output::Message(msg) => serde_json::to_string(&msg) .map(warp::ws::Message::text) .map_err(Error::Serde), - juniper_graphql_transport_ws::Output::Close { code, message } => { + graphql_transport_ws::Output::Close { code, message } => { Ok(warp::ws::Message::close_with(code, message)) } });