Skip to content

Commit

Permalink
Implement disconnection event (raised after the connection was lost).
Browse files Browse the repository at this point in the history
  • Loading branch information
Marfusios committed Dec 10, 2018
1 parent 83af266 commit a1ed9bb
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 16 deletions.
33 changes: 33 additions & 0 deletions src/Websocket.Client/DisconnectionType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
namespace Websocket.Client
{
/// <summary>
/// Type that specify happenend disconnection
/// </summary>
public enum DisconnectionType
{
/// <summary>
/// Type used for exit event, disposing of the websocket client
/// </summary>
Exit = 0,

/// <summary>
/// Type used when connection to websocket was lost in meantime
/// </summary>
Lost = 1,

/// <summary>
/// Type used when connection to websocket was lost by not receiving any message in given timerange
/// </summary>
NoMessageReceived = 2,

/// <summary>
/// Type used when connection or reconnection returned error
/// </summary>
Error = 3,

/// <summary>
/// Type used when disconnection was requested by user
/// </summary>
ByUser = 4
}
}
5 changes: 5 additions & 0 deletions src/Websocket.Client/IWebsocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ public interface IWebsocketClient : IDisposable
/// </summary>
IObservable<ReconnectionType> ReconnectionHappened { get; }

/// <summary>
/// Stream for disconnection event (trigerred after the connection was lost)
/// </summary>
IObservable<DisconnectionType> DisconnectionHappened { get; }

/// <summary>
/// Time range in ms, how long to wait before reconnecting if no message comes from server.
/// Default 60000 ms (1 minute)
Expand Down
13 changes: 8 additions & 5 deletions src/Websocket.Client/ReconnectionType.cs
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
namespace Websocket.Client
{
/// <summary>
/// Type that specify happenend reconnection
/// </summary>
public enum ReconnectionType
{
/// <summary>
/// Type used for initial connection to websocket stream
/// </summary>
Initial,
Initial = 0,

/// <summary>
/// Type used when connection to websocket was lost in meantime
/// </summary>
Lost,
Lost = 1,

/// <summary>
/// Type used when connection to websocket was lost by not receiving any message in given timerange
/// </summary>
NoMessageReceived,
NoMessageReceived = 2,

/// <summary>
/// Type used after unsuccessful previous reconnection
/// </summary>
Error,
Error = 3,

/// <summary>
/// Type used when reconnection was requested by user
/// </summary>
ByUser
ByUser = 4
}
}
52 changes: 42 additions & 10 deletions src/Websocket.Client/WebsocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class WebsocketClient : IWebsocketClient

private readonly Subject<string> _messageReceivedSubject = new Subject<string>();
private readonly Subject<ReconnectionType> _reconnectionSubject = new Subject<ReconnectionType>();
private readonly Subject<DisconnectionType> _disconnectedSubject = new Subject<DisconnectionType>();

private readonly BlockingCollection<string> _messagesToSendQueue = new BlockingCollection<string>();

Expand All @@ -53,6 +54,11 @@ public WebsocketClient(Uri url, Func<ClientWebSocket> clientFactory = null)
/// </summary>
public IObservable<ReconnectionType> ReconnectionHappened => _reconnectionSubject.AsObservable();

/// <summary>
/// Stream for disconnection event (trigerred after the connection was lost)
/// </summary>
public IObservable<DisconnectionType> DisconnectionHappened => _disconnectedSubject.AsObservable();

/// <summary>
/// Time range in ms, how long to wait before reconnecting if no message comes from server.
/// Default 60000 ms (1 minute)
Expand Down Expand Up @@ -82,15 +88,24 @@ public void Dispose()
{
_disposing = true;
Log.Debug(L("Disposing.."));
_lastChanceTimer?.Dispose();
_cancelation?.Cancel();
_cancelationTotal?.Cancel();
_client?.Abort();
_client?.Dispose();
_cancelation?.Dispose();
_cancelationTotal?.Dispose();
_messagesToSendQueue?.Dispose();
try
{
_lastChanceTimer?.Dispose();
_cancelation?.Cancel();
_cancelationTotal?.Cancel();
_client?.Abort();
_client?.Dispose();
_cancelation?.Dispose();
_cancelationTotal?.Dispose();
_messagesToSendQueue?.Dispose();
}
catch (Exception e)
{
Log.Error(e, L($"Failed to dispose client, error: {e.Message}"));
}

IsStarted = false;
_disconnectedSubject.OnNext(DisconnectionType.Exit);
}

/// <summary>
Expand Down Expand Up @@ -161,7 +176,14 @@ private async Task SendFromQueue()
{
foreach (var message in _messagesToSendQueue.GetConsumingEnumerable(_cancelationTotal.Token))
{
await SendInternal(message).ConfigureAwait(false);
try
{
await SendInternal(message).ConfigureAwait(false);
}
catch (Exception e)
{
Log.Error(L($"Failed to send message: '{message}'. Error: {e.Message}"));
}
}
}
catch (TaskCanceledException)
Expand Down Expand Up @@ -214,6 +236,7 @@ private async Task StartClient(Uri uri, CancellationToken token, ReconnectionTyp
}
catch (Exception e)
{
_disconnectedSubject.OnNext(DisconnectionType.Error);
Log.Error(e, L("Exception while connecting. " +
$"Waiting {ErrorReconnectTimeoutMs/1000} sec before next reconnection try."));
await Task.Delay(ErrorReconnectTimeoutMs, token).ConfigureAwait(false);
Expand All @@ -230,11 +253,14 @@ private async Task<ClientWebSocket> GetClient()
return _client;
}

private async Task Reconnect( ReconnectionType type)
private async Task Reconnect(ReconnectionType type)
{
IsRunning = false;
if (_disposing)
return;
if(type != ReconnectionType.Error)
_disconnectedSubject.OnNext(TranslateTypeToDisconnection(type));

Log.Debug(L("Reconnecting..."));
_cancelation.Cancel();
await Task.Delay(1000).ConfigureAwait(false);
Expand Down Expand Up @@ -313,5 +339,11 @@ private string L(string msg)
{
return $"[WEBSOCKET CLIENT] {msg}";
}

private DisconnectionType TranslateTypeToDisconnection(ReconnectionType type)
{
// beaware enum indexes must correspond to each other
return (DisconnectionType) type;
}
}
}
4 changes: 3 additions & 1 deletion test_integration/Websocket.Client.Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ static void Main(string[] args)
client.ReconnectTimeoutMs = (int)TimeSpan.FromSeconds(30).TotalMilliseconds;
client.ReconnectionHappened.Subscribe(type =>
Log.Information($"Reconnection happened, type: {type}"));
client.DisconnectionHappened.Subscribe(type =>
Log.Warning($"Disconnection happened, type: {type}"));

client.MessageReceived.Subscribe(msg => Log.Information($"Message received: {msg}"));

client.Start();
client.Start().Wait();

Task.Run(() => StartSendingPing(client));

Expand Down

0 comments on commit a1ed9bb

Please sign in to comment.