Skip to content

Commit

Permalink
Fixing reconnector issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Elad Zelingher committed Jan 24, 2017
1 parent dc6b466 commit 7c55a69
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ await mWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
catch (Exception ex)
{
RaiseConnectionError(ex);
RaiseConnectionClosed();
}
}

Expand Down
14 changes: 9 additions & 5 deletions src/net45/WampSharp/WAMP2/V2/Api/WampChannelReconnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class WampChannelReconnector : IDisposable
private IDisposable mDisposable = Disposable.Empty;
private bool mStarted = false;
private readonly object mLock = new object();
private IDisposable mConnectionBrokenDisposable;

/// <summary>
/// Initializes a new instance of <see cref="WampChannelReconnector"/>.
Expand All @@ -29,20 +30,22 @@ public WampChannelReconnector(IWampChannel channel, Func<Task> connector)

var connectionBrokenObservable =
Observable.FromEventPattern<WampSessionCloseEventArgs>
(x => monitor.ConnectionBroken += x,
x => monitor.ConnectionBroken -= x)
.Select(x => Unit.Default);
(x => monitor.ConnectionBroken += x,
x => monitor.ConnectionBroken -= x)
.Select(x => Unit.Default)
.Replay(1);

var onceAndConnectionBroken =
Observable.Return(Unit.Default).Concat
(connectionBrokenObservable);
connectionBrokenObservable.StartWith(Unit.Default);

IObservable<IObservable<Unit>> reconnect =
from connectionBroke in onceAndConnectionBroken
let tryReconnect = Observable.FromAsync(connector)
.Catch<Unit, Exception>(x => Observable.Empty<Unit>())
select tryReconnect;

mConnectionBrokenDisposable = connectionBrokenObservable.Connect();

mMerged = reconnect.Concat();
}

Expand Down Expand Up @@ -78,6 +81,7 @@ public void Dispose()
{
mMerged = null;
mDisposable.Dispose();
mConnectionBrokenDisposable.Dispose();
}
}
}
Expand Down

0 comments on commit 7c55a69

Please sign in to comment.