From 62c5aad3698cdd435f2d64bb32faeffec375b24e Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Thu, 19 Dec 2024 04:08:01 +0000 Subject: [PATCH 1/7] Add target group support Allows a single indexer to handle multiple networks/datacenters/etc. --- internal/index/index.go | 21 +++++++++--- internal/index/index_test.go | 62 +++++++++++++++++++++++++---------- internal/index/schema.go | 11 +++++++ internal/index/types.go | 4 ++- internal/scraper/collector.go | 5 +-- internal/scraper/prober.go | 2 ++ internal/scraper/scraper.go | 1 + 7 files changed, 81 insertions(+), 25 deletions(-) diff --git a/internal/index/index.go b/internal/index/index.go index 81fa337..a0fb908 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -52,8 +52,8 @@ func (d *DB) UpsertSnapshots(entries ...*SnapshotEntry) { // GetSnapshotsByTarget returns all snapshots served by a host // ordered by newest to oldest. -func (d *DB) GetSnapshotsByTarget(target string) (entries []*SnapshotEntry) { - res, err := d.DB.Txn(false).Get(tableSnapshotEntry, "id_prefix", target) +func (d *DB) GetSnapshotsByTarget(group string, target string) (entries []*SnapshotEntry) { + res, err := d.DB.Txn(false).Get(tableSnapshotEntry, "id_prefix", group, target) if err != nil { panic("getting snapshots by target failed: " + err.Error()) } @@ -114,6 +114,19 @@ func (d *DB) GetSnapshotsAtSlot(slot uint64) (entries []*SnapshotEntry) { return } +// Fetches the snapshots that are at a given slot. +func (d *DB) GetSnapshotsAtSlotByGroup(group string, slot uint64) (entries []*SnapshotEntry) { + res, err := d.DB.Txn(false).Get(tableSnapshotEntry, "base_slot_by_group", group, slot) + if err != nil { + panic("getting best snapshots failed: " + err.Error()) + } + + for entry := res.Next(); entry != nil; entry = res.Next() { + entries = append(entries, entry.(*SnapshotEntry)) + } + return +} + // DeleteOldSnapshots delete snapshot entry older than the given timestamp. func (d *DB) DeleteOldSnapshots(minTime time.Time) (n int) { txn := d.DB.Txn(true) @@ -140,10 +153,10 @@ func (d *DB) DeleteOldSnapshots(minTime time.Time) (n int) { // DeleteSnapshotsByTarget deletes all snapshots owned by a given target. // Returns the number of deletions made. -func (d *DB) DeleteSnapshotsByTarget(target string) int { +func (d *DB) DeleteSnapshotsByTarget(group string, target string) int { txn := d.DB.Txn(true) defer txn.Abort() - n, err := txn.DeleteAll(tableSnapshotEntry, "id_prefix", target) + n, err := txn.DeleteAll(tableSnapshotEntry, "id_prefix", group, target) if err != nil { panic("failed to delete snapshots by target: " + err.Error()) } diff --git a/internal/index/index_test.go b/internal/index/index_test.go index d676d29..69abfa8 100644 --- a/internal/index/index_test.go +++ b/internal/index/index_test.go @@ -26,7 +26,7 @@ import ( var dummyTime1 = time.Date(2022, 4, 27, 15, 33, 20, 0, time.UTC) var snapshotEntry1 = &SnapshotEntry{ - SnapshotKey: NewSnapshotKey("host1", 100, 100), + SnapshotKey: NewSnapshotKey("mainnet", "host1", 100, 100), UpdatedAt: dummyTime1, Info: &types.SnapshotInfo{ Slot: 100, @@ -37,7 +37,7 @@ var snapshotEntry1 = &SnapshotEntry{ } var snapshotEntry2 = &SnapshotEntry{ - SnapshotKey: NewSnapshotKey("host1", 99, 99), + SnapshotKey: NewSnapshotKey("mainnet", "host1", 99, 99), UpdatedAt: dummyTime1.Add(-20 * time.Second), Info: &types.SnapshotInfo{ Slot: 99, @@ -48,7 +48,7 @@ var snapshotEntry2 = &SnapshotEntry{ } var snapshotEntry3 = &SnapshotEntry{ - SnapshotKey: NewSnapshotKey("host2", 100, 100), + SnapshotKey: NewSnapshotKey("mainnet", "host2", 100, 100), UpdatedAt: dummyTime1, Info: &types.SnapshotInfo{ Slot: 100, @@ -58,24 +58,35 @@ var snapshotEntry3 = &SnapshotEntry{ }, } +var snapshotEntry4 = &SnapshotEntry{ + SnapshotKey: NewSnapshotKey("devnet", "host1", 100, 100), + UpdatedAt: dummyTime1, + Info: &types.SnapshotInfo{ + Slot: 151, + Hash: solana.Hash{0x03}, + Files: []*types.SnapshotFile{}, + TotalSize: 0, + }, +} + func TestDB(t *testing.T) { db := NewDB() - assert.Equal(t, 0, db.DeleteSnapshotsByTarget("host1")) - assert.Equal(t, 0, db.DeleteSnapshotsByTarget("host2")) + assert.Equal(t, 0, db.DeleteSnapshotsByTarget("mainnet", "host1")) + assert.Equal(t, 0, db.DeleteSnapshotsByTarget("mainnet", "host2")) - assert.Len(t, db.GetSnapshotsByTarget("host1"), 0) - assert.Len(t, db.GetSnapshotsByTarget("host2"), 0) + assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host1"), 0) + assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host2"), 0) assert.Len(t, db.GetBestSnapshots(-1), 0) db.UpsertSnapshots(snapshotEntry1) - assert.Len(t, db.GetSnapshotsByTarget("host1"), 1) - assert.Len(t, db.GetSnapshotsByTarget("host2"), 0) + assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host1"), 1) + assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host2"), 0) assert.Len(t, db.GetBestSnapshots(-1), 1) db.UpsertSnapshots(snapshotEntry1, snapshotEntry2) - assert.Len(t, db.GetSnapshotsByTarget("host1"), 2) - assert.Len(t, db.GetSnapshotsByTarget("host2"), 0) + assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host1"), 2) + assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host2"), 0) assert.Equal(t, []*SnapshotEntry{ snapshotEntry1, @@ -84,21 +95,35 @@ func TestDB(t *testing.T) { db.GetBestSnapshots(-1)) db.UpsertSnapshots(snapshotEntry2, snapshotEntry3) - assert.Len(t, db.GetSnapshotsByTarget("host1"), 2) - assert.Len(t, db.GetSnapshotsByTarget("host2"), 1) + assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host1"), 2) + assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host2"), 1) + assert.Equal(t, + []*SnapshotEntry{ + snapshotEntry1, + snapshotEntry3, + snapshotEntry2, + }, + db.GetBestSnapshots(-1)) + + // Add a devnet target and ensure the result is the same + db.UpsertSnapshots(snapshotEntry4) + assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host1"), 2) + assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host2"), 1) assert.Equal(t, []*SnapshotEntry{ + snapshotEntry4, snapshotEntry1, snapshotEntry3, snapshotEntry2, }, db.GetBestSnapshots(-1)) - assert.Equal(t, 2, db.DeleteSnapshotsByTarget("host1")) - assert.Len(t, db.GetSnapshotsByTarget("host1"), 0) - assert.Len(t, db.GetSnapshotsByTarget("host2"), 1) + assert.Equal(t, 2, db.DeleteSnapshotsByTarget("mainnet", "host1")) + assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host1"), 0) + assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host2"), 1) assert.Equal(t, []*SnapshotEntry{ + snapshotEntry4, snapshotEntry3, }, db.GetBestSnapshots(-1)) @@ -106,10 +131,11 @@ func TestDB(t *testing.T) { db.UpsertSnapshots(snapshotEntry1, snapshotEntry2) assert.Equal(t, 1, db.DeleteOldSnapshots(snapshotEntry2.UpdatedAt.Add(time.Second))) - assert.Len(t, db.GetSnapshotsByTarget("host1"), 1) - assert.Len(t, db.GetSnapshotsByTarget("host2"), 1) + assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host1"), 1) + assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host2"), 1) assert.Equal(t, []*SnapshotEntry{ + snapshotEntry4, snapshotEntry1, snapshotEntry3, }, diff --git a/internal/index/schema.go b/internal/index/schema.go index fbfde7b..d156840 100644 --- a/internal/index/schema.go +++ b/internal/index/schema.go @@ -28,6 +28,7 @@ var schema = memdb.DBSchema{ Unique: true, Indexer: &memdb.CompoundIndex{ Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "Group"}, &memdb.StringFieldIndex{Field: "Target"}, &memdb.UintFieldIndex{Field: "InverseSlot"}, }, @@ -44,6 +45,16 @@ var schema = memdb.DBSchema{ Unique: false, Indexer: &memdb.UintFieldIndex{Field: "BaseSlot"}, }, + "base_slot_by_group": { + Name: "base_slot_by_group", + Unique: false, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "Group"}, + &memdb.UintFieldIndex{Field: "BaseSlot"}, + }, + }, + }, }, }, }, diff --git a/internal/index/types.go b/internal/index/types.go index 9819cea..71fb76d 100644 --- a/internal/index/types.go +++ b/internal/index/types.go @@ -27,13 +27,15 @@ type SnapshotEntry struct { } type SnapshotKey struct { + Group string `json:"group"` Target string `json:"target"` InverseSlot uint64 `json:"inverse_slot"` // newest-to-oldest sort BaseSlot uint64 `json:"base_slot"` } -func NewSnapshotKey(target string, slot uint64, base_slot uint64) SnapshotKey { +func NewSnapshotKey(group string, target string, slot uint64, base_slot uint64) SnapshotKey { return SnapshotKey{ + Group: group, Target: target, InverseSlot: ^slot, BaseSlot: base_slot, diff --git a/internal/scraper/collector.go b/internal/scraper/collector.go index 2f827b9..2146175 100644 --- a/internal/scraper/collector.go +++ b/internal/scraper/collector.go @@ -68,11 +68,11 @@ func (c *Collector) run() { c.Log.Debug("Scrape success", zap.String("target", res.Target), zap.Int("num_snapshots", len(res.Infos))) - c.DB.DeleteSnapshotsByTarget(res.Target) + c.DB.DeleteSnapshotsByTarget(res.Group, res.Target) entries := make([]*index.SnapshotEntry, len(res.Infos)) for i, info := range res.Infos { entries[i] = &index.SnapshotEntry{ - SnapshotKey: index.NewSnapshotKey(res.Target, info.Slot, info.BaseSlot), + SnapshotKey: index.NewSnapshotKey(res.Group, res.Target, info.Slot, info.BaseSlot), Info: info, UpdatedAt: res.Time, } @@ -84,6 +84,7 @@ func (c *Collector) run() { type ProbeResult struct { Time time.Time Target string + Group string Infos []*types.SnapshotInfo Err error } diff --git a/internal/scraper/prober.go b/internal/scraper/prober.go index 6142f77..a5cae37 100644 --- a/internal/scraper/prober.go +++ b/internal/scraper/prober.go @@ -28,6 +28,7 @@ import ( // Prober checks snapshot info from Solana nodes. type Prober struct { + group string client *http.Client scheme string apiPath string @@ -77,6 +78,7 @@ func NewProber(group *types.TargetGroup) (*Prober, error) { } return &Prober{ + group: group.Group, client: client, scheme: group.Scheme, apiPath: group.APIPath, diff --git a/internal/scraper/scraper.go b/internal/scraper/scraper.go index a0009c3..b4e4b74 100644 --- a/internal/scraper/scraper.go +++ b/internal/scraper/scraper.go @@ -95,6 +95,7 @@ func (s *Scraper) scrape(ctx context.Context, results chan<- ProbeResult) { defer wg.Done() infos, err := s.prober.Probe(ctx, target) results <- ProbeResult{ + Group: s.prober.group, Time: time.Now(), Target: s.prober.scheme + "://" + target, Infos: infos, From 7a6e3dd03b5bb8f7043081b09469e91451d8cc99 Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Thu, 19 Dec 2024 09:06:01 +0000 Subject: [PATCH 2/7] Add slot by group --- internal/index/schema.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/internal/index/schema.go b/internal/index/schema.go index d156840..897e8af 100644 --- a/internal/index/schema.go +++ b/internal/index/schema.go @@ -40,6 +40,16 @@ var schema = memdb.DBSchema{ Unique: false, Indexer: &memdb.UintFieldIndex{Field: "InverseSlot"}, }, + "slot_by_group": { + Name: "slot_by_group", + Unique: false, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "Group"}, + &memdb.UintFieldIndex{Field: "InverseSlot"}, + }, + }, + }, "base_slot": { Name: "base_slot", Unique: false, From f396195bd879d06a160171939e8063c28e8b0d7f Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Thu, 19 Dec 2024 10:47:14 +0000 Subject: [PATCH 3/7] add fetch methods --- internal/index/index.go | 40 +++++++++++++++++++++++------------- internal/index/index_test.go | 12 +++++++++++ 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/internal/index/index.go b/internal/index/index.go index a0fb908..3b3cd0b 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -86,8 +86,15 @@ func (d *DB) GetAllSnapshots() (entries []*SnapshotEntry) { // GetBestSnapshots returns newest-to-oldest snapshots. // The `max` argument controls the max number of snapshots to return. // If max is negative, it returns all snapshots. -func (d *DB) GetBestSnapshots(max int) (entries []*SnapshotEntry) { - res, err := d.DB.Txn(false).Get(tableSnapshotEntry, "slot") +func (d *DB) GetBestSnapshotsByGroup(max int, group string) (entries []*SnapshotEntry) { + var res memdb.ResultIterator + var err error + if group != "" { + res, err = d.DB.Txn(false).Get(tableSnapshotEntry, "slot_by_group", group) + } else { + res, err = d.DB.Txn(false).Get(tableSnapshotEntry, "slot") + } + if err != nil { panic("getting best snapshots failed: " + err.Error()) } @@ -102,21 +109,16 @@ func (d *DB) GetBestSnapshots(max int) (entries []*SnapshotEntry) { } // Fetches the snapshots that are at a given slot. -func (d *DB) GetSnapshotsAtSlot(slot uint64) (entries []*SnapshotEntry) { - res, err := d.DB.Txn(false).Get(tableSnapshotEntry, "base_slot", slot) - if err != nil { - panic("getting best snapshots failed: " + err.Error()) - } +func (d *DB) GetSnapshotsAtSlotByGroup(group string, slot uint64) (entries []*SnapshotEntry) { + var res memdb.ResultIterator + var err error - for entry := res.Next(); entry != nil; entry = res.Next() { - entries = append(entries, entry.(*SnapshotEntry)) + if group != "" { + res, err = d.DB.Txn(false).Get(tableSnapshotEntry, "base_slot_by_group", slot, group) + } else { + res, err = d.DB.Txn(false).Get(tableSnapshotEntry, "base_slot", slot) } - return -} -// Fetches the snapshots that are at a given slot. -func (d *DB) GetSnapshotsAtSlotByGroup(group string, slot uint64) (entries []*SnapshotEntry) { - res, err := d.DB.Txn(false).Get(tableSnapshotEntry, "base_slot_by_group", group, slot) if err != nil { panic("getting best snapshots failed: " + err.Error()) } @@ -127,6 +129,16 @@ func (d *DB) GetSnapshotsAtSlotByGroup(group string, slot uint64) (entries []*Sn return } +// Fetches the best snapshots +func (d *DB) GetBestSnapshots(max int) (entries []*SnapshotEntry) { + return d.GetBestSnapshotsByGroup(max, "") +} + +// Fetches the snapshots that are at a given slot. +func (d *DB) GetSnapshotsAtSlot(slot uint64) (entries []*SnapshotEntry) { + return d.GetSnapshotsAtSlotByGroup("", slot) +} + // DeleteOldSnapshots delete snapshot entry older than the given timestamp. func (d *DB) DeleteOldSnapshots(minTime time.Time) (n int) { txn := d.DB.Txn(true) diff --git a/internal/index/index_test.go b/internal/index/index_test.go index 69abfa8..277ddda 100644 --- a/internal/index/index_test.go +++ b/internal/index/index_test.go @@ -140,4 +140,16 @@ func TestDB(t *testing.T) { snapshotEntry3, }, db.GetBestSnapshots(-1)) + + assert.Equal(t, + []*SnapshotEntry{ + snapshotEntry4, + }, + db.GetBestSnapshotsByGroup(-1, "devnet")) + + assert.Equal(t, + []*SnapshotEntry{ + snapshotEntry1, + snapshotEntry3 + }, ) } From 68bc95f19df1cc08328c9744f8b89941dbd86a9a Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Thu, 19 Dec 2024 11:31:44 +0000 Subject: [PATCH 4/7] add filtering of best snaphots by group --- internal/index/index.go | 9 ++++----- internal/index/index_test.go | 7 ++++--- internal/index/schema.go | 10 ---------- 3 files changed, 8 insertions(+), 18 deletions(-) diff --git a/internal/index/index.go b/internal/index/index.go index 3b3cd0b..14e2c21 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -89,11 +89,7 @@ func (d *DB) GetAllSnapshots() (entries []*SnapshotEntry) { func (d *DB) GetBestSnapshotsByGroup(max int, group string) (entries []*SnapshotEntry) { var res memdb.ResultIterator var err error - if group != "" { - res, err = d.DB.Txn(false).Get(tableSnapshotEntry, "slot_by_group", group) - } else { - res, err = d.DB.Txn(false).Get(tableSnapshotEntry, "slot") - } + res, err = d.DB.Txn(false).Get(tableSnapshotEntry, "slot") if err != nil { panic("getting best snapshots failed: " + err.Error()) @@ -103,6 +99,9 @@ func (d *DB) GetBestSnapshotsByGroup(max int, group string) (entries []*Snapshot if entry == nil { break } + if group != "" && entry.(*SnapshotEntry).Group != group { + continue + } entries = append(entries, entry.(*SnapshotEntry)) } return diff --git a/internal/index/index_test.go b/internal/index/index_test.go index 277ddda..0f2324d 100644 --- a/internal/index/index_test.go +++ b/internal/index/index_test.go @@ -141,7 +141,7 @@ func TestDB(t *testing.T) { }, db.GetBestSnapshots(-1)) - assert.Equal(t, + assert.Equal(t, []*SnapshotEntry{ snapshotEntry4, }, @@ -150,6 +150,7 @@ func TestDB(t *testing.T) { assert.Equal(t, []*SnapshotEntry{ snapshotEntry1, - snapshotEntry3 - }, ) + snapshotEntry3, + }, + db.GetBestSnapshotsByGroup(-1, "mainnet")) } diff --git a/internal/index/schema.go b/internal/index/schema.go index 897e8af..d156840 100644 --- a/internal/index/schema.go +++ b/internal/index/schema.go @@ -40,16 +40,6 @@ var schema = memdb.DBSchema{ Unique: false, Indexer: &memdb.UintFieldIndex{Field: "InverseSlot"}, }, - "slot_by_group": { - Name: "slot_by_group", - Unique: false, - Indexer: &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{Field: "Group"}, - &memdb.UintFieldIndex{Field: "InverseSlot"}, - }, - }, - }, "base_slot": { Name: "base_slot", Unique: false, From 6c419cbe2b36adfbde6cafbb10625d1ed8beadf1 Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Thu, 19 Dec 2024 11:51:10 +0000 Subject: [PATCH 5/7] Finalized HTTP API for requesting snapshots by group --- internal/cmd/fetch/fetch.go | 10 ++++++---- internal/fetch/tracker.go | 6 ++++-- internal/index/index.go | 19 ++++++++++++++++-- internal/index/index_test.go | 4 ++-- internal/integrationtest/tracker_test.go | 2 +- internal/mirror/mirror.go | 2 +- internal/tracker/tracker.go | 25 +++++++++++++++++------- 7 files changed, 49 insertions(+), 19 deletions(-) diff --git a/internal/cmd/fetch/fetch.go b/internal/cmd/fetch/fetch.go index 417e7a4..c983eeb 100644 --- a/internal/cmd/fetch/fetch.go +++ b/internal/cmd/fetch/fetch.go @@ -49,6 +49,7 @@ var Cmd = cobra.Command{ var ( ledgerDir string trackerURL string + group string minSnapAge uint64 maxSnapAge uint64 baseSlot uint64 @@ -62,6 +63,7 @@ func init() { flags := Cmd.Flags() flags.StringVar(&ledgerDir, "ledger", "", "Path to ledger dir") flags.StringVar(&trackerURL, "tracker", "", "Download as instructed by given tracker URL") + flags.StringVar(&group, "group", "", "Download from specified group") flags.Uint64Var(&minSnapAge, "min-slots", 500, "Download only snapshots slots newer than local") flags.Uint64Var(&maxSnapAge, "max-slots", 10000, "Refuse to download slots older than the newest") flags.DurationVar(&requestTimeout, "request-timeout", 3*time.Second, "Max time to wait for headers (excluding download)") @@ -109,7 +111,7 @@ func run() { log.Info("Fetching snapshots at slot", zap.Uint64("base_slot", baseSlot)) // Ask tracker for snapshots at a specific location - remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, baseSlot) + remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, group, baseSlot) if err != nil { log.Fatal("Failed to fetch snapshot info", zap.Error(err)) } @@ -122,7 +124,7 @@ func run() { log.Info("Finding best snapshot") // Ask tracker for best snapshots. - remoteSnaps, err = trackerClient.GetBestSnapshots(ctx, -1) + remoteSnaps, err = trackerClient.GetBestSnapshots(ctx, group, -1) if err != nil { log.Fatal("Failed to request snapshot info", zap.Error(err)) } @@ -145,7 +147,7 @@ func run() { // If we are not fetching a full snapshot and the base slot isn't matching // we need to fetch an older incremental snapshot. log.Info("Full snapshot is newer than local, but not requested") - remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, localSnaps[0].BaseSlot) + remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, group, localSnaps[0].BaseSlot) if err != nil { log.Fatal("Failed to request snapshot info", zap.Error(err)) } @@ -204,7 +206,7 @@ func run() { // If we were downloading a full snapshot, check if there's a newer incremental snapshot we can fetch // Find latest incremental snapshot log.Info("Finding incremental snapshot for full slot", zap.Uint64("base_slot", snap.BaseSlot)) - remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, snap.BaseSlot) + remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, group, snap.BaseSlot) if err != nil { log.Fatal("Failed to request snapshot info", zap.Error(err)) } diff --git a/internal/fetch/tracker.go b/internal/fetch/tracker.go index 5b372c7..45505a6 100644 --- a/internal/fetch/tracker.go +++ b/internal/fetch/tracker.go @@ -38,11 +38,12 @@ func NewTrackerClientWithResty(client *resty.Client) *TrackerClient { return &TrackerClient{resty: client} } -func (c *TrackerClient) GetBestSnapshots(ctx context.Context, count int) (sources []types.SnapshotSource, err error) { +func (c *TrackerClient) GetBestSnapshots(ctx context.Context, group string, count int) (sources []types.SnapshotSource, err error) { res, err := c.resty.R(). SetContext(ctx). SetHeader("accept", "application/json"). SetQueryParam("max", strconv.Itoa(count)). + SetQueryParam("group", group). SetResult(&sources). Get("/v1/best_snapshots") if err != nil { @@ -54,11 +55,12 @@ func (c *TrackerClient) GetBestSnapshots(ctx context.Context, count int) (source return } -func (c *TrackerClient) GetSnapshotAtSlot(ctx context.Context, slot uint64) (sources []types.SnapshotSource, err error) { +func (c *TrackerClient) GetSnapshotAtSlot(ctx context.Context, group string, slot uint64) (sources []types.SnapshotSource, err error) { res, err := c.resty.R(). SetContext(ctx). SetHeader("accept", "application/json"). SetQueryParam("slot", strconv.FormatUint(slot, 10)). + SetQueryParam("group", group). SetResult(&sources). Get("/v1/snapshots") if err != nil { diff --git a/internal/index/index.go b/internal/index/index.go index 14e2c21..01b8546 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -83,10 +83,25 @@ func (d *DB) GetAllSnapshots() (entries []*SnapshotEntry) { return } +func (d *DB) GetAllSnapshotsByGroup(group string) (entries []*SnapshotEntry) { + iter, err := d.DB.Txn(false).LowerBound(tableSnapshotEntry, "id_prefix", group, "", uint64(0)) + if err != nil { + panic("getting best snapshots failed: " + err.Error()) + } + for { + el := iter.Next() + if el == nil { + break + } + entries = append(entries, el.(*SnapshotEntry)) + } + return +} + // GetBestSnapshots returns newest-to-oldest snapshots. // The `max` argument controls the max number of snapshots to return. // If max is negative, it returns all snapshots. -func (d *DB) GetBestSnapshotsByGroup(max int, group string) (entries []*SnapshotEntry) { +func (d *DB) GetBestSnapshotsByGroup(group string, max int) (entries []*SnapshotEntry) { var res memdb.ResultIterator var err error res, err = d.DB.Txn(false).Get(tableSnapshotEntry, "slot") @@ -130,7 +145,7 @@ func (d *DB) GetSnapshotsAtSlotByGroup(group string, slot uint64) (entries []*Sn // Fetches the best snapshots func (d *DB) GetBestSnapshots(max int) (entries []*SnapshotEntry) { - return d.GetBestSnapshotsByGroup(max, "") + return d.GetBestSnapshotsByGroup("", max) } // Fetches the snapshots that are at a given slot. diff --git a/internal/index/index_test.go b/internal/index/index_test.go index 0f2324d..8cbce51 100644 --- a/internal/index/index_test.go +++ b/internal/index/index_test.go @@ -145,12 +145,12 @@ func TestDB(t *testing.T) { []*SnapshotEntry{ snapshotEntry4, }, - db.GetBestSnapshotsByGroup(-1, "devnet")) + db.GetBestSnapshotsByGroup("devnet", -1)) assert.Equal(t, []*SnapshotEntry{ snapshotEntry1, snapshotEntry3, }, - db.GetBestSnapshotsByGroup(-1, "mainnet")) + db.GetBestSnapshotsByGroup("mainnet", -1)) } diff --git a/internal/integrationtest/tracker_test.go b/internal/integrationtest/tracker_test.go index 31554d4..373844d 100644 --- a/internal/integrationtest/tracker_test.go +++ b/internal/integrationtest/tracker_test.go @@ -87,7 +87,7 @@ func TestTracker(t *testing.T) { // Create tracker client. client := fetch.NewTrackerClientWithResty(resty.NewWithClient(server.Client()).SetHostURL(server.URL)) - snaps, err := client.GetBestSnapshots(context.TODO(), -1) + snaps, err := client.GetBestSnapshots(context.TODO(), "", -1) require.NoError(t, err) // Remove timestamps and port numbers. for i := range snaps { diff --git a/internal/mirror/mirror.go b/internal/mirror/mirror.go index 6ec8d1a..833f052 100644 --- a/internal/mirror/mirror.go +++ b/internal/mirror/mirror.go @@ -65,7 +65,7 @@ func (w *Worker) Run(ctx context.Context) { func (w *Worker) tick(ctx context.Context) { w.Log.Debug("Tick") - sources, err := w.Tracker.GetBestSnapshots(ctx, w.SyncCount) + sources, err := w.Tracker.GetBestSnapshots(ctx, "", w.SyncCount) if err != nil { w.Log.Error("Failed to find new snapshots", zap.Error(err)) return diff --git a/internal/tracker/tracker.go b/internal/tracker/tracker.go index 8c115d8..53fbed1 100644 --- a/internal/tracker/tracker.go +++ b/internal/tracker/tracker.go @@ -59,17 +59,26 @@ func (h *Handler) createJson(c *gin.Context, entries []*index.SnapshotEntry) { func (h *Handler) GetSnapshots(c *gin.Context) { var query struct { - Slot uint64 `form:"slot"` + Slot uint64 `form:"slot"` + Group string `form:"group"` } if err := c.BindQuery(&query); err != nil { return } var entries []*index.SnapshotEntry - if query.Slot == 0 { - entries = h.DB.GetAllSnapshots() + if query.Group == "" { + if query.Slot == 0 { + entries = h.DB.GetAllSnapshots() + } else { + entries = h.DB.GetSnapshotsAtSlot(query.Slot) + } } else { - entries = h.DB.GetSnapshotsAtSlot(query.Slot) + if query.Slot == 0 { + entries = h.DB.GetAllSnapshotsByGroup(query.Group) + } else { + entries = h.DB.GetSnapshotsAtSlotByGroup(query.Group, query.Slot) + } } h.createJson(c, entries) @@ -78,7 +87,8 @@ func (h *Handler) GetSnapshots(c *gin.Context) { // GetBestSnapshots returns the currently available best snapshots. func (h *Handler) GetBestSnapshots(c *gin.Context) { var query struct { - Max int `form:"max"` + Max int `form:"max"` + Group string `form:"group"` } if err := c.BindQuery(&query); err != nil { return @@ -87,13 +97,14 @@ func (h *Handler) GetBestSnapshots(c *gin.Context) { if query.Max < 0 || query.Max > maxItems { query.Max = maxItems } - entries := h.DB.GetBestSnapshots(query.Max) + entries := h.DB.GetBestSnapshotsByGroup(query.Group, query.Max) h.createJson(c, entries) } func (h *Handler) Health(c *gin.Context) { var query struct { - Max int `form:"max"` + Max int `form:"max"` + Group string `form:"group"` } if err := c.BindQuery(&query); err != nil { return From d6d04b3acb23f56dc5c6d60350621b7e8379fc6b Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Thu, 19 Dec 2024 12:26:34 +0000 Subject: [PATCH 6/7] Fixed the snapshots by traget --- internal/index/index.go | 28 ++++++++---------------- internal/index/index_test.go | 13 +++++++++++ internal/integrationtest/tracker_test.go | 4 ++++ internal/tracker/tracker.go | 3 ++- types/snapshot.go | 1 + 5 files changed, 29 insertions(+), 20 deletions(-) diff --git a/internal/index/index.go b/internal/index/index.go index 01b8546..9b0f4dc 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -67,9 +67,8 @@ func (d *DB) GetSnapshotsByTarget(group string, target string) (entries []*Snaps return } -// GetAllSnapshots returns a list of all snapshots. -func (d *DB) GetAllSnapshots() (entries []*SnapshotEntry) { - iter, err := d.DB.Txn(false).LowerBound(tableSnapshotEntry, "id", "", uint64(0)) +func (d *DB) GetAllSnapshotsByGroup(group string) (entries []*SnapshotEntry) { + iter, err := d.DB.Txn(false).LowerBound(tableSnapshotEntry, "id", "", "", uint64(0)) if err != nil { panic("getting best snapshots failed: " + err.Error()) } @@ -78,33 +77,24 @@ func (d *DB) GetAllSnapshots() (entries []*SnapshotEntry) { if el == nil { break } + if group != "" && el.(*SnapshotEntry).Group != group { + continue + } entries = append(entries, el.(*SnapshotEntry)) } return } -func (d *DB) GetAllSnapshotsByGroup(group string) (entries []*SnapshotEntry) { - iter, err := d.DB.Txn(false).LowerBound(tableSnapshotEntry, "id_prefix", group, "", uint64(0)) - if err != nil { - panic("getting best snapshots failed: " + err.Error()) - } - for { - el := iter.Next() - if el == nil { - break - } - entries = append(entries, el.(*SnapshotEntry)) - } - return +// GetAllSnapshots returns a list of all snapshots. +func (d *DB) GetAllSnapshots() (entries []*SnapshotEntry) { + return d.GetAllSnapshotsByGroup("") } // GetBestSnapshots returns newest-to-oldest snapshots. // The `max` argument controls the max number of snapshots to return. // If max is negative, it returns all snapshots. func (d *DB) GetBestSnapshotsByGroup(group string, max int) (entries []*SnapshotEntry) { - var res memdb.ResultIterator - var err error - res, err = d.DB.Txn(false).Get(tableSnapshotEntry, "slot") + res, err := d.DB.Txn(false).Get(tableSnapshotEntry, "slot") if err != nil { panic("getting best snapshots failed: " + err.Error()) diff --git a/internal/index/index_test.go b/internal/index/index_test.go index 8cbce51..38a89de 100644 --- a/internal/index/index_test.go +++ b/internal/index/index_test.go @@ -147,10 +147,23 @@ func TestDB(t *testing.T) { }, db.GetBestSnapshotsByGroup("devnet", -1)) + assert.Equal(t, + []*SnapshotEntry{ + snapshotEntry4, + }, + db.GetAllSnapshotsByGroup("devnet")) + assert.Equal(t, []*SnapshotEntry{ snapshotEntry1, snapshotEntry3, }, db.GetBestSnapshotsByGroup("mainnet", -1)) + + assert.Equal(t, + []*SnapshotEntry{ + snapshotEntry1, + snapshotEntry3, + }, + db.GetAllSnapshotsByGroup("mainnet")) } diff --git a/internal/integrationtest/tracker_test.go b/internal/integrationtest/tracker_test.go index 373844d..1994fb4 100644 --- a/internal/integrationtest/tracker_test.go +++ b/internal/integrationtest/tracker_test.go @@ -119,6 +119,7 @@ func TestTracker(t *testing.T) { }, TotalSize: 1, }, + Group: "test", }, { SnapshotInfo: types.SnapshotInfo{ @@ -136,6 +137,7 @@ func TestTracker(t *testing.T) { }, TotalSize: 1, }, + Group: "test", }, { SnapshotInfo: types.SnapshotInfo{ @@ -153,6 +155,7 @@ func TestTracker(t *testing.T) { }, TotalSize: 1, }, + Group: "test", }, { SnapshotInfo: types.SnapshotInfo{ @@ -170,6 +173,7 @@ func TestTracker(t *testing.T) { }, TotalSize: 1, }, + Group: "test", }, }, snaps) diff --git a/internal/tracker/tracker.go b/internal/tracker/tracker.go index 53fbed1..ebc4df8 100644 --- a/internal/tracker/tracker.go +++ b/internal/tracker/tracker.go @@ -51,6 +51,7 @@ func (h *Handler) createJson(c *gin.Context, entries []*index.SnapshotEntry) { sources[i] = types.SnapshotSource{ SnapshotInfo: *entry.Info, Target: entry.Target, + Group: entry.Group, UpdatedAt: entry.UpdatedAt, } } @@ -110,7 +111,7 @@ func (h *Handler) Health(c *gin.Context) { return } query.Max = 1 - entries := h.DB.GetBestSnapshots(query.Max) + entries := h.DB.GetBestSnapshotsByGroup(query.Group, query.Max) var health struct { MaxSnapshot uint64 diff --git a/types/snapshot.go b/types/snapshot.go index 4f1e469..1565413 100644 --- a/types/snapshot.go +++ b/types/snapshot.go @@ -25,6 +25,7 @@ import ( type SnapshotSource struct { SnapshotInfo Target string `json:"target"` + Group string `json:"group"` UpdatedAt time.Time `json:"updated_at"` } From d9b15d7ae6e2e0b88e97f969e52d9e2335e37818 Mon Sep 17 00:00:00 2001 From: Linus Kendall Date: Thu, 19 Dec 2024 12:32:10 +0000 Subject: [PATCH 7/7] goformat --- internal/index/types.go | 4 ++-- internal/scraper/collector.go | 2 +- internal/scraper/prober.go | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/index/types.go b/internal/index/types.go index 71fb76d..e8845f2 100644 --- a/internal/index/types.go +++ b/internal/index/types.go @@ -27,7 +27,7 @@ type SnapshotEntry struct { } type SnapshotKey struct { - Group string `json:"group"` + Group string `json:"group"` Target string `json:"target"` InverseSlot uint64 `json:"inverse_slot"` // newest-to-oldest sort BaseSlot uint64 `json:"base_slot"` @@ -35,7 +35,7 @@ type SnapshotKey struct { func NewSnapshotKey(group string, target string, slot uint64, base_slot uint64) SnapshotKey { return SnapshotKey{ - Group: group, + Group: group, Target: target, InverseSlot: ^slot, BaseSlot: base_slot, diff --git a/internal/scraper/collector.go b/internal/scraper/collector.go index 2146175..2aa928f 100644 --- a/internal/scraper/collector.go +++ b/internal/scraper/collector.go @@ -84,7 +84,7 @@ func (c *Collector) run() { type ProbeResult struct { Time time.Time Target string - Group string + Group string Infos []*types.SnapshotInfo Err error } diff --git a/internal/scraper/prober.go b/internal/scraper/prober.go index a5cae37..0572fc6 100644 --- a/internal/scraper/prober.go +++ b/internal/scraper/prober.go @@ -28,7 +28,7 @@ import ( // Prober checks snapshot info from Solana nodes. type Prober struct { - group string + group string client *http.Client scheme string apiPath string @@ -78,7 +78,7 @@ func NewProber(group *types.TargetGroup) (*Prober, error) { } return &Prober{ - group: group.Group, + group: group.Group, client: client, scheme: group.Scheme, apiPath: group.APIPath,