Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync2: ATX integration #6448

Open
wants to merge 12 commits into
base: sync2/fix-multipeer
Choose a base branch
from
14 changes: 14 additions & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/spacemeshos/go-spacemesh/hare4"
"github.com/spacemeshos/go-spacemesh/miner"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/sync2"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/malsync"
Expand Down Expand Up @@ -77,6 +78,14 @@ func MainnetConfig() Config {

hare4conf := hare4.DefaultConfig()
hare4conf.Enable = false

oldAtxSyncCfg := sync2.DefaultConfig()
oldAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = time.Hour
oldAtxSyncCfg.MaxDepth = 16
newAtxSyncCfg := sync2.DefaultConfig()
newAtxSyncCfg.MaxDepth = 21
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 5 * time.Minute
Comment on lines +82 to +87
Copy link
Member

@fasmat fasmat Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like how this config is structured, what is the meaning of OldAtxSyncCfg and NewAtxSyncCfg?
Is there maybe a better name for them?

Why are they part of "main" -> "Syncer" -> "V2"? Shouldn't they be part of the "main" -> "Syncer" config object?

Also if these are the defaults anyway, no need to overwrite them with the same values here again 🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: fixed spelling

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might cause confusion as there's currently V1 sync for ATXs, too, which is also configured in the syncer config.
When V2 is used in server-only mode, both V1 and V2 syncs are used at the same time.

Copy link
Contributor Author

@ivan4th ivan4th Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to use reasonable defaults so that most users will not have to adjust sync settings.
Then at some time in the future we might want to restructure the config to remove all the v1 stuff and keep v2 sync only, moving it out of this v2 field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

old vs new in this context means old epochs vs the current epoch

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so it is actually PrevEpochSyncConfig and CurrentEpochSyncConfig? 🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to use reasonable defaults so that most users will not have to adjust sync settings. Then at some time in the future we might want to restructure the config to remove all the v1 stuff and keep v2 sync only, moving it out of this v2 field.

The reasonable defaults are already specified as part of the syncer.DefaultConfig, my point was that we don't need to overwrite this values again with the same values in config.MainnetConfig 🙂

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might cause confusion as there's currently V1 sync for ATXs, too, which is also configured in the syncer config. When V2 is used in server-only mode, both V1 and V2 syncs are used at the same time.

But this is a technical detail that I think should not be exposed via the config, or at least in a different way. Maybe instead of V2 we can call it something like "reconciliationSync" or similar. Especially since "V2" has negative connotations: https://en.wikipedia.org/wiki/V-2_rocket

Copy link
Contributor Author

@ivan4th ivan4th Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually there's a bit of confusion re current vs prev, and I was imprecise in my above comment.
When a new epoch starts, activations don't start to be published in that epoch immediately, and consequently, the more efficient (but more memory and CPU hungry per missing/extra set element) sync is to be applied to the new epoch some time later (EpochEndFraction in the existing sync config). That's why the "new" epoch is not necessarily the current one, and the "old" epoch is not necessarily the previous. We could probably come up with better naming but I'm unsure current / prev is the right choice here.

Otherwise agree, will fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to v2 part to reconc-sync (ReconcSync).


return Config{
BaseConfig: BaseConfig{
DataDirParent: defaultDataDir,
Expand Down Expand Up @@ -212,6 +221,11 @@ func MainnetConfig() Config {
DisableMeshAgreement: true,
AtxSync: atxsync.DefaultConfig(),
MalSync: malsync.DefaultConfig(),
V2: syncer.SyncV2Config{
OldAtxSyncCfg: oldAtxSyncCfg,
NewAtxSyncCfg: newAtxSyncCfg,
ParallelLoadLimit: 10,
},
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
13 changes: 13 additions & 0 deletions config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/spacemeshos/go-spacemesh/hare4"
"github.com/spacemeshos/go-spacemesh/miner"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/sync2"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/malsync"
Expand Down Expand Up @@ -65,6 +66,13 @@ func testnet() config.Config {
hare4conf := hare4.DefaultConfig()
hare4conf.Enable = false
defaultdir := filepath.Join(home, "spacemesh-testnet", "/")

oldAtxSyncCfg := sync2.DefaultConfig()
oldAtxSyncCfg.MaxDepth = 16
newAtxSyncCfg := sync2.DefaultConfig()
newAtxSyncCfg.MaxDepth = 21
newAtxSyncCfg.MultiPeerReconcilerConfig.SyncInterval = 5 * time.Minute

return config.Config{
Preset: "testnet",
BaseConfig: config.BaseConfig{
Expand Down Expand Up @@ -163,6 +171,11 @@ func testnet() config.Config {
OutOfSyncThresholdLayers: 10,
AtxSync: atxsync.DefaultConfig(),
MalSync: malsync.DefaultConfig(),
V2: syncer.SyncV2Config{
OldAtxSyncCfg: oldAtxSyncCfg,
NewAtxSyncCfg: newAtxSyncCfg,
ParallelLoadLimit: 10,
},
},
Recovery: checkpoint.DefaultConfig(),
Cache: datastore.DefaultConfig(),
Expand Down
9 changes: 9 additions & 0 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"sync"
"time"

corehost "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -1013,3 +1014,11 @@
})
return peers
}

func (f *Fetch) Host() corehost.Host {
return f.host.(corehost.Host)

Check warning on line 1019 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L1018-L1019

Added lines #L1018 - L1019 were not covered by tests
}

func (f *Fetch) Peers() *peers.Peers {
return f.peers

Check warning on line 1023 in fetch/fetch.go

View check run for this annotation

Codecov / codecov/patch

fetch/fetch.go#L1022-L1023

Added lines #L1022 - L1023 were not covered by tests
}
16 changes: 13 additions & 3 deletions fetch/mesh_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
return nil
}

options := system.GetAtxOpts{}
var options system.GetAtxOpts
for _, opt := range opts {
opt(&options)
}
Expand All @@ -41,10 +41,20 @@
zap.Bool("limiting", !options.LimitingOff),
)
hashes := types.ATXIDsToHashes(ids)
handler := f.validators.atx.HandleMessage
if options.RecvChannel != nil {
handler = func(ctx context.Context, id types.Hash32, p p2p.Peer, data []byte) error {
if err := f.validators.atx.HandleMessage(ctx, id, p, data); err != nil {
return err
}

Check warning on line 49 in fetch/mesh_data.go

View check run for this annotation

Codecov / codecov/patch

fetch/mesh_data.go#L48-L49

Added lines #L48 - L49 were not covered by tests
options.RecvChannel <- types.ATXID(id)
return nil
}
}
if options.LimitingOff {
return f.getHashes(ctx, hashes, datastore.ATXDB, f.validators.atx.HandleMessage)
return f.getHashes(ctx, hashes, datastore.ATXDB, handler)
}
return f.getHashes(ctx, hashes, datastore.ATXDB, f.validators.atx.HandleMessage, withLimiter(f.getAtxsLimiter))
return f.getHashes(ctx, hashes, datastore.ATXDB, handler, withLimiter(f.getAtxsLimiter))
}

type dataReceiver func(context.Context, types.Hash32, p2p.Peer, []byte) error
Expand Down
21 changes: 17 additions & 4 deletions fetch/mesh_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func startTestLoop(tb testing.TB, f *Fetch, eg *errgroup.Group, stop chan struct
default:
f.mu.Lock()
for h, req := range f.unprocessed {
require.NoError(tb, req.validator(req.ctx, types.Hash32{}, p2p.NoPeer, []byte{}))
require.NoError(tb, req.validator(req.ctx, h, p2p.NoPeer, []byte{}))
close(req.promise.completed)
delete(f.unprocessed, h)
}
Expand Down Expand Up @@ -591,7 +591,7 @@ func genATXs(tb testing.TB, num uint32) []*types.ActivationTx {
}

func TestGetATXs(t *testing.T) {
atxs := genATXs(t, 2)
atxs := genATXs(t, 4)
f := createFetch(t)
f.mAtxH.EXPECT().
HandleMessage(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Expand All @@ -602,10 +602,23 @@ func TestGetATXs(t *testing.T) {
var eg errgroup.Group
startTestLoop(t, f.Fetch, &eg, stop)

atxIDs := types.ToATXIDs(atxs)
require.NoError(t, f.GetAtxs(context.Background(), atxIDs))
atxIDs1 := types.ToATXIDs(atxs[:2])
require.NoError(t, f.GetAtxs(context.Background(), atxIDs1))

recvCh := make(chan types.ATXID)
atxIDs2 := types.ToATXIDs(atxs[2:])
var recvIDs []types.ATXID
eg.Go(func() error {
for id := range recvCh {
recvIDs = append(recvIDs, id)
}
return nil
})
require.NoError(t, f.GetAtxs(context.Background(), atxIDs2, system.WithRecvChannel(recvCh)))
close(recvCh)
close(stop)
require.NoError(t, eg.Wait())
require.ElementsMatch(t, atxIDs2, recvIDs)
}

func TestGetActiveSet(t *testing.T) {
Expand Down
Loading
Loading