Skip to content

Commit

Permalink
fix(graphql-transport-ws): ensure result message is processed before … (
Browse files Browse the repository at this point in the history
#466)

* fix(graphql-transport-ws): ensure result message is processed before complete message; do not close connection if keepalive is on

* chore(gql_websocket_link): format graphql_transport_ws.dart

* fix(graphql-transport-ws): added timeout logic for processing received complete message wait loop

* refactor(graphql_transport_ws): change while loop to completer for async tasks management

* fix(graphql_transport_ws): completer error Future already completed

* test(gql_websocket_link): new async task management tests
  • Loading branch information
agufagit authored Nov 4, 2024
1 parent 656a39c commit 09709e5
Show file tree
Hide file tree
Showing 2 changed files with 471 additions and 8 deletions.
58 changes: 51 additions & 7 deletions links/gql_websocket_link/lib/src/graphql_transport_ws.dart
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,8 @@ class _ConnectionState {
// TODO: WebSocketChannel should have a `state` getter and `onStateChange` stream
bool isOpen = false;

Map<String, Completer<void>> nextOrErrorMsgWaitMap = {};

/// Checks the `connect` problem and evaluates if the client should retry.
bool shouldRetryConnectOrThrow(Object errOrCloseEvent) {
options.log?.call("shouldRetryConnectOrThrow $errOrCloseEvent");
Expand Down Expand Up @@ -813,6 +815,29 @@ class _ConnectionState {
}
// parseMessage(msg!, reviver: options.jsonMessageReviver);
if (!isOpen) return;

// wait for next or error message (result) to be processed before process complete message
if (message is CompleteMessage &&
nextOrErrorMsgWaitMap.containsKey(message.id)) {
final completer = nextOrErrorMsgWaitMap[message.id];

if (completer != null) {
if (completer.isCompleted) {
nextOrErrorMsgWaitMap.remove(message.id);
} else {
final timer = Timer(const Duration(seconds: 60), () {
// Timeout => let's return an error
if (!completer.isCompleted) {
completer.complete();
}
});

await completer.future;
timer.cancel();
}
}
}

emitter.emit(TransportWsEvent.message(message));
if (message is PingMessage || message is PongMessage) {
final msgPayload = message is PingMessage
Expand Down Expand Up @@ -911,16 +936,16 @@ class _ConnectionState {
// if (socket.readyState == WebSocketImpl.CLOSING) await throwOnClose;

final _releaseComp = Completer<void>();
final void Function() release = _releaseComp.complete;
final released = _releaseComp.future;

return _Connection(
socket: socket,
release: release,
release: _releaseComp,
waitForReleaseOrThrowOnClose: Future.any([
// wait for
released.then((_) {
if (locks == 0) {
// if released, no other operations, and not keep alive, wait for the socket to close
if (locks == 0 && options.keepAlive == Duration.zero) {
// and if no more locks are present, complete the connection
final complete = () {
isOpen = false;
Expand Down Expand Up @@ -988,20 +1013,37 @@ class _Client extends TransportWsClient {
final socket = _c.socket;
final release = _c.release;
final waitForReleaseOrThrowOnClose = _c.waitForReleaseOrThrowOnClose;

// print("isolate debug name: ${Isolate.current.debugName}");
// print(payload.operation.toString());
// print(payload.variables.toString());
// print(payload.context.toString());
// print("graphQLSocketMessageEncoder: ${Isolate.current.debugName}");
// if done while waiting for connect, release the connection lock right away
final _subscribeMsg = await options.graphQLSocketMessageEncoder(
SubscribeMessage(id, options.serializer.serializeRequest(payload)),
);
if (done) return release();
// print("after graphQLSocketMessageEncoder: ${Isolate.current.debugName}");
if (done) {
if (!release.isCompleted) release.complete();
}

final unlisten = emitter.onMessage(id, (message) {
if (message is NextMessage) {
sink.add(message.payload);
final completer = state.nextOrErrorMsgWaitMap[id];
if (completer != null && !completer.isCompleted) {
completer.complete();
}
state.nextOrErrorMsgWaitMap.remove(id);
} else if (message is ErrorMessage) {
errored = true;
done = true;
sink.addError(message.payload);
final completer = state.nextOrErrorMsgWaitMap[id];
if (completer != null && !completer.isCompleted) {
completer.complete();
}
state.nextOrErrorMsgWaitMap.remove(id);
releaser();
} else if (message is CompleteMessage) {
done = true;
Expand All @@ -1011,6 +1053,8 @@ class _Client extends TransportWsClient {

socket.sink.add(_subscribeMsg);

state.nextOrErrorMsgWaitMap[id] = Completer();

releaser = () async {
final _completeMsg =
await options.graphQLSocketMessageEncoder(CompleteMessage(id));
Expand All @@ -1020,7 +1064,7 @@ class _Client extends TransportWsClient {
}
state.locks--;
done = true;
release();
if (!release.isCompleted) release.complete();
};

// either the releaser will be called, connection completed and
Expand Down Expand Up @@ -1077,7 +1121,7 @@ class _Client extends TransportWsClient {

class _Connection {
final WebSocketChannel socket;
final void Function() release;
final Completer<void> release;
final Future<void> waitForReleaseOrThrowOnClose;

_Connection({
Expand Down
Loading

0 comments on commit 09709e5

Please sign in to comment.