Skip to content

Commit

Permalink
Fixed issue with WebSocket receive lock
Browse files Browse the repository at this point in the history
  • Loading branch information
chronoxor committed Dec 12, 2019
1 parent 6647fec commit ad3fcd1
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 126 deletions.
14 changes: 8 additions & 6 deletions source/NetCoreServer/FileCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class FileCache
/// <returns>'true' if the cache value was added, 'false' if the given key was not added</returns>
public bool Add(string key, byte[] value, TimeSpan timeout = new TimeSpan())
{
lock(_lock)
lock (_lock)
{
// Try to find and remove the previous key
RemoveInternal(key);
Expand Down Expand Up @@ -90,7 +90,7 @@ public class FileCache
/// <returns>'true' and cache value if the cache value was found, 'false' if the given key was not found</returns>
public Tuple<bool, byte[]> Find(string key)
{
lock(_lock)
lock (_lock)
{
// Try to find the given key
if (!_entriesByKey.TryGetValue(key, out var cacheValue))
Expand Down Expand Up @@ -129,8 +129,10 @@ public Tuple<bool, byte[]> Find(string key, out DateTime timeout)
/// <returns>'true' if the cache value was removed, 'false' if the given key was not found</returns>
public bool Remove(string key)
{
lock(_lock)
lock (_lock)
{
return RemoveInternal(key);
}
}

/// <summary>
Expand All @@ -152,7 +154,7 @@ public bool Remove(string key)
if (!InsertPathInternal(path, prefix, timeout, handler))
return false;

lock(_lock)
lock (_lock)
{
// Update the cache entry
if (timeout.Ticks > 0)
Expand Down Expand Up @@ -220,7 +222,7 @@ public bool RemovePath(string path)
/// </summary>
public void Clear()
{
lock(_lock)
lock (_lock)
{
// Clear all cache entries
_entriesByKey.Clear();
Expand Down Expand Up @@ -378,7 +380,7 @@ private bool InsertPathInternal(string path, string prefix, TimeSpan timeout, In

private bool RemovePathInternal(string path)
{
lock(_lock)
lock (_lock)
{
// Try to find the given path
if (!_pathsByKey.TryGetValue(path, out var cacheValue))
Expand Down
249 changes: 130 additions & 119 deletions source/NetCoreServer/WebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public bool PerformClientUpgrade(HttpResponse response, Guid id)
accept = true;
}
}

// Failed to perform WebSocket handshake
if (!accept || !connection || !upgrade)
{
Expand Down Expand Up @@ -303,20 +303,10 @@ public void PrepareSendFrame(byte opcode, bool mask, byte[] buffer, long offset,
/// <param name="size">Buffer size</param>
public void PrepareReceiveFrame(byte[] buffer, long offset, long size)
{
var index = 0;

// Clear received data after WebSocket frame was processed
if (WsReceived)
lock (WsReceiveLock)
{
WsReceived = false;
WsHeaderSize = 0;
WsPayloadSize = 0;
WsReceiveBuffer.Clear();
Array.Clear(WsReceiveMask, 0, WsReceiveMask.Length);
}
var index = 0;

while (size > 0)
{
// Clear received data after WebSocket frame was processed
if (WsReceived)
{
Expand All @@ -327,32 +317,20 @@ public void PrepareReceiveFrame(byte[] buffer, long offset, long size)
Array.Clear(WsReceiveMask, 0, WsReceiveMask.Length);
}

// Prepare WebSocket frame opcode and mask flag
if (WsReceiveBuffer.Count < 2)
while (size > 0)
{
for (int i = 0; i < 2; ++i, ++index, --size)
// Clear received data after WebSocket frame was processed
if (WsReceived)
{
if (size == 0)
return;
WsReceiveBuffer.Add(buffer[offset + index]);
WsReceived = false;
WsHeaderSize = 0;
WsPayloadSize = 0;
WsReceiveBuffer.Clear();
Array.Clear(WsReceiveMask, 0, WsReceiveMask.Length);
}
}

byte opcode = (byte) (WsReceiveBuffer[0] & 0x0F);
bool fin = ((WsReceiveBuffer[0] >> 7) & 0x01) != 0;
bool mask = ((WsReceiveBuffer[1] >> 7) & 0x01) != 0;
int payload = WsReceiveBuffer[1] & (~0x80);

// Prepare WebSocket frame size
if (payload <= 125)
{
WsHeaderSize = 2 + (mask ? 4 : 0);
WsPayloadSize = payload;
WsReceiveBuffer.Capacity = WsHeaderSize + WsPayloadSize;
}
else if (payload == 126)
{
if (WsReceiveBuffer.Count < 4)
// Prepare WebSocket frame opcode and mask flag
if (WsReceiveBuffer.Count < 2)
{
for (int i = 0; i < 2; ++i, ++index, --size)
{
Expand All @@ -362,83 +340,108 @@ public void PrepareReceiveFrame(byte[] buffer, long offset, long size)
}
}

payload = ((WsReceiveBuffer[2] << 8) | (WsReceiveBuffer[3] << 0));
WsHeaderSize = 4 + (mask ? 4 : 0);
WsPayloadSize = payload;
byte opcode = (byte) (WsReceiveBuffer[0] & 0x0F);
bool fin = ((WsReceiveBuffer[0] >> 7) & 0x01) != 0;
bool mask = ((WsReceiveBuffer[1] >> 7) & 0x01) != 0;
int payload = WsReceiveBuffer[1] & (~0x80);

// Prepare WebSocket frame size
if (payload <= 125)
{
WsHeaderSize = 2 + (mask ? 4 : 0);
WsPayloadSize = payload;
WsReceiveBuffer.Capacity = WsHeaderSize + WsPayloadSize;
}
else if (payload == 127)
{
if (WsReceiveBuffer.Count < 10)
}
else if (payload == 126)
{
for (int i = 0; i < 8; ++i, ++index, --size)
if (WsReceiveBuffer.Count < 4)
{
if (size == 0)
return;
WsReceiveBuffer.Add(buffer[offset + index]);
for (int i = 0; i < 2; ++i, ++index, --size)
{
if (size == 0)
return;
WsReceiveBuffer.Add(buffer[offset + index]);
}
}

payload = ((WsReceiveBuffer[2] << 8) | (WsReceiveBuffer[3] << 0));
WsHeaderSize = 4 + (mask ? 4 : 0);
WsPayloadSize = payload;
WsReceiveBuffer.Capacity = WsHeaderSize + WsPayloadSize;
}
else if (payload == 127)
{
if (WsReceiveBuffer.Count < 10)
{
for (int i = 0; i < 8; ++i, ++index, --size)
{
if (size == 0)
return;
WsReceiveBuffer.Add(buffer[offset + index]);
}
}

payload = ((WsReceiveBuffer[2] << 56) | (WsReceiveBuffer[3] << 48) | (WsReceiveBuffer[4] << 40) | (WsReceiveBuffer[5] << 32) | (WsReceiveBuffer[6] << 24) | (WsReceiveBuffer[7] << 16) | (WsReceiveBuffer[8] << 8) | (WsReceiveBuffer[9] << 0));
WsHeaderSize = 10 + (mask ? 4 : 0);
WsPayloadSize = payload;
payload = ((WsReceiveBuffer[2] << 56) | (WsReceiveBuffer[3] << 48) | (WsReceiveBuffer[4] << 40) | (WsReceiveBuffer[5] << 32) | (WsReceiveBuffer[6] << 24) | (WsReceiveBuffer[7] << 16) | (WsReceiveBuffer[8] << 8) | (WsReceiveBuffer[9] << 0));
WsHeaderSize = 10 + (mask ? 4 : 0);
WsPayloadSize = payload;
WsReceiveBuffer.Capacity = WsHeaderSize + WsPayloadSize;
}
}

// Prepare WebSocket frame mask
if (mask)
{
if (WsReceiveBuffer.Count < WsHeaderSize)
// Prepare WebSocket frame mask
if (mask)
{
for (int i = 0; i < 4; ++i, ++index, --size)
if (WsReceiveBuffer.Count < WsHeaderSize)
{
if (size == 0)
return;
WsReceiveBuffer.Add(buffer[offset + index]);
WsReceiveMask[i] = buffer[offset + index];
for (int i = 0; i < 4; ++i, ++index, --size)
{
if (size == 0)
return;
WsReceiveBuffer.Add(buffer[offset + index]);
WsReceiveMask[i] = buffer[offset + index];
}
}
}
}

int total = WsHeaderSize + WsPayloadSize;
int length = Math.Min(total - WsReceiveBuffer.Count, (int)size);
int total = WsHeaderSize + WsPayloadSize;
int length = Math.Min(total - WsReceiveBuffer.Count, (int)size);

// Prepare WebSocket frame payload
WsReceiveBuffer.AddRange(buffer[((int)offset + index)..((int)offset + index + length)]);
index += length;
size -= length;
// Prepare WebSocket frame payload
WsReceiveBuffer.AddRange(buffer[((int)offset + index)..((int)offset + index + length)]);
index += length;
size -= length;

// Process WebSocket frame
if (WsReceiveBuffer.Count == total)
{
int bufferOffset = WsHeaderSize;
// Process WebSocket frame
if (WsReceiveBuffer.Count == total)
{
int bufferOffset = WsHeaderSize;

// Unmask WebSocket frame content
if (mask)
for (int i = 0; i < WsPayloadSize; ++i)
WsReceiveBuffer[bufferOffset + i] ^= WsReceiveMask[i % 4];
// Unmask WebSocket frame content
if (mask)
for (int i = 0; i < WsPayloadSize; ++i)
WsReceiveBuffer[bufferOffset + i] ^= WsReceiveMask[i % 4];

WsReceived = true;
WsReceived = true;

if ((opcode & WS_PING) == WS_PING)
{
// Call the WebSocket ping handler
_wsHandler.OnWsPing(WsReceiveBuffer.ToArray(), bufferOffset, WsPayloadSize);
}
else if ((opcode & WS_PONG) == WS_PONG)
{
// Call the WebSocket pong handler
_wsHandler.OnWsPong(WsReceiveBuffer.ToArray(), bufferOffset, WsPayloadSize);
}
else if ((opcode & WS_CLOSE) == WS_CLOSE)
{
// Call the WebSocket close handler
_wsHandler.OnWsClose(WsReceiveBuffer.ToArray(), bufferOffset, WsPayloadSize);
}
else if (((opcode & WS_TEXT) == WS_TEXT) || ((opcode & WS_BINARY) == WS_BINARY))
{
// Call the WebSocket received handler
_wsHandler.OnWsReceived(WsReceiveBuffer.ToArray(), bufferOffset, WsPayloadSize);
if ((opcode & WS_PING) == WS_PING)
{
// Call the WebSocket ping handler
_wsHandler.OnWsPing(WsReceiveBuffer.ToArray(), bufferOffset, WsPayloadSize);
}
else if ((opcode & WS_PONG) == WS_PONG)
{
// Call the WebSocket pong handler
_wsHandler.OnWsPong(WsReceiveBuffer.ToArray(), bufferOffset, WsPayloadSize);
}
else if ((opcode & WS_CLOSE) == WS_CLOSE)
{
// Call the WebSocket close handler
_wsHandler.OnWsClose(WsReceiveBuffer.ToArray(), bufferOffset, WsPayloadSize);
}
else if (((opcode & WS_TEXT) == WS_TEXT) || ((opcode & WS_BINARY) == WS_BINARY))
{
// Call the WebSocket received handler
_wsHandler.OnWsReceived(WsReceiveBuffer.ToArray(), bufferOffset, WsPayloadSize);
}
}
}
}
Expand All @@ -449,42 +452,48 @@ public void PrepareReceiveFrame(byte[] buffer, long offset, long size)
/// </summary>
public int RequiredReceiveFrameSize()
{
if (WsReceived)
return 0;
lock (WsReceiveLock)
{
if (WsReceived)
return 0;

// Required WebSocket frame opcode and mask flag
if (WsReceiveBuffer.Count < 2)
return 2 - WsReceiveBuffer.Count;
// Required WebSocket frame opcode and mask flag
if (WsReceiveBuffer.Count < 2)
return 2 - WsReceiveBuffer.Count;

bool mask = ((WsReceiveBuffer[1] >> 7) & 0x01) != 0;
int payload = WsReceiveBuffer[1] & (~0x80);
bool mask = ((WsReceiveBuffer[1] >> 7) & 0x01) != 0;
int payload = WsReceiveBuffer[1] & (~0x80);

// Required WebSocket frame size
if ((payload == 126) && (WsReceiveBuffer.Count < 4))
return 4 - WsReceiveBuffer.Count;
if ((payload == 127) && (WsReceiveBuffer.Count < 10))
return 10 - WsReceiveBuffer.Count;
// Required WebSocket frame size
if ((payload == 126) && (WsReceiveBuffer.Count < 4))
return 4 - WsReceiveBuffer.Count;
if ((payload == 127) && (WsReceiveBuffer.Count < 10))
return 10 - WsReceiveBuffer.Count;

// Required WebSocket frame mask
if ((mask) && (WsReceiveBuffer.Count < WsHeaderSize))
return WsHeaderSize - WsReceiveBuffer.Count;
// Required WebSocket frame mask
if ((mask) && (WsReceiveBuffer.Count < WsHeaderSize))
return WsHeaderSize - WsReceiveBuffer.Count;

// Required WebSocket frame payload
return WsHeaderSize + WsPayloadSize - WsReceiveBuffer.Count;
// Required WebSocket frame payload
return WsHeaderSize + WsPayloadSize - WsReceiveBuffer.Count;
}
}

/// <summary>
/// Clear WebSocket send/receive buffers
/// </summary>
public void ClearWsBuffers()
{
WsReceived = false;
WsHeaderSize = 0;
WsPayloadSize = 0;
WsReceiveBuffer.Clear();
Array.Clear(WsReceiveMask, 0, WsReceiveMask.Length);
lock (WsReceiveLock)
{
WsReceived = false;
WsHeaderSize = 0;
WsPayloadSize = 0;
WsReceiveBuffer.Clear();
Array.Clear(WsReceiveMask, 0, WsReceiveMask.Length);
}

lock(WsSendLock)
lock (WsSendLock)
{
WsSendBuffer.Clear();
Array.Clear(WsSendMask, 0, WsSendMask.Length);
Expand All @@ -508,6 +517,10 @@ public void ClearWsBuffers()
/// </summary>
internal int WsPayloadSize;

/// <summary>
/// Receive buffer lock
/// </summary>
internal readonly object WsReceiveLock = new object();
/// <summary>
/// Receive buffer
/// </summary>
Expand All @@ -531,5 +544,3 @@ public void ClearWsBuffers()
internal readonly byte[] WsSendMask = new byte[4];
}
}


2 changes: 1 addition & 1 deletion source/NetCoreServer/WsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class WsClient : HttpClient, IWebSocket

public long SendText(byte[] buffer, long offset, long size)
{
lock(WebSocket.WsSendLock)
lock (WebSocket.WsSendLock)
{
WebSocket.PrepareSendFrame(WebSocket.WS_FIN | WebSocket.WS_TEXT, true, buffer, offset, size);
return base.Send(WebSocket.WsSendBuffer.ToArray());
Expand Down

0 comments on commit ad3fcd1

Please sign in to comment.