Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split storage ranges to parallelize execution #7733

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ public class AccountWithStorageStartingHash
{
public PathWithAccount PathAndAccount { get; set; }
public ValueHash256 StorageStartingHash { get; set; }
public ValueHash256 StorageHashLimit { get; set; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,71 @@
memDb[ProgressTracker.ACC_PROGRESS_KEY].Should().BeEquivalentTo(Keccak.MaxValue.BytesToArray());
}

[TestCase("0x0000000000000000000000000000000000000000000000000000000000000000", "0x2000000000000000000000000000000000000000000000000000000000000000", null, "0x8fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")]
[TestCase("0x2000000000000000000000000000000000000000000000000000000000000000", "0x4000000000000000000000000000000000000000000000000000000000000000", "0x8fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", "0x67ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")]
[TestCase("0x8fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", "0xbfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", null, "0xdfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")]
public void Should_partition_storage_request_if_last_processed_less_than_threshold(string start, string lastProcessed, string? limit, string expectedSplit)
{
BlockTree blockTree = Build.A.BlockTree().WithBlocks(Build.A.Block
.WithStateRoot(Keccak.EmptyTreeHash)
.TestObject).TestObject;
TestMemDb memDb = new();
using ProgressTracker progressTracker = new(blockTree, memDb, LimboLogs.Instance, 1);

Check failure on line 172 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (release, Nethermind)

Argument 3: cannot convert from 'Nethermind.Logging.LimboLogs' to 'Nethermind.Blockchain.Synchronization.ISyncConfig'

Check failure on line 172 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (release, Nethermind)

Argument 4: cannot convert from 'int' to 'Nethermind.Logging.ILogManager'

Check failure on line 172 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (release, Nethermind)

Argument 3: cannot convert from 'Nethermind.Logging.LimboLogs' to 'Nethermind.Blockchain.Synchronization.ISyncConfig'

Check failure on line 172 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (release, Nethermind)

Argument 4: cannot convert from 'int' to 'Nethermind.Logging.ILogManager'

Check failure on line 172 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (debug, Nethermind)

Argument 3: cannot convert from 'Nethermind.Logging.LimboLogs' to 'Nethermind.Blockchain.Synchronization.ISyncConfig'

Check failure on line 172 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (debug, Nethermind)

Argument 4: cannot convert from 'int' to 'Nethermind.Logging.ILogManager'

Check failure on line 172 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (debug, Nethermind)

Argument 3: cannot convert from 'Nethermind.Logging.LimboLogs' to 'Nethermind.Blockchain.Synchronization.ISyncConfig'

Check failure on line 172 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (debug, Nethermind)

Argument 4: cannot convert from 'int' to 'Nethermind.Logging.ILogManager'

var startHash = new ValueHash256(start);
var lastProcessedHash = new ValueHash256(lastProcessed);
ValueHash256? limitHash = limit is null ? (ValueHash256?)null : new ValueHash256(limit);

progressTracker.EnqueueStorageRange(TestItem.Tree.AccountsWithPaths[0], startHash, lastProcessedHash, limitHash);

//ignore account range
bool isFinished = progressTracker.IsFinished(out _);

//expecting 2 batches
isFinished = progressTracker.IsFinished(out SnapSyncBatch? batch1);
isFinished.Should().BeFalse();
batch1.Should().NotBeNull();

isFinished = progressTracker.IsFinished(out SnapSyncBatch? batch2);
isFinished.Should().BeFalse();
batch2.Should().NotBeNull();

batch2?.StorageRangeRequest?.StartingHash.Should().Be(batch1?.StorageRangeRequest?.LimitHash);
batch1?.StorageRangeRequest?.StartingHash.Should().Be(lastProcessedHash);
batch2?.StorageRangeRequest?.LimitHash.Should().Be(limitHash ?? Keccak.MaxValue);

batch1?.StorageRangeRequest?.LimitHash.Should().Be(new ValueHash256(expectedSplit));
}


[TestCase("0x0000000000000000000000000000000000000000000000000000000000000000", "0xb100000000000000000000000000000000000000000000000000000000000000", null)]
[TestCase("0x8fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", "0xdfffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff", null)]
public void Should_not_partition_storage_request_if_last_processed_more_than_threshold(string start, string lastProcessed, string? limit)
{
BlockTree blockTree = Build.A.BlockTree().WithBlocks(Build.A.Block
.WithStateRoot(Keccak.EmptyTreeHash)
.TestObject).TestObject;
TestMemDb memDb = new();
using ProgressTracker progressTracker = new(blockTree, memDb, LimboLogs.Instance, 1);

Check failure on line 208 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (release, Nethermind)

Argument 3: cannot convert from 'Nethermind.Logging.LimboLogs' to 'Nethermind.Blockchain.Synchronization.ISyncConfig'

Check failure on line 208 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (release, Nethermind)

Argument 4: cannot convert from 'int' to 'Nethermind.Logging.ILogManager'

Check failure on line 208 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (release, Nethermind)

Argument 3: cannot convert from 'Nethermind.Logging.LimboLogs' to 'Nethermind.Blockchain.Synchronization.ISyncConfig'

Check failure on line 208 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (release, Nethermind)

Argument 4: cannot convert from 'int' to 'Nethermind.Logging.ILogManager'

Check failure on line 208 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (debug, Nethermind)

Argument 3: cannot convert from 'Nethermind.Logging.LimboLogs' to 'Nethermind.Blockchain.Synchronization.ISyncConfig'

Check failure on line 208 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (debug, Nethermind)

Argument 4: cannot convert from 'int' to 'Nethermind.Logging.ILogManager'

Check failure on line 208 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (debug, Nethermind)

Argument 3: cannot convert from 'Nethermind.Logging.LimboLogs' to 'Nethermind.Blockchain.Synchronization.ISyncConfig'

Check failure on line 208 in src/Nethermind/Nethermind.Synchronization.Test/SnapSync/ProgressTrackerTests.cs

View workflow job for this annotation

GitHub Actions / Build (debug, Nethermind)

Argument 4: cannot convert from 'int' to 'Nethermind.Logging.ILogManager'

var startHash = new ValueHash256(start);
var lastProcessedHash = new ValueHash256(lastProcessed);
ValueHash256? limitHash = limit is null ? (ValueHash256?)null : new ValueHash256(limit);

progressTracker.EnqueueStorageRange(TestItem.Tree.AccountsWithPaths[0], startHash, lastProcessedHash, limitHash);

//ignore account range
bool isFinished = progressTracker.IsFinished(out _);

//expecting 1 batch
isFinished = progressTracker.IsFinished(out SnapSyncBatch? batch1);
isFinished.Should().BeFalse();
batch1.Should().NotBeNull();

batch1?.StorageRangeRequest?.StartingHash.Should().Be(lastProcessedHash);
batch1?.StorageRangeRequest?.LimitHash.Should().Be(limitHash ?? Keccak.MaxValue);
}

private ProgressTracker CreateProgressTracker(int accountRangePartition = 1)
{
BlockTree blockTree = Build.A.BlockTree().WithBlocks(Build.A.Block.WithStateRoot(Keccak.EmptyTreeHash).TestObject).TestObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Db;
using Nethermind.Int256;
using Nethermind.Logging;
using Nethermind.State.Snap;

Expand All @@ -28,6 +29,7 @@ public class ProgressTracker : IDisposable
public const int HIGH_STORAGE_QUEUE_SIZE = STORAGE_BATCH_SIZE * 100;
private const int CODES_BATCH_SIZE = 1_000;
public const int HIGH_CODES_QUEUE_SIZE = CODES_BATCH_SIZE * 5;
private const uint StorageRangeSplitFactor = 2;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you experimented with more radical splitting factors? 4? 8? 16? 32?

internal static readonly byte[] ACC_PROGRESS_KEY = Encoding.ASCII.GetBytes("AccountProgressKey");

// This does not need to be a lot as it spawn other requests. In fact 8 is probably too much. It is severely
Expand Down Expand Up @@ -311,9 +313,9 @@ public void EnqueueAccountStorage(PathWithAccount pwa)
StoragesToRetrieve.Enqueue(pwa);
}

public void EnqueueAccountRefresh(PathWithAccount pathWithAccount, in ValueHash256? startingHash)
public void EnqueueAccountRefresh(PathWithAccount pathWithAccount, in ValueHash256? startingHash, in ValueHash256? hashLimit)
{
AccountsToRefresh.Enqueue(new AccountWithStorageStartingHash() { PathAndAccount = pathWithAccount, StorageStartingHash = startingHash.GetValueOrDefault() });
AccountsToRefresh.Enqueue(new AccountWithStorageStartingHash() { PathAndAccount = pathWithAccount, StorageStartingHash = startingHash.GetValueOrDefault(), StorageHashLimit = hashLimit ?? Keccak.MaxValue });
}

public void ReportFullStorageRequestFinished(IEnumerable<PathWithAccount> storages = default)
Expand All @@ -337,6 +339,54 @@ public void EnqueueStorageRange(StorageRange storageRange)
}
}

