Skip to content

Commit

Permalink
Fixing WebSocket multicast issues
Browse files Browse the repository at this point in the history
  • Loading branch information
chronoxor committed Dec 12, 2019
1 parent 8537f60 commit 6647fec
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 74 deletions.
70 changes: 35 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2369,12 +2369,12 @@ Seconds to benchmarking: 10
Errors: 0
Total time: 10.002 s
Total data: 49.382 MiB
Total messages: 1617887
Data throughput: 4.958 MiB/s
Message latency: 6.182 mcs
Message throughput: 161742 msg/s
Total time: 10.001 s
Total data: 73.838 MiB
Total messages: 2418902
Data throughput: 7.389 MiB/s
Message latency: 4.134 mcs
Message throughput: 241846 msg/s
```

* [WsEchoServer](https://github.com/chronoxor/NetCoreServer/blob/master/performance/WsEchoServer/Program.cs)
Expand All @@ -2390,12 +2390,12 @@ Seconds to benchmarking: 10
Errors: 0
Total time: 10.994 s
Total data: 22.548 MiB
Total messages: 738454
Data throughput: 2.051 MiB/s
Message latency: 14.888 mcs
Message throughput: 67168 msg/s
Total time: 12.189 s
Total data: 111.540 MiB
Total messages: 3654528
Data throughput: 9.153 MiB/s
Message latency: 3.335 mcs
Message throughput: 299815 msg/s
```

### WebSocket secure echo server
Expand All @@ -2414,12 +2414,12 @@ Seconds to benchmarking: 10
Errors: 0
Total time: 10.003 s
Total data: 37.1007 MiB
Total messages: 1244649
Data throughput: 3.816 MiB/s
Message latency: 8.036 mcs
Message throughput: 124426 msg/s
Total time: 10.021 s
Total data: 45.372 MiB
Total messages: 1486484
Data throughput: 4.539 MiB/s
Message latency: 6.742 mcs
Message throughput: 148323 msg/s
```

* [WssEchoServer](https://github.com/chronoxor/NetCoreServer/blob/master/performance/WssEchoServer/Program.cs)
Expand Down Expand Up @@ -2592,11 +2592,11 @@ Seconds to benchmarking: 10
Errors: 0
Total time: 10.002 s
Total data: 52.907 MiB
Total messages: 1732985
Data throughput: 5.294 MiB/s
Message latency: 5.771 mcs
Message throughput: 173253 msg/s
Total data: 20.880 MiB
Total messages: 683537
Data throughput: 2.087 MiB/s
Message latency: 14.633 mcs
Message throughput: 68335 msg/s
```

