Skip to content

Commit

Permalink
fix: only reconnect one client when there is only 1 node url
Browse files Browse the repository at this point in the history
  • Loading branch information
lausannel authored Aug 26, 2024
1 parent c5e86d7 commit acf44e2
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions src/Apache.IoTDB/SessionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,39 +235,55 @@ public async Task Open(CancellationToken cancellationToken = default)

public async Task<Client> Reconnect(Client originalClient = null, CancellationToken cancellationToken = default)
{
originalClient.Transport.Close();
originalClient?.Transport.Close();

if (_nodeUrls.Count == 0)
{
await Open(_enableRpcCompression);
return _clients.Take();
}

int startIndex = _endPoints.FindIndex(x => x.Ip == originalClient.EndPoint.Ip && x.Port == originalClient.EndPoint.Port);
if (startIndex == -1)
{
throw new ArgumentException($"The original client is not in the list of endpoints. Original client: {originalClient.EndPoint.Ip}:{originalClient.EndPoint.Port}");
}

for (int attempt = 1; attempt <= RetryNum; attempt++)
{
for (int i = 0; i < _endPoints.Count; i++)
for (int attempt = 1; attempt <= RetryNum; attempt++)
{
int j = (startIndex + i) % _endPoints.Count;
try
{
var client = await CreateAndOpen(_endPoints[j].Ip, _endPoints[j].Port, _enableRpcCompression, _timeout, cancellationToken);
var client = await CreateAndOpen(_host, _port, _enableRpcCompression, _timeout, cancellationToken);
return client;
}
catch (Exception e)
{
if (_debugMode)
{
_logger.LogWarning(e, "Attempt connecting to {0}:{1} failed", _endPoints[j].Ip, _endPoints[j].Port);
_logger.LogWarning(e, "Attempt reconnecting to {0}:{1} failed", _host, _port);
}
}
}
}
else
{
int startIndex = _endPoints.FindIndex(x => x.Ip == originalClient.EndPoint.Ip && x.Port == originalClient.EndPoint.Port);
if (startIndex == -1)
{
throw new ArgumentException($"The original client is not in the list of endpoints. Original client: {originalClient.EndPoint.Ip}:{originalClient.EndPoint.Port}");
}

for (int attempt = 1; attempt <= RetryNum; attempt++)
{
for (int i = 0; i < _endPoints.Count; i++)
{
int j = (startIndex + i) % _endPoints.Count;
try
{
var client = await CreateAndOpen(_endPoints[j].Ip, _endPoints[j].Port, _enableRpcCompression, _timeout, cancellationToken);
return client;
}
catch (Exception e)
{
if (_debugMode)
{
_logger.LogWarning(e, "Attempt connecting to {0}:{1} failed", _endPoints[j].Ip, _endPoints[j].Port);
}
}
}
}
}

throw new TException("Error occurs when reconnecting session pool. Could not connect to any server", null);
}

Expand Down

0 comments on commit acf44e2

Please sign in to comment.