diff --git a/src/Apache.IoTDB/Apache.IoTDB.csproj b/src/Apache.IoTDB/Apache.IoTDB.csproj index 9441084..89d433b 100644 --- a/src/Apache.IoTDB/Apache.IoTDB.csproj +++ b/src/Apache.IoTDB/Apache.IoTDB.csproj @@ -1,16 +1,23 @@  - - net5.0;net6.0;netstandard2.1;netstandard2.0;net461 - latest + + net5.0;net6.0;netstandard2.1;netstandard2.0;net461 + latest + eedalong, lausannel, MysticBoy, Aiemu, HTHou + LiuLin Lab + C# client for Apache IoTDB + https://github.com/apache/iotdb-client-csharp + https://github.com/apache/iotdb-client-csharp - - - - - - - + - + + + + + + + + \ No newline at end of file diff --git a/src/Apache.IoTDB/Client.cs b/src/Apache.IoTDB/Client.cs index 0fb5507..8478706 100644 --- a/src/Apache.IoTDB/Client.cs +++ b/src/Apache.IoTDB/Client.cs @@ -8,13 +8,15 @@ public class Client public long SessionId { get; } public long StatementId { get; } public TFramedTransport Transport { get; } + public TEndPoint EndPoint { get; } - public Client(IClientRPCService.Client client, long sessionId, long statementId, TFramedTransport transport) + public Client(IClientRPCService.Client client, long sessionId, long statementId, TFramedTransport transport, TEndPoint endpoint) { ServiceClient = client; SessionId = sessionId; StatementId = statementId; Transport = transport; + EndPoint = endpoint; } } } \ No newline at end of file diff --git a/src/Apache.IoTDB/SessionPool.cs b/src/Apache.IoTDB/SessionPool.cs index 44c3d7b..1014416 100644 --- a/src/Apache.IoTDB/SessionPool.cs +++ b/src/Apache.IoTDB/SessionPool.cs @@ -122,7 +122,7 @@ public async Task ExecuteClientOperationAsync(AsyncOperation x.Ip == originalClient.EndPoint.Ip && x.Port == originalClient.EndPoint.Port); + if (startIndex == -1) { - int currentHostIndex = random.Next(0, _endPoints.Count); - int attempts = 0; + throw new ArgumentException($"The original client is not in the list of endpoints. Original client: {originalClient.EndPoint.Ip}:{originalClient.EndPoint.Port}"); + } + for (int i = 0; i < RetryNum && !isConnected; i++) + { + int attempts = 1; while (attempts < _endPoints.Count) { - int j = (currentHostIndex + attempts) % _endPoints.Count; - + int j = (startIndex + attempts) % _endPoints.Count; try { - for (int index = 0; index < _poolSize; index++) - { - var client = await CreateAndOpen(_endPoints[j].Ip, _endPoints[j].Port, _enableRpcCompression, _timeout, cancellationToken); - _clients.Add(client); - } + var client = await CreateAndOpen(_endPoints[j].Ip, _endPoints[j].Port, _enableRpcCompression, _timeout, cancellationToken); + _clients.Add(client); isConnected = true; break; } @@ -248,7 +262,7 @@ public async Task Reconnect(CancellationToken cancellationToken = default) } } - if (!isConnected || _clients.ClientQueue.Count != _poolSize) + if (!isConnected) { throw new TException("Error occurs when reconnecting session pool. Could not connect to any server", null); } @@ -369,13 +383,14 @@ private async Task CreateAndOpen(string host, int port, bool enableRpcCo var sessionId = openResp.SessionId; var statementId = await client.requestStatementIdAsync(sessionId, cancellationToken); - _isClose = false; + var endpoint = new TEndPoint(host, port); var returnClient = new Client( client, sessionId, statementId, - transport); + transport, + endpoint); return returnClient; }