Skip to content

Commit

Permalink
Fix floating point encoding, add tests for NewIndexedValue
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Jan 2, 2025
1 parent 81edc0a commit 0bab544
Show file tree
Hide file tree
Showing 7 changed files with 566 additions and 10 deletions.
11 changes: 11 additions & 0 deletions pkg/solana/logpoller/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type filters struct {
loadedFilters atomic.Bool
knownPrograms map[string]uint // fast lookup to see if a base58-encoded ProgramID matches any registered filters
knownDiscriminators map[string]uint // fast lookup by first 10 characters (60-bits) of a base64-encoded discriminator
seqNums map[int64]int64
}

func newFilters(lggr logger.SugaredLogger, orm ORM) *filters {
Expand All @@ -38,6 +39,11 @@ func newFilters(lggr logger.SugaredLogger, orm ORM) *filters {
}
}

func (fl *filters) IncrementSeqNums(filterID int64) int64 {
fl.seqNums[filterID]++
return fl.seqNums[filterID]
}

// PruneFilters - prunes all filters marked to be deleted from the database and all corresponding logs.
func (fl *filters) PruneFilters(ctx context.Context) error {
err := fl.LoadFilters(ctx)
Expand Down Expand Up @@ -385,6 +391,11 @@ func (fl *filters) LoadFilters(ctx context.Context) error {
}
}

fl.seqNums, err = fl.orm.SelectSeqNums(ctx)
if err != nil {
return fmt.Errorf("failed to select sequence numbers from db: %w", err)
}

fl.loadedFilters.Store(true)

return nil
Expand Down
6 changes: 2 additions & 4 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ORM interface {
MarkFilterDeleted(ctx context.Context, id int64) (err error)
MarkFilterBackfilled(ctx context.Context, id int64) (err error)
InsertLogs(context.Context, []Log) (err error)
SelectSeqNums(ctx context.Context) (map[int64]int64, error)
}

type ILogPoller interface {
Expand Down Expand Up @@ -134,10 +135,7 @@ func (lp *LogPoller) Process(programEvent ProgramEvent) (err error) {
subKeyValues = append(subKeyValues, indexedVal)
}

lp.seqNums[filter.ID]++
log.SequenceNum = lp.seqNums

// TODO: fill in, and keep track of SequenceNumber for each filter. (Initialize from db on LoadFilters, then increment each time?)
log.SequenceNum = lp.filters.IncrementSeqNums(filter.ID)

expiresAt := time.Now() // TODO: account for possible discrepencies in time? Seems like retention should be passed directly to ORM
expiresAt.Add(filter.Retention)
Expand Down
58 changes: 58 additions & 0 deletions pkg/solana/logpoller/mock_orm.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions pkg/solana/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,13 @@ func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address Public
}
return logs, nil
}

func (o *DSORM) SelectSeqNums(ctx context.Context) (map[int64]int64, error) {
seqNums := make(map[int64]int64)
query := "SELECT id, MAX(sequence_num) FROM solana.logs WHERE chain_id=%s GROUP BY id"
err := o.ds.SelectContext(ctx, &seqNums, query, o.chainID)
if err != nil {
return nil, err
}
return seqNums, nil
}
Loading

0 comments on commit 0bab544

Please sign in to comment.