Skip to content

Commit

Permalink
Fix/error on shutdown (#7873)
Browse files Browse the repository at this point in the history
Co-authored-by: Lukasz Rozmej <[email protected]>
  • Loading branch information
asdacap and LukaszRozmej authored Dec 7, 2024
1 parent 8d7593e commit 6851a0e
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public void Setup()
public async Task TearDown()
{
await _pool.DisposeAsync();
await _dispatcher.DisposeAsync();
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,38 @@ public async Task Simple_test_sync()
}
}

[Retry(tryCount: 5)]
[Test]
public async Task When_ConcurrentHandleResponseIsRunning_Then_BlockDispose()
{
TestSyncFeed syncFeed = new(isMultiFeed: true);
syncFeed.LockResponse();
TestDownloader downloader = new TestDownloader();
SyncDispatcher<TestBatch> dispatcher = new(
new TestSyncConfig(),
syncFeed,
downloader,
new TestSyncPeerPool(),
new StaticPeerAllocationStrategyFactory<TestBatch>(FirstFree.Instance),
LimboLogs.Instance);
Task executorTask = dispatcher.Start(CancellationToken.None);

// Load some requests
syncFeed.Activate();
await Task.Delay(100);
syncFeed.Finish();

// Dispose
Task disposeTask = Task.Run(() => dispatcher.DisposeAsync());
await Task.Delay(100);

disposeTask.IsCompletedSuccessfully.Should().BeFalse();

syncFeed.UnlockResponse();
await disposeTask;
await executorTask;
}

[Retry(tryCount: 5)]
[TestCase(false, 1, 1, 8)]
[TestCase(true, 1, 1, 24)]
Expand All @@ -294,6 +326,7 @@ public async Task Test_release_before_processing_complete(bool isMultiSync, int

Task _ = dispatcher.Start(CancellationToken.None);
syncFeed.Activate();

await Task.Delay(100);

Assert.That(() => syncFeed.HighestRequested, Is.EqualTo(expectedHighestRequest).After(4000, 100));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Blockchain.Synchronization;
Expand All @@ -12,10 +13,11 @@

namespace Nethermind.Synchronization.ParallelSync
{
public class SyncDispatcher<T>
public class SyncDispatcher<T> : IAsyncDisposable
{
private readonly Lock _feedStateManipulation = new();
private SyncFeedState _currentFeedState = SyncFeedState.Dormant;
private static readonly TimeSpan ActiveTaskDisposeTimeout = TimeSpan.FromSeconds(10);

private IPeerAllocationStrategyFactory<T> PeerAllocationStrategyFactory { get; }

Expand All @@ -24,10 +26,13 @@ public class SyncDispatcher<T>
private ISyncDownloader<T> Downloader { get; }
private ISyncPeerPool SyncPeerPool { get; }

private readonly CountdownEvent _activeTasks = new CountdownEvent(1);
private readonly SemaphoreSlim _concurrentProcessingSemaphore;
private readonly TimeSpan _emptyRequestDelay;
private readonly int _allocateTimeoutMs;

private bool _disposed = false;

public SyncDispatcher(
ISyncConfig syncConfig,
ISyncFeed<T>? syncFeed,
Expand Down Expand Up @@ -109,7 +114,29 @@ public async Task Start(CancellationToken cancellationToken)
if (Logger.IsTrace) Logger.Trace($"SyncDispatcher request: {request}, AllocatedPeer {allocation.Current}");

// Use Task.Run to make sure it queues it instead of running part of it synchronously.
Task task = Task.Run(() => DoDispatch(cancellationToken, allocatedPeer, request, allocation), cancellationToken);
_activeTasks.AddCount();

Task task;
try
{
task = Task.Run(
() =>
{
try
{
return DoDispatch(cancellationToken, allocatedPeer, request, allocation);
}
finally
{
_activeTasks.Signal();
}
});
}
catch
{
_activeTasks.Signal();
throw;
}

if (!Feed.IsMultiFeed)
{
Expand Down Expand Up @@ -272,5 +299,22 @@ private void UpdateState(SyncFeedState state)
}
}
}

public ValueTask DisposeAsync()
{
if (_disposed)
{
return ValueTask.CompletedTask;
}
_disposed = true;

_activeTasks.Signal();
if (!_activeTasks.Wait(ActiveTaskDisposeTimeout))
{
if (Logger.IsWarn) Logger.Warn($"Timeout on waiting for active tasks for feed {Feed.GetType().Name} {_activeTasks.CurrentCount}");
}
_concurrentProcessingSemaphore.Dispose();
return ValueTask.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ namespace Nethermind.Synchronization;
/// <summary>
/// Container to make it simpler to get a bunch of components within a same named scoped.
/// </summary>
public class SyncFeedComponent<TBatch>(Lazy<ISyncFeed<TBatch>> feed, Lazy<SyncDispatcher<TBatch>> dispatcher, Lazy<ISyncDownloader<TBatch>> blockDownloader, ILifetimeScope lifetimeScope) : IDisposable, IAsyncDisposable
public class SyncFeedComponent<TBatch>(ISyncFeed<TBatch> feed, SyncDispatcher<TBatch> dispatcher, Lazy<ISyncDownloader<TBatch>> blockDownloader, ILifetimeScope lifetimeScope) : IDisposable, IAsyncDisposable
{
public ISyncFeed<TBatch> Feed => feed.Value;
public SyncDispatcher<TBatch> Dispatcher => dispatcher.Value;
public ISyncFeed<TBatch> Feed => feed;
public SyncDispatcher<TBatch> Dispatcher => dispatcher;
public BlockDownloader BlockDownloader => (BlockDownloader)blockDownloader.Value;

public void Dispose()
Expand Down
10 changes: 8 additions & 2 deletions src/Nethermind/Nethermind.Synchronization/Synchronizer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,9 @@ public async ValueTask DisposeAsync()
{
_syncCancellation?.Cancel();

await Task.WhenAny(
Task.Delay(FeedsTerminationTimeout),
Task timeout = Task.Delay(FeedsTerminationTimeout);
Task completedFirst = await Task.WhenAny(
timeout,
Task.WhenAll(
fullSyncComponent.Feed.FeedTask,
fastSyncComponent.Feed.FeedTask,
Expand All @@ -251,6 +252,11 @@ await Task.WhenAny(
oldBodiesComponent.Feed.FeedTask,
oldReceiptsComponent.Feed.FeedTask));

if (completedFirst == timeout)
{
if (_logger.IsWarn) _logger.Warn("Sync feeds dispose timeout");
}

CancellationTokenExtensions.CancelDisposeAndClear(ref _syncCancellation);
}

Expand Down

0 comments on commit 6851a0e

Please sign in to comment.