public void EnqueueStorageRange(PathWithAccount account, ValueHash256? startingHash, ValueHash256 lastProcessedHash, ValueHash256? limitHash)
{
limitHash ??= Keccak.MaxValue;

if (lastProcessedHash > limitHash)
return;

UInt256 limit = new UInt256(limitHash.Value.Bytes, true);
UInt256 lastProcessed = new UInt256(lastProcessedHash.Bytes, true);
UInt256 start = startingHash.HasValue ? new UInt256(startingHash.Value.Bytes, true) : UInt256.Zero;

var fullRange = limit - start;

if (lastProcessed < fullRange / StorageRangeSplitFactor + start)
{
var halfOfLeft = (limit - lastProcessed) / 2 + lastProcessed;
var halfOfLeftHash = new ValueHash256(halfOfLeft);

NextSlotRange.Enqueue(new StorageRange()
{
Accounts = new ArrayPoolList<PathWithAccount>(1) { account },
StartingHash = lastProcessedHash,
LimitHash = halfOfLeftHash
});

NextSlotRange.Enqueue(new StorageRange()
{
Accounts = new ArrayPoolList<PathWithAccount>(1) { account },
StartingHash = halfOfLeftHash,
LimitHash = limitHash
});

if (_logger.IsTrace)
_logger.Trace($"EnqueueStorageRange account {account.Path} start hash: {startingHash} | last processed: {lastProcessedHash} | limit: {limitHash} | split {halfOfLeftHash}");

return;
}

//default - no split
var storageRange = new StorageRange()
{
Accounts = new ArrayPoolList<PathWithAccount>(1) { account },
StartingHash = lastProcessedHash,
LimitHash = limitHash
};
NextSlotRange.Enqueue(storageRange);
}

