Skip to content

Commit

Permalink
Merge branch 'master' into 7748_additional_rpc_url_table
Browse files Browse the repository at this point in the history
  • Loading branch information
ssonthal authored Dec 5, 2024
2 parents 6934e06 + a0e0594 commit 5ba3705
Show file tree
Hide file tree
Showing 31 changed files with 622 additions and 124 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,6 @@ csharp_style_var_elsewhere = false:suggestion

# Coding convention

dotnet_diagnostic.IDE0005.severity = error
dotnet_diagnostic.IDE0005.severity = suggestion
dotnet_diagnostic.IDE0041.severity = error

3 changes: 2 additions & 1 deletion src/Nethermind/Nethermind.Benchmark.Runner/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public DashboardConfig(params Job[] jobs)
AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Descriptor);
AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Statistics);
AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Params);
AddColumnProvider(BenchmarkDotNet.Columns.DefaultColumnProviders.Metrics);
AddLogger(BenchmarkDotNet.Loggers.ConsoleLogger.Default);
AddExporter(BenchmarkDotNet.Exporters.Json.JsonExporter.FullCompressed);
AddDiagnoser(BenchmarkDotNet.Diagnosers.MemoryDiagnoser.Default);
Expand Down Expand Up @@ -59,7 +60,7 @@ public static void Main(string[] args)
{
foreach (Assembly assembly in additionalJobAssemblies)
{
BenchmarkRunner.Run(assembly, new DashboardConfig(Job.MediumRun.WithRuntime(CoreRuntime.Core80)), args);
BenchmarkRunner.Run(assembly, new DashboardConfig(Job.MediumRun.WithRuntime(CoreRuntime.Core90)), args);
}

foreach (Assembly assembly in simpleJobAssemblies)
Expand Down
59 changes: 59 additions & 0 deletions src/Nethermind/Nethermind.Benchmark/Core/ParallelBenchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using BenchmarkDotNet.Attributes;
using Nethermind.Core.Threading;
using System.Threading;
using System.Threading.Tasks;

namespace Nethermind.Benchmarks.Core;

[HideColumns("Job", "RatioSD")]
public class ParallelBenchmark
{
private int[] _times;

[GlobalSetup]
public void Setup()
{
_times = new int[200];

for (int i = 0; i < _times.Length; i++)
{
_times[i] = i % 100;
}
}

[Benchmark(Baseline = true)]
public void ParallelFor()
{
Parallel.For(
0,
_times.Length,
(i) => Thread.Sleep(_times[i]));
}

[Benchmark]
public void ParallelForEach()
{
Parallel.ForEach(
_times,
(time) => Thread.Sleep(time));
}

[Benchmark]
public void UnbalancedParallel()
{
ParallelUnbalancedWork.For<int[]>(
0,
_times.Length,
ParallelUnbalancedWork.DefaultOptions,
_times,
(i, value) =>
{
Thread.Sleep(value[i]);
return value;
},
(value) => { });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,35 @@ public void logs_should_be_empty_for_existing_block_and_addresses_and_non_existi
.WithLogs(Build.A.LogEntry.WithAddress(TestItem.AddressA)
.WithTopics(TestItem.KeccakB, TestItem.KeccakC).TestObject));

[Test, MaxTime(Timeout.MaxTestTime)]
[TestCase(1, 1)]
[TestCase(5, 3)]
public void logs_should_have_correct_log_indexes(int filtersCount, int logsPerTx)
{
const int txCount = 10;

Assert(
filterBuilder: (builder, _) => builder
.FromBlock(1L)
.ToBlock(10L)
.WithAddresses(TestItem.AddressA, TestItem.AddressB)
.WithTopicExpressions(TestTopicExpressions.Or(
TestTopicExpressions.Specific(TestItem.KeccakB),
TestTopicExpressions.Specific(TestItem.KeccakD)
)),
filterCount: filtersCount,
receiptBuilder: (builder, _) => builder
.WithBlockNumber(6L)
.WithLogs(Enumerable.Range(0, logsPerTx).Select(_ =>
Build.A.LogEntry
.WithAddress(TestItem.AddressA)
.WithTopics(TestItem.KeccakB, TestItem.KeccakC).TestObject
).ToArray()),
receiptCount: txCount,
logsAssertion: logs => logs.Select(l => l.LogIndex).Should().BeEquivalentTo(Enumerable.Range(0, txCount * logsPerTx))
);
}


private void LogsShouldNotBeEmpty(Action<FilterBuilder> filterBuilder,
Action<ReceiptBuilder> receiptBuilder)
Expand All @@ -255,6 +284,15 @@ private void LogsShouldBeEmpty(IEnumerable<Action<FilterBuilder>> filterBuilders
IEnumerable<Action<ReceiptBuilder>> receiptBuilders)
=> Assert(filterBuilders, receiptBuilders, logs => logs.Should().BeEmpty());

private void Assert(Action<FilterBuilder, int> filterBuilder, int filterCount,
Action<ReceiptBuilder, int> receiptBuilder, int receiptCount,
Action<IEnumerable<FilterLog>> logsAssertion)
=> Assert(
Enumerable.Range(0, filterCount).Select<int, Action<FilterBuilder>>(i => builder => filterBuilder(builder, i)),
Enumerable.Range(0, receiptCount).Select<int, Action<ReceiptBuilder>>(i => builder => receiptBuilder(builder, i)),
logsAssertion
);

private void Assert(IEnumerable<Action<FilterBuilder>> filterBuilders,
IEnumerable<Action<ReceiptBuilder>> receiptBuilders,
Action<IEnumerable<FilterLog>> logsAssertion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,21 @@ private void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spe
{
if (spec.WithdrawalsEnabled && block.Withdrawals is not null)
{
int progress = 0;
Parallel.For(0, block.Withdrawals.Length, parallelOptions,
_ =>
ParallelUnbalancedWork.For(0, block.Withdrawals.Length, parallelOptions, (preWarmer: this, block, stateRoot),
static (i, state) =>
{
IReadOnlyTxProcessorSource env = _envPool.Get();
int i = 0;
IReadOnlyTxProcessorSource env = state.preWarmer._envPool.Get();
try
{
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
// Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
i = Interlocked.Increment(ref progress) - 1;
scope.WorldState.WarmUp(block.Withdrawals[i].Address);
using IReadOnlyTxProcessingScope scope = env.Build(state.stateRoot);
scope.WorldState.WarmUp(state.block.Withdrawals[i].Address);
}
finally
{
_envPool.Return(env);
state.preWarmer._envPool.Return(env);
}

return state;
});
}
}
Expand All @@ -135,24 +132,19 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp

try
{
int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions, _ =>
ParallelUnbalancedWork.For<BlockState>(0, block.Transactions.Length, parallelOptions, new(this, block, stateRoot, spec), static (i, state) =>
{
using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority();
IReadOnlyTxProcessorSource env = _envPool.Get();
SystemTransaction systemTransaction = _systemTransactionPool.Get();
IReadOnlyTxProcessorSource env = state.PreWarmer._envPool.Get();
SystemTransaction systemTransaction = state.PreWarmer._systemTransactionPool.Get();
Transaction? tx = null;
try
{
// Process transactions in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
int i = Interlocked.Increment(ref progress) - 1;
// If the transaction has already been processed or being processed, exit early
if (block.TransactionProcessed > i) return;
if (state.Block.TransactionProcessed > i) return state;

tx = block.Transactions[i];
tx = state.Block.Transactions[i];
tx.CopyTo(systemTransaction);
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
using IReadOnlyTxProcessingScope scope = env.Build(state.StateRoot);

Address senderAddress = tx.SenderAddress!;
if (!scope.WorldState.AccountExists(senderAddress))
Expand All @@ -163,7 +155,7 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp
UInt256 nonceDelta = UInt256.Zero;
for (int prev = 0; prev < i; prev++)
{
if (senderAddress == block.Transactions[prev].SenderAddress)
if (senderAddress == state.Block.Transactions[prev].SenderAddress)
{
nonceDelta++;
}
Expand All @@ -174,26 +166,28 @@ private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec sp
scope.WorldState.IncrementNonce(senderAddress, nonceDelta);
}

if (spec.UseTxAccessLists)
if (state.Spec.UseTxAccessLists)
{
scope.WorldState.WarmUp(tx.AccessList); // eip-2930
}
TransactionResult result = scope.TransactionProcessor.Warmup(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance);
if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
TransactionResult result = scope.TransactionProcessor.Warmup(systemTransaction, new BlockExecutionContext(state.Block.Header.Clone()), NullTxTracer.Instance);
if (state.PreWarmer._logger.IsTrace) state.PreWarmer._logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
}
catch (Exception ex) when (ex is EvmException or OverflowException)
{
// Ignore, regular tx processing exceptions
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex);
if (state.PreWarmer._logger.IsDebug) state.PreWarmer._logger.Error($"Error pre-warming cache {tx?.Hash}", ex);
}
finally
{
_systemTransactionPool.Return(systemTransaction);
_envPool.Return(env);
state.PreWarmer._systemTransactionPool.Return(systemTransaction);
state.PreWarmer._envPool.Return(env);
}

return state;
});
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -273,21 +267,16 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block)
}
}

int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions,
_ =>
ParallelUnbalancedWork.For(0, block.Transactions.Length, parallelOptions, (preWarmer: PreWarmer, block, StateRoot),
static (i, state) =>
{
int i = 0;
// Process addresses in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
i = Interlocked.Increment(ref progress) - 1;
Transaction tx = block.Transactions[i];
Transaction tx = state.block.Transactions[i];
Address? sender = tx.SenderAddress;

var env = PreWarmer._envPool.Get();
var env = state.preWarmer._envPool.Get();
try
{
using IReadOnlyTxProcessingScope scope = env.Build(StateRoot);
using IReadOnlyTxProcessingScope scope = env.Build(state.StateRoot);
if (sender is not null)
{
scope.WorldState.WarmUp(sender);
Expand All @@ -300,8 +289,10 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block)
}
finally
{
PreWarmer._envPool.Return(env);
state.preWarmer._envPool.Return(env);
}

return state;
});
}
catch (OperationCanceledException)
Expand All @@ -316,4 +307,13 @@ private class ReadOnlyTxProcessingEnvPooledObjectPolicy(ReadOnlyTxProcessingEnvF
public IReadOnlyTxProcessorSource Create() => envFactory.Create();
public bool Return(IReadOnlyTxProcessorSource obj) => true;
}

private struct BlockState(BlockCachePreWarmer preWarmer, Block block, Hash256 stateRoot, IReleaseSpec spec)
{
public BlockCachePreWarmer PreWarmer = preWarmer;
public Block Block = block;
public Hash256 StateRoot = stateRoot;
public IReleaseSpec Spec = spec;
}
}

11 changes: 8 additions & 3 deletions src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Specs;
using Nethermind.Core.Threading;
using Nethermind.Crypto;
using Nethermind.Evm;
using Nethermind.Evm.Tracing;
Expand Down Expand Up @@ -333,11 +334,15 @@ protected virtual TxReceipt[] ProcessBlock(
[MethodImpl(MethodImplOptions.NoInlining)]
private static void CalculateBlooms(TxReceipt[] receipts)
{
int index = 0;
Parallel.For(0, receipts.Length, _ =>
ParallelUnbalancedWork.For(
0,
receipts.Length,
ParallelUnbalancedWork.DefaultOptions,
receipts,
static (i, receipts) =>
{
int i = Interlocked.Increment(ref index) - 1;
receipts[i].CalculateBloom();
return receipts;
});
}

Expand Down
Loading

0 comments on commit 5ba3705

Please sign in to comment.