Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
juancastillo0 committed Oct 25, 2021
2 parents e5b6f66 + cd5120e commit 44ff1a0
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 9 deletions.
29 changes: 28 additions & 1 deletion links/gql_websocket_link/lib/src/link.dart
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ class WebSocketLink extends Link {
StreamController<GraphQLSocketMessage>.broadcast();
StreamSubscription<ConnectionKeepAlive>? _keepAliveSubscription;

/// Completes when the [WebSocketLink] is disposed.
/// Non-null when the Link is closing or already closed with [_close].
Completer<void>? _disposedCompleter;

/// true when the [WebSocketLink] can't send any more messages.
/// This happends after calling [dispose] or when [autoReconnect] is false
/// and the web socket disconnected.
bool get isDisabled => _disposedCompleter != null;

/// Initialize the [WebSocketLink] with a [uri].
/// You can customize the headers & protocols by passing [channelGenerator],
/// if [channelGenerator] is passed, [uri] must be null.
Expand Down Expand Up @@ -173,6 +182,7 @@ class WebSocketLink extends Link {
}
},
onError: response.addError,
onDone: response.close,
);
// Send the request.
_write(
Expand All @@ -185,6 +195,9 @@ class WebSocketLink extends Link {
};

response.onCancel = () {
if (isDisabled) {
return;
}
messagesSubscription?.cancel();
_write(StopOperation(id)).catchError(response.addError);
_requests.removeWhere((e) => e.context.entry<RequestId>()!.id == id);
Expand Down Expand Up @@ -219,6 +232,14 @@ class WebSocketLink extends Link {
_reConnectRequests.clear();
}
}, onDone: () {
assert(
!isDisabled || _connectionStateController.value == closed,
"_connectionStateController should be disposed with a closed state",
);
if (isDisabled) {
// already disposed
return;
}
_connectionStateController.add(closed);
if (autoReconnect) {
_reConnectRequests.clear();
Expand Down Expand Up @@ -312,7 +333,8 @@ class WebSocketLink extends Link {
case MessageTypes.data:
final dynamic data = payload["data"];
final dynamic errors = payload["errors"];
return SubscriptionData(id, data, errors);
final dynamic extensions = payload["extensions"];
return SubscriptionData(id, data, errors, extensions);
case MessageTypes.error:
return SubscriptionError(id, payload);
case MessageTypes.complete:
Expand All @@ -324,12 +346,17 @@ class WebSocketLink extends Link {

/// Close the WebSocket channel.
Future<void> _close() async {
if (_disposedCompleter != null) {
return _disposedCompleter!.future;
}
_disposedCompleter = Completer();
await _keepAliveSubscription?.cancel();
await _channel?.sink.close(websocket_status.goingAway);
_connectionStateController.add(closed);
await _connectionStateController.close();
await _messagesController.close();
_reconnectTimer?.cancel();
_disposedCompleter!.complete();
}

/// Disposes the underlying channel explicitly.
Expand Down
9 changes: 8 additions & 1 deletion links/gql_websocket_link/lib/src/messages.dart
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,24 @@ class ConnectionKeepAlive extends GraphQLSocketMessage {
/// payload. The user should check the errors result before processing the
/// data value. These error are from the query resolvers.
class SubscriptionData extends GraphQLSocketMessage {
SubscriptionData(this.id, this.data, this.errors) : super(MessageTypes.data);
SubscriptionData(
this.id,
this.data,
this.errors,
this.extensions,
) : super(MessageTypes.data);

final String id;
final dynamic data;
final dynamic errors;
final dynamic extensions;

@override
Map<String, dynamic> toJson() => <String, dynamic>{
"type": type,
"data": data,
"errors": errors,
"extensions": extensions,
};

@override
Expand Down
145 changes: 138 additions & 7 deletions links/gql_websocket_link/test/gql_websocket_link_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ void main() {
WebSocket webSocket;
IOWebSocketChannel channel;
WebSocketLink link;
final responseExtensions = {
"customExtension": {"value": 1}
};
final responseData = {
"data": {
"pokemons": [
Expand Down Expand Up @@ -209,6 +212,7 @@ void main() {
"payload": {
"data": responseData,
"errors": null,
"extensions": responseExtensions,
},
},
),
Expand All @@ -228,6 +232,10 @@ void main() {
(Response response) {
expect(response.data, responseData);
expect(response.errors, null);
expect(
response.context.entry<ResponseExtensions>()?.extensions,
responseExtensions,
);
},
),
);
Expand All @@ -251,6 +259,8 @@ void main() {
]
}
};

final responseExtensions2 = {"customExtension": "1"};
final responseData2 = {
"data": {
"pokemons": [
Expand Down Expand Up @@ -308,6 +318,7 @@ void main() {
"payload": {
"data": responseData2,
"errors": null,
"extensions": responseExtensions2,
},
},
),
Expand All @@ -325,23 +336,32 @@ void main() {
// We expect responseData1, then responseData2 in order.
int callCounter = 0;
const maxCall = 2;
link
.request(request)
.map((Response response) => response.data)
.listen(
link.request(request).listen(
expectAsync1(
(data) {
(response) {
callCounter += 1;
if (callCounter == 1) {
expect(
data,
response.data,
responseData1,
);
expect(
response.context
.entry<ResponseExtensions>()
?.extensions,
null,
);
} else if (callCounter == 2) {
expect(
data,
response.data,
responseData2,
);
expect(
response.context
.entry<ResponseExtensions>()
?.extensions,
responseExtensions2,
);
}
},
count: maxCall,
Expand All @@ -366,6 +386,12 @@ void main() {
"path": ["path1", "path2"],
"extensions": {"key1": "val", "key2": 77},
};
final responseExtensions = {
"extensinon1": {"dw": 2},
"extensinon2": {
"l": [3]
},
};

request = Request(
operation: Operation(
Expand Down Expand Up @@ -401,6 +427,7 @@ void main() {
"payload": {
"data": null,
"errors": [responseError],
"extensions": responseExtensions,
},
},
),
Expand Down Expand Up @@ -436,6 +463,10 @@ void main() {
responseError["extensions"] as Map<String, dynamic>,
),
);
expect(
response.context.entry<ResponseExtensions>()?.extensions,
responseExtensions,
);
},
),
);
Expand Down Expand Up @@ -1085,6 +1116,106 @@ void main() {
link.request(request).listen((event) {});
link.request(request).listen((event) {});
});

test(
"Client dispose closes the server and request streams",
() async {
HttpServer server;
WebSocket webSocket;
IOWebSocketChannel channel;
WebSocketLink link;
Request request;

final responseData = {
"pokemons": [
{"name": "2"}
]
};

request = Request(
operation: Operation(
operationName: "pokemonsSubscription",
document: parseString(
r"subscription MySubscription { pokemons(first: $first) { name } }"),
),
variables: const <String, dynamic>{
"first": 3,
},
);

server = await HttpServer.bind("localhost", 0);
server.transform(WebSocketTransformer()).listen(
(webSocket) async {
final channel = IOWebSocketChannel(webSocket);
channel.stream.listen(
expectAsync1<void, dynamic>(
(dynamic message) async {
final map =
json.decode(message as String) as Map<String, dynamic>;
if (map["type"] == "connection_init") {
channel.sink.add(json.encode(ConnectionAck()));
} else if (map["type"] == "start") {
channel.sink.add(
json.encode(
<String, dynamic>{
"type": "data",
"id": map["id"],
"payload": {
"data": responseData,
},
},
),
);
}
},
count: 3,
),
onDone: expectAsync0(
() {
expect(channel.closeCode, websocket_status.goingAway);
},
count: 1,
),
);
},
);

webSocket = await WebSocket.connect("ws://localhost:${server.port}");
channel = IOWebSocketChannel(webSocket);

link = WebSocketLink(null, channelGenerator: () => channel);
bool received = false;
final firstFut = link
.request(request)
.listen(expectAsync1(
(Response response) async {
expect(response.data, responseData);
if (received) {
await link.dispose();
}
received = true;
},
count: 1,
))
.asFuture<Object?>();

final secondFut = link
.request(request)
.listen(expectAsync1(
(Response response) async {
expect(response.data, responseData);
if (received) {
await link.dispose();
}
received = true;
},
count: 1,
))
.asFuture<Object?>();

await Future.wait([firstFut, secondFut]);
},
);
},
);
}

0 comments on commit 44ff1a0

Please sign in to comment.