diff --git a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart index dc72592a..cf702509 100644 --- a/links/gql_websocket_link/lib/src/graphql_transport_ws.dart +++ b/links/gql_websocket_link/lib/src/graphql_transport_ws.dart @@ -625,6 +625,8 @@ class _ConnectionState { // TODO: WebSocketChannel should have a `state` getter and `onStateChange` stream bool isOpen = false; + Map> nextOrErrorMsgWaitMap = {}; + /// Checks the `connect` problem and evaluates if the client should retry. bool shouldRetryConnectOrThrow(Object errOrCloseEvent) { options.log?.call("shouldRetryConnectOrThrow $errOrCloseEvent"); @@ -813,6 +815,29 @@ class _ConnectionState { } // parseMessage(msg!, reviver: options.jsonMessageReviver); if (!isOpen) return; + + // wait for next or error message (result) to be processed before process complete message + if (message is CompleteMessage && + nextOrErrorMsgWaitMap.containsKey(message.id)) { + final completer = nextOrErrorMsgWaitMap[message.id]; + + if (completer != null) { + if (completer.isCompleted) { + nextOrErrorMsgWaitMap.remove(message.id); + } else { + final timer = Timer(const Duration(seconds: 60), () { + // Timeout => let's return an error + if (!completer.isCompleted) { + completer.complete(); + } + }); + + await completer.future; + timer.cancel(); + } + } + } + emitter.emit(TransportWsEvent.message(message)); if (message is PingMessage || message is PongMessage) { final msgPayload = message is PingMessage @@ -911,16 +936,16 @@ class _ConnectionState { // if (socket.readyState == WebSocketImpl.CLOSING) await throwOnClose; final _releaseComp = Completer(); - final void Function() release = _releaseComp.complete; final released = _releaseComp.future; return _Connection( socket: socket, - release: release, + release: _releaseComp, waitForReleaseOrThrowOnClose: Future.any([ // wait for released.then((_) { - if (locks == 0) { + // if released, no other operations, and not keep alive, wait for the socket to close + if (locks == 0 && options.keepAlive == Duration.zero) { // and if no more locks are present, complete the connection final complete = () { isOpen = false; @@ -988,20 +1013,37 @@ class _Client extends TransportWsClient { final socket = _c.socket; final release = _c.release; final waitForReleaseOrThrowOnClose = _c.waitForReleaseOrThrowOnClose; - + // print("isolate debug name: ${Isolate.current.debugName}"); + // print(payload.operation.toString()); + // print(payload.variables.toString()); + // print(payload.context.toString()); + // print("graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); // if done while waiting for connect, release the connection lock right away final _subscribeMsg = await options.graphQLSocketMessageEncoder( SubscribeMessage(id, options.serializer.serializeRequest(payload)), ); - if (done) return release(); + // print("after graphQLSocketMessageEncoder: ${Isolate.current.debugName}"); + if (done) { + if (!release.isCompleted) release.complete(); + } final unlisten = emitter.onMessage(id, (message) { if (message is NextMessage) { sink.add(message.payload); + final completer = state.nextOrErrorMsgWaitMap[id]; + if (completer != null && !completer.isCompleted) { + completer.complete(); + } + state.nextOrErrorMsgWaitMap.remove(id); } else if (message is ErrorMessage) { errored = true; done = true; sink.addError(message.payload); + final completer = state.nextOrErrorMsgWaitMap[id]; + if (completer != null && !completer.isCompleted) { + completer.complete(); + } + state.nextOrErrorMsgWaitMap.remove(id); releaser(); } else if (message is CompleteMessage) { done = true; @@ -1011,6 +1053,8 @@ class _Client extends TransportWsClient { socket.sink.add(_subscribeMsg); + state.nextOrErrorMsgWaitMap[id] = Completer(); + releaser = () async { final _completeMsg = await options.graphQLSocketMessageEncoder(CompleteMessage(id)); @@ -1020,7 +1064,7 @@ class _Client extends TransportWsClient { } state.locks--; done = true; - release(); + if (!release.isCompleted) release.complete(); }; // either the releaser will be called, connection completed and @@ -1077,7 +1121,7 @@ class _Client extends TransportWsClient { class _Connection { final WebSocketChannel socket; - final void Function() release; + final Completer release; final Future waitForReleaseOrThrowOnClose; _Connection({ diff --git a/links/gql_websocket_link/test/gql_websocket_link_test.dart b/links/gql_websocket_link/test/gql_websocket_link_test.dart index 6d881d97..d19aba70 100644 --- a/links/gql_websocket_link/test/gql_websocket_link_test.dart +++ b/links/gql_websocket_link/test/gql_websocket_link_test.dart @@ -117,6 +117,7 @@ void _testLinks( }) { final dataMessageType = isApolloSubProtocol ? "data" : "next"; final startMessageType = isApolloSubProtocol ? "start" : "subscribe"; + final endMessageType = isApolloSubProtocol ? "stop" : "complete"; test( "send connection_init", @@ -134,6 +135,13 @@ void _testLinks( ), ); + final completer = Completer(); + final timer = Timer(const Duration(seconds: 5), () { + if (!completer.isCompleted) { + completer.completeError("Timeout"); + } + }); + server = await HttpServer.bind("localhost", 0); server.transform(WebSocketTransformer()).listen( (webSocket) async { @@ -144,6 +152,9 @@ void _testLinks( final map = json.decode(message as String) as Map; expect(map["type"], MessageTypes.connectionInit); + if (!completer.isCompleted) { + completer.complete(); + } }, ), ); @@ -156,6 +167,9 @@ void _testLinks( link = makeLink(null, channelGenerator: () => channel); // link.request(request).listen(print); + + await completer.future; + timer.cancel(); }, ); @@ -176,6 +190,13 @@ void _testLinks( ), ); + final completer = Completer(); + final timer = Timer(const Duration(seconds: 5), () { + if (!completer.isCompleted) { + completer.completeError("Timeout"); + } + }); + server = await HttpServer.bind("localhost", 0); server.transform(WebSocketTransformer()).listen( (webSocket) async { @@ -187,6 +208,9 @@ void _testLinks( json.decode(message as String) as Map; expect(map["type"], MessageTypes.connectionInit); expect(map["payload"], initialPayload); + if (!completer.isCompleted) { + completer.complete(); + } }, ), ); @@ -204,6 +228,8 @@ void _testLinks( ); // link.request(request).listen(print); + await completer.future; + timer.cancel(); }, ); @@ -227,6 +253,13 @@ void _testLinks( ), ); + final completer = Completer(); + final timer = Timer(const Duration(seconds: 5), () { + if (!completer.isCompleted) { + completer.completeError("Timeout"); + } + }); + server = await HttpServer.bind("localhost", 0); server.transform(WebSocketTransformer()).listen( (webSocket) async { @@ -238,6 +271,9 @@ void _testLinks( json.decode(message as String) as Map; expect(map["type"], MessageTypes.connectionInit); expect(map["payload"], baseInitialPayload); + if (!completer.isCompleted) { + completer.complete(); + } }, ), ); @@ -254,6 +290,9 @@ void _testLinks( ); // link.request(request).listen(print); + + await completer.future; + timer.cancel(); }, ); @@ -706,6 +745,370 @@ void _testLinks( }, ); + test( + "yield complete/stop message before next/data message", + () async { + HttpServer server; + WebSocket webSocket; + IOWebSocketChannel channel; + Link link; + Request request; + final responseData1 = { + "data": { + "pokemons": [ + {"name": "Bulbasaur"}, + {"name": "Ivysaur"}, + {"name": "Venusaur"} + ] + } + }; + + request = Request( + operation: Operation( + operationName: "pokemonsSubscription", + document: parseString( + r"subscription MySubscription { pokemons(first: $first) { name } }"), + ), + variables: const { + "first": 3, + }, + ); + + server = await HttpServer.bind("localhost", 0); + server.transform(WebSocketTransformer()).listen( + (webSocket) async { + final channel = IOWebSocketChannel(webSocket); + channel.stream.listen( + (dynamic message) async { + final map = + json.decode(message as String) as Map; + if (map["type"] == "connection_init") { + channel.sink.add( + json.encode( + ConnectionAck(), + ), + ); + } else if (map["type"] == startMessageType) { + // send end message to complete the operation + channel.sink.add( + json.encode( + { + "type": endMessageType, + "id": map["id"], + }, + ), + ); + await Future.delayed(Duration(seconds: 1)); + // client should wait for this message to come before completing the operation + channel.sink.add( + json.encode( + { + "type": dataMessageType, + "id": map["id"], + "payload": { + "data": responseData1, + "errors": null, + }, + }, + ), + ); + } + }, + ); + }, + ); + + webSocket = await WebSocket.connect("ws://localhost:${server.port}"); + channel = IOWebSocketChannel(webSocket); + link = makeLink(null, channelGenerator: () => channel); + link.request(request).listen( + expectAsync1( + (response) { + expect( + response.data, + responseData1, + ); + expect(response.errors, null); + expect( + response.context.entry()?.extensions, + null, + ); + }, + ), + ); + }, + ); + + test( + "conneciton is kept alive when no operations are in process", + () async { + // only test for transport ws sub-protocol + if (isApolloSubProtocol) { + return; + } + HttpServer server; + WebSocket webSocket; + IOWebSocketChannel channel; + Link link; + Request request; + final responseData1 = { + "data": { + "pokemons": [ + {"name": "Bulbasaur"}, + {"name": "Ivysaur"}, + {"name": "Venusaur"} + ] + } + }; + + request = Request( + operation: Operation( + operationName: "pokemonsSubscription", + document: parseString( + r"subscription MySubscription { pokemons(first: $first) { name } }"), + ), + variables: const { + "first": 3, + }, + ); + + final completer = Completer(); + final timer = Timer(const Duration(seconds: 5), () { + if (!completer.isCompleted) { + completer.complete(); + } + }); + + server = await HttpServer.bind("localhost", 0); + server.transform(WebSocketTransformer()).listen( + (webSocket) async { + final channel = IOWebSocketChannel(webSocket); + channel.stream.listen( + (dynamic message) async { + final map = + json.decode(message as String) as Map; + if (map["type"] == "connection_init") { + channel.sink.add( + json.encode( + ConnectionAck(), + ), + ); + } else if (map["type"] == startMessageType) { + channel.sink.add( + json.encode( + { + "type": dataMessageType, + "id": map["id"], + "payload": { + "data": responseData1, + "errors": null, + }, + }, + ), + ); + // send end message to complete the operation + channel.sink.add( + json.encode( + { + "type": endMessageType, + "id": map["id"], + }, + ), + ); + } + }, + ); + }, + ); + + // test case 1: The connection should be kept alive + webSocket = await WebSocket.connect("ws://localhost:${server.port}"); + channel = IOWebSocketChannel(webSocket); + final cHashCode = channel.hashCode; + final wsHashCode = webSocket.hashCode; + link = makeLink(null, + channelGenerator: () => channel, + inactivityTimeout: Duration(seconds: 5)); + link.request(request).listen( + expectAsync1( + (response) { + expect( + response.data, + responseData1, + ); + expect(response.errors, null); + expect( + response.context.entry()?.extensions, + null, + ); + if (!completer.isCompleted) { + completer.complete(); + } + }, + ), + ); + + await completer.future; + timer.cancel(); + // The connection should be kept alive + await Future.delayed(Duration(seconds: 1)); + expect(webSocket.closeCode, null); + expect(channel.hashCode, cHashCode); + expect(webSocket.hashCode, wsHashCode); + await channel.sink.close(1000, "Normal Closure"); + + // test case 2: The connection should be closed, not keepAlive + final completer1 = Completer(); + final timer1 = Timer(const Duration(seconds: 5), () { + if (!completer.isCompleted) { + completer.complete(); + } + }); + + webSocket = await WebSocket.connect("ws://localhost:${server.port}"); + channel = IOWebSocketChannel(webSocket); + link = makeLink(null, channelGenerator: () => channel); + link.request(request).listen( + expectAsync1( + (response) { + print(response); + expect( + response.data, + responseData1, + ); + expect(response.errors, null); + expect( + response.context.entry()?.extensions, + null, + ); + if (!completer1.isCompleted) { + completer1.complete(); + } + }, + ), + ); + + await completer1.future; + timer1.cancel(); + await Future.delayed(Duration(seconds: 1)); + expect(webSocket.closeCode, 1000); + }, + ); + + test( + "operation is completed by error", + () async { + // only test for transport ws sub-protocol + if (isApolloSubProtocol) { + return; + } + HttpServer server; + WebSocket webSocket; + IOWebSocketChannel channel; + Link link; + Request request; + + request = Request( + operation: Operation( + operationName: "pokemonsSubscription", + document: parseString( + r"subscription MySubscription { pokemons(first: $first) { name } }"), + ), + variables: const { + "first": 3, + }, + ); + + final completer = Completer(); + final timer = Timer(const Duration(seconds: 5), () { + if (!completer.isCompleted) { + completer.complete(); + } + }); + + server = await HttpServer.bind("localhost", 0); + server.transform(WebSocketTransformer()).listen( + (webSocket) async { + final channel = IOWebSocketChannel(webSocket); + channel.stream.listen( + (dynamic message) async { + final map = + json.decode(message as String) as Map; + if (map["type"] == "connection_init") { + channel.sink.add( + json.encode( + ConnectionAck(), + ), + ); + } else if (map["type"] == startMessageType) { + channel.sink.add( + json.encode( + { + "type": "error", + "id": map["id"], + "payload": [ + { + "message": "error message 2.1", + "locations": [ + {"column": 1, "line": 2} + ] + }, + { + "message": "error message 2.2", + } + ], + }, + ), + ); + } + }, + ); + }, + ); + + webSocket = await WebSocket.connect("ws://localhost:${server.port}"); + channel = IOWebSocketChannel(webSocket); + final cHashCode = channel.hashCode; + final wsHashCode = webSocket.hashCode; + link = makeLink(null, + channelGenerator: () => channel, + inactivityTimeout: Duration(seconds: 5)); + link.request(request).listen( + (event) {}, + onError: (Object err, StackTrace stack) { + final errors = err as List; + expect(errors.length, 2); + expect( + errors.first.message, + "error message 2.1", + ); + expect( + errors.first.locations! + .map((e) => {"column": e.column, "line": e.line}), + [ + {"column": 1, "line": 2} + ], + ); + expect( + errors.last.message, + "error message 2.2", + ); + if (!completer.isCompleted) { + completer.complete(); + } + }, + ); + + await completer.future; + timer.cancel(); + // The connection should be kept alive + expect(webSocket.closeCode, null); + expect(channel.hashCode, cHashCode); + expect(webSocket.hashCode, wsHashCode); + await channel.sink.close(1000, "Normal Closure"); + }, + ); + test("throw WebSocketLinkParserException when unable to parse response", () async { HttpServer server; @@ -861,6 +1264,13 @@ void _testLinks( ), ); + final completer = Completer(); + final timer = Timer(const Duration(seconds: 5), () { + if (!completer.isCompleted) { + completer.completeError("Timeout"); + } + }); + server = await HttpServer.bind("localhost", 0); server.transform(WebSocketTransformer()).listen( (webSocket) async { @@ -898,7 +1308,13 @@ void _testLinks( } messageCount++; }, - () => messageCount == 3, + () { + final isDone = messageCount == 3; + if (isDone && !completer.isCompleted) { + completer.complete(); + } + return isDone; + }, ), ); }, @@ -908,6 +1324,9 @@ void _testLinks( channel = IOWebSocketChannel(webSocket); link = makeLink(null, channelGenerator: () => channel); responseSub = link.request(request).listen(print); + + await completer.future; + timer.cancel(); }); test(