diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go index 1458abe39..93af60a85 100644 --- a/pkg/solana/logpoller/filters.go +++ b/pkg/solana/logpoller/filters.go @@ -29,6 +29,7 @@ type filters struct { loadedFilters atomic.Bool knownPrograms map[string]uint // fast lookup to see if a base58-encoded ProgramID matches any registered filters knownDiscriminators map[string]uint // fast lookup by first 10 characters (60-bits) of a base64-encoded discriminator + seqNums map[int64]int64 } func newFilters(lggr logger.SugaredLogger, orm ORM) *filters { @@ -38,6 +39,11 @@ func newFilters(lggr logger.SugaredLogger, orm ORM) *filters { } } +func (fl *filters) IncrementSeqNums(filterID int64) int64 { + fl.seqNums[filterID]++ + return fl.seqNums[filterID] +} + // PruneFilters - prunes all filters marked to be deleted from the database and all corresponding logs. func (fl *filters) PruneFilters(ctx context.Context) error { err := fl.LoadFilters(ctx) @@ -385,6 +391,11 @@ func (fl *filters) LoadFilters(ctx context.Context) error { } } + fl.seqNums, err = fl.orm.SelectSeqNums(ctx) + if err != nil { + return fmt.Errorf("failed to select sequence numbers from db: %w", err) + } + fl.loadedFilters.Store(true) return nil diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index 2abfe06ed..e454fdc77 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -30,6 +30,7 @@ type ORM interface { MarkFilterDeleted(ctx context.Context, id int64) (err error) MarkFilterBackfilled(ctx context.Context, id int64) (err error) InsertLogs(context.Context, []Log) (err error) + SelectSeqNums(ctx context.Context) (map[int64]int64, error) } type ILogPoller interface { @@ -134,10 +135,7 @@ func (lp *LogPoller) Process(programEvent ProgramEvent) (err error) { subKeyValues = append(subKeyValues, indexedVal) } - lp.seqNums[filter.ID]++ - log.SequenceNum = lp.seqNums - - // TODO: fill in, and keep track of SequenceNumber for each filter. (Initialize from db on LoadFilters, then increment each time?) + log.SequenceNum = lp.filters.IncrementSeqNums(filter.ID) expiresAt := time.Now() // TODO: account for possible discrepencies in time? Seems like retention should be passed directly to ORM expiresAt.Add(filter.Retention) diff --git a/pkg/solana/logpoller/mock_orm.go b/pkg/solana/logpoller/mock_orm.go index 25bf0972e..1508ba4aa 100644 --- a/pkg/solana/logpoller/mock_orm.go +++ b/pkg/solana/logpoller/mock_orm.go @@ -369,6 +369,64 @@ func (_c *mockORM_SelectFilters_Call) RunAndReturn(run func(context.Context) ([] return _c } +// SelectSeqNums provides a mock function with given fields: ctx +func (_m *mockORM) SelectSeqNums(ctx context.Context) (map[int64]int64, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for SelectSeqNums") + } + + var r0 map[int64]int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (map[int64]int64, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) map[int64]int64); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]int64) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// mockORM_SelectSeqNums_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SelectSeqNums' +type mockORM_SelectSeqNums_Call struct { + *mock.Call +} + +// SelectSeqNums is a helper method to define mock.On call +// - ctx context.Context +func (_e *mockORM_Expecter) SelectSeqNums(ctx interface{}) *mockORM_SelectSeqNums_Call { + return &mockORM_SelectSeqNums_Call{Call: _e.mock.On("SelectSeqNums", ctx)} +} + +func (_c *mockORM_SelectSeqNums_Call) Run(run func(ctx context.Context)) *mockORM_SelectSeqNums_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *mockORM_SelectSeqNums_Call) Return(_a0 map[int64]int64, _a1 error) *mockORM_SelectSeqNums_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *mockORM_SelectSeqNums_Call) RunAndReturn(run func(context.Context) (map[int64]int64, error)) *mockORM_SelectSeqNums_Call { + _c.Call.Return(run) + return _c +} + // newMockORM creates a new instance of mockORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func newMockORM(t interface { diff --git a/pkg/solana/logpoller/orm.go b/pkg/solana/logpoller/orm.go index 97f964a31..7371ed36d 100644 --- a/pkg/solana/logpoller/orm.go +++ b/pkg/solana/logpoller/orm.go @@ -204,3 +204,13 @@ func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address Public } return logs, nil } + +func (o *DSORM) SelectSeqNums(ctx context.Context) (map[int64]int64, error) { + seqNums := make(map[int64]int64) + query := "SELECT id, MAX(sequence_num) FROM solana.logs WHERE chain_id=%s GROUP BY id" + err := o.ds.SelectContext(ctx, &seqNums, query, o.chainID) + if err != nil { + return nil, err + } + return seqNums, nil +} diff --git a/pkg/solana/logpoller/pkg/solana/logpoller/mock_orm.go b/pkg/solana/logpoller/pkg/solana/logpoller/mock_orm.go new file mode 100644 index 000000000..1508ba4aa --- /dev/null +++ b/pkg/solana/logpoller/pkg/solana/logpoller/mock_orm.go @@ -0,0 +1,442 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package logpoller + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// mockORM is an autogenerated mock type for the ORM type +type mockORM struct { + mock.Mock +} + +type mockORM_Expecter struct { + mock *mock.Mock +} + +func (_m *mockORM) EXPECT() *mockORM_Expecter { + return &mockORM_Expecter{mock: &_m.Mock} +} + +// ChainID provides a mock function with given fields: +func (_m *mockORM) ChainID() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ChainID") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// mockORM_ChainID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ChainID' +type mockORM_ChainID_Call struct { + *mock.Call +} + +// ChainID is a helper method to define mock.On call +func (_e *mockORM_Expecter) ChainID() *mockORM_ChainID_Call { + return &mockORM_ChainID_Call{Call: _e.mock.On("ChainID")} +} + +func (_c *mockORM_ChainID_Call) Run(run func()) *mockORM_ChainID_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *mockORM_ChainID_Call) Return(_a0 string) *mockORM_ChainID_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *mockORM_ChainID_Call) RunAndReturn(run func() string) *mockORM_ChainID_Call { + _c.Call.Return(run) + return _c +} + +// DeleteFilters provides a mock function with given fields: ctx, filters +func (_m *mockORM) DeleteFilters(ctx context.Context, filters map[int64]Filter) error { + ret := _m.Called(ctx, filters) + + if len(ret) == 0 { + panic("no return value specified for DeleteFilters") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, map[int64]Filter) error); ok { + r0 = rf(ctx, filters) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// mockORM_DeleteFilters_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteFilters' +type mockORM_DeleteFilters_Call struct { + *mock.Call +} + +// DeleteFilters is a helper method to define mock.On call +// - ctx context.Context +// - filters map[int64]Filter +func (_e *mockORM_Expecter) DeleteFilters(ctx interface{}, filters interface{}) *mockORM_DeleteFilters_Call { + return &mockORM_DeleteFilters_Call{Call: _e.mock.On("DeleteFilters", ctx, filters)} +} + +func (_c *mockORM_DeleteFilters_Call) Run(run func(ctx context.Context, filters map[int64]Filter)) *mockORM_DeleteFilters_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(map[int64]Filter)) + }) + return _c +} + +func (_c *mockORM_DeleteFilters_Call) Return(_a0 error) *mockORM_DeleteFilters_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *mockORM_DeleteFilters_Call) RunAndReturn(run func(context.Context, map[int64]Filter) error) *mockORM_DeleteFilters_Call { + _c.Call.Return(run) + return _c +} + +// InsertFilter provides a mock function with given fields: ctx, filter +func (_m *mockORM) InsertFilter(ctx context.Context, filter Filter) (int64, error) { + ret := _m.Called(ctx, filter) + + if len(ret) == 0 { + panic("no return value specified for InsertFilter") + } + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, Filter) (int64, error)); ok { + return rf(ctx, filter) + } + if rf, ok := ret.Get(0).(func(context.Context, Filter) int64); ok { + r0 = rf(ctx, filter) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context, Filter) error); ok { + r1 = rf(ctx, filter) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// mockORM_InsertFilter_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InsertFilter' +type mockORM_InsertFilter_Call struct { + *mock.Call +} + +// InsertFilter is a helper method to define mock.On call +// - ctx context.Context +// - filter Filter +func (_e *mockORM_Expecter) InsertFilter(ctx interface{}, filter interface{}) *mockORM_InsertFilter_Call { + return &mockORM_InsertFilter_Call{Call: _e.mock.On("InsertFilter", ctx, filter)} +} + +func (_c *mockORM_InsertFilter_Call) Run(run func(ctx context.Context, filter Filter)) *mockORM_InsertFilter_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(Filter)) + }) + return _c +} + +func (_c *mockORM_InsertFilter_Call) Return(id int64, err error) *mockORM_InsertFilter_Call { + _c.Call.Return(id, err) + return _c +} + +func (_c *mockORM_InsertFilter_Call) RunAndReturn(run func(context.Context, Filter) (int64, error)) *mockORM_InsertFilter_Call { + _c.Call.Return(run) + return _c +} + +// InsertLogs provides a mock function with given fields: _a0, _a1 +func (_m *mockORM) InsertLogs(_a0 context.Context, _a1 []Log) error { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for InsertLogs") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []Log) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// mockORM_InsertLogs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InsertLogs' +type mockORM_InsertLogs_Call struct { + *mock.Call +} + +// InsertLogs is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 []Log +func (_e *mockORM_Expecter) InsertLogs(_a0 interface{}, _a1 interface{}) *mockORM_InsertLogs_Call { + return &mockORM_InsertLogs_Call{Call: _e.mock.On("InsertLogs", _a0, _a1)} +} + +func (_c *mockORM_InsertLogs_Call) Run(run func(_a0 context.Context, _a1 []Log)) *mockORM_InsertLogs_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]Log)) + }) + return _c +} + +func (_c *mockORM_InsertLogs_Call) Return(err error) *mockORM_InsertLogs_Call { + _c.Call.Return(err) + return _c +} + +func (_c *mockORM_InsertLogs_Call) RunAndReturn(run func(context.Context, []Log) error) *mockORM_InsertLogs_Call { + _c.Call.Return(run) + return _c +} + +// MarkFilterBackfilled provides a mock function with given fields: ctx, id +func (_m *mockORM) MarkFilterBackfilled(ctx context.Context, id int64) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for MarkFilterBackfilled") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// mockORM_MarkFilterBackfilled_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MarkFilterBackfilled' +type mockORM_MarkFilterBackfilled_Call struct { + *mock.Call +} + +// MarkFilterBackfilled is a helper method to define mock.On call +// - ctx context.Context +// - id int64 +func (_e *mockORM_Expecter) MarkFilterBackfilled(ctx interface{}, id interface{}) *mockORM_MarkFilterBackfilled_Call { + return &mockORM_MarkFilterBackfilled_Call{Call: _e.mock.On("MarkFilterBackfilled", ctx, id)} +} + +func (_c *mockORM_MarkFilterBackfilled_Call) Run(run func(ctx context.Context, id int64)) *mockORM_MarkFilterBackfilled_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *mockORM_MarkFilterBackfilled_Call) Return(err error) *mockORM_MarkFilterBackfilled_Call { + _c.Call.Return(err) + return _c +} + +func (_c *mockORM_MarkFilterBackfilled_Call) RunAndReturn(run func(context.Context, int64) error) *mockORM_MarkFilterBackfilled_Call { + _c.Call.Return(run) + return _c +} + +// MarkFilterDeleted provides a mock function with given fields: ctx, id +func (_m *mockORM) MarkFilterDeleted(ctx context.Context, id int64) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for MarkFilterDeleted") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// mockORM_MarkFilterDeleted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'MarkFilterDeleted' +type mockORM_MarkFilterDeleted_Call struct { + *mock.Call +} + +// MarkFilterDeleted is a helper method to define mock.On call +// - ctx context.Context +// - id int64 +func (_e *mockORM_Expecter) MarkFilterDeleted(ctx interface{}, id interface{}) *mockORM_MarkFilterDeleted_Call { + return &mockORM_MarkFilterDeleted_Call{Call: _e.mock.On("MarkFilterDeleted", ctx, id)} +} + +func (_c *mockORM_MarkFilterDeleted_Call) Run(run func(ctx context.Context, id int64)) *mockORM_MarkFilterDeleted_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *mockORM_MarkFilterDeleted_Call) Return(err error) *mockORM_MarkFilterDeleted_Call { + _c.Call.Return(err) + return _c +} + +func (_c *mockORM_MarkFilterDeleted_Call) RunAndReturn(run func(context.Context, int64) error) *mockORM_MarkFilterDeleted_Call { + _c.Call.Return(run) + return _c +} + +// SelectFilters provides a mock function with given fields: ctx +func (_m *mockORM) SelectFilters(ctx context.Context) ([]Filter, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for SelectFilters") + } + + var r0 []Filter + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]Filter, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) []Filter); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]Filter) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// mockORM_SelectFilters_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SelectFilters' +type mockORM_SelectFilters_Call struct { + *mock.Call +} + +// SelectFilters is a helper method to define mock.On call +// - ctx context.Context +func (_e *mockORM_Expecter) SelectFilters(ctx interface{}) *mockORM_SelectFilters_Call { + return &mockORM_SelectFilters_Call{Call: _e.mock.On("SelectFilters", ctx)} +} + +func (_c *mockORM_SelectFilters_Call) Run(run func(ctx context.Context)) *mockORM_SelectFilters_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *mockORM_SelectFilters_Call) Return(_a0 []Filter, _a1 error) *mockORM_SelectFilters_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *mockORM_SelectFilters_Call) RunAndReturn(run func(context.Context) ([]Filter, error)) *mockORM_SelectFilters_Call { + _c.Call.Return(run) + return _c +} + +// SelectSeqNums provides a mock function with given fields: ctx +func (_m *mockORM) SelectSeqNums(ctx context.Context) (map[int64]int64, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for SelectSeqNums") + } + + var r0 map[int64]int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (map[int64]int64, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) map[int64]int64); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[int64]int64) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// mockORM_SelectSeqNums_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SelectSeqNums' +type mockORM_SelectSeqNums_Call struct { + *mock.Call +} + +// SelectSeqNums is a helper method to define mock.On call +// - ctx context.Context +func (_e *mockORM_Expecter) SelectSeqNums(ctx interface{}) *mockORM_SelectSeqNums_Call { + return &mockORM_SelectSeqNums_Call{Call: _e.mock.On("SelectSeqNums", ctx)} +} + +func (_c *mockORM_SelectSeqNums_Call) Run(run func(ctx context.Context)) *mockORM_SelectSeqNums_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *mockORM_SelectSeqNums_Call) Return(_a0 map[int64]int64, _a1 error) *mockORM_SelectSeqNums_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *mockORM_SelectSeqNums_Call) RunAndReturn(run func(context.Context) (map[int64]int64, error)) *mockORM_SelectSeqNums_Call { + _c.Call.Return(run) + return _c +} + +// newMockORM creates a new instance of mockORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func newMockORM(t interface { + mock.TestingT + Cleanup(func()) +}) *mockORM { + mock := &mockORM{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/solana/logpoller/types.go b/pkg/solana/logpoller/types.go index 9ec1a4e01..938fb12f3 100644 --- a/pkg/solana/logpoller/types.go +++ b/pkg/solana/logpoller/types.go @@ -159,16 +159,18 @@ func (v *IndexedValue) FromInt64(i int64) { v.FromUint64(uint64(i + math.MaxInt64)) } -func (v *IndexedValue) FromUint64(u uint64) []byte { - var b []byte - binary.BigEndian.PutUint64(b, u) +func (v *IndexedValue) FromUint64(u uint64) { + *v = make([]byte, 8) + binary.BigEndian.PutUint64(*v, u) } func (v *IndexedValue) FromFloat64(f float64) { - if f >= 0 { - v.FromUint64(math.Float64bits(f)) + if f > 0 { + v.FromUint64(math.Float64bits(f) + math.MaxInt64) + return } - v.FromUint64(math.Float64bits(math.Abs(f))) + v.FromUint64(math.MaxInt64 - math.Float64bits(f)) + return } func NewIndexedValue(typedVal any) (iVal IndexedValue, err error) { diff --git a/pkg/solana/logpoller/types_test.go b/pkg/solana/logpoller/types_test.go index 263c22bab..c60ca4794 100644 --- a/pkg/solana/logpoller/types_test.go +++ b/pkg/solana/logpoller/types_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/gagliardetto/solana-go" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -18,3 +19,37 @@ func newRandomEventSignature(t *testing.T) EventSignature { pubKey := newRandomPublicKey(t) return EventSignature(pubKey[:8]) } + +func TestIndexedValue(t *testing.T) { + cases := []struct { + typeName string + lower any + higher any + }{ + {"int32", int32(-5), int32(5)}, + {"int32", int32(-8), int32(-5)}, + {"int32", int32(5), int32(8)}, + {"int64", int64(-5), int64(5)}, + {"int64", int64(-8), int64(-5)}, + {"int64", int64(5), int64(8)}, + {"float32", float32(-5), float32(5)}, + {"float32", float32(-8), float32(-5)}, + {"float32", float32(5), float32(8)}, + {"float64", float64(-5), float64(5)}, + {"float64", float64(-8), float64(-5)}, + {"float64", float64(5), float64(8)}, + {"string", "abcc", "abcd"}, + {"string", "abcd", "abcdef"}, + {"[]byte", []byte("abcc"), []byte("abcd")}, + {"[]byte", []byte("abcd"), []byte("abcdef")}, + } + for _, c := range cases { + t.Run(c.typeName, func(t *testing.T) { + iVal1, err := NewIndexedValue(c.lower) + require.NoError(t, err) + iVal2, err := NewIndexedValue(c.higher) + require.NoError(t, err) + assert.Less(t, iVal1, iVal2) + }) + } +}