public void ReportStorageRangeRequestFinished(StorageRange storageRange = null)
{
EnqueueStorageRange(storageRange);
Expand Down
21 changes: 8 additions & 13 deletions src/Nethermind/Nethermind.Synchronization/SnapSync/SnapProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public AddRangeResult AddStorageRange(StorageRange request, SlotsAndProofs respo
}

PathWithAccount account = request.Accounts[i];
result = AddStorageRange(request.BlockNumber.Value, account, account.Account.StorageRoot, request.StartingHash, responses[i], proofs);
result = AddStorageRange(request.BlockNumber.Value, account, account.Account.StorageRoot, request.StartingHash, responses[i], proofs, request.LimitHash);

slotCount += responses[i].Count;
}
Expand All @@ -194,7 +194,7 @@ public AddRangeResult AddStorageRange(StorageRange request, SlotsAndProofs respo
return result;
}

public AddRangeResult AddStorageRange(long blockNumber, PathWithAccount pathWithAccount, in ValueHash256 expectedRootHash, in ValueHash256? startingHash, IReadOnlyList<PathWithStorageSlot> slots, IReadOnlyList<byte[]>? proofs = null)
public AddRangeResult AddStorageRange(long blockNumber, PathWithAccount pathWithAccount, in ValueHash256 expectedRootHash, in ValueHash256? startingHash, IReadOnlyList<PathWithStorageSlot> slots, IReadOnlyList<byte[]>? proofs = null, Hash256? hashLimit = null)
{
ITrieStore store = _trieStorePool.Get();
StorageTree tree = new(store.GetTrieStore(pathWithAccount.Path.ToCommitment()), _logManager);
Expand All @@ -206,26 +206,20 @@ public AddRangeResult AddStorageRange(long blockNumber, PathWithAccount pathWith
{
if (moreChildrenToRight)
{
StorageRange range = new()
{
Accounts = new ArrayPoolList<PathWithAccount>(1) { pathWithAccount },
StartingHash = slots[^1].Path
};

_progressTracker.EnqueueStorageRange(range);
_progressTracker.EnqueueStorageRange(pathWithAccount, startingHash, slots[^1].Path, hashLimit);
}
}
else if (result == AddRangeResult.MissingRootHashInProofs)
{
_logger.Trace($"SNAP - AddStorageRange failed, missing root hash {expectedRootHash} in the proofs, startingHash:{startingHash}");

_progressTracker.EnqueueAccountRefresh(pathWithAccount, startingHash);
_progressTracker.EnqueueAccountRefresh(pathWithAccount, startingHash, hashLimit);
}
else if (result == AddRangeResult.DifferentRootHash)
{
_logger.Trace($"SNAP - AddStorageRange failed, expected storage root hash:{expectedRootHash} but was {tree.RootHash}, startingHash:{startingHash}");

_progressTracker.EnqueueAccountRefresh(pathWithAccount, startingHash);
_progressTracker.EnqueueAccountRefresh(pathWithAccount, startingHash, hashLimit);
}

return result;
Expand Down Expand Up @@ -272,7 +266,8 @@ public void RefreshAccounts(AccountsToRefreshRequest request, IOwnedReadOnlyList
StorageRange range = new()
{
Accounts = new ArrayPoolList<PathWithAccount>(1) { requestedPath.PathAndAccount },
StartingHash = requestedPath.StorageStartingHash
StartingHash = requestedPath.StorageStartingHash,
LimitHash = requestedPath.StorageHashLimit
};

_progressTracker.EnqueueStorageRange(range);
Expand Down Expand Up @@ -305,7 +300,7 @@ public void RefreshAccounts(AccountsToRefreshRequest request, IOwnedReadOnlyList

private void RetryAccountRefresh(AccountWithStorageStartingHash requestedPath)
{
_progressTracker.EnqueueAccountRefresh(requestedPath.PathAndAccount, requestedPath.StorageStartingHash);
_progressTracker.EnqueueAccountRefresh(requestedPath.PathAndAccount, requestedPath.StorageStartingHash, requestedPath.StorageHashLimit);
}

public void AddCodes(IReadOnlyList<ValueHash256> requestedHashes, IOwnedReadOnlyList<byte[]> codes)
Expand Down
Loading