Skip to content

Commit

Permalink
feat: allow clients in sessionpool to connect to different endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
lausannel committed Jul 8, 2024
1 parent f44923f commit 77c3569
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 42 deletions.
29 changes: 18 additions & 11 deletions src/Apache.IoTDB/Apache.IoTDB.csproj
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>net5.0;net6.0;netstandard2.1;netstandard2.0;net461</TargetFrameworks>
<LangVersion>latest</LangVersion>
<PropertyGroup>
<TargetFrameworks>net5.0;net6.0;netstandard2.1;netstandard2.0;net461</TargetFrameworks>
<LangVersion>latest</LangVersion>
<Authors>eedalong, lausannel, MysticBoy, Aiemu, HTHou</Authors>
<Company>LiuLin Lab</Company>
<PackageDescription>C# client for Apache IoTDB</PackageDescription>
<PackageProjectUrl>https://github.com/apache/iotdb-client-csharp</PackageProjectUrl>
<RepositoryUrl>https://github.com/apache/iotdb-client-csharp</RepositoryUrl>

</PropertyGroup>

<ItemGroup>
<PackageReference Include="ApacheThrift" Version="0.14.1" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'net461' or '$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="IndexRange" Version="1.0.2" />
</ItemGroup>
</PropertyGroup>

</Project>
<ItemGroup>
<PackageReference Include="ApacheThrift" Version="0.14.1" />
</ItemGroup>
<ItemGroup
Condition="'$(TargetFramework)' == 'net461' or '$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="IndexRange" Version="1.0.2" />
</ItemGroup>

</Project>
4 changes: 3 additions & 1 deletion src/Apache.IoTDB/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ public class Client
public long SessionId { get; }
public long StatementId { get; }
public TFramedTransport Transport { get; }
public TEndPoint EndPoint { get; }

public Client(IClientRPCService.Client client, long sessionId, long statementId, TFramedTransport transport)
public Client(IClientRPCService.Client client, long sessionId, long statementId, TFramedTransport transport, TEndPoint endpoint)
{
ServiceClient = client;
SessionId = sessionId;
StatementId = statementId;
Transport = transport;
EndPoint = endpoint;
}
}
}
75 changes: 45 additions & 30 deletions src/Apache.IoTDB/SessionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public async Task<TResult> ExecuteClientOperationAsync<TResult>(AsyncOperation<T
{
if (retryOnFailure)
{
await Reconnect();
await Reconnect(client);
client = _clients.Take();
try
{
Expand Down Expand Up @@ -175,6 +175,7 @@ public async Task Open(CancellationToken cancellationToken = default)
{
_clients = new ConcurrentClientQueue();
_clients.Timeout = _timeout * 5;

if (_nodeUrls.Count == 0)
{
for (var index = 0; index < _poolSize; index++)
Expand All @@ -184,33 +185,46 @@ public async Task Open(CancellationToken cancellationToken = default)
}
else
{
foreach (var endPoint in _endPoints)
int startIndex = 0;
for (var index = 0; index < _poolSize; index++)
{
try
bool isConnected = false;
for (int i = 0; i < _endPoints.Count; i++)
{
for (var index = 0; index < _poolSize; index++)
var endPointIndex = (startIndex + i) % _endPoints.Count;
var endPoint = _endPoints[endPointIndex];
try
{
var client = await CreateAndOpen(endPoint.Ip, endPoint.Port, _enableRpcCompression, _timeout, cancellationToken);
_clients.Add(client);
isConnected = true;
startIndex = (endPointIndex + 1) % _endPoints.Count;
break;
}
catch (Exception e)
{
if (_debugMode)
{
_logger.LogWarning(e, "Currently connecting to {0}:{1} failed", endPoint.Ip, endPoint.Port);
}
}
break;
}
catch (Exception e) // CreateAndOpen will not throw TException
if (!isConnected) // current client could not connect to any endpoint
{
#if NET461_OR_GREATER || NETSTANDARD2_0
#else
_clients.ClientQueue.Clear();
#endif
continue;
throw new TException("Error occurs when opening session pool. Could not connect to any server", null);
}
}
if (_clients.ClientQueue.Count != _poolSize)
{
throw new TException("Error occurs when opening session pool. Could not connect to any server", null);
}
}

if (_clients.ClientQueue.Count != _poolSize)
{
throw new TException(string.Format("Error occurs when opening session pool. Client pool size is not equal to the expected size. Client pool size: {0}, expected size: {1}", _clients.ClientQueue.Count, _poolSize), null);
}
_isClose = false;
}
public async Task Reconnect(CancellationToken cancellationToken = default)


public async Task Reconnect(Client originalClient = null, CancellationToken cancellationToken = default)
{
if (_nodeUrls.Count == 0)
{
Expand All @@ -219,24 +233,24 @@ public async Task Reconnect(CancellationToken cancellationToken = default)
}

bool isConnected = false;
Random random = new Random();
originalClient.Transport.Close();

for (int i = 0; i < RetryNum && !isConnected; i++)
int startIndex = _endPoints.FindIndex(x => x.Ip == originalClient.EndPoint.Ip && x.Port == originalClient.EndPoint.Port);
if (startIndex == -1)
{
int currentHostIndex = random.Next(0, _endPoints.Count);
int attempts = 0;
throw new ArgumentException($"The original client is not in the list of endpoints. Original client: {originalClient.EndPoint.Ip}:{originalClient.EndPoint.Port}");
}

for (int i = 0; i < RetryNum && !isConnected; i++)
{
int attempts = 1;
while (attempts < _endPoints.Count)
{
int j = (currentHostIndex + attempts) % _endPoints.Count;

int j = (startIndex + attempts) % _endPoints.Count;
try
{
for (int index = 0; index < _poolSize; index++)
{
var client = await CreateAndOpen(_endPoints[j].Ip, _endPoints[j].Port, _enableRpcCompression, _timeout, cancellationToken);
_clients.Add(client);
}
var client = await CreateAndOpen(_endPoints[j].Ip, _endPoints[j].Port, _enableRpcCompression, _timeout, cancellationToken);
_clients.Add(client);
isConnected = true;
break;
}
Expand All @@ -248,7 +262,7 @@ public async Task Reconnect(CancellationToken cancellationToken = default)
}
}

if (!isConnected || _clients.ClientQueue.Count != _poolSize)
if (!isConnected)
{
throw new TException("Error occurs when reconnecting session pool. Could not connect to any server", null);
}
Expand Down Expand Up @@ -369,13 +383,14 @@ private async Task<Client> CreateAndOpen(string host, int port, bool enableRpcCo
var sessionId = openResp.SessionId;
var statementId = await client.requestStatementIdAsync(sessionId, cancellationToken);

_isClose = false;
var endpoint = new TEndPoint(host, port);

var returnClient = new Client(
client,
sessionId,
statementId,
transport);
transport,
endpoint);

return returnClient;
}
Expand Down

0 comments on commit 77c3569

Please sign in to comment.