Skip to content

Commit

Permalink
Merge pull request #1011 from appwrite/feat-realtime-heartbeat
Browse files Browse the repository at this point in the history
feat: realtime heartbeat
  • Loading branch information
abnegate authored Dec 10, 2024
2 parents 481487d + e91ef58 commit abdba83
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
private companion object {
private const val TYPE_ERROR = "error"
private const val TYPE_EVENT = "event"
private const val TYPE_PONG = "pong"
private const val HEARTBEAT_INTERVAL = 20_000L // 20 seconds

private const val DEBOUNCE_MILLIS = 1L

Expand All @@ -40,6 +42,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
private var reconnectAttempts = 0
private var subscriptionsCounter = 0
private var reconnect = true
private var heartbeatJob: Job? = null
}

private fun createSocket() {
Expand Down Expand Up @@ -80,9 +83,25 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
}

private fun closeSocket() {
stopHeartbeat()
socket?.close(RealtimeCode.POLICY_VIOLATION.value, null)
}

private fun startHeartbeat() {
stopHeartbeat()
heartbeatJob = launch {
while (isActive) {
delay(HEARTBEAT_INTERVAL)
socket?.send("""{"type":"ping"}""")
}
}
}

private fun stopHeartbeat() {
heartbeatJob?.cancel()
heartbeatJob = null
}

private fun getTimeout() = when {
reconnectAttempts < 5 -> 1000L
reconnectAttempts < 15 -> 5000L
Expand Down Expand Up @@ -145,6 +164,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
override fun onOpen(webSocket: WebSocket, response: Response) {
super.onOpen(webSocket, response)
reconnectAttempts = 0
startHeartbeat()
}

override fun onMessage(webSocket: WebSocket, text: String) {
Expand Down Expand Up @@ -181,6 +201,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {

override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
super.onClosing(webSocket, code, reason)
stopHeartbeat()
if (!reconnect || code == RealtimeCode.POLICY_VIOLATION.value) {
reconnect = true
return
Expand All @@ -203,6 +224,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {

override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
super.onFailure(webSocket, t, response)
stopHeartbeat()
t.printStackTrace()
}
}
Expand Down
26 changes: 25 additions & 1 deletion templates/flutter/lib/src/realtime_mixin.dart.twig
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,33 @@ mixin RealtimeMixin {
int _retries = 0;
StreamSubscription? _websocketSubscription;
bool _creatingSocket = false;
Timer? _heartbeatTimer;

Future<dynamic> _closeConnection() async {
_stopHeartbeat();
await _websocketSubscription?.cancel();
await _websok?.sink.close(status.normalClosure, 'Ending session');
_lastUrl = null;
_retries = 0;
_reconnect = false;
}

void _startHeartbeat() {
_stopHeartbeat();
_heartbeatTimer = Timer.periodic(Duration(seconds: 20), (_) {
if (_websok != null) {
_websok!.sink.add(jsonEncode({
"type": "ping"
}));
}
});
}

void _stopHeartbeat() {
_heartbeatTimer?.cancel();
_heartbeatTimer = null;
}

_createSocket() async {
if(_creatingSocket || _channels.isEmpty) return;
_creatingSocket = true;
Expand Down Expand Up @@ -78,6 +96,10 @@ mixin RealtimeMixin {
}));
}
}
_startHeartbeat(); // Start heartbeat after successful connection
break;
case 'pong':
debugPrint('Received heartbeat response from realtime server');
break;
case 'event':
final message = RealtimeMessage.fromMap(data.data);
Expand All @@ -91,8 +113,10 @@ mixin RealtimeMixin {
break;
}
}, onDone: () {
_stopHeartbeat();
_retry();
}, onError: (err, stack) {
_stopHeartbeat();
for (var subscription in _subscriptions.values) {
subscription.controller.addError(err, stack);
}
Expand Down Expand Up @@ -187,4 +211,4 @@ mixin RealtimeMixin {
_retry();
}
}
}
}
33 changes: 33 additions & 0 deletions templates/swift/Sources/Services/Realtime.swift.twig
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ open class Realtime : Service {

private let TYPE_ERROR = "error"
private let TYPE_EVENT = "event"
private let TYPE_PONG = "pong"
private let DEBOUNCE_NANOS = 1_000_000
private let HEARTBEAT_INTERVAL: UInt64 = 20_000_000_000 // 20 seconds in nanoseconds

private var socketClient: WebSocketClient? = nil
private var activeChannels = Set<String>()
private var activeSubscriptions = [Int: RealtimeCallback]()
private var heartbeatTask: Task<Void, Swift.Error>? = nil

let connectSync = DispatchQueue(label: "ConnectSync")

Expand All @@ -20,6 +23,29 @@ open class Realtime : Service {
private var subscriptionsCounter = 0
private var reconnect = true

private func startHeartbeat() {
stopHeartbeat()
heartbeatTask = Task {
do {
while !Task.isCancelled {
if let client = socketClient, client.isConnected {
client.send(text: #"{"type": "ping"}"#)
}
try await Task.sleep(nanoseconds: HEARTBEAT_INTERVAL)
}
} catch {
if !Task.isCancelled {
print("Heartbeat task failed: \(error.localizedDescription)")
}
}
}
}

private func stopHeartbeat() {
heartbeatTask?.cancel()
heartbeatTask = nil
}

private func createSocket() async throws {
guard activeChannels.count > 0 else {
reconnect = false
Expand Down Expand Up @@ -50,6 +76,8 @@ open class Realtime : Service {
}

private func closeSocket() async throws {
stopHeartbeat()

guard let client = socketClient,
let group = client.threadGroup else {
return
Expand Down Expand Up @@ -163,6 +191,7 @@ extension Realtime: WebSocketClientDelegate {

public func onOpen(channel: Channel) {
self.reconnectAttempts = 0
startHeartbeat()
}

public func onMessage(text: String) {
Expand All @@ -172,13 +201,16 @@ extension Realtime: WebSocketClientDelegate {
switch type {
case TYPE_ERROR: try! handleResponseError(from: json)
case TYPE_EVENT: handleResponseEvent(from: json)
case TYPE_PONG: break // Handle pong response if needed
default: break
}
}
}
}

public func onClose(channel: Channel, data: Data) async throws {
stopHeartbeat()

if (!reconnect) {
reconnect = true
return
Expand All @@ -196,6 +228,7 @@ extension Realtime: WebSocketClientDelegate {
}

public func onError(error: Swift.Error?, status: HTTPResponseStatus?) {
stopHeartbeat()
print(error?.localizedDescription ?? "Unknown error")
}

Expand Down

0 comments on commit abdba83

Please sign in to comment.