Skip to content

Commit

Permalink
Merge pull request #3 from rpcpool/add_target_group
Browse files Browse the repository at this point in the history
Add target group
  • Loading branch information
linuskendall authored Dec 19, 2024
2 parents caeefce + d9b15d7 commit d5fa2cb
Show file tree
Hide file tree
Showing 13 changed files with 166 additions and 47 deletions.
10 changes: 6 additions & 4 deletions internal/cmd/fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var Cmd = cobra.Command{
var (
ledgerDir string
trackerURL string
group string
minSnapAge uint64
maxSnapAge uint64
baseSlot uint64
Expand All @@ -62,6 +63,7 @@ func init() {
flags := Cmd.Flags()
flags.StringVar(&ledgerDir, "ledger", "", "Path to ledger dir")
flags.StringVar(&trackerURL, "tracker", "", "Download as instructed by given tracker URL")
flags.StringVar(&group, "group", "", "Download from specified group")
flags.Uint64Var(&minSnapAge, "min-slots", 500, "Download only snapshots <n> slots newer than local")
flags.Uint64Var(&maxSnapAge, "max-slots", 10000, "Refuse to download <n> slots older than the newest")
flags.DurationVar(&requestTimeout, "request-timeout", 3*time.Second, "Max time to wait for headers (excluding download)")
Expand Down Expand Up @@ -109,7 +111,7 @@ func run() {
log.Info("Fetching snapshots at slot", zap.Uint64("base_slot", baseSlot))

// Ask tracker for snapshots at a specific location
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, baseSlot)
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, group, baseSlot)
if err != nil {
log.Fatal("Failed to fetch snapshot info", zap.Error(err))
}
Expand All @@ -122,7 +124,7 @@ func run() {
log.Info("Finding best snapshot")

// Ask tracker for best snapshots.
remoteSnaps, err = trackerClient.GetBestSnapshots(ctx, -1)
remoteSnaps, err = trackerClient.GetBestSnapshots(ctx, group, -1)
if err != nil {
log.Fatal("Failed to request snapshot info", zap.Error(err))
}
Expand All @@ -145,7 +147,7 @@ func run() {
// If we are not fetching a full snapshot and the base slot isn't matching
// we need to fetch an older incremental snapshot.
log.Info("Full snapshot is newer than local, but not requested")
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, localSnaps[0].BaseSlot)
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, group, localSnaps[0].BaseSlot)
if err != nil {
log.Fatal("Failed to request snapshot info", zap.Error(err))
}
Expand Down Expand Up @@ -204,7 +206,7 @@ func run() {
// If we were downloading a full snapshot, check if there's a newer incremental snapshot we can fetch
// Find latest incremental snapshot
log.Info("Finding incremental snapshot for full slot", zap.Uint64("base_slot", snap.BaseSlot))
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, snap.BaseSlot)
remoteSnaps, err = trackerClient.GetSnapshotAtSlot(ctx, group, snap.BaseSlot)
if err != nil {
log.Fatal("Failed to request snapshot info", zap.Error(err))
}
Expand Down
6 changes: 4 additions & 2 deletions internal/fetch/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ func NewTrackerClientWithResty(client *resty.Client) *TrackerClient {
return &TrackerClient{resty: client}
}

func (c *TrackerClient) GetBestSnapshots(ctx context.Context, count int) (sources []types.SnapshotSource, err error) {
func (c *TrackerClient) GetBestSnapshots(ctx context.Context, group string, count int) (sources []types.SnapshotSource, err error) {
res, err := c.resty.R().
SetContext(ctx).
SetHeader("accept", "application/json").
SetQueryParam("max", strconv.Itoa(count)).
SetQueryParam("group", group).
SetResult(&sources).
Get("/v1/best_snapshots")
if err != nil {
Expand All @@ -54,11 +55,12 @@ func (c *TrackerClient) GetBestSnapshots(ctx context.Context, count int) (source
return
}

func (c *TrackerClient) GetSnapshotAtSlot(ctx context.Context, slot uint64) (sources []types.SnapshotSource, err error) {
func (c *TrackerClient) GetSnapshotAtSlot(ctx context.Context, group string, slot uint64) (sources []types.SnapshotSource, err error) {
res, err := c.resty.R().
SetContext(ctx).
SetHeader("accept", "application/json").
SetQueryParam("slot", strconv.FormatUint(slot, 10)).
SetQueryParam("group", group).
SetResult(&sources).
Get("/v1/snapshots")
if err != nil {
Expand Down
49 changes: 39 additions & 10 deletions internal/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (d *DB) UpsertSnapshots(entries ...*SnapshotEntry) {

// GetSnapshotsByTarget returns all snapshots served by a host
// ordered by newest to oldest.
func (d *DB) GetSnapshotsByTarget(target string) (entries []*SnapshotEntry) {
res, err := d.DB.Txn(false).Get(tableSnapshotEntry, "id_prefix", target)
func (d *DB) GetSnapshotsByTarget(group string, target string) (entries []*SnapshotEntry) {
res, err := d.DB.Txn(false).Get(tableSnapshotEntry, "id_prefix", group, target)
if err != nil {
panic("getting snapshots by target failed: " + err.Error())
}
Expand All @@ -67,9 +67,8 @@ func (d *DB) GetSnapshotsByTarget(target string) (entries []*SnapshotEntry) {
return
}

// GetAllSnapshots returns a list of all snapshots.
func (d *DB) GetAllSnapshots() (entries []*SnapshotEntry) {
iter, err := d.DB.Txn(false).LowerBound(tableSnapshotEntry, "id", "", uint64(0))
func (d *DB) GetAllSnapshotsByGroup(group string) (entries []*SnapshotEntry) {
iter, err := d.DB.Txn(false).LowerBound(tableSnapshotEntry, "id", "", "", uint64(0))
if err != nil {
panic("getting best snapshots failed: " + err.Error())
}
Expand All @@ -78,16 +77,25 @@ func (d *DB) GetAllSnapshots() (entries []*SnapshotEntry) {
if el == nil {
break
}
if group != "" && el.(*SnapshotEntry).Group != group {
continue
}
entries = append(entries, el.(*SnapshotEntry))
}
return
}

// GetAllSnapshots returns a list of all snapshots.
func (d *DB) GetAllSnapshots() (entries []*SnapshotEntry) {
return d.GetAllSnapshotsByGroup("")
}

// GetBestSnapshots returns newest-to-oldest snapshots.
// The `max` argument controls the max number of snapshots to return.
// If max is negative, it returns all snapshots.
func (d *DB) GetBestSnapshots(max int) (entries []*SnapshotEntry) {
func (d *DB) GetBestSnapshotsByGroup(group string, max int) (entries []*SnapshotEntry) {
res, err := d.DB.Txn(false).Get(tableSnapshotEntry, "slot")

if err != nil {
panic("getting best snapshots failed: " + err.Error())
}
Expand All @@ -96,14 +104,25 @@ func (d *DB) GetBestSnapshots(max int) (entries []*SnapshotEntry) {
if entry == nil {
break
}
if group != "" && entry.(*SnapshotEntry).Group != group {
continue
}
entries = append(entries, entry.(*SnapshotEntry))
}
return
}

// Fetches the snapshots that are at a given slot.
func (d *DB) GetSnapshotsAtSlot(slot uint64) (entries []*SnapshotEntry) {
res, err := d.DB.Txn(false).Get(tableSnapshotEntry, "base_slot", slot)
func (d *DB) GetSnapshotsAtSlotByGroup(group string, slot uint64) (entries []*SnapshotEntry) {
var res memdb.ResultIterator
var err error

if group != "" {
res, err = d.DB.Txn(false).Get(tableSnapshotEntry, "base_slot_by_group", slot, group)
} else {
res, err = d.DB.Txn(false).Get(tableSnapshotEntry, "base_slot", slot)
}

if err != nil {
panic("getting best snapshots failed: " + err.Error())
}
Expand All @@ -114,6 +133,16 @@ func (d *DB) GetSnapshotsAtSlot(slot uint64) (entries []*SnapshotEntry) {
return
}

// Fetches the best snapshots
func (d *DB) GetBestSnapshots(max int) (entries []*SnapshotEntry) {
return d.GetBestSnapshotsByGroup("", max)
}

// Fetches the snapshots that are at a given slot.
func (d *DB) GetSnapshotsAtSlot(slot uint64) (entries []*SnapshotEntry) {
return d.GetSnapshotsAtSlotByGroup("", slot)
}

// DeleteOldSnapshots delete snapshot entry older than the given timestamp.
func (d *DB) DeleteOldSnapshots(minTime time.Time) (n int) {
txn := d.DB.Txn(true)
Expand All @@ -140,10 +169,10 @@ func (d *DB) DeleteOldSnapshots(minTime time.Time) (n int) {

// DeleteSnapshotsByTarget deletes all snapshots owned by a given target.
// Returns the number of deletions made.
func (d *DB) DeleteSnapshotsByTarget(target string) int {
func (d *DB) DeleteSnapshotsByTarget(group string, target string) int {
txn := d.DB.Txn(true)
defer txn.Abort()
n, err := txn.DeleteAll(tableSnapshotEntry, "id_prefix", target)
n, err := txn.DeleteAll(tableSnapshotEntry, "id_prefix", group, target)
if err != nil {
panic("failed to delete snapshots by target: " + err.Error())
}
Expand Down
88 changes: 70 additions & 18 deletions internal/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
var dummyTime1 = time.Date(2022, 4, 27, 15, 33, 20, 0, time.UTC)

var snapshotEntry1 = &SnapshotEntry{
SnapshotKey: NewSnapshotKey("host1", 100, 100),
SnapshotKey: NewSnapshotKey("mainnet", "host1", 100, 100),
UpdatedAt: dummyTime1,
Info: &types.SnapshotInfo{
Slot: 100,
Expand All @@ -37,7 +37,7 @@ var snapshotEntry1 = &SnapshotEntry{
}

var snapshotEntry2 = &SnapshotEntry{
SnapshotKey: NewSnapshotKey("host1", 99, 99),
SnapshotKey: NewSnapshotKey("mainnet", "host1", 99, 99),
UpdatedAt: dummyTime1.Add(-20 * time.Second),
Info: &types.SnapshotInfo{
Slot: 99,
Expand All @@ -48,7 +48,7 @@ var snapshotEntry2 = &SnapshotEntry{
}

var snapshotEntry3 = &SnapshotEntry{
SnapshotKey: NewSnapshotKey("host2", 100, 100),
SnapshotKey: NewSnapshotKey("mainnet", "host2", 100, 100),
UpdatedAt: dummyTime1,
Info: &types.SnapshotInfo{
Slot: 100,
Expand All @@ -58,24 +58,35 @@ var snapshotEntry3 = &SnapshotEntry{
},
}

var snapshotEntry4 = &SnapshotEntry{
SnapshotKey: NewSnapshotKey("devnet", "host1", 100, 100),
UpdatedAt: dummyTime1,
Info: &types.SnapshotInfo{
Slot: 151,
Hash: solana.Hash{0x03},
Files: []*types.SnapshotFile{},
TotalSize: 0,
},
}

func TestDB(t *testing.T) {
db := NewDB()

assert.Equal(t, 0, db.DeleteSnapshotsByTarget("host1"))
assert.Equal(t, 0, db.DeleteSnapshotsByTarget("host2"))
assert.Equal(t, 0, db.DeleteSnapshotsByTarget("mainnet", "host1"))
assert.Equal(t, 0, db.DeleteSnapshotsByTarget("mainnet", "host2"))

assert.Len(t, db.GetSnapshotsByTarget("host1"), 0)
assert.Len(t, db.GetSnapshotsByTarget("host2"), 0)
assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host1"), 0)
assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host2"), 0)
assert.Len(t, db.GetBestSnapshots(-1), 0)

db.UpsertSnapshots(snapshotEntry1)
assert.Len(t, db.GetSnapshotsByTarget("host1"), 1)
assert.Len(t, db.GetSnapshotsByTarget("host2"), 0)
assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host1"), 1)
assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host2"), 0)
assert.Len(t, db.GetBestSnapshots(-1), 1)

db.UpsertSnapshots(snapshotEntry1, snapshotEntry2)
assert.Len(t, db.GetSnapshotsByTarget("host1"), 2)
assert.Len(t, db.GetSnapshotsByTarget("host2"), 0)
assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host1"), 2)
assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host2"), 0)
assert.Equal(t,
[]*SnapshotEntry{
snapshotEntry1,
Expand All @@ -84,34 +95,75 @@ func TestDB(t *testing.T) {
db.GetBestSnapshots(-1))

db.UpsertSnapshots(snapshotEntry2, snapshotEntry3)
assert.Len(t, db.GetSnapshotsByTarget("host1"), 2)
assert.Len(t, db.GetSnapshotsByTarget("host2"), 1)
assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host1"), 2)
assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host2"), 1)
assert.Equal(t,
[]*SnapshotEntry{
snapshotEntry1,
snapshotEntry3,
snapshotEntry2,
},
db.GetBestSnapshots(-1))

// Add a devnet target and ensure the result is the same
db.UpsertSnapshots(snapshotEntry4)
assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host1"), 2)
assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host2"), 1)
assert.Equal(t,
[]*SnapshotEntry{
snapshotEntry4,
snapshotEntry1,
snapshotEntry3,
snapshotEntry2,
},
db.GetBestSnapshots(-1))

assert.Equal(t, 2, db.DeleteSnapshotsByTarget("host1"))
assert.Len(t, db.GetSnapshotsByTarget("host1"), 0)
assert.Len(t, db.GetSnapshotsByTarget("host2"), 1)
assert.Equal(t, 2, db.DeleteSnapshotsByTarget("mainnet", "host1"))
assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host1"), 0)
assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host2"), 1)
assert.Equal(t,
[]*SnapshotEntry{
snapshotEntry4,
snapshotEntry3,
},
db.GetBestSnapshots(-1))

db.UpsertSnapshots(snapshotEntry1, snapshotEntry2)

assert.Equal(t, 1, db.DeleteOldSnapshots(snapshotEntry2.UpdatedAt.Add(time.Second)))
assert.Len(t, db.GetSnapshotsByTarget("host1"), 1)
assert.Len(t, db.GetSnapshotsByTarget("host2"), 1)
assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host1"), 1)
assert.Len(t, db.GetSnapshotsByTarget("mainnet", "host2"), 1)
assert.Equal(t,
[]*SnapshotEntry{
snapshotEntry4,
snapshotEntry1,
snapshotEntry3,
},
db.GetBestSnapshots(-1))

assert.Equal(t,
[]*SnapshotEntry{
snapshotEntry4,
},
db.GetBestSnapshotsByGroup("devnet", -1))

assert.Equal(t,
[]*SnapshotEntry{
snapshotEntry4,
},
db.GetAllSnapshotsByGroup("devnet"))

assert.Equal(t,
[]*SnapshotEntry{
snapshotEntry1,
snapshotEntry3,
},
db.GetBestSnapshotsByGroup("mainnet", -1))

assert.Equal(t,
[]*SnapshotEntry{
snapshotEntry1,
snapshotEntry3,
},
db.GetAllSnapshotsByGroup("mainnet"))
}
11 changes: 11 additions & 0 deletions internal/index/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var schema = memdb.DBSchema{
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: "Group"},
&memdb.StringFieldIndex{Field: "Target"},
&memdb.UintFieldIndex{Field: "InverseSlot"},
},
Expand All @@ -44,6 +45,16 @@ var schema = memdb.DBSchema{
Unique: false,
Indexer: &memdb.UintFieldIndex{Field: "BaseSlot"},
},
"base_slot_by_group": {
Name: "base_slot_by_group",
Unique: false,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{Field: "Group"},
&memdb.UintFieldIndex{Field: "BaseSlot"},
},
},
},
},
},
},
Expand Down
4 changes: 3 additions & 1 deletion internal/index/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ type SnapshotEntry struct {
}

type SnapshotKey struct {
Group string `json:"group"`
Target string `json:"target"`
InverseSlot uint64 `json:"inverse_slot"` // newest-to-oldest sort
BaseSlot uint64 `json:"base_slot"`
}

func NewSnapshotKey(target string, slot uint64, base_slot uint64) SnapshotKey {
func NewSnapshotKey(group string, target string, slot uint64, base_slot uint64) SnapshotKey {
return SnapshotKey{
Group: group,
Target: target,
InverseSlot: ^slot,
BaseSlot: base_slot,
Expand Down
Loading

0 comments on commit d5fa2cb

Please sign in to comment.