Skip to content

Commit

Permalink
Enqueue Signal requests while reconnecting (#81)
Browse files Browse the repository at this point in the history
* implement queue to events emitter

* check `isDisposed` instead

* update message

* implement queue to events emitter

* update lock

* impl
  • Loading branch information
hiroshihorie authored Mar 1, 2022
1 parent da8fe67 commit 811ded7
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 10 deletions.
3 changes: 3 additions & 0 deletions lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -545,12 +545,15 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
if (_connectionState == ConnectionState.connected) {
if (didReconnect) {
events.emit(const EngineReconnectedEvent());
// send queued requests if engine re-connected
signalClient.sendQueuedRequests();
} else {
events.emit(const EngineConnectedEvent());
}
} else if (_connectionState == ConnectionState.reconnecting) {
events.emit(const EngineReconnectingEvent());
} else if (_connectionState == ConnectionState.disconnected) {
signalClient.cleanUp();
events.emit(const EngineDisconnectedEvent());
}
}
Expand Down
69 changes: 59 additions & 10 deletions lib/src/core/signal_client.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:collection';

import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc;
import 'package:http/http.dart' as http;
Expand Down Expand Up @@ -26,6 +27,8 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
final WebSocketConnector _wsConnector;
LiveKitWebSocket? _ws;

final _queue = Queue<lk_rtc.SignalRequest>();

@internal
SignalClient(WebSocketConnector wsConnector) : _wsConnector = wsConnector {
events.listen((event) {
Expand All @@ -34,7 +37,7 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {

onDispose(() async {
await events.dispose();
await _cleanUp();
await cleanUp();
});
}

Expand All @@ -59,7 +62,7 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
? ConnectionState.reconnecting
: ConnectionState.connecting);
// Clean up existing socket
await _cleanUp();
await cleanUp();
// Attempt to connect
_ws = await _wsConnector(
rtcUri,
Expand Down Expand Up @@ -103,26 +106,41 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
}
}

Future<void> _cleanUp() async {
@internal
Future<void> cleanUp() async {
await _ws?.dispose();
_ws = null;
_queue.clear();
}

@internal
Future<void> disconnect() async {
logger.fine('SignalClient disconnect');
await _cleanUp();
await cleanUp();
}

void _sendRequest(lk_rtc.SignalRequest req) {
if (_ws == null || isDisposed) {
logger.warning(
'[$objectId] Could not send message, not connected or already disposed');
void _sendRequest(
lk_rtc.SignalRequest req, {
bool enqueueIfReconnecting = true,
}) {
if (isDisposed) {
logger.warning('[$objectId] Could not send message, already disposed');
return;
}

if (_connectionState == ConnectionState.reconnecting &&
req._canQueue() &&
enqueueIfReconnecting) {
_queue.add(req);
return;
}

final buf = req.writeToBuffer();
_ws?.send(buf);
if (_ws == null) {
logger.warning('[$objectId] Could not send message, socket is null');
return;
}

_ws?.send(req.writeToBuffer());
}

void _updateConnectionState(ConnectionState newValue) {
Expand Down Expand Up @@ -379,3 +397,34 @@ extension SignalClientRequests on SignalClient {
),
));
}

// private methods
extension on lk_rtc.SignalRequest {
// returns if this request can be queued
bool _canQueue() => ![
// list of types that cannot be queued
lk_rtc.SignalRequest_Message.syncState,
lk_rtc.SignalRequest_Message.trickle,
lk_rtc.SignalRequest_Message.offer,
lk_rtc.SignalRequest_Message.answer,
lk_rtc.SignalRequest_Message.simulate
].contains(whichMessage());
}

// internal methods

extension SignalClientInternalMethods on SignalClient {
@internal
void sendQueuedRequests() {
// queue is empty
if (_queue.isEmpty) return;
// send requests
for (final request in _queue) {
_sendRequest(request, enqueueIfReconnecting: false);
}
_queue.clear();
}

@internal
void clearQueue() => _queue.clear();
}

0 comments on commit 811ded7

Please sign in to comment.