Skip to content

Commit

Permalink
Added convenient method to wait for leadership
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Apr 30, 2024
1 parent f859533 commit 55b3879
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ public static async Task ConsensusToken()
True(GetLocalClusterView(host2).ConsensusToken.IsCancellationRequested);
await host2.StartAsync();

await GetLocalClusterView(host1).WaitForLeaderAsync(DefaultTimeout);
await GetLocalClusterView(host1).WaitForLeadershipAsync(DefaultTimeout);
Equal(GetLocalClusterView(host1).LeadershipToken, GetLocalClusterView(host1).ConsensusToken);
True(await GetLocalClusterView(host1).AddMemberAsync(GetLocalClusterView(host2).LocalMemberAddress));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public interface IRaftCluster : IReplicationCluster<IRaftLogEntry>, IPeerMesh<IR
/// has communication with the leader.
/// </summary>
/// <remarks>
/// The token moves to canceled state if the current node upgrades to the candidate state or looses connection with the leader.
/// The token moves to canceled state if the current node upgrades to the candidate state or loses connection with the leader.
/// </remarks>
CancellationToken ConsensusToken { get; }

Expand All @@ -72,4 +72,16 @@ public interface IRaftCluster : IReplicationCluster<IRaftLogEntry>, IPeerMesh<IR
/// <returns>The task representing asynchronous result.</returns>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
ValueTask ApplyReadBarrierAsync(CancellationToken token = default);

/// <summary>
/// Waits until the local node is elected as the leader.
/// </summary>
/// <param name="timeout">The time to wait; or <see cref="Timeout.InfiniteTimeSpan"/>.</param>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <returns>The leadership token.</returns>
/// <exception cref="TimeoutException">The operation is timed out.</exception>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
/// <exception cref="ObjectDisposedException">The local node is disposed.</exception>
/// <seealso cref="LeadershipToken"/>
ValueTask<CancellationToken> WaitForLeadershipAsync(TimeSpan timeout, CancellationToken token = default);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public abstract partial class RaftCluster<TMember> : Disposable, IUnresponsiveCl

private volatile RaftState<TMember> state;
private volatile TaskCompletionSource<TMember> electionEvent;
private volatile TaskCompletionSource<CancellationToken> leadershipEvent;
private InvocationList<Action<RaftCluster<TMember>, TMember?>> leaderChangedHandlers;
private InvocationList<Action<RaftCluster<TMember>, TMember>> replicationHandlers;
private volatile int electionTimeout;
Expand Down Expand Up @@ -75,6 +76,7 @@ protected RaftCluster(IClusterMemberConfiguration config, in TagList measurement
readinessProbe = new(TaskCreationOptions.RunContinuationsAsynchronously);
aggressiveStickiness = config.AggressiveLeaderStickiness;
electionEvent = new(TaskCreationOptions.RunContinuationsAsynchronously);
leadershipEvent = new(TaskCreationOptions.RunContinuationsAsynchronously);
state = new StandbyState<TMember>(this, TimeSpan.FromMilliseconds(electionTimeout));
EndPointComparer = config.EndPointComparer;
this.measurementTags = measurementTags;
Expand Down Expand Up @@ -256,22 +258,18 @@ private set
}
}

/// <summary>
/// Waits for the leader election asynchronously.
/// </summary>
/// <param name="timeout">The time to wait; or <see cref="System.Threading.Timeout.InfiniteTimeSpan"/>.</param>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <returns>The elected leader.</returns>
/// <exception cref="TimeoutException">The operation is timed out.</exception>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
/// <exception cref="ObjectDisposedException">The local node is disposed.</exception>
/// <inheritdoc cref="ICluster.WaitForLeaderAsync(TimeSpan, CancellationToken)"/>
public Task<TMember> WaitForLeaderAsync(TimeSpan timeout, CancellationToken token = default)
=> electionEvent.Task.WaitAsync(timeout, token);

/// <inheritdoc />
ValueTask<IClusterMember> ICluster.WaitForLeaderAsync(TimeSpan timeout, CancellationToken token)
=> new(WaitForLeaderAsync(timeout, token).Convert<TMember, IClusterMember>());

/// <inheritdoc cref="IRaftCluster.WaitForLeadershipAsync(TimeSpan, CancellationToken)"/>
public ValueTask<CancellationToken> WaitForLeadershipAsync(TimeSpan timeout, CancellationToken token = default)
=> new(leadershipEvent.Task.WaitAsync(timeout, token));

private ValueTask UnfreezeAsync()
{
ValueTask result;
Expand Down Expand Up @@ -441,7 +439,14 @@ private async Task CancelPendingRequestsAsync()
}

private ValueTask UpdateStateAsync(RaftState<TMember> newState)
=> Interlocked.Exchange(ref state, newState).DisposeAsync();
{
if (leadershipEvent is { Task.IsCompletedSuccessfully: true } leadershipEventCopy)
{
Interlocked.CompareExchange(ref leadershipEvent, new(TaskCreationOptions.RunContinuationsAsynchronously), leadershipEventCopy);
}

return Interlocked.Exchange(ref state, newState).DisposeAsync();
}

/// <summary>
/// Stops serving local member.
Expand Down Expand Up @@ -1118,11 +1123,13 @@ async void IRaftStateMachine<TMember>.MoveToLeaderState(IRaftStateMachine.IWeakC
};

await UpdateStateAsync(newState).ConfigureAwait(false);
await auditTrail.AppendNoOpEntry(LifecycleToken).ConfigureAwait(false);

// ensure that the leader is visible to the consumers after no-op entry is added to the log (which acts as a write barrier)
Leader = newLeader;
await auditTrail.AppendNoOpEntry(LifecycleToken).ConfigureAwait(false);
newState.StartLeading(HeartbeatTimeout, auditTrail, ConfigurationStorage);
leadershipEvent.TrySetResult(newState.Token);

newState.StartLeading(HeartbeatTimeout, auditTrail, ConfigurationStorage);
Logger.TransitionToLeaderStateCompleted(currentTerm);
}
}
Expand Down Expand Up @@ -1293,6 +1300,7 @@ protected override void Dispose(bool disposing)
memberAddedHandlers = memberRemovedHandlers = default;
leaderChangedHandlers = default;
TrySetDisposedException(electionEvent);
TrySetDisposedException(leadershipEvent);
}

base.Dispose(disposing);
Expand Down

0 comments on commit 55b3879

Please sign in to comment.