diff --git a/src/DotNext.Threading/Threading/Leases/LeaseConsumer.cs b/src/DotNext.Threading/Threading/Leases/LeaseConsumer.cs new file mode 100644 index 000000000..b205a6b29 --- /dev/null +++ b/src/DotNext.Threading/Threading/Leases/LeaseConsumer.cs @@ -0,0 +1,215 @@ +using System.Runtime.InteropServices; + +namespace DotNext.Threading.Leases; + +using Diagnostics; + +/// <summary> +/// Represents client side of a lease in a distributed environment. +/// </summary> +/// <seealso cref="LeaseProvider{TMetadata}"/> +public abstract class LeaseConsumer : Disposable +{ + private readonly double clockDriftBound; + private LeaseIdentity identity; + private CancellationTokenSource? source; + + /// <summary> + /// Initializes a new lease consumer. + /// </summary> + protected LeaseConsumer() => clockDriftBound = 0.5D; + + /// <summary> + /// Gets or sets wall clock desync degree in the cluster, in percents. + /// </summary> + /// <remarks> + /// 0 means that wall clocks for this consumer and lease provider are in sync. To reduce contention between + /// concurrent consumers it's better to renew a lease earlier than its expiration timeout. + /// </remarks> + /// <value>A value in range [0..1). The default value is 0.3.</value> + public double ClockDriftBound + { + get => clockDriftBound; + init => clockDriftBound = double.IsFinite(value) && value >= 0D ? value : throw new ArgumentOutOfRangeException(nameof(value)); + } + + private TimeSpan AdjustTimeToLive(TimeSpan originalTtl) + => originalTtl - (originalTtl * clockDriftBound); + + /// <summary> + /// Gets the token bounded to the lease lifetime. + /// </summary> + /// <remarks> + /// Use that token to perform lease-bounded operation. The token acquired before a call to + /// <see cref="TryAcquireAsync(CancellationToken)"/> or <see cref="TryRenewAsync(CancellationToken)"/> + /// should not be used after. The typical use case to invoke these methods and then obtain the token. + /// </remarks> + public CancellationToken Token => source?.Token ?? new(true); + + private void CancelAndDispose() + { + using (source) + { + source?.Cancel(throwOnFirstException: false); + } + } + + /// <summary> + /// Tries to acquire the lease. + /// </summary> + /// <remarks> + /// This method cancels <see cref="Token"/> immediately. If the method returns <see langword="true"/>, the token + /// can be used to perform async operation bounded to the lease lifetime. + /// </remarks> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns><see langword="true"/> if lease taken successfully; otherwise, <see langword="false"/>.</returns> + /// <exception cref="ObjectDisposedException">The consumer is disposed.</exception> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + public async ValueTask<bool> TryAcquireAsync(CancellationToken token = default) + { + ObjectDisposedException.ThrowIf(IsDisposingOrDisposed, this); + + CancelAndDispose(); + + var ts = new Timestamp(); + TimeSpan remainingTime; + if (await TryAcquireCoreAsync(token).ConfigureAwait(false) is { } response && (remainingTime = AdjustTimeToLive(response.TimeToLive - ts.Elapsed)) > TimeSpan.Zero) + { + source = new(); + identity = response.Identity; + source.CancelAfter(remainingTime); + return true; + } + + return false; + } + + /// <summary> + /// Performs a call to <see cref="LeaseProvider{TMetadata}.TryAcquireAsync(CancellationToken)"/> across the application boundaries. + /// </summary> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns>The response from the lease provider; or <see langword="null"/> if the lease cannot be taken.</returns> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + protected abstract ValueTask<AcquisitionResult?> TryAcquireCoreAsync(CancellationToken token = default); + + /// <summary> + /// Tries to renew a lease. + /// </summary> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns><see langword="true"/> if lease renewed successfully; otherwise, <see langword="false"/>.</returns> + /// <exception cref="ObjectDisposedException">The consumer is disposed.</exception> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + /// <exception cref="InvalidOperationException">This consumer never took the lease.</exception> + public async ValueTask<bool> TryRenewAsync(CancellationToken token = default) + { + ObjectDisposedException.ThrowIf(IsDisposingOrDisposed, this); + + if (identity.Version is LeaseIdentity.InitialVersion) + throw new InvalidOperationException(); + + var ts = new Timestamp(); + TimeSpan remainingTime; + if (await TryRenewAsync(identity, token).ConfigureAwait(false) is { } response && (remainingTime = AdjustTimeToLive(response.TimeToLive - ts.Elapsed)) > TimeSpan.Zero) + { + identity = response.Identity; + + if (source is null || !TryResetOrDestroy(source)) + source = new(); + + source.CancelAfter(remainingTime); + return true; + } + + return false; + + static bool TryResetOrDestroy(CancellationTokenSource source) + { + var result = source.TryReset(); + if (!result) + source.Dispose(); + + return result; + } + } + + /// <summary> + /// Performs a call to <see cref="LeaseProvider{TMetadata}.TryRenewAsync(LeaseIdentity, bool, CancellationToken)"/> or + /// <see cref="LeaseProvider{TMetadata}.TryAcquireOrRenewAsync(LeaseIdentity, CancellationToken)"/> across the application boundaries. + /// </summary> + /// <param name="identity">The identity of the lease to renew.</param> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns>The response from the lease provider; or <see langword="null"/> if the lease cannot be taken.</returns> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + protected abstract ValueTask<AcquisitionResult?> TryRenewAsync(LeaseIdentity identity, CancellationToken token); + + /// <summary> + /// Releases a lease. + /// </summary> + /// <remarks> + /// This method cancels <see cref="Token"/> immediately. + /// </remarks> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns><see langword="true"/> if lease canceled successfully; otherwise, <see langword="false"/>.</returns> + /// <exception cref="ObjectDisposedException">The consumer is disposed.</exception> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + /// <exception cref="InvalidOperationException">This consumer never took the lease.</exception> + /// <exception cref="InvalidOperationException">This consumer never took the lease.</exception> + public async ValueTask<bool> ReleaseAsync(CancellationToken token = default) + { + ObjectDisposedException.ThrowIf(IsDisposingOrDisposed, this); + + if (this.identity.Version is LeaseIdentity.InitialVersion) + throw new InvalidOperationException(); + + CancelAndDispose(); + if (await ReleaseAsync(this.identity, token).ConfigureAwait(false) is { } identity) + { + this.identity = identity; + return true; + } + + return false; + } + + /// <summary> + /// Performs a call to <see cref="LeaseProvider{TMetadata}.ReleaseAsync(LeaseIdentity, CancellationToken)"/> across + /// the application boundaries. + /// </summary> + /// <param name="identity">The identity of the lease to renew.</param> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns>The response from the lease provider; or <see langword="null"/> if the lease cannot be taken.</returns> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + protected abstract ValueTask<LeaseIdentity?> ReleaseAsync(LeaseIdentity identity, CancellationToken token); + + /// <inheritdoc/> + protected override void Dispose(bool disposing) + { + if (disposing) + { + if (source is not null) + { + source.Dispose(); + source = null; + } + } + + base.Dispose(disposing); + } + + /// <summary> + /// Represents a result of lease acquisition operation. + /// </summary> + [StructLayout(LayoutKind.Auto)] + protected readonly struct AcquisitionResult + { + /// <summary> + /// Gets or sets the identity of the lease. + /// </summary> + public required LeaseIdentity Identity { get; init; } + + /// <summary> + /// Gets or sets lease expiration time returned by the provider. + /// </summary> + public required TimeSpan TimeToLive { get; init; } + } +} \ No newline at end of file diff --git a/src/DotNext.Threading/Threading/Leases/LeaseIdentity.cs b/src/DotNext.Threading/Threading/Leases/LeaseIdentity.cs new file mode 100644 index 000000000..557b76fc6 --- /dev/null +++ b/src/DotNext.Threading/Threading/Leases/LeaseIdentity.cs @@ -0,0 +1,70 @@ +using System.Diagnostics.CodeAnalysis; +using System.Runtime.InteropServices; + +namespace DotNext.Threading.Leases; + +/// <summary> +/// Represents a lease in the particular point in time. +/// </summary> +[StructLayout(LayoutKind.Auto)] +public readonly struct LeaseIdentity : IEquatable<LeaseIdentity> +{ + /// <summary> + /// Represents initial version of a lease which cannot be renewed or released but can be acquired. + /// </summary> + [CLSCompliant(false)] + public const ulong InitialVersion = 0UL; + + /// <summary> + /// Gets a version of the lease. + /// </summary> + [CLSCompliant(false)] + public required ulong Version { get; init; } + + /// <summary> + /// Gets an ID of the lease. + /// </summary> + /// <remarks> + /// This property can be used only if the provider supports the deletion of leases. + /// In that case, a newly created lease after its deletion must have unique random ID + /// to prevent its renewal from the stale client. + /// </remarks> + public Guid Id { get; init; } + + private bool Equals(in LeaseIdentity other) + => Version == other.Version && Id == other.Id; + + /// <summary> + /// Determines whether this identity is the same as the specified one. + /// </summary> + /// <param name="other">The identity to be compared.</param> + /// <returns><see langword="true"/> if this identity is the same as <paramref name="other"/>; otherwise, <see langword="false"/>.</returns> + public bool Equals(LeaseIdentity other) => Equals(in other); + + /// <inheritdoc/> + public override bool Equals([NotNullWhen(true)] object? other) + => other is LeaseIdentity identity && Equals(in identity); + + /// <inheritdoc/> + public override int GetHashCode() => HashCode.Combine(Version, Id); + + /// <summary> + /// Determines whether the two identities are equal. + /// </summary> + /// <param name="x">The first identity to compare.</param> + /// <param name="y">The second identity to compare.</param> + /// <returns><see langword="true"/> if <paramref name="x"/> is equal to <paramref name="y"/>; otherwise, <see langword="false"/>.</returns> + public static bool operator ==(in LeaseIdentity x, in LeaseIdentity y) + => x.Equals(in y); + + /// <summary> + /// Determines whether the two identities are not equal. + /// </summary> + /// <param name="x">The first identity to compare.</param> + /// <param name="y">The second identity to compare.</param> + /// <returns><see langword="true"/> if <paramref name="x"/> is not equal to <paramref name="y"/>; otherwise, <see langword="false"/>.</returns> + public static bool operator !=(in LeaseIdentity x, in LeaseIdentity y) + => x.Equals(in y) is false; + + internal LeaseIdentity BumpVersion() => this with { Version = Version + 1UL }; +} \ No newline at end of file diff --git a/src/DotNext.Threading/Threading/Leases/LeaseProvider.Utils.cs b/src/DotNext.Threading/Threading/Leases/LeaseProvider.Utils.cs new file mode 100644 index 000000000..182190c39 --- /dev/null +++ b/src/DotNext.Threading/Threading/Leases/LeaseProvider.Utils.cs @@ -0,0 +1,64 @@ +using System.Runtime.InteropServices; + +namespace DotNext.Threading.Leases; + +using Patterns; + +public partial class LeaseProvider<TMetadata> +{ + private interface ITransitionCondition + { + bool Invoke(in State state, TimeProvider provider, TimeSpan timeToLive, out TimeSpan remainingTime); + } + + private sealed class AcqusitionCondition : ITransitionCondition, ISingleton<AcqusitionCondition> + { + public static AcqusitionCondition Instance { get; } = new(); + + private AcqusitionCondition() + { + } + + bool ITransitionCondition.Invoke(in State state, TimeProvider provider, TimeSpan timeToLive, out TimeSpan remainingTime) + => state.IsExpired(provider, timeToLive, out remainingTime); + } + + [StructLayout(LayoutKind.Auto)] + private readonly struct RenewalCondition(LeaseIdentity identity, bool reacquire) : ITransitionCondition + { + bool ITransitionCondition.Invoke(in State state, TimeProvider provider, TimeSpan timeToLive, out TimeSpan remainingTime) + { + remainingTime = timeToLive; + return state.Identity == identity && (reacquire || !state.IsExpired(provider, timeToLive, out remainingTime)); + } + } + + [StructLayout(LayoutKind.Auto)] + private readonly struct AcquisitionOrRenewalCondition(LeaseIdentity identity) : ITransitionCondition + { + bool ITransitionCondition.Invoke(in State state, TimeProvider provider, TimeSpan timeToLive, out TimeSpan remainingTime) + { + remainingTime = timeToLive; + return state.Identity == identity || state.IsExpired(provider, timeToLive, out remainingTime); + } + } + + [StructLayout(LayoutKind.Auto)] + private readonly struct Updater<TArg>(TArg arg, Func<TArg, TMetadata, CancellationToken, ValueTask<TMetadata>> updater) : ISupplier<TMetadata, CancellationToken, ValueTask<TMetadata>> + { + ValueTask<TMetadata> ISupplier<TMetadata, CancellationToken, ValueTask<TMetadata>>.Invoke(TMetadata metadata, CancellationToken token) + => updater(arg, metadata, token); + } + + private sealed class NoOpUpdater : ISupplier<TMetadata, CancellationToken, ValueTask<TMetadata>>, ISingleton<NoOpUpdater> + { + public static NoOpUpdater Instance { get; } = new(); + + private NoOpUpdater() + { + } + + ValueTask<TMetadata> ISupplier<TMetadata, CancellationToken, ValueTask<TMetadata>>.Invoke(TMetadata metadata, CancellationToken token) + => ValueTask.FromResult(metadata); + } +} \ No newline at end of file diff --git a/src/DotNext.Threading/Threading/Leases/LeaseProvider.cs b/src/DotNext.Threading/Threading/Leases/LeaseProvider.cs new file mode 100644 index 000000000..744e6bb51 --- /dev/null +++ b/src/DotNext.Threading/Threading/Leases/LeaseProvider.cs @@ -0,0 +1,438 @@ +using System.ComponentModel; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.InteropServices; + +namespace DotNext.Threading.Leases; + +using Diagnostics; + +/// <summary> +/// Represents provider side of a lease in a distributed environment. +/// </summary> +/// <remarks> +/// An instance of this type must support concurrent calls. +/// </remarks> +/// <typeparam name="TMetadata">The type of metadata associated with a lease.</typeparam> +/// <seealso cref="LeaseConsumer"/> +public abstract partial class LeaseProvider<TMetadata> : Disposable +{ + private readonly TimeProvider provider; + + [SuppressMessage("Usage", "CA2213", Justification = "Disposed using DestroyLease() method")] + private volatile CancellationTokenSource? lifetimeTokenSource; + + /// <summary> + /// Initializes a new instance of lease provider. + /// </summary> + /// <param name="ttl">The lease expiration timeout.</param> + /// <param name="provider">The time provider.</param> + /// <exception cref="ArgumentOutOfRangeException"><paramref name="ttl"/> is not positive.</exception> + protected LeaseProvider(TimeSpan ttl, TimeProvider? provider = null) + { + ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(ttl, TimeSpan.Zero); + + this.provider = provider ?? TimeProvider.System; + TimeToLive = ttl; + lifetimeTokenSource = new(); + LifetimeToken = lifetimeTokenSource.Token; + } + + /// <summary> + /// A token that represents state of this object. + /// </summary> + /// <remarks> + /// A call to <see cref="Dispose(bool)"/> cancels the token. + /// </remarks> + protected CancellationToken LifetimeToken { get; } // cached to avoid ObjectDisposedException + + /// <summary> + /// Gets a lease time-to-live. + /// </summary> + public TimeSpan TimeToLive { get; } + + private async ValueTask<AcquisitionResult?> TryChangeStateAsync<TCondition, TUpdater>(TCondition condition, TUpdater updater, CancellationToken token = default) + where TCondition : notnull, ITransitionCondition + where TUpdater : notnull, ISupplier<TMetadata, CancellationToken, ValueTask<TMetadata>> + { + var cts = token.LinkTo(LifetimeToken); + try + { + var state = await GetStateAsync(token).ConfigureAwait(false); + + if (!condition.Invoke(in state, provider, TimeToLive, out var remainingTime)) + return null; + + state = state with + { + CreatedAt = provider.GetUtcNow(), + Identity = state.Identity.BumpVersion(), + Metadata = await updater.Invoke(state.Metadata, token).ConfigureAwait(false), + }; + + var ts = new Timestamp(); + if (!await TryUpdateStateAsync(state, token).ConfigureAwait(false)) + return null; + + remainingTime = TimeToLive - ts.Elapsed; + return remainingTime > TimeSpan.Zero + ? new(in state, remainingTime) + : null; + } + catch (OperationCanceledException e) when (e.CausedBy(cts, LifetimeToken)) + { + throw new ObjectDisposedException(GetType().Name); + } + finally + { + cts?.Dispose(); + } + } + + private async ValueTask<(LeaseIdentity Identity, TimeSpan RemainingTime, bool IsAcquired)> TryAcquireAsync<TUpdater>(TUpdater updater, CancellationToken token = default) + where TUpdater : notnull, ISupplier<TMetadata, CancellationToken, ValueTask<TMetadata>> + { + var cts = token.LinkTo(LifetimeToken); + try + { + var state = await GetStateAsync(token).ConfigureAwait(false); + + if (!state.IsExpired(provider, TimeToLive, out var remainingTime)) + return new(state.Identity, remainingTime, false); + + state = state with + { + CreatedAt = provider.GetUtcNow(), + Identity = state.Identity.BumpVersion(), + Metadata = await updater.Invoke(state.Metadata, token).ConfigureAwait(false), + }; + + var ts = new Timestamp(); + if (!await TryUpdateStateAsync(state, token).ConfigureAwait(false)) + return new(state.Identity, TimeToLive, false); + + remainingTime = TimeToLive - ts.Elapsed; + return remainingTime > TimeSpan.Zero + ? new(state.Identity, remainingTime, true) + : new(state.Identity, TimeSpan.Zero, false); + } + catch (OperationCanceledException e) when (e.CausedBy(cts, LifetimeToken)) + { + throw new ObjectDisposedException(GetType().Name); + } + finally + { + cts?.Dispose(); + } + } + + /// <summary> + /// Tries to acquire the lease. + /// </summary> + /// <typeparam name="TArg">The type of the argument to be passed to the metadata updater.</typeparam> + /// <param name="arg">The argument to be passed to the metadata updater.</param> + /// <param name="updater">An idempotent operation to update the metadata on successful acquisition of the lease.</param> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns>The acquistion result; or <see langword="null"/> if the lease is already taken.</returns> + /// <exception cref="ObjectDisposedException">The provider has been disposed.</exception> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + public ValueTask<AcquisitionResult?> TryAcquireAsync<TArg>(TArg arg, Func<TArg, TMetadata, CancellationToken, ValueTask<TMetadata>> updater, CancellationToken token = default) + => updater is not null ? TryChangeStateAsync(AcqusitionCondition.Instance, new Updater<TArg>(arg, updater), token) : ValueTask.FromException<AcquisitionResult?>(new ArgumentNullException(nameof(updater))); + + /// <summary> + /// Tries to acquire the lease. + /// </summary> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns>The acquistion result; or <see langword="null"/> if the lease is already taken.</returns> + /// <exception cref="ObjectDisposedException">The provider has been disposed.</exception> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + public ValueTask<AcquisitionResult?> TryAcquireAsync(CancellationToken token = default) + => TryChangeStateAsync(AcqusitionCondition.Instance, NoOpUpdater.Instance, token); + + private async ValueTask<AcquisitionResult> AcquireAsync<TUpdater>(TUpdater updater, CancellationToken token = default) + where TUpdater : notnull, ISupplier<TMetadata, CancellationToken, ValueTask<TMetadata>> + { + var cts = token.LinkTo(LifetimeToken); + try + { + while (true) + { + var state = await GetStateAsync(token).ConfigureAwait(false); + + if (state.IsExpired(provider, TimeToLive, out var remainingTime)) + { + state = state with + { + CreatedAt = provider.GetUtcNow(), + Identity = state.Identity.BumpVersion(), + Metadata = await updater.Invoke(state.Metadata, token).ConfigureAwait(false), + }; + + var ts = new Timestamp(); + if (!await TryUpdateStateAsync(state, token).ConfigureAwait(false)) + continue; + + remainingTime = TimeToLive - ts.Elapsed; + + if (remainingTime <= TimeSpan.Zero) + continue; + + return new(in state, remainingTime); + } + + await Task.Delay(remainingTime, provider, token).ConfigureAwait(false); + } + } + catch (OperationCanceledException e) when (e.CausedBy(cts, LifetimeToken)) + { + throw new ObjectDisposedException(GetType().Name); + } + finally + { + cts?.Dispose(); + } + } + + /// <summary> + /// Acquires the lease. + /// </summary> + /// <typeparam name="TArg">The type of the argument to be passed to the metadata updater.</typeparam> + /// <param name="arg">The argument to be passed to the metadata updater.</param> + /// <param name="updater">An idempotent operation to update the metadata on successful acquisition of the lease.</param> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns>The status of the operation.</returns> + /// <exception cref="ObjectDisposedException">The provider has been disposed.</exception> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + public ValueTask<AcquisitionResult> AcquireAsync<TArg>(TArg arg, Func<TArg, TMetadata, CancellationToken, ValueTask<TMetadata>> updater, CancellationToken token = default) + => updater is not null ? AcquireAsync(new Updater<TArg>(arg, updater), token) : ValueTask.FromException<AcquisitionResult>(new ArgumentNullException(nameof(updater))); + + /// <summary> + /// Acquires the lease. + /// </summary> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns>The status of the operation.</returns> + /// <exception cref="ObjectDisposedException">The provider has been disposed.</exception> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + public ValueTask<AcquisitionResult> AcquireAsync(CancellationToken token = default) + => AcquireAsync(NoOpUpdater.Instance, token); + + /// <summary> + /// Tries to renew the lease. + /// </summary> + /// <typeparam name="TArg">The type of the argument to be passed to the metadata updater.</typeparam> + /// <param name="identity">The identity of the lease obtained from <see cref="TryAcquireAsync(CancellationToken)"/> or <see cref="AcquireAsync(CancellationToken)"/>.</param> + /// <param name="reacquire"><see langword="true"/> to acquire the lease on renewal if it is expired.</param> + /// <param name="arg">The argument to be passed to the metadata updater.</param> + /// <param name="updater">An idempotent operation to update the metadata on successful acquisition of the lease.</param> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns>The status of the operation; or <see langword="null"/> if the lease is taken by another process or expired.</returns> + /// <exception cref="ObjectDisposedException">The provider has been disposed.</exception> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + public ValueTask<AcquisitionResult?> TryRenewAsync<TArg>(LeaseIdentity identity, bool reacquire, TArg arg, Func<TArg, TMetadata, CancellationToken, ValueTask<TMetadata>> updater, CancellationToken token = default) + => updater is not null ? TryChangeStateAsync(new RenewalCondition(identity, reacquire), new Updater<TArg>(arg, updater), token) : ValueTask.FromException<AcquisitionResult?>(new ArgumentNullException(nameof(updater))); + + /// <summary> + /// Tries to renew the lease. + /// </summary> + /// <param name="identity">The identity of the lease obtained from <see cref="TryAcquireAsync(CancellationToken)"/> or <see cref="AcquireAsync(CancellationToken)"/>.</param> + /// <param name="reacquire"><see langword="true"/> to acquire the lease on renewal if it is expired.</param> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns>The status of the operation; or <see langword="null"/> if the lease is taken by another process or expired.</returns> + /// <exception cref="ObjectDisposedException">The provider has been disposed.</exception> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + public ValueTask<AcquisitionResult?> TryRenewAsync(LeaseIdentity identity, bool reacquire, CancellationToken token = default) + => TryChangeStateAsync(new RenewalCondition(identity, reacquire), NoOpUpdater.Instance, token); + + private async ValueTask<LeaseIdentity?> ReleaseAsync<TUpdater>(LeaseIdentity identity, TUpdater updater, CancellationToken token = default) + where TUpdater : notnull, ISupplier<TMetadata, CancellationToken, ValueTask<TMetadata>> + { + if (identity.Version is LeaseIdentity.InitialVersion) + return null; + + var cts = token.LinkTo(LifetimeToken); + try + { + var state = await GetStateAsync(token).ConfigureAwait(false); + + if (state.IsExpired(provider, TimeToLive, out _) || state.Identity != identity) + return null; + + state = state with + { + CreatedAt = default, + Identity = identity.BumpVersion(), + Metadata = await updater.Invoke(state.Metadata, token).ConfigureAwait(false), + }; + + return await TryUpdateStateAsync(state, token).ConfigureAwait(false) ? state.Identity : null; + } + catch (OperationCanceledException e) when (e.CausedBy(cts, LifetimeToken)) + { + throw new ObjectDisposedException(GetType().Name); + } + finally + { + cts?.Dispose(); + } + } + + /// <summary> + /// Releases the lease. + /// </summary> + /// <typeparam name="TArg">The type of the argument to be passed to the metadata updater.</typeparam> + /// <param name="identity">The identity of the lease obtained from <see cref="TryAcquireAsync(CancellationToken)"/> or <see cref="AcquireAsync(CancellationToken)"/>.</param> + /// <param name="arg">The argument to be passed to the metadata updater.</param> + /// <param name="updater">An idempotent operation to update the metadata on successful acquisition of the lease.</param> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns>Updated lease identity; or <see langword="null"/> if expired or taken by another process.</returns> + /// <exception cref="ObjectDisposedException">The provider has been disposed.</exception> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + public ValueTask<LeaseIdentity?> ReleaseAsync<TArg>(LeaseIdentity identity, TArg arg, Func<TArg, TMetadata, CancellationToken, ValueTask<TMetadata>> updater, CancellationToken token = default) + => updater is not null ? ReleaseAsync(identity, new Updater<TArg>(arg, updater), token) : ValueTask.FromException<LeaseIdentity?>(new ArgumentNullException(nameof(updater))); + + /// <summary> + /// Releases the lease. + /// </summary> + /// <param name="identity">The identity of the lease obtained from <see cref="TryAcquireAsync(CancellationToken)"/> or <see cref="AcquireAsync(CancellationToken)"/>.</param> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns>Updated lease identity; or <see langword="null"/> if expired or taken by another process.</returns> + /// <exception cref="ObjectDisposedException">The provider has been disposed.</exception> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + public ValueTask<LeaseIdentity?> ReleaseAsync(LeaseIdentity identity, CancellationToken token = default) + => ReleaseAsync(identity, NoOpUpdater.Instance, token); + + /// <summary> + /// Tries to acquire or renew the lease. + /// </summary> + /// <typeparam name="TArg">The type of the argument to be passed to the metadata updater.</typeparam> + /// <param name="identity">The identity of the lease obtained from <see cref="TryAcquireAsync(CancellationToken)"/> or <see cref="AcquireAsync(CancellationToken)"/>.</param> + /// <param name="arg">The argument to be passed to the metadata updater.</param> + /// <param name="updater">An idempotent operation to update the metadata on successful acquisition of the lease.</param> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns>The status of the operation; or <see langword="null"/> if the lease is taken by another process.</returns> + /// <exception cref="ObjectDisposedException">The provider has been disposed.</exception> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + public ValueTask<AcquisitionResult?> TryAcquireOrRenewAsync<TArg>(LeaseIdentity identity, TArg arg, Func<TArg, TMetadata, CancellationToken, ValueTask<TMetadata>> updater, CancellationToken token = default) + => updater is not null ? TryChangeStateAsync(new AcquisitionOrRenewalCondition(identity), new Updater<TArg>(arg, updater), token) : ValueTask.FromException<AcquisitionResult?>(new ArgumentNullException(nameof(updater))); + + /// <summary> + /// Tries to acquire or renew the lease. + /// </summary> + /// <param name="identity">The identity of the lease obtained from <see cref="TryAcquireAsync(CancellationToken)"/> or <see cref="AcquireAsync(CancellationToken)"/>.</param> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns>The status of the operation; or <see langword="null"/> if the lease is taken by another process.</returns> + /// <exception cref="ObjectDisposedException">The provider has been disposed.</exception> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + public ValueTask<AcquisitionResult?> TryAcquireOrRenewAsync(LeaseIdentity identity, CancellationToken token = default) + => TryChangeStateAsync(new AcquisitionOrRenewalCondition(identity), NoOpUpdater.Instance, token); + + /// <summary> + /// Loads the state of a lease from the underlying storage. + /// </summary> + /// <remarks> + /// The method can be called concurrently. + /// </remarks> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns>The state restored from the underlying storage.</returns> + /// <exception cref="OperationCanceledException">The operation has been canceled.</exception> + protected abstract ValueTask<State> GetStateAsync(CancellationToken token); + + /// <summary> + /// Attempts to update the state in the underlying storage using compare-and-set semantics. + /// </summary> + /// <remarks> + /// The operation must use compare-and-set semantics in the following way: save <paramref name="state"/> + /// only if <see cref="LeaseIdentity.Version"/> of the currently stored state is equal to the version of <paramref name="state"/> - 1. + /// Note that the method can be called concurrently. + /// </remarks> + /// <param name="state">The state to be stored in the underlying storage.</param> + /// <param name="token">The token that can be used to cancel the operation.</param> + /// <returns><see langword="true"/> if update is performed successfully; <see langword="false"/> is compare-and-set failed.</returns> + protected abstract ValueTask<bool> TryUpdateStateAsync(State state, CancellationToken token); + + /// <inheritdoc/> + protected override void Dispose(bool disposing) + { + if (disposing) + { + if (Interlocked.Exchange(ref lifetimeTokenSource, null) is { } cts) + { + try + { + cts.Cancel(throwOnFirstException: false); + } + finally + { + cts.Dispose(); + } + } + } + + base.Dispose(disposing); + } + + /// <summary> + /// Represents a state of the lease. + /// </summary> + [StructLayout(LayoutKind.Auto)] + public readonly struct State + { + /// <summary> + /// The metadata associated with the lease. + /// </summary> + public TMetadata Metadata { get; init; } + + /// <summary> + /// A version of the lease. + /// </summary> + /// <remarks> + /// Must be set of <see cref="LeaseIdentity.InitialVersion"/> if there is no state in the underlying persistent storage. + /// </remarks> + public required LeaseIdentity Identity { get; init; } + + /// <summary> + /// A timestamp of when this state was created. + /// </summary> + public required DateTimeOffset CreatedAt { get; init; } + + internal bool IsExpired(TimeProvider provider, TimeSpan ttl, out TimeSpan remaining) + => (remaining = provider.GetUtcNow() - CreatedAt) < ttl; + } + + /// <summary> + /// Represents a result of lease acquisition operation. + /// </summary> + [StructLayout(LayoutKind.Auto)] + public readonly struct AcquisitionResult + { + /// <summary> + /// A version of the lease. + /// </summary> + public readonly State State; + + /// <summary> + /// The remaining lease time. + /// </summary> + public readonly Timeout Lifetime; + + internal AcquisitionResult(in State state, TimeSpan ttl) + { + Debug.Assert(ttl > TimeSpan.Zero); + + State = state; + Lifetime = new(ttl); + } + + /// <summary> + /// Deconstructs the result. + /// </summary> + /// <param name="state">Same as <see cref="AcquisitionResult.State"/>.</param> + /// <param name="lifetime">Same as <paramref name="lifetime"/>.</param> + [EditorBrowsable(EditorBrowsableState.Never)] + public void Deconstruct(out State state, out Timeout lifetime) + { + state = State; + lifetime = Lifetime; + } + } +} \ No newline at end of file