Skip to content

Commit

Permalink
First prototype of lease provider/consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Jun 14, 2024
1 parent 37c90f6 commit cd5b927
Show file tree
Hide file tree
Showing 4 changed files with 787 additions and 0 deletions.
215 changes: 215 additions & 0 deletions src/DotNext.Threading/Threading/Leases/LeaseConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
using System.Runtime.InteropServices;

namespace DotNext.Threading.Leases;

using Diagnostics;

/// <summary>
/// Represents client side of a lease in a distributed environment.
/// </summary>
/// <seealso cref="LeaseProvider{TMetadata}"/>
public abstract class LeaseConsumer : Disposable
{
private readonly double clockDriftBound;
private LeaseIdentity identity;
private CancellationTokenSource? source;

/// <summary>
/// Initializes a new lease consumer.
/// </summary>
protected LeaseConsumer() => clockDriftBound = 0.5D;

/// <summary>
/// Gets or sets wall clock desync degree in the cluster, in percents.
/// </summary>
/// <remarks>
/// 0 means that wall clocks for this consumer and lease provider are in sync. To reduce contention between
/// concurrent consumers it's better to renew a lease earlier than its expiration timeout.
/// </remarks>
/// <value>A value in range [0..1). The default value is 0.3.</value>
public double ClockDriftBound
{
get => clockDriftBound;
init => clockDriftBound = double.IsFinite(value) && value >= 0D ? value : throw new ArgumentOutOfRangeException(nameof(value));
}

private TimeSpan AdjustTimeToLive(TimeSpan originalTtl)
=> originalTtl - (originalTtl * clockDriftBound);

/// <summary>
/// Gets the token bounded to the lease lifetime.
/// </summary>
/// <remarks>
/// Use that token to perform lease-bounded operation. The token acquired before a call to
/// <see cref="TryAcquireAsync(CancellationToken)"/> or <see cref="TryRenewAsync(CancellationToken)"/>
/// should not be used after. The typical use case to invoke these methods and then obtain the token.
/// </remarks>
public CancellationToken Token => source?.Token ?? new(true);

private void CancelAndDispose()
{
using (source)
{
source?.Cancel(throwOnFirstException: false);
}
}

/// <summary>
/// Tries to acquire the lease.
/// </summary>
/// <remarks>
/// This method cancels <see cref="Token"/> immediately. If the method returns <see langword="true"/>, the token
/// can be used to perform async operation bounded to the lease lifetime.
/// </remarks>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <returns><see langword="true"/> if lease taken successfully; otherwise, <see langword="false"/>.</returns>
/// <exception cref="ObjectDisposedException">The consumer is disposed.</exception>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
public async ValueTask<bool> TryAcquireAsync(CancellationToken token = default)
{
ObjectDisposedException.ThrowIf(IsDisposingOrDisposed, this);

CancelAndDispose();

var ts = new Timestamp();
TimeSpan remainingTime;
if (await TryAcquireCoreAsync(token).ConfigureAwait(false) is { } response && (remainingTime = AdjustTimeToLive(response.TimeToLive - ts.Elapsed)) > TimeSpan.Zero)
{
source = new();
identity = response.Identity;
source.CancelAfter(remainingTime);
return true;
}

return false;
}

/// <summary>
/// Performs a call to <see cref="LeaseProvider{TMetadata}.TryAcquireAsync(CancellationToken)"/> across the application boundaries.
/// </summary>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <returns>The response from the lease provider; or <see langword="null"/> if the lease cannot be taken.</returns>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
protected abstract ValueTask<AcquisitionResult?> TryAcquireCoreAsync(CancellationToken token = default);

/// <summary>
/// Tries to renew a lease.
/// </summary>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <returns><see langword="true"/> if lease renewed successfully; otherwise, <see langword="false"/>.</returns>
/// <exception cref="ObjectDisposedException">The consumer is disposed.</exception>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
/// <exception cref="InvalidOperationException">This consumer never took the lease.</exception>
public async ValueTask<bool> TryRenewAsync(CancellationToken token = default)
{
ObjectDisposedException.ThrowIf(IsDisposingOrDisposed, this);

if (identity.Version is LeaseIdentity.InitialVersion)
throw new InvalidOperationException();

var ts = new Timestamp();
TimeSpan remainingTime;
if (await TryRenewAsync(identity, token).ConfigureAwait(false) is { } response && (remainingTime = AdjustTimeToLive(response.TimeToLive - ts.Elapsed)) > TimeSpan.Zero)
{
identity = response.Identity;

if (source is null || !TryResetOrDestroy(source))
source = new();

source.CancelAfter(remainingTime);
return true;
}

return false;

static bool TryResetOrDestroy(CancellationTokenSource source)
{
var result = source.TryReset();
if (!result)
source.Dispose();

return result;
}
}

/// <summary>
/// Performs a call to <see cref="LeaseProvider{TMetadata}.TryRenewAsync(LeaseIdentity, bool, CancellationToken)"/> or
/// <see cref="LeaseProvider{TMetadata}.TryAcquireOrRenewAsync(LeaseIdentity, CancellationToken)"/> across the application boundaries.
/// </summary>
/// <param name="identity">The identity of the lease to renew.</param>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <returns>The response from the lease provider; or <see langword="null"/> if the lease cannot be taken.</returns>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
protected abstract ValueTask<AcquisitionResult?> TryRenewAsync(LeaseIdentity identity, CancellationToken token);

/// <summary>
/// Releases a lease.
/// </summary>
/// <remarks>
/// This method cancels <see cref="Token"/> immediately.
/// </remarks>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <returns><see langword="true"/> if lease canceled successfully; otherwise, <see langword="false"/>.</returns>
/// <exception cref="ObjectDisposedException">The consumer is disposed.</exception>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
/// <exception cref="InvalidOperationException">This consumer never took the lease.</exception>
/// <exception cref="InvalidOperationException">This consumer never took the lease.</exception>
public async ValueTask<bool> ReleaseAsync(CancellationToken token = default)
{
ObjectDisposedException.ThrowIf(IsDisposingOrDisposed, this);

if (this.identity.Version is LeaseIdentity.InitialVersion)
throw new InvalidOperationException();

CancelAndDispose();
if (await ReleaseAsync(this.identity, token).ConfigureAwait(false) is { } identity)
{
this.identity = identity;
return true;
}

return false;
}

/// <summary>
/// Performs a call to <see cref="LeaseProvider{TMetadata}.ReleaseAsync(LeaseIdentity, CancellationToken)"/> across
/// the application boundaries.
/// </summary>
/// <param name="identity">The identity of the lease to renew.</param>
/// <param name="token">The token that can be used to cancel the operation.</param>
/// <returns>The response from the lease provider; or <see langword="null"/> if the lease cannot be taken.</returns>
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
protected abstract ValueTask<LeaseIdentity?> ReleaseAsync(LeaseIdentity identity, CancellationToken token);

/// <inheritdoc/>
protected override void Dispose(bool disposing)
{
if (disposing)
{
if (source is not null)
{
source.Dispose();
source = null;
}
}

base.Dispose(disposing);
}

/// <summary>
/// Represents a result of lease acquisition operation.
/// </summary>
[StructLayout(LayoutKind.Auto)]
protected readonly struct AcquisitionResult
{
/// <summary>
/// Gets or sets the identity of the lease.
/// </summary>
public required LeaseIdentity Identity { get; init; }

/// <summary>
/// Gets or sets lease expiration time returned by the provider.
/// </summary>
public required TimeSpan TimeToLive { get; init; }
}
}
70 changes: 70 additions & 0 deletions src/DotNext.Threading/Threading/Leases/LeaseIdentity.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.InteropServices;

