Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
## [1.0.0-pre.10] - 2021-12-02

### Fixes
* On fragmented and reliable pipelines, sending a large packet when the reliable window was almost full could result in the packet being lost.
* Fixed "pending sends" warning being emitted very often when sending to remote hosts.
  • Loading branch information
Unity Technologies committed Dec 2, 2021
1 parent e5c8ed9 commit c96ecc4
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 23 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Change log

## [1.0.0-pre.10] - 2021-12-02

### Fixes
* On fragmented and reliable pipelines, sending a large packet when the reliable window was almost full could result in the packet being lost.
* Fixed "pending sends" warning being emitted very often when sending to remote hosts.

## [1.0.0-pre.9] - 2021-11-26

### Changes
Expand Down
50 changes: 38 additions & 12 deletions Runtime/BaselibNetworkInterface.cs
Original file line number Diff line number Diff line change
Expand Up @@ -350,23 +350,38 @@ struct FlushSendJob : IJob
public NativeArray<BaselibData> Baselib;
public unsafe void Execute()
{
// We process the results in batches because if we allocate a big number of results here,
// it can cause a stack overflow
const uint k_ResultsBufferSize = 64;
const int k_MaxIterations = 500;
var results = stackalloc Binding.Baselib_RegisteredNetwork_CompletionResult[(int)k_ResultsBufferSize];

var error = default(ErrorState);
var pollCount = 0;
var status = default(Binding.Baselib_RegisteredNetwork_ProcessStatus);
var retryCount = 1000;
var pendingSend = Tx.InUse > 0;
var maxIterations = k_MaxIterations;

while (Tx.InUse > 0 && retryCount-- > 0)
while (pendingSend)
{
while ((status = Binding.Baselib_RegisteredNetwork_Socket_UDP_ProcessSend(Baselib[0].m_Socket, &error)) == Binding.Baselib_RegisteredNetwork_ProcessStatus.Pending
&& pollCount++ < Tx.Capacity) {}
// We ensure we never process more than the actual capacity to prevent unexpected deadlocks
while (pollCount++ < Tx.Capacity)
{
if (Binding.Baselib_RegisteredNetwork_Socket_UDP_ProcessSend(Baselib[0].m_Socket, &error) != Binding.Baselib_RegisteredNetwork_ProcessStatus.Pending)
break;
}

int count;
// InUse is not thread safe, needs to be called in a single threaded flush job
var inFlight = Tx.InUse;
if (inFlight > 0)
// At this point all the packets have been processed or the network can't send a packet right now
// so we yield execution to give the opportunity to other threads to process as the potential network
// pressure releases.
Binding.Baselib_Thread_YieldExecution();
// Next step we wait until processing sends complete.
// The timeout was arbitrarily chosen as it seems to be enough for sending 5000 packets in our tests.
Binding.Baselib_RegisteredNetwork_Socket_UDP_WaitForCompletedSend(Baselib[0].m_Socket, 20, &error);

var count = 0;
var resultBatchesCount = Tx.Capacity / k_ResultsBufferSize + 1;
while ((count = (int)Binding.Baselib_RegisteredNetwork_Socket_UDP_DequeueSend(Baselib[0].m_Socket, results, k_ResultsBufferSize, &error)) > 0)
{
var results = stackalloc Binding.Baselib_RegisteredNetwork_CompletionResult[inFlight];
count = (int)Binding.Baselib_RegisteredNetwork_Socket_UDP_DequeueSend(Baselib[0].m_Socket, results, (uint)inFlight, &error);
if (error.code != ErrorCode.Success)
{
// copy recv flow? e.g. pass
Expand All @@ -378,13 +393,24 @@ public unsafe void Execute()
// pass through a new NetworkPacketSender.?
Tx.ReleaseHandle((int)results[i].requestUserdata - 1);
}

if (resultBatchesCount-- < 0) // Deadlock guard
break;
}

// We set the pollCount to zero that way we try and process any previous packets that may have failed because
// internal buffers were full via a EAGAIN error which we don't actually know happened but assume it may have happen
pollCount = 0;

// InUse is not thread safe, needs to be called in a single threaded flush job
// We ensure we never process more than the actual capacity to prevent unexpected deadlocks
pendingSend = Tx.InUse > 0 && maxIterations-- > 0;
}

#if ENABLE_UNITY_COLLECTIONS_CHECKS
if (Tx.InUse > 0)
{
UnityEngine.Debug.LogWarning("There are pending send packets after the baselib process send");
UnityEngine.Debug.LogWarning(string.Format("There are {0} pending send packets after the baselib process send", Tx.InUse));
}
#endif
}
Expand Down
2 changes: 1 addition & 1 deletion Runtime/NetworkDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ public void Execute()
private SessionIdToken GenerateRandomSessionIdToken(ref SessionIdToken token)
{
//SessionIdToken token = new SessionIdToken();
for (uint i = 0; i <= SessionIdToken.k_Length; ++i)
for (uint i = 0; i < SessionIdToken.k_Length; ++i)
{
unsafe
{
Expand Down
8 changes: 5 additions & 3 deletions Runtime/NetworkPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -560,12 +560,14 @@ internal unsafe int ProcessPipelineSend(NetworkDriver.Concurrent driver, int sta
var connectionId = connection.m_NetworkId;
var systemHeaderSize = driver.MaxProtocolHeaderSize();

// If the call comes from update, the sendHandle is set to default.
bool inUpdateCall = sendHandle.data == IntPtr.Zero;

var resumeQ = new NativeList<int>(16, Allocator.Temp);
int resumeQStart = 0;

// If the call comes from update, the sendHandle is set to default.
var inboundBuffer = default(InboundSendBuffer);
if (sendHandle.data != IntPtr.Zero)
if (!inUpdateCall)
{
inboundBuffer.bufferWithHeaders = (byte*)sendHandle.data + initialHeaderSize + 1;
inboundBuffer.bufferWithHeadersLength = sendHandle.size - initialHeaderSize - 1;
Expand Down Expand Up @@ -625,7 +627,7 @@ internal unsafe int ProcessPipelineSend(NetworkDriver.Concurrent driver, int sta

if (inboundBuffer.bufferWithHeadersLength == 0)
{
if ((requests & NetworkPipelineStage.Requests.Error) != 0 && sendHandle.data != IntPtr.Zero)
if ((requests & NetworkPipelineStage.Requests.Error) != 0 && !inUpdateCall)
retval = sendResult;
break;
}
Expand Down
47 changes: 47 additions & 0 deletions Tests/Runtime/BaselibNetworkInterfaceTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using NUnit.Framework;
using Unity.Networking.Transport;
using UnityEngine.TestTools;
using System.Linq;

namespace Unity.Networking.Transport.Tests
{
public class BaselibNetworkInterfaceTests
{
[Test]
public unsafe void Baselib_Send_WaitForCompletion()
{
var settings = new NetworkSettings();
settings.WithBaselibNetworkInterfaceParameters(sendQueueCapacity: 2000);

using (var baselibInterface = new BaselibNetworkInterface())
{
baselibInterface.Initialize(settings);
baselibInterface.CreateInterfaceEndPoint(NetworkEndPoint.AnyIpv4, out var endpoint);
Assert.Zero(baselibInterface.Bind(endpoint));

// This tests is only valid when sending packets to a public IP.
// So we use an invalid one: https://stackoverflow.com/questions/10456044/what-is-a-good-invalid-ip-address-to-use-for-unit-tests/
baselibInterface.CreateInterfaceEndPoint(NetworkEndPoint.Parse("192.0.2.0", 1234), out var destination);
var queueHandle = default(NetworkSendQueueHandle);

var sendInterface = baselibInterface.CreateSendInterface();

for (int i = 0; i < settings.GetBaselibNetworkInterfaceParameters().sendQueueCapacity; i++)
{
sendInterface.BeginSendMessage.Ptr.Invoke(out var sendHandle, sendInterface.UserData, NetworkParameterConstants.MTU);
sendHandle.size = sendHandle.capacity;
var data = (byte*)sendHandle.data;
for (int j = 0; j < sendHandle.size; j++)
{
data[j] = (byte)j;
}
Assert.AreEqual(sendHandle.capacity, sendInterface.EndSendMessage.Ptr.Invoke(ref sendHandle, ref destination, sendInterface.UserData, ref queueHandle));
}

baselibInterface.ScheduleSend(default, default).Complete();

LogAssert.NoUnexpectedReceived();
}
}
}
}
11 changes: 11 additions & 0 deletions Tests/Runtime/BaselibNetworkInterfaceTests.cs.meta

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Tests/Runtime/com.unity.transport.Tests.asmdef
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
{
"name": "Unity.Networking.Transport.PlayTests",
"references": [
"Unity.Networking.Transport"
"references": [
"Unity.Collections",
"Unity.Networking.Transport",
"Unity.Burst"
],
"optionalUnityReferences": [
"TestAssemblies"
Expand Down
4 changes: 2 additions & 2 deletions ValidationExceptions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
{
"ValidationTest": "Restricted File Type Validation",
"ExceptionMessage": "/Samples~/CustomNetworkInterface/Scripts/network.bindings~/build.bat cannot be included in a package.",
"PackageVersion": "1.0.0-pre.9"
"PackageVersion": "1.0.0-pre.10"
},
{
"ValidationTest": "Restricted File Type Validation",
"ExceptionMessage": "/Samples~/CustomNetworkInterface/Scripts/network.bindings~/shell.bat cannot be included in a package.",
"PackageVersion": "1.0.0-pre.9"
"PackageVersion": "1.0.0-pre.10"
}
],
"WarningExceptions": []
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "com.unity.transport",
"displayName": "Unity Transport",
"version": "1.0.0-pre.9",
"version": "1.0.0-pre.10",
"unity": "2020.3",
"unityRelease": "0f1",
"description": "Unity network transport layer - the low-level interface for sending UDP data",
Expand All @@ -11,12 +11,12 @@
"com.unity.mathematics": "1.2.1"
},
"upmCi": {
"footprint": "a44523c3c6bf06fc16c1c7e10cb639178c93aaf7"
"footprint": "f3dc8eba91dfd32393f7fa090ad6edcb829cda68"
},
"repository": {
"url": "https://github.cds.internal.unity3d.com/unity/com.unity.transport.git",
"type": "git",
"revision": "19e6f0975064ffd6eb9882715550c4e8709adf09"
"revision": "5acd333c1b77b8eee2eed9803cbf5b30a2f38b83"
},
"samples": [
{
Expand Down

0 comments on commit c96ecc4

Please sign in to comment.