Skip to content

Commit

Permalink
Added additional cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Mar 8, 2024
1 parent bdf8181 commit 645a34d
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ private readonly int GetIndex(TMember member, out int hashCode)
private readonly ref ContextEntry? GetEntry(TMember member, out int hashCode)
=> ref Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(entries), GetIndex(member, out hashCode));

private void ResizeAndRemoveDeadEntries()
private void ResizeAndRemoveDeadEntries(CancellationToken token)
{
var oldEntries = entries;
entries = new ContextEntry?[Grow(oldEntries.Length)];

// copy elements from old array to a new one
for (var i = 0; i < oldEntries.Length; i++)
for (var i = 0; i < oldEntries.Length; i++, token.ThrowIfCancellationRequested())
{
ref var oldEntry = ref Unsafe.Add(ref MemoryMarshal.GetArrayDataReference(oldEntries), i);
for (ContextEntry? current = oldEntry, next; current is not null; current = next)
Expand Down Expand Up @@ -148,7 +148,7 @@ private readonly bool Insert(ContextEntry entry)
return collisions <= maxCollisions;
}

public Replicator GetOrCreate(TMember key, Func<TMember, Replicator> factory)
public Replicator GetOrCreate(TMember key, Func<TMember, Replicator> factory, CancellationToken token = default)
{
Debug.Assert(key is not null);

Expand All @@ -165,7 +165,7 @@ public Replicator GetOrCreate(TMember key, Func<TMember, Replicator> factory)
ContextEntry? entryToReuse = null;

// try to get from dictionary
for (var current = entry; current is not null; current = current.Next)
for (var current = entry; current is not null; current = current.Next, token.ThrowIfCancellationRequested())
{
if (current.Key is not { } tmp)
{
Expand All @@ -190,7 +190,7 @@ public Replicator GetOrCreate(TMember key, Func<TMember, Replicator> factory)
else if (!Insert(new(key, hashCode, factory, out result)))
{
// too many collisions, resize
ResizeAndRemoveDeadEntries();
ResizeAndRemoveDeadEntries(token);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ internal LeaderState(IRaftStateMachine<TMember> stateMachine, long term, TimeSpa
var precedingIndex = member.State.PrecedingIndex;

// fork replication procedure
replicator = context.GetOrCreate(member, replicatorFactory);
replicator = context.GetOrCreate(member, replicatorFactory, LeadershipToken);
replicator.Initialize(activeConfig, proposedConfig, commitIndex, currentTerm, precedingIndex);
response = SpawnReplicationAsync(replicator, auditTrail, currentIndex, LeadershipToken);
}
else
{
replicator = context.GetOrCreate(member, localReplicatorFactory);
replicator = context.GetOrCreate(member, localReplicatorFactory, LeadershipToken);
response = localMemberResponse;
}
}
Expand Down

0 comments on commit 645a34d

Please sign in to comment.