namespace DotNext.Threading.Leases;

/// <summary>
/// Represents a lease in the particular point in time.
/// </summary>
[StructLayout(LayoutKind.Auto)]
public readonly struct LeaseIdentity : IEquatable<LeaseIdentity>
{
/// <summary>
/// Represents initial version of a lease which cannot be renewed or released but can be acquired.
/// </summary>
[CLSCompliant(false)]
public const ulong InitialVersion = 0UL;

/// <summary>
/// Gets a version of the lease.
/// </summary>
[CLSCompliant(false)]
public required ulong Version { get; init; }

/// <summary>
/// Gets an ID of the lease.
/// </summary>
/// <remarks>
/// This property can be used only if the provider supports the deletion of leases.
/// In that case, a newly created lease after its deletion must have unique random ID
/// to prevent its renewal from the stale client.
/// </remarks>
public Guid Id { get; init; }

private bool Equals(in LeaseIdentity other)
=> Version == other.Version && Id == other.Id;

/// <summary>
/// Determines whether this identity is the same as the specified one.
/// </summary>
/// <param name="other">The identity to be compared.</param>
/// <returns><see langword="true"/> if this identity is the same as <paramref name="other"/>; otherwise, <see langword="false"/>.</returns>
public bool Equals(LeaseIdentity other) => Equals(in other);

/// <inheritdoc/>
public override bool Equals([NotNullWhen(true)] object? other)
=> other is LeaseIdentity identity && Equals(in identity);

/// <inheritdoc/>
public override int GetHashCode() => HashCode.Combine(Version, Id);

/// <summary>
/// Determines whether the two identities are equal.
/// </summary>
/// <param name="x">The first identity to compare.</param>
/// <param name="y">The second identity to compare.</param>
/// <returns><see langword="true"/> if <paramref name="x"/> is equal to <paramref name="y"/>; otherwise, <see langword="false"/>.</returns>
public static bool operator ==(in LeaseIdentity x, in LeaseIdentity y)
=> x.Equals(in y);

/// <summary>
/// Determines whether the two identities are not equal.
/// </summary>
/// <param name="x">The first identity to compare.</param>
/// <param name="y">The second identity to compare.</param>
/// <returns><see langword="true"/> if <paramref name="x"/> is not equal to <paramref name="y"/>; otherwise, <see langword="false"/>.</returns>
public static bool operator !=(in LeaseIdentity x, in LeaseIdentity y)
=> x.Equals(in y) is false;

internal LeaseIdentity BumpVersion() => this with { Version = Version + 1UL };
}
64 changes: 64 additions & 0 deletions src/DotNext.Threading/Threading/Leases/LeaseProvider.Utils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System.Runtime.InteropServices;