* [WsMulticastServer](https://github.com/chronoxor/NetCoreServer/blob/master/performance/WsMulticastServer/Program.cs)
Expand All @@ -2611,12 +2611,12 @@ Seconds to benchmarking: 10
Errors: 0
Total time: 10.023 s
Total data: 16.098 MiB
Total messages: 527442
Data throughput: 1.620 MiB/s
Message latency: 19.003 mcs
Message throughput: 52623 msg/s
Total time: 10.017 s
Total data: 14.797 MiB
Total messages: 484275
Data throughput: 1.486 MiB/s
Message latency: 20.685 mcs
Message throughput: 48344 msg/s
```

### WebSocket secure multicast server
Expand Down Expand Up @@ -2653,12 +2653,12 @@ Seconds to benchmarking: 10
Errors: 0
Total time: 10.014 s
Total data: 18.815 MiB
Total messages: 615923
Data throughput: 1.897 MiB/s
Message latency: 16.259 mcs
Message throughput: 61502 msg/s
Total time: 10.098 s
Total data: 21.020 MiB
Total messages: 688799
Data throughput: 2.083 MiB/s
Message latency: 14.660 mcs
Message throughput: 68210 msg/s
```

## Benchmark: Web Server
Expand Down
14 changes: 7 additions & 7 deletions source/NetCoreServer/SslServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public SslServer(SslContext context, IPEndPoint endpoint)
/// <summary>
/// Number of sessions connected to the server
/// </summary>
public long ConnectedSessions { get { return _sessions.Count; } }
public long ConnectedSessions { get { return Sessions.Count; } }
/// <summary>
/// Number of bytes pending sent by the server
/// </summary>
Expand Down Expand Up @@ -289,7 +289,7 @@ private void OnAsyncCompleted(object sender, SocketAsyncEventArgs e)
#region Session management

// Server sessions
private readonly ConcurrentDictionary<Guid, SslSession> _sessions = new ConcurrentDictionary<Guid, SslSession>();
protected readonly ConcurrentDictionary<Guid, SslSession> Sessions = new ConcurrentDictionary<Guid, SslSession>();

/// <summary>
/// Disconnect all connected sessions
Expand All @@ -301,7 +301,7 @@ public virtual bool DisconnectAll()
return false;

// Disconnect all sessions
foreach (var session in _sessions.Values)
foreach (var session in Sessions.Values)
session.Disconnect();

return true;
Expand All @@ -315,7 +315,7 @@ public virtual bool DisconnectAll()
public SslSession FindSession(Guid id)
{
// Try to find the required session
return _sessions.TryGetValue(id, out SslSession result) ? result : null;
return Sessions.TryGetValue(id, out SslSession result) ? result : null;
}

/// <summary>
Expand All @@ -325,7 +325,7 @@ public SslSession FindSession(Guid id)
internal void RegisterSession(SslSession session)
{
// Register a new session
_sessions.TryAdd(session.Id, session);
Sessions.TryAdd(session.Id, session);
}

/// <summary>
Expand All @@ -335,7 +335,7 @@ internal void RegisterSession(SslSession session)
internal void UnregisterSession(Guid id)
{
// Unregister session by Id
_sessions.TryRemove(id, out SslSession temp);
Sessions.TryRemove(id, out SslSession temp);
}

#endregion
Expand Down Expand Up @@ -365,7 +365,7 @@ public virtual bool Multicast(byte[] buffer, long offset, long size)
return true;

// Multicast data to all sessions
foreach (var session in _sessions.Values)
foreach (var session in Sessions.Values)
session.SendAsync(buffer, offset, size);

return true;
Expand Down
14 changes: 7 additions & 7 deletions source/NetCoreServer/TcpServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public TcpServer(IPEndPoint endpoint)
/// <summary>
/// Number of sessions connected to the server
/// </summary>
public long ConnectedSessions { get { return _sessions.Count; } }
public long ConnectedSessions { get { return Sessions.Count; } }
/// <summary>
/// Number of bytes pending sent by the server
/// </summary>
Expand Down Expand Up @@ -280,7 +280,7 @@ private void OnAsyncCompleted(object sender, SocketAsyncEventArgs e)
#region Session management

// Server sessions
private readonly ConcurrentDictionary<Guid, TcpSession> _sessions = new ConcurrentDictionary<Guid, TcpSession>();
protected readonly ConcurrentDictionary<Guid, TcpSession> Sessions = new ConcurrentDictionary<Guid, TcpSession>();

/// <summary>
/// Disconnect all connected sessions
Expand All @@ -292,7 +292,7 @@ public virtual bool DisconnectAll()
return false;

// Disconnect all sessions
foreach (var session in _sessions.Values)
foreach (var session in Sessions.Values)
session.Disconnect();

return true;
Expand All @@ -306,7 +306,7 @@ public virtual bool DisconnectAll()
public TcpSession FindSession(Guid id)
{
// Try to find the required session
return _sessions.TryGetValue(id, out TcpSession result) ? result : null;
return Sessions.TryGetValue(id, out TcpSession result) ? result : null;
}

/// <summary>
Expand All @@ -316,7 +316,7 @@ public TcpSession FindSession(Guid id)
internal void RegisterSession(TcpSession session)
{
// Register a new session
_sessions.TryAdd(session.Id, session);
Sessions.TryAdd(session.Id, session);
}

/// <summary>
Expand All @@ -326,7 +326,7 @@ internal void RegisterSession(TcpSession session)
internal void UnregisterSession(Guid id)
{
// Unregister session by Id
_sessions.TryRemove(id, out TcpSession temp);
Sessions.TryRemove(id, out TcpSession temp);
}

#endregion
Expand Down Expand Up @@ -356,7 +356,7 @@ public virtual bool Multicast(byte[] buffer, long offset, long size)
return true;

// Multicast data to all sessions
foreach (var session in _sessions.Values)
foreach (var session in Sessions.Values)
session.SendAsync(buffer, offset, size);

return true;
Expand Down
8 changes: 4 additions & 4 deletions source/NetCoreServer/WsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace NetCoreServer
/// <remarks>WebSocket client is used to communicate with WebSocket server. Thread-safe.</remarks>
public class WsClient : HttpClient, IWebSocket
{
protected readonly WebSocket WebSocket;
internal readonly WebSocket WebSocket;

/// <summary>
/// Initialize WebSocket client with a given IP address and port number
Expand Down Expand Up @@ -41,13 +41,13 @@ public class WsClient : HttpClient, IWebSocket

#region WebSocket send text methods

public long SendText(byte[] buffer, long offset, long size)
{
public long SendText(byte[] buffer, long offset, long size)
{
lock(WebSocket.WsSendLock)
{
WebSocket.PrepareSendFrame(WebSocket.WS_FIN | WebSocket.WS_TEXT, true, buffer, offset, size);
return base.Send(WebSocket.WsSendBuffer.ToArray());
}
}
}

public long SendText(string text)
Expand Down
45 changes: 36 additions & 9 deletions source/NetCoreServer/WsServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace NetCoreServer
/// <remarks> WebSocket server is used to communicate with clients using WebSocket protocol. Thread-safe.</remarks>
public class WsServer : HttpServer, IWebSocket
{
protected readonly WebSocket WebSocket;
internal readonly WebSocket WebSocket;

/// <summary>
/// Initialize WebSocket server with a given IP address and port number
Expand All @@ -34,18 +34,45 @@ public virtual bool CloseAll(int status)
lock (WebSocket.WsSendLock)
{
WebSocket.PrepareSendFrame(WebSocket.WS_FIN | WebSocket.WS_CLOSE, false, null, 0, 0, status);
if (!Multicast(WebSocket.WsSendBuffer.ToArray()))
return false;

return base.DisconnectAll();
}
}

public override bool Multicast(byte[] buffer, long offset, long size)
{
if (!IsStarted)
return false;

if (size == 0)
return true;

// Multicast data to all WebSocket sessions
foreach (var session in Sessions.Values)
{
if (session is WsSession wsSession)
{
lock (wsSession.WebSocket.WsSendLock)
{
if (wsSession.WebSocket.WsHandshaked)
wsSession.SendAsync(buffer, offset, size);
}
}
}

return true;
}

#region WebSocket multicast text methods

public bool MulticastText(byte[] buffer, long offset, long size)
{
lock (WebSocket.WsSendLock)
{
WebSocket.PrepareSendFrame(WebSocket.WS_FIN | WebSocket.WS_TEXT, false, buffer, offset, size);
return base.Multicast(WebSocket.WsSendBuffer.ToArray());
return Multicast(WebSocket.WsSendBuffer.ToArray());
}
}

Expand All @@ -54,7 +81,7 @@ public bool MulticastText(string text)
lock (WebSocket.WsSendLock)
{
WebSocket.PrepareSendFrame(WebSocket.WS_FIN | WebSocket.WS_TEXT, false, Encoding.UTF8.GetBytes(text), 0, text.Length);
return base.Multicast(WebSocket.WsSendBuffer.ToArray());
return Multicast(WebSocket.WsSendBuffer.ToArray());
}
}

Expand All @@ -67,7 +94,7 @@ public bool MulticastBinary(byte[] buffer, long offset, long size)
lock (WebSocket.WsSendLock)
{
WebSocket.PrepareSendFrame(WebSocket.WS_FIN | WebSocket.WS_BINARY, false, buffer, offset, size);
return base.Multicast(WebSocket.WsSendBuffer.ToArray());
return Multicast(WebSocket.WsSendBuffer.ToArray());
}
}