namespace DotNext.Threading.Leases;

using Patterns;

public partial class LeaseProvider<TMetadata>
{
private interface ITransitionCondition
{
bool Invoke(in State state, TimeProvider provider, TimeSpan timeToLive, out TimeSpan remainingTime);
}

private sealed class AcqusitionCondition : ITransitionCondition, ISingleton<AcqusitionCondition>
{
public static AcqusitionCondition Instance { get; } = new();

private AcqusitionCondition()
{
}

bool ITransitionCondition.Invoke(in State state, TimeProvider provider, TimeSpan timeToLive, out TimeSpan remainingTime)
=> state.IsExpired(provider, timeToLive, out remainingTime);
}

[StructLayout(LayoutKind.Auto)]
private readonly struct RenewalCondition(LeaseIdentity identity, bool reacquire) : ITransitionCondition
{
bool ITransitionCondition.Invoke(in State state, TimeProvider provider, TimeSpan timeToLive, out TimeSpan remainingTime)
{
remainingTime = timeToLive;
return state.Identity == identity && (reacquire || !state.IsExpired(provider, timeToLive, out remainingTime));
}
}

[StructLayout(LayoutKind.Auto)]
private readonly struct AcquisitionOrRenewalCondition(LeaseIdentity identity) : ITransitionCondition
{
bool ITransitionCondition.Invoke(in State state, TimeProvider provider, TimeSpan timeToLive, out TimeSpan remainingTime)
{
remainingTime = timeToLive;
return state.Identity == identity || state.IsExpired(provider, timeToLive, out remainingTime);
}
}

[StructLayout(LayoutKind.Auto)]
private readonly struct Updater<TArg>(TArg arg, Func<TArg, TMetadata, CancellationToken, ValueTask<TMetadata>> updater) : ISupplier<TMetadata, CancellationToken, ValueTask<TMetadata>>
{
ValueTask<TMetadata> ISupplier<TMetadata, CancellationToken, ValueTask<TMetadata>>.Invoke(TMetadata metadata, CancellationToken token)
=> updater(arg, metadata, token);
}

private sealed class NoOpUpdater : ISupplier<TMetadata, CancellationToken, ValueTask<TMetadata>>, ISingleton<NoOpUpdater>
{
public static NoOpUpdater Instance { get; } = new();

private NoOpUpdater()
{
}

ValueTask<TMetadata> ISupplier<TMetadata, CancellationToken, ValueTask<TMetadata>>.Invoke(TMetadata metadata, CancellationToken token)
=> ValueTask.FromResult(metadata);
}
}
Loading

0 comments on commit cd5b927

Please sign in to comment.