Expand All @@ -76,7 +103,7 @@ public bool MulticastBinary(string text)
lock (WebSocket.WsSendLock)
{
WebSocket.PrepareSendFrame(WebSocket.WS_FIN | WebSocket.WS_BINARY, false, Encoding.UTF8.GetBytes(text), 0, text.Length);
return base.Multicast(WebSocket.WsSendBuffer.ToArray());
return Multicast(WebSocket.WsSendBuffer.ToArray());
}
}

Expand All @@ -89,7 +116,7 @@ public bool SendPing(byte[] buffer, long offset, long size)
lock (WebSocket.WsSendLock)
{
WebSocket.PrepareSendFrame(WebSocket.WS_FIN | WebSocket.WS_PING, false, buffer, offset, size);
return base.Multicast(WebSocket.WsSendBuffer.ToArray());
return Multicast(WebSocket.WsSendBuffer.ToArray());
}
}

Expand All @@ -98,7 +125,7 @@ public bool SendPing(string text)
lock (WebSocket.WsSendLock)
{
WebSocket.PrepareSendFrame(WebSocket.WS_FIN | WebSocket.WS_PING, false, Encoding.UTF8.GetBytes(text), 0, text.Length);
return base.Multicast(WebSocket.WsSendBuffer.ToArray());
return Multicast(WebSocket.WsSendBuffer.ToArray());
}
}

Expand All @@ -111,7 +138,7 @@ public bool SendPong(byte[] buffer, long offset, long size)
lock (WebSocket.WsSendLock)
{
WebSocket.PrepareSendFrame(WebSocket.WS_FIN | WebSocket.WS_PONG, false, buffer, offset, size);
return base.Multicast(WebSocket.WsSendBuffer.ToArray());
return Multicast(WebSocket.WsSendBuffer.ToArray());
}
}

Expand All @@ -120,7 +147,7 @@ public bool SendPong(string text)
lock (WebSocket.WsSendLock)
{
WebSocket.PrepareSendFrame(WebSocket.WS_FIN | WebSocket.WS_PONG, false, Encoding.UTF8.GetBytes(text), 0, text.Length);
return base.Multicast(WebSocket.WsSendBuffer.ToArray());
return Multicast(WebSocket.WsSendBuffer.ToArray());
}
}

Expand Down
2 changes: 1 addition & 1 deletion source/NetCoreServer/WsSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace NetCoreServer
/// <remarks> WebSocket session is used to read and write data from the connected WebSocket client. Thread-safe.</remarks>
public class WsSession : HttpSession, IWebSocket
{
protected readonly WebSocket WebSocket;
internal readonly WebSocket WebSocket;

public WsSession(WsServer server) : base(server) { WebSocket = new WebSocket(this); }

Expand Down
Loading

0 comments on commit 6647fec

Please sign in to comment.