From ee63576ee6e57148dd8b68ff3ab3cf558c458bc0 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 21 Nov 2024 15:32:30 +0100 Subject: [PATCH 1/8] implement register unregister filter operations --- pkg/solana/logpoller/log_poller.go | 317 ++++++++++++++++++++++++ pkg/solana/logpoller/log_poller_test.go | 221 +++++++++++++++++ pkg/solana/logpoller/mock_orm.go | 140 +++++++++++ pkg/solana/logpoller/models.go | 5 +- pkg/solana/logpoller/orm.go | 37 ++- pkg/solana/logpoller/orm_test.go | 118 ++++++--- pkg/solana/logpoller/parser.go | 3 +- pkg/solana/logpoller/query.go | 8 +- pkg/solana/logpoller/types.go | 14 ++ 9 files changed, 815 insertions(+), 48 deletions(-) create mode 100644 pkg/solana/logpoller/log_poller.go create mode 100644 pkg/solana/logpoller/log_poller_test.go create mode 100644 pkg/solana/logpoller/mock_orm.go diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go new file mode 100644 index 000000000..fd1c207b8 --- /dev/null +++ b/pkg/solana/logpoller/log_poller.go @@ -0,0 +1,317 @@ +package logpoller + +import ( + "context" + "errors" + "fmt" + "iter" + "slices" + "sync" + "sync/atomic" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +var ( + ErrFilterNameConflict = errors.New("filter with such name already exists") +) + +//go:generate mockery --name ORM --inpackage --structname mockORM --filename mock_orm.go +type ORM interface { + InsertFilter(ctx context.Context, filter Filter) (id int64, err error) + SelectFilters(ctx context.Context) ([]Filter, error) + DeleteFilters(ctx context.Context, filters []Filter) error + MarkFilterDeleted(ctx context.Context, id int64) (err error) +} + +type LogPoller struct { + services.StateMachine + lggr logger.SugaredLogger + orm ORM + + filtersByName map[string]Filter + filtersByAddress map[PublicKey]map[EventSignature][]Filter + filtersToBackfill []Filter + filtersToDelete []Filter // populated on start from db and pruned on first iteration of run + filtersMutex sync.RWMutex + loadedFilters atomic.Bool + + chStop services.StopChan + wg sync.WaitGroup +} + +func NewLogPoller(lggr logger.SugaredLogger, orm ORM) *LogPoller { + return &LogPoller{ + orm: orm, + lggr: logger.Sugared(logger.Named(lggr, "LogPoller")), + + filtersByName: make(map[string]Filter), + filtersByAddress: map[PublicKey]map[EventSignature][]Filter{}, + } +} + +func (lp *LogPoller) Start(context.Context) error { + return lp.StartOnce("LogPoller", func() error { + lp.wg.Add(2) + go lp.run() + go lp.backgroundWorkerRun() + return nil + }) +} + +func (lp *LogPoller) Close() error { + return lp.StopOnce("LogPoller", func() error { + close(lp.chStop) + lp.wg.Wait() + return nil + }) +} + +func (lp *LogPoller) run() { + defer lp.wg.Done() + ctx, cancel := lp.chStop.NewCtx() + defer cancel() + + var blocks chan struct { + BlockNumber int64 + Logs any // to be defined + } + + for { + select { + case <-ctx.Done(): + return + case block := <-blocks: + lp.filtersMutex.Lock() + filtersToBackfill := lp.filtersToBackfill + lp.filtersToBackfill = nil + lp.filtersMutex.Unlock() + // TODO: NONEVM-916 parse, filters and persist logs + // NOTE: removal of filters occurs in the separate goroutine, so there is a chance that upon insert + // of log corresponding filter won't be present in the db. Ensure to refilter and retry on insert error + for _, filter := range filtersToBackfill { + go lp.startFilterBackfill(ctx, filter, block.BlockNumber) + } + } + } +} + +func (lp *LogPoller) backgroundWorkerRun() { + defer lp.wg.Done() + ctx, cancel := lp.chStop.NewCtx() + defer cancel() + + pruneFilters := services.NewTicker(time.Minute + 618*time.Millisecond) // try to minimize collisions with one-second period + defer pruneFilters.Stop() + for { + select { + case <-ctx.Done(): + return + case <-pruneFilters.C: + err := lp.pruneFilters(ctx) + if err != nil { + lp.lggr.Errorw("Failed to prune filters", "err", err) + } + } + } +} + +func (lp *LogPoller) pruneFilters(ctx context.Context) error { + err := lp.loadFilters(ctx) + if err != nil { + return fmt.Errorf("failed to load filters: %w", err) + } + + lp.filtersMutex.Lock() + filtersToDelete := lp.filtersToDelete + lp.filtersToDelete = nil + lp.filtersMutex.Unlock() + + if len(filtersToDelete) == 0 { + return nil + } + + err = lp.orm.DeleteFilters(ctx, filtersToDelete) + if err != nil { + lp.filtersMutex.Lock() + defer lp.filtersMutex.Unlock() + lp.filtersToDelete = append(lp.filtersToDelete, filtersToDelete...) + return fmt.Errorf("failed to delete filters: %w", err) + } + + return nil +} + +func (lp *LogPoller) startFilterBackfill(ctx context.Context, filter Filter, toBlock int64) { + // TODO: NONEVM-916 start backfill + lp.lggr.Debugw("Starting filter backfill", "filter", filter) +} + +// RegisterFilter persists provided filter and ensures that any log emitted by a contract with filter.Address +// that matches filter.EventSig signature will be captured starting from filter.StartingBlock. +// filter.Name must be unique otherwise ErrFilterNameConflict is returned. // TODO: not sure this is a good idea. Callers are most likely going to ignore this error +// The filter may be unregistered later by Filter.Name +// Warnings/debug information is keyed by filter name. +func (lp *LogPoller) RegisterFilter(ctx context.Context, filter Filter) error { + if len(filter.Name) == 0 { + return errors.New("name is required") + } + + err := lp.loadFilters(ctx) + if err != nil { + return fmt.Errorf("failed to load filters: %w", err) + } + + lp.filtersMutex.Lock() + defer lp.filtersMutex.Unlock() + + if _, ok := lp.filtersByName[filter.Name]; ok { + return ErrFilterNameConflict + } + + filterID, err := lp.orm.InsertFilter(ctx, filter) + if err != nil { + return fmt.Errorf("failed to insert filter: %w", err) + } + + filter.ID = filterID + lp.filtersByName[filter.Name] = filter + filtersByEventSig, ok := lp.filtersByAddress[filter.Address] + if !ok { + filtersByEventSig = make(map[EventSignature][]Filter) + lp.filtersByAddress[filter.Address] = filtersByEventSig + } + + filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter) + lp.filtersToBackfill = append(lp.filtersToBackfill, filter) + return nil +} + +// UnregisterFilter will remove the filter with the given name and prune all corresponding logs. +// If the name does not exist, it will log an error but not return an error. +// Warnings/debug information is keyed by filter name. +func (lp *LogPoller) UnregisterFilter(ctx context.Context, name string) error { + err := lp.loadFilters(ctx) + if err != nil { + return fmt.Errorf("failed to load filters: %w", err) + } + + lp.filtersMutex.Lock() + defer lp.filtersMutex.Unlock() + + filter, ok := lp.filtersByName[name] + if !ok { + lp.lggr.Warnw("Filter not found in filtersByName", "name", name) + return nil + } + + if err := lp.orm.MarkFilterDeleted(ctx, filter.ID); err != nil { + return fmt.Errorf("failed to mark filter deleted: %w", err) + } + + delete(lp.filtersByName, filter.Name) + filtersByEventSig, ok := lp.filtersByAddress[filter.Address] + if !ok { + lp.lggr.Warnw("Filter not found in filtersByAddress", "name", name, "address", filter.Address) + return nil + } + + filtersByEventSig[filter.EventSig], ok = removeFilterFromSlice(filtersByEventSig[filter.EventSig], filter) + if !ok { + lp.lggr.Warnw("Filter not found in filtersByEventSig", "name", name, "address", filter.Address) + } + + if len(filtersByEventSig[filter.EventSig]) == 0 { + delete(filtersByEventSig, filter.EventSig) + } + + if len(lp.filtersByAddress[filter.Address]) == 0 { + delete(lp.filtersByAddress, filter.Address) + } + + // remove or ensure that filters was not present in the slice to backfill + lp.filtersToBackfill, _ = removeFilterFromSlice(lp.filtersToBackfill, filter) + lp.filtersToDelete = append(lp.filtersToDelete, filter) + return nil +} + +func (lp *LogPoller) loadFilters(ctx context.Context) error { + if lp.loadedFilters.Load() { + return nil + } + + lp.lggr.Debugw("Loading filters from db") + lp.filtersMutex.Lock() + defer lp.filtersMutex.Unlock() + // reset filters' indexes to ensure we do not have partial data from the previous run + lp.filtersByAddress = make(map[PublicKey]map[EventSignature][]Filter) + lp.filtersByName = make(map[string]Filter) + lp.filtersToBackfill = nil + lp.filtersToDelete = nil + + ctx, cancel := lp.chStop.Ctx(ctx) + defer cancel() + filters, err := lp.orm.SelectFilters(ctx) + if err != nil { + return fmt.Errorf("failed to select filters from db: %w", err) + } + + for _, filter := range filters { + if filter.IsDeleted { + lp.filtersToDelete = append(lp.filtersToDelete, filter) + continue + } + + if _, ok := lp.filtersByName[filter.Name]; ok { + errMsg := fmt.Sprintf("invariant violation while loading from db: expected filters to have unique name: %s ", filter.Name) + lp.lggr.Critical(errMsg) + return errors.New(errMsg) + } + + lp.filtersByName[filter.Name] = filter + filtersByEventSig, ok := lp.filtersByAddress[filter.Address] + if !ok { + filtersByEventSig = make(map[EventSignature][]Filter) + lp.filtersByAddress[filter.Address] = filtersByEventSig + } + + filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter) + lp.filtersToBackfill = append(lp.filtersToBackfill, filter) + } + + lp.loadedFilters.Store(true) + return nil +} + +func removeFilterFromSlice(filters []Filter, filter Filter) ([]Filter, bool) { + index := slices.IndexFunc(filters, func(item Filter) bool { + return item.ID == filter.ID + }) + if index == -1 { + return filters, false + } + + lastIdx := len(filters) - 1 + filters[index], filters[lastIdx] = filters[lastIdx], filters[index] + return filters[:lastIdx], true +} + +// matchingFilters - allows to iterate through filters that match provided keys +func (lp *LogPoller) matchingFilters(addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] { + return func(yield func(Filter) bool) { + lp.filtersMutex.RLock() + defer lp.filtersMutex.RUnlock() + filters, ok := lp.filtersByAddress[addr] + if !ok { + return + } + + for _, filter := range filters[eventSignature] { + if !yield(filter) { + return + } + } + } +} diff --git a/pkg/solana/logpoller/log_poller_test.go b/pkg/solana/logpoller/log_poller_test.go new file mode 100644 index 000000000..56b89dac0 --- /dev/null +++ b/pkg/solana/logpoller/log_poller_test.go @@ -0,0 +1,221 @@ +package logpoller + +import ( + "errors" + "testing" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestLogPoller_LoadFilters(t *testing.T) { + orm := newMockORM(t) + lp := NewLogPoller(logger.Sugared(logger.Test(t)), orm) + ctx := tests.Context(t) + orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once() + deleted := Filter{ + ID: 3, + Name: "Deleted", + IsDeleted: true, + } + happyPath := Filter{ + ID: 1, + Name: "Happy path", + } + happyPath2 := Filter{ + ID: 1, + Name: "Happy path 2", + } + orm.On("SelectFilters", mock.Anything).Return([]Filter{ + deleted, + happyPath, + happyPath2, + }, nil).Once() + + err := lp.loadFilters(ctx) + require.EqualError(t, err, "failed to select filters from db: db failed") + err = lp.loadFilters(ctx) + require.NoError(t, err) + // only one filter to delete + require.Len(t, lp.filtersToDelete, 1) + require.Equal(t, deleted, lp.filtersToDelete[0]) + // backfill and happy path both indexed + require.Len(t, lp.filtersByAddress, 1) + require.Len(t, lp.filtersByAddress[happyPath.Address], 1) + require.Len(t, lp.filtersByAddress[happyPath.Address][happyPath.EventSig], 2) + require.Contains(t, lp.filtersByAddress[happyPath.Address][happyPath.EventSig], happyPath) + require.Contains(t, lp.filtersByAddress[happyPath.Address][happyPath.EventSig], happyPath2) + require.Len(t, lp.filtersByName, 2) + require.Equal(t, lp.filtersByName[happyPath.Name], happyPath) + require.Equal(t, lp.filtersByName[happyPath2.Name], happyPath2) + // any call following successful should be noop + err = lp.loadFilters(ctx) + require.NoError(t, err) +} + +func TestLogPoller_RegisterFilter(t *testing.T) { + lggr := logger.Sugared(logger.Test(t)) + t.Run("Returns an error if name is empty", func(t *testing.T) { + orm := newMockORM(t) + lp := NewLogPoller(lggr, orm) + err := lp.RegisterFilter(tests.Context(t), Filter{}) + require.EqualError(t, err, "name is required") + }) + t.Run("Returns an error if fails to load filters from db", func(t *testing.T) { + orm := newMockORM(t) + lp := NewLogPoller(lggr, orm) + orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once() + err := lp.RegisterFilter(tests.Context(t), Filter{Name: "Filter"}) + require.EqualError(t, err, "failed to load filters: failed to select filters from db: db failed") + }) + t.Run("Returns an error if filter with the same name is present in db", func(t *testing.T) { + orm := newMockORM(t) + lp := NewLogPoller(lggr, orm) + const filterName = "Filter" + orm.On("SelectFilters", mock.Anything).Return([]Filter{{Name: filterName}}, nil).Once() + err := lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + require.EqualError(t, err, ErrFilterNameConflict.Error()) + }) + t.Run("Returns an error if adding the same filter twice", func(t *testing.T) { + orm := newMockORM(t) + lp := NewLogPoller(lggr, orm) + const filterName = "Filter" + orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() + orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(1), nil).Once() + err := lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + require.NoError(t, err) + err = lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + require.EqualError(t, err, ErrFilterNameConflict.Error()) + }) + t.Run("Happy path", func(t *testing.T) { + orm := newMockORM(t) + lp := NewLogPoller(lggr, orm) + const filterName = "Filter" + orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() + orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(0), errors.New("failed to insert")).Once() + err := lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + require.Error(t, err) + // can readd after db issue is resovled + orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(1), nil).Once() + err = lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + require.NoError(t, err) + }) + t.Run("Can reregister after unregister", func(t *testing.T) { + orm := newMockORM(t) + lp := NewLogPoller(lggr, orm) + const filterName = "Filter" + orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() + const filterID = int64(10) + orm.On("InsertFilter", mock.Anything, mock.Anything).Return(filterID, nil).Once() + err := lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + require.NoError(t, err) + orm.On("MarkFilterDeleted", mock.Anything, filterID).Return(nil).Once() + err = lp.UnregisterFilter(tests.Context(t), filterName) + require.NoError(t, err) + orm.On("InsertFilter", mock.Anything, mock.Anything).Return(filterID+1, nil).Once() + err = lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + require.NoError(t, err) + require.Len(t, lp.filtersToDelete, 1) + require.Contains(t, lp.filtersToDelete, Filter{Name: filterName, ID: filterID}) + require.Len(t, lp.filtersToBackfill, 1) + require.Contains(t, lp.filtersToBackfill, Filter{Name: filterName, ID: filterID + 1}) + }) +} + +func TestLogPoller_UnregisterFilter(t *testing.T) { + lggr := logger.Sugared(logger.Test(t)) + t.Run("Returns an error if fails to load filters from db", func(t *testing.T) { + orm := newMockORM(t) + lp := NewLogPoller(lggr, orm) + orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once() + err := lp.UnregisterFilter(tests.Context(t), "Filter") + require.EqualError(t, err, "failed to load filters: failed to select filters from db: db failed") + }) + t.Run("Noop if filter is not present", func(t *testing.T) { + orm := newMockORM(t) + lp := NewLogPoller(lggr, orm) + const filterName = "Filter" + orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() + err := lp.UnregisterFilter(tests.Context(t), filterName) + require.NoError(t, err) + }) + t.Run("Returns error if fails to mark filter as deleted", func(t *testing.T) { + orm := newMockORM(t) + lp := NewLogPoller(lggr, orm) + const filterName = "Filter" + const id int64 = 10 + orm.On("SelectFilters", mock.Anything).Return([]Filter{{ID: id, Name: filterName}}, nil).Once() + orm.On("MarkFilterDeleted", mock.Anything, id).Return(errors.New("db query failed")).Once() + err := lp.UnregisterFilter(tests.Context(t), filterName) + require.EqualError(t, err, "failed to mark filter deleted: db query failed") + }) + t.Run("Happy path", func(t *testing.T) { + orm := newMockORM(t) + lp := NewLogPoller(lggr, orm) + const filterName = "Filter" + const id int64 = 10 + orm.On("SelectFilters", mock.Anything).Return([]Filter{{ID: id, Name: filterName}}, nil).Once() + orm.On("MarkFilterDeleted", mock.Anything, id).Return(nil).Once() + err := lp.UnregisterFilter(tests.Context(t), filterName) + require.NoError(t, err) + require.Len(t, lp.filtersToDelete, 1) + require.Len(t, lp.filtersToBackfill, 0) + require.Len(t, lp.filtersByName, 0) + require.Len(t, lp.filtersByAddress, 0) + }) +} + +func TestLogPoller_pruneFilters(t *testing.T) { + lggr := logger.Sugared(logger.Test(t)) + t.Run("Happy path", func(t *testing.T) { + orm := newMockORM(t) + lp := NewLogPoller(lggr, orm) + toDelete := Filter{ + ID: 1, + Name: "To delete", + IsDeleted: true, + } + orm.On("SelectFilters", mock.Anything).Return([]Filter{ + toDelete, + { + ID: 2, + Name: "To keep", + }, + }, nil).Once() + orm.On("DeleteFilters", mock.Anything, []Filter{toDelete}).Return(nil).Once() + err := lp.pruneFilters(tests.Context(t)) + require.NoError(t, err) + require.Len(t, lp.filtersToDelete, 0) + }) + t.Run("If DB removal fails will add filters back into removal slice ", func(t *testing.T) { + orm := newMockORM(t) + lp := NewLogPoller(lggr, orm) + toDelete := Filter{ + ID: 1, + Name: "To delete", + IsDeleted: true, + } + orm.On("SelectFilters", mock.Anything).Return([]Filter{ + toDelete, + { + ID: 2, + Name: "To keep", + }, + }, nil).Once() + newToDelete := Filter{ + ID: 3, + Name: "To delete 2", + } + orm.On("DeleteFilters", mock.Anything, []Filter{toDelete}).Return(errors.New("db failed")).Run(func(_ mock.Arguments) { + orm.On("MarkFilterDeleted", mock.Anything, newToDelete.ID).Return(nil).Once() + orm.On("InsertFilter", mock.Anything, mock.Anything).Return(newToDelete.ID, nil).Once() + require.NoError(t, lp.RegisterFilter(tests.Context(t), newToDelete)) + require.NoError(t, lp.UnregisterFilter(tests.Context(t), newToDelete.Name)) + }).Once() + err := lp.pruneFilters(tests.Context(t)) + require.EqualError(t, err, "failed to delete filters: db failed") + require.Equal(t, lp.filtersToDelete, []Filter{newToDelete, toDelete}) + }) +} diff --git a/pkg/solana/logpoller/mock_orm.go b/pkg/solana/logpoller/mock_orm.go new file mode 100644 index 000000000..fd47a3271 --- /dev/null +++ b/pkg/solana/logpoller/mock_orm.go @@ -0,0 +1,140 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package logpoller + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// mockORM is an autogenerated mock type for the ORM type +type mockORM struct { + mock.Mock +} + +// DeleteFilters provides a mock function with given fields: ctx, filters +func (_m *mockORM) DeleteFilters(ctx context.Context, filters []Filter) error { + ret := _m.Called(ctx, filters) + + if len(ret) == 0 { + panic("no return value specified for DeleteFilters") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []Filter) error); ok { + r0 = rf(ctx, filters) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// InsertFilter provides a mock function with given fields: ctx, filter +func (_m *mockORM) InsertFilter(ctx context.Context, filter Filter) (int64, error) { + ret := _m.Called(ctx, filter) + + if len(ret) == 0 { + panic("no return value specified for InsertFilter") + } + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, Filter) (int64, error)); ok { + return rf(ctx, filter) + } + if rf, ok := ret.Get(0).(func(context.Context, Filter) int64); ok { + r0 = rf(ctx, filter) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context, Filter) error); ok { + r1 = rf(ctx, filter) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// SelectFilters provides a mock function with given fields: ctx +func (_m *mockORM) SelectFilters(ctx context.Context) ([]Filter, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for SelectFilters") + } + + var r0 []Filter + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]Filter, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) []Filter); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]Filter) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MarkFilterBackfilled provides a mock function with given fields: ctx, id, earliestBlock +func (_m *mockORM) MarkFilterBackfilled(ctx context.Context, id int64, earliestBlock int64) error { + ret := _m.Called(ctx, id, earliestBlock) + + if len(ret) == 0 { + panic("no return value specified for MarkFilterBackfilled") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64, int64) error); ok { + r0 = rf(ctx, id, earliestBlock) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MarkFilterDeleted provides a mock function with given fields: ctx, id +func (_m *mockORM) MarkFilterDeleted(ctx context.Context, id int64) error { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for MarkFilterDeleted") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, int64) error); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// newMockORM creates a new instance of mockORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func newMockORM(t interface { + mock.TestingT + Cleanup(func()) +}) *mockORM { + mock := &mockORM{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/solana/logpoller/models.go b/pkg/solana/logpoller/models.go index fe6092333..b04a5ae38 100644 --- a/pkg/solana/logpoller/models.go +++ b/pkg/solana/logpoller/models.go @@ -11,12 +11,13 @@ type Filter struct { Name string Address PublicKey EventName string - EventSig []byte + EventSig EventSignature StartingBlock int64 EventIDL string SubkeyPaths SubkeyPaths Retention time.Duration MaxLogsKept int64 + IsDeleted bool } type Log struct { @@ -28,7 +29,7 @@ type Log struct { BlockNumber int64 BlockTimestamp time.Time Address PublicKey - EventSig []byte + EventSig EventSignature SubkeyValues pq.ByteaArray TxHash Signature Data []byte diff --git a/pkg/solana/logpoller/orm.go b/pkg/solana/logpoller/orm.go index b9daec231..247a3d611 100644 --- a/pkg/solana/logpoller/orm.go +++ b/pkg/solana/logpoller/orm.go @@ -9,6 +9,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" ) +var _ ORM = (*DSORM)(nil) + type DSORM struct { chainID string ds sqlutil.DataSource @@ -72,13 +74,42 @@ func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (id int64, err // GetFilterByID returns filter by ID func (o *DSORM) GetFilterByID(ctx context.Context, id int64) (Filter, error) { - query := `SELECT id, name, address, event_name, event_sig, starting_block, event_idl, subkey_paths, retention, max_logs_kept - FROM solana.log_poller_filters WHERE id = $1` + query := filtersQuery("WHERE id = $1") var result Filter err := o.ds.GetContext(ctx, &result, query, id) return result, err } +func (o *DSORM) MarkFilterDeleted(ctx context.Context, id int64) (err error) { + query := `UPDATE solana.log_poller_filters SET is_deleted = true WHERE id = $1` + _, err = o.ds.ExecContext(ctx, query, id) + return err +} + +func (o *DSORM) DeleteFilter(ctx context.Context, id int64) (err error) { + query := `DELETE FROM solana.log_poller_filters WHERE id = $1` + _, err = o.ds.ExecContext(ctx, query, id) + return err +} + +func (o *DSORM) DeleteFilters(ctx context.Context, filters []Filter) error { + for _, filter := range filters { + err := o.DeleteFilter(ctx, filter.ID) + if err != nil { + return fmt.Errorf("error deleting filter %s (%d): %w", filter.Name, filter.ID, err) + } + } + + return nil +} + +func (o *DSORM) SelectFilters(ctx context.Context) ([]Filter, error) { + query := filtersQuery("WHERE chain_id = $1") + var filters []Filter + err := o.ds.SelectContext(ctx, &filters, query, o.chainID) + return filters, err +} + // InsertLogs is idempotent to support replays. func (o *DSORM) InsertLogs(ctx context.Context, logs []Log) error { if err := o.validateLogs(logs); err != nil { @@ -127,7 +158,7 @@ func (o *DSORM) validateLogs(logs []Log) error { } // SelectLogs finds the logs in a given block range. -func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address PublicKey, eventSig []byte) ([]Log, error) { +func (o *DSORM) SelectLogs(ctx context.Context, start, end int64, address PublicKey, eventSig EventSignature) ([]Log, error) { args, err := newQueryArgsForEvent(o.chainID, address, eventSig). withStartBlock(start). withEndBlock(end). diff --git a/pkg/solana/logpoller/orm_test.go b/pkg/solana/logpoller/orm_test.go index 09c91c5c7..aa78c81f2 100644 --- a/pkg/solana/logpoller/orm_test.go +++ b/pkg/solana/logpoller/orm_test.go @@ -10,7 +10,6 @@ import ( "github.com/gagliardetto/solana-go" "github.com/google/uuid" _ "github.com/jackc/pgx/v4/stdlib" - "github.com/lib/pq" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/pg" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" @@ -23,20 +22,19 @@ func TestLogPollerFilters(t *testing.T) { lggr := logger.Test(t) dbURL, ok := os.LookupEnv("CL_DATABASE_URL") require.True(t, ok, "CL_DATABASE_URL must be set") - chainID := uuid.NewString() - dbx := pg.NewSqlxDB(t, dbURL) - orm := NewORM(chainID, dbx, lggr) - - privateKey, err := solana.NewRandomPrivateKey() - require.NoError(t, err) - pubKey := privateKey.PublicKey() t.Run("Ensure all fields are readable/writable", func(t *testing.T) { + privateKey, err := solana.NewRandomPrivateKey() + require.NoError(t, err) + pubKey := privateKey.PublicKey() + chainID := uuid.NewString() + dbx := pg.NewSqlxDB(t, dbURL) + orm := NewORM(chainID, dbx, lggr) filters := []Filter{ { Name: "happy path", Address: PublicKey(pubKey), EventName: "event", - EventSig: []byte{1, 2, 3}, + EventSig: EventSignature{1, 2, 3}, StartingBlock: 1, EventIDL: "{}", SubkeyPaths: SubkeyPaths([][]string{{"a", "b"}, {"c"}}), @@ -47,7 +45,7 @@ func TestLogPollerFilters(t *testing.T) { Name: "empty sub key paths", Address: PublicKey(pubKey), EventName: "event", - EventSig: []byte{1, 2, 3}, + EventSig: EventSignature{1, 2, 3}, StartingBlock: 1, EventIDL: "{}", SubkeyPaths: SubkeyPaths([][]string{}), @@ -58,7 +56,7 @@ func TestLogPollerFilters(t *testing.T) { Name: "nil sub key paths", Address: PublicKey(pubKey), EventName: "event", - EventSig: []byte{1, 2, 3}, + EventSig: EventSignature{1, 2, 3}, StartingBlock: 1, EventIDL: "{}", SubkeyPaths: nil, @@ -80,14 +78,61 @@ func TestLogPollerFilters(t *testing.T) { } }) t.Run("Returns and error if name is not unique", func(t *testing.T) { + chainID := uuid.NewString() + dbx := pg.NewSqlxDB(t, dbURL) + orm := NewORM(chainID, dbx, lggr) filter := newRandomFilter(t) ctx := tests.Context(t) - _, err = orm.InsertFilter(ctx, filter) + _, err := orm.InsertFilter(ctx, filter) require.NoError(t, err) - filter.EventSig = []byte(uuid.NewString()) _, err = orm.InsertFilter(ctx, filter) require.EqualError(t, err, `ERROR: duplicate key value violates unique constraint "solana_log_poller_filter_name" (SQLSTATE 23505)`) }) + t.Run("Allows reuse name of a filter marked as deleted", func(t *testing.T) { + chainID := uuid.NewString() + dbx := pg.NewSqlxDB(t, dbURL) + orm := NewORM(chainID, dbx, lggr) + filter := newRandomFilter(t) + ctx := tests.Context(t) + filterID, err := orm.InsertFilter(ctx, filter) + require.NoError(t, err) + // mark deleted + err = orm.MarkFilterDeleted(ctx, filterID) + require.NoError(t, err) + // ensure marked as deleted + dbFilter, err := orm.GetFilterByID(ctx, filterID) + require.NoError(t, err) + require.True(t, dbFilter.IsDeleted, "expected to be deleted") + newFilterID, err := orm.InsertFilter(ctx, filter) + require.NoError(t, err) + require.NotEqual(t, newFilterID, filterID, "expected db to generate new filter as we can not be sure that new one matches the same logs") + }) + t.Run("Deletes log on parent filter deletion", func(t *testing.T) { + dbx := pg.NewSqlxDB(t, dbURL) + chainID := uuid.NewString() + orm := NewORM(chainID, dbx, lggr) + filter := newRandomFilter(t) + ctx := tests.Context(t) + filterID, err := orm.InsertFilter(ctx, filter) + require.NoError(t, err) + log := newRandomLog(t, filterID, chainID) + err = orm.InsertLogs(ctx, []Log{log}) + require.NoError(t, err) + logs, err := orm.SelectLogs(ctx, 0, log.BlockNumber, log.Address, log.EventSig) + require.NoError(t, err) + require.Len(t, logs, 1) + err = orm.MarkFilterDeleted(ctx, filterID) + require.NoError(t, err) + // logs are expected to be present in db even if filter was marked as deleted + logs, err = orm.SelectLogs(ctx, 0, log.BlockNumber, log.Address, log.EventSig) + require.NoError(t, err) + require.Len(t, logs, 1) + err = orm.DeleteFilter(ctx, filterID) + require.NoError(t, err) + logs, err = orm.SelectLogs(ctx, 0, log.BlockNumber, log.Address, log.EventSig) + require.NoError(t, err) + require.Len(t, logs, 0) + }) } func newRandomFilter(t *testing.T) Filter { @@ -98,7 +143,7 @@ func newRandomFilter(t *testing.T) Filter { Name: uuid.NewString(), Address: PublicKey(pubKey), EventName: "event", - EventSig: []byte{1, 2, 3}, + EventSig: EventSignature{1, 2, 3}, StartingBlock: 1, EventIDL: "{}", SubkeyPaths: [][]string{{"a", "b"}, {"c"}}, @@ -107,36 +152,14 @@ func newRandomFilter(t *testing.T) Filter { } } -func TestLogPollerLogs(t *testing.T) { - lggr := logger.Test(t) - dbURL, ok := os.LookupEnv("CL_DATABASE_URL") - require.True(t, ok, "CL_DATABASE_URL must be set") - chainID := uuid.NewString() - dbx := pg.NewSqlxDB(t, dbURL) - orm := NewORM(chainID, dbx, lggr) - +func newRandomLog(t *testing.T, filterID int64, chainID string) Log { privateKey, err := solana.NewRandomPrivateKey() require.NoError(t, err) pubKey := privateKey.PublicKey() - - ctx := tests.Context(t) - // create filter as it's required for a log - filterID, err := orm.InsertFilter(ctx, Filter{ - Name: "awesome filter", - Address: PublicKey(pubKey), - EventName: "event", - EventSig: []byte{1, 2, 3}, - StartingBlock: 1, - EventIDL: "{}", - SubkeyPaths: [][]string{{"a", "b"}, {"c"}}, - Retention: 1000, - MaxLogsKept: 3, - }) - require.NoError(t, err) data := []byte("solana is fun") signature, err := privateKey.Sign(data) require.NoError(t, err) - log := Log{ + return Log{ FilterId: filterID, ChainId: chainID, LogIndex: 1, @@ -144,11 +167,26 @@ func TestLogPollerLogs(t *testing.T) { BlockNumber: 10, BlockTimestamp: time.Unix(1731590113, 0), Address: PublicKey(pubKey), - EventSig: []byte{3, 2, 1}, - SubkeyValues: pq.ByteaArray([][]byte{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}), + EventSig: EventSignature{3, 2, 1}, + SubkeyValues: [][]byte{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}, TxHash: Signature(signature), Data: data, } +} + +func TestLogPollerLogs(t *testing.T) { + lggr := logger.Test(t) + dbURL, ok := os.LookupEnv("CL_DATABASE_URL") + require.True(t, ok, "CL_DATABASE_URL must be set") + chainID := uuid.NewString() + dbx := pg.NewSqlxDB(t, dbURL) + orm := NewORM(chainID, dbx, lggr) + + ctx := tests.Context(t) + // create filter as it's required for a log + filterID, err := orm.InsertFilter(ctx, newRandomFilter(t)) + require.NoError(t, err) + log := newRandomLog(t, filterID, chainID) err = orm.InsertLogs(ctx, []Log{log}) require.NoError(t, err) // insert of the same Log should not produce two instances diff --git a/pkg/solana/logpoller/parser.go b/pkg/solana/logpoller/parser.go index a0dd44b52..a3a054594 100644 --- a/pkg/solana/logpoller/parser.go +++ b/pkg/solana/logpoller/parser.go @@ -1,5 +1,6 @@ package logpoller var ( - logsFields = [...]string{"id", "filter_id", "chain_id", "log_index", "block_hash", "block_number", "block_timestamp", "address", "event_sig", "subkey_values", "tx_hash", "data", "created_at", "expires_at", "sequence_num"} + logsFields = [...]string{"id", "filter_id", "chain_id", "log_index", "block_hash", "block_number", "block_timestamp", "address", "event_sig", "subkey_values", "tx_hash", "data", "created_at", "expires_at", "sequence_num"} + filterFields = [...]string{"id", "name", "address", "event_name", "event_sig", "starting_block", "event_idl", "subkey_paths", "retention", "max_logs_kept", "is_deleted"} ) diff --git a/pkg/solana/logpoller/query.go b/pkg/solana/logpoller/query.go index bb516d4da..1f0fb8b7f 100644 --- a/pkg/solana/logpoller/query.go +++ b/pkg/solana/logpoller/query.go @@ -79,7 +79,7 @@ func (q *queryArgs) withEventName(eventName string) *queryArgs { } // withEventSig sets the EventSig field in queryArgs. -func (q *queryArgs) withEventSig(eventSig []byte) *queryArgs { +func (q *queryArgs) withEventSig(eventSig EventSignature) *queryArgs { return q.withField("event_sig", eventSig) } @@ -173,7 +173,7 @@ func (q *queryArgs) withSequenceNum(sequenceNum int64) *queryArgs { return q.withField("sequence_num", sequenceNum) } -func newQueryArgsForEvent(chainId string, address PublicKey, eventSig []byte) *queryArgs { +func newQueryArgsForEvent(chainId string, address PublicKey, eventSig EventSignature) *queryArgs { return newQueryArgs(chainId). withAddress(address). withEventSig(eventSig) @@ -191,6 +191,10 @@ func logsQuery(clause string) string { return fmt.Sprintf(`SELECT %s FROM solana.logs %s`, strings.Join(logsFields[:], ", "), clause) } +func filtersQuery(clause string) string { + return fmt.Sprintf(`SELECT %s FROM solana.log_poller_filters %s`, strings.Join(filterFields[:], ", "), clause) +} + func (q *queryArgs) toArgs() (map[string]any, error) { if len(q.err) > 0 { return nil, errors.Join(q.err...) diff --git a/pkg/solana/logpoller/types.go b/pkg/solana/logpoller/types.go index 5b42877a6..95122bc86 100644 --- a/pkg/solana/logpoller/types.go +++ b/pkg/solana/logpoller/types.go @@ -97,3 +97,17 @@ func (p *SubkeyPaths) Scan(src interface{}) error { return nil } + +const EventSignatureLength = 8 + +type EventSignature [EventSignatureLength]byte + +// Scan implements Scanner for database/sql. +func (s *EventSignature) Scan(src interface{}) error { + return scanFixedLengthArray("EventSignature", EventSignatureLength, src, s[:]) +} + +// Value implements valuer for database/sql. +func (s EventSignature) Value() (driver.Value, error) { + return s[:], nil +} From f29db30336ef6db065ab1b95f1d0a48a442ffb02 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Mon, 25 Nov 2024 17:03:08 +0100 Subject: [PATCH 2/8] move filters management into separate struct & allow update of non primary fields --- pkg/solana/logpoller/filters.go | 251 +++++++++++++++++ .../{log_poller_test.go => filters_test.go} | 172 +++++++----- pkg/solana/logpoller/log_poller.go | 259 ++++-------------- pkg/solana/logpoller/models.go | 8 +- pkg/solana/logpoller/orm.go | 5 + pkg/solana/logpoller/orm_test.go | 29 +- pkg/solana/logpoller/types.go | 5 + 7 files changed, 439 insertions(+), 290 deletions(-) create mode 100644 pkg/solana/logpoller/filters.go rename pkg/solana/logpoller/{log_poller_test.go => filters_test.go} (57%) diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go new file mode 100644 index 000000000..44e1e7d94 --- /dev/null +++ b/pkg/solana/logpoller/filters.go @@ -0,0 +1,251 @@ +package logpoller + +import ( + "context" + "errors" + "fmt" + "iter" + "slices" + "sync" + "sync/atomic" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" +) + +type filters struct { + orm ORM + lggr logger.SugaredLogger + + filtersByName map[string]Filter + filtersByAddress map[PublicKey]map[EventSignature][]Filter + filtersToBackfill []Filter + filtersToDelete []Filter // populated on start from db and pruned on first iteration of run + filtersMutex sync.RWMutex + loadedFilters atomic.Bool +} + +func newFilters(lggr logger.SugaredLogger, orm ORM) *filters { + return &filters{ + orm: orm, + lggr: lggr, + + filtersByName: make(map[string]Filter), + filtersByAddress: map[PublicKey]map[EventSignature][]Filter{}, + } +} + +// PruneFilters - prunes all filters marked to be deleted from the database and all corresponding logs. +func (lp *filters) PruneFilters(ctx context.Context) error { + err := lp.LoadFilters(ctx) + if err != nil { + return fmt.Errorf("failed to load filters: %w", err) + } + + lp.filtersMutex.Lock() + filtersToDelete := lp.filtersToDelete + lp.filtersToDelete = nil + lp.filtersMutex.Unlock() + + if len(filtersToDelete) == 0 { + return nil + } + + err = lp.orm.DeleteFilters(ctx, filtersToDelete) + if err != nil { + lp.filtersMutex.Lock() + defer lp.filtersMutex.Unlock() + lp.filtersToDelete = append(lp.filtersToDelete, filtersToDelete...) + return fmt.Errorf("failed to delete filters: %w", err) + } + + return nil +} + +// RegisterFilter persists provided filter and ensures that any log emitted by a contract with filter.Address +// that matches filter.EventSig signature will be captured starting from filter.StartingBlock. +// The filter may be unregistered later by filter.Name. +// In case of Filter.Name collision (within the chain scope) returns ErrFilterNameConflict if +// one of the fields defining resulting logs (Address, EventSig, EventIDL, SubkeyPaths) does not match original filter. +// Otherwise, updates remaining fields and schedules backfill. +// Warnings/debug information is keyed by filter name. +func (lp *filters) RegisterFilter(ctx context.Context, filter Filter) error { + if len(filter.Name) == 0 { + return errors.New("name is required") + } + + err := lp.LoadFilters(ctx) + if err != nil { + return fmt.Errorf("failed to load filters: %w", err) + } + + lp.filtersMutex.Lock() + defer lp.filtersMutex.Unlock() + + if existingFilter, ok := lp.filtersByName[filter.Name]; ok { + if !existingFilter.MatchSameLogs(filter) { + return ErrFilterNameConflict + } + + lp.removeFilterFromIndexes(existingFilter) + } + + filterID, err := lp.orm.InsertFilter(ctx, filter) + if err != nil { + return fmt.Errorf("failed to insert filter: %w", err) + } + + filter.ID = filterID + lp.filtersByName[filter.Name] = filter + filtersByEventSig, ok := lp.filtersByAddress[filter.Address] + if !ok { + filtersByEventSig = make(map[EventSignature][]Filter) + lp.filtersByAddress[filter.Address] = filtersByEventSig + } + + filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter) + lp.filtersToBackfill = append(lp.filtersToBackfill, filter) + return nil +} + +// UnregisterFilter will remove the filter with the given name and async prune all corresponding logs. +// If the name does not exist, it will log an error but not return an error. +// Warnings/debug information is keyed by filter name. +func (lp *filters) UnregisterFilter(ctx context.Context, name string) error { + err := lp.LoadFilters(ctx) + if err != nil { + return fmt.Errorf("failed to load filters: %w", err) + } + + lp.filtersMutex.Lock() + defer lp.filtersMutex.Unlock() + + filter, ok := lp.filtersByName[name] + if !ok { + lp.lggr.Warnw("Filter not found in filtersByName", "name", name) + return nil + } + + if err := lp.orm.MarkFilterDeleted(ctx, filter.ID); err != nil { + return fmt.Errorf("failed to mark filter deleted: %w", err) + } + + lp.removeFilterFromIndexes(filter) + + lp.filtersToDelete = append(lp.filtersToDelete, filter) + return nil +} + +func (lp *filters) removeFilterFromIndexes(filter Filter) { + delete(lp.filtersByName, filter.Name) + lp.filtersToBackfill, _ = removeFilterFromSlice(lp.filtersToBackfill, filter) + + filtersByEventSig, ok := lp.filtersByAddress[filter.Address] + if !ok { + lp.lggr.Warnw("Filter not found in filtersByAddress", "name", filter.Name, "address", filter.Address) + return + } + + filtersByEventSig[filter.EventSig], ok = removeFilterFromSlice(filtersByEventSig[filter.EventSig], filter) + if !ok { + lp.lggr.Warnw("Filter not found in filtersByEventSig", "name", filter.Name, "address", filter.Address) + return + } + + if len(filtersByEventSig[filter.EventSig]) == 0 { + delete(filtersByEventSig, filter.EventSig) + } + + if len(lp.filtersByAddress[filter.Address]) == 0 { + delete(lp.filtersByAddress, filter.Address) + } +} + +// MatchingFilters - returns iterator to go through all matching filters. +// Requires LoadFilters to be called at least once. +func (lp *filters) MatchingFilters(ctx context.Context, addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] { + return func(yield func(Filter) bool) { + lp.filtersMutex.RLock() + defer lp.filtersMutex.RUnlock() + filters, ok := lp.filtersByAddress[addr] + if !ok { + return + } + + for _, filter := range filters[eventSignature] { + if !yield(filter) { + return + } + } + } +} + +// ConsumeFiltersToBackfill - removes all filters from the backfill queue and returns them to caller. +// Requires LoadFilters to be called at least once. +func (lp *filters) ConsumeFiltersToBackfill(ctx context.Context) []Filter { + lp.filtersMutex.Lock() + defer lp.filtersMutex.Unlock() + filtersToBackfill := lp.filtersToBackfill + lp.filtersToBackfill = nil + return filtersToBackfill +} + +// LoadFilters - loads filters from database. Can be called multiple times without side effects. +func (lp *filters) LoadFilters(ctx context.Context) error { + if lp.loadedFilters.Load() { + return nil + } + + lp.lggr.Debugw("Loading filters from db") + lp.filtersMutex.Lock() + defer lp.filtersMutex.Unlock() + // reset filters' indexes to ensure we do not have partial data from the previous run + lp.filtersByAddress = make(map[PublicKey]map[EventSignature][]Filter) + lp.filtersByName = make(map[string]Filter) + lp.filtersToBackfill = nil + lp.filtersToDelete = nil + + filters, err := lp.orm.SelectFilters(ctx) + if err != nil { + return fmt.Errorf("failed to select filters from db: %w", err) + } + + for _, filter := range filters { + if filter.IsDeleted { + lp.filtersToDelete = append(lp.filtersToDelete, filter) + continue + } + + if _, ok := lp.filtersByName[filter.Name]; ok { + errMsg := fmt.Sprintf("invariant violation while loading from db: expected filters to have unique name: %s ", filter.Name) + lp.lggr.Critical(errMsg) + return errors.New(errMsg) + } + + lp.filtersByName[filter.Name] = filter + filtersByEventSig, ok := lp.filtersByAddress[filter.Address] + if !ok { + filtersByEventSig = make(map[EventSignature][]Filter) + lp.filtersByAddress[filter.Address] = filtersByEventSig + } + + filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter) + lp.filtersToBackfill = append(lp.filtersToBackfill, filter) + } + + lp.loadedFilters.Store(true) + + return nil +} + +func removeFilterFromSlice(filters []Filter, filter Filter) ([]Filter, bool) { + index := slices.IndexFunc(filters, func(item Filter) bool { + return item.ID == filter.ID + }) + if index == -1 { + return filters, false + } + + lastIdx := len(filters) - 1 + filters[index], filters[lastIdx] = filters[lastIdx], filters[index] + return filters[:lastIdx], true +} diff --git a/pkg/solana/logpoller/log_poller_test.go b/pkg/solana/logpoller/filters_test.go similarity index 57% rename from pkg/solana/logpoller/log_poller_test.go rename to pkg/solana/logpoller/filters_test.go index 56b89dac0..f4d8a7046 100644 --- a/pkg/solana/logpoller/log_poller_test.go +++ b/pkg/solana/logpoller/filters_test.go @@ -2,17 +2,20 @@ package logpoller import ( "errors" + "fmt" "testing" + "github.com/gagliardetto/solana-go" + "github.com/google/uuid" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) -func TestLogPoller_LoadFilters(t *testing.T) { +func TestFilters_LoadFilters(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(logger.Sugared(logger.Test(t)), orm) + fs := newFilters(logger.Sugared(logger.Test(t)), orm) ctx := tests.Context(t) orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once() deleted := Filter{ @@ -34,144 +37,171 @@ func TestLogPoller_LoadFilters(t *testing.T) { happyPath2, }, nil).Once() - err := lp.loadFilters(ctx) + err := fs.LoadFilters(ctx) require.EqualError(t, err, "failed to select filters from db: db failed") - err = lp.loadFilters(ctx) + err = fs.LoadFilters(ctx) require.NoError(t, err) // only one filter to delete - require.Len(t, lp.filtersToDelete, 1) - require.Equal(t, deleted, lp.filtersToDelete[0]) + require.Len(t, fs.filtersToDelete, 1) + require.Equal(t, deleted, fs.filtersToDelete[0]) // backfill and happy path both indexed - require.Len(t, lp.filtersByAddress, 1) - require.Len(t, lp.filtersByAddress[happyPath.Address], 1) - require.Len(t, lp.filtersByAddress[happyPath.Address][happyPath.EventSig], 2) - require.Contains(t, lp.filtersByAddress[happyPath.Address][happyPath.EventSig], happyPath) - require.Contains(t, lp.filtersByAddress[happyPath.Address][happyPath.EventSig], happyPath2) - require.Len(t, lp.filtersByName, 2) - require.Equal(t, lp.filtersByName[happyPath.Name], happyPath) - require.Equal(t, lp.filtersByName[happyPath2.Name], happyPath2) + require.Len(t, fs.filtersByAddress, 1) + require.Len(t, fs.filtersByAddress[happyPath.Address], 1) + require.Len(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], 2) + require.Contains(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], happyPath) + require.Contains(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], happyPath2) + require.Len(t, fs.filtersByName, 2) + require.Equal(t, fs.filtersByName[happyPath.Name], happyPath) + require.Equal(t, fs.filtersByName[happyPath2.Name], happyPath2) // any call following successful should be noop - err = lp.loadFilters(ctx) + err = fs.LoadFilters(ctx) require.NoError(t, err) } -func TestLogPoller_RegisterFilter(t *testing.T) { +func TestFilters_RegisterFilter(t *testing.T) { lggr := logger.Sugared(logger.Test(t)) t.Run("Returns an error if name is empty", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) - err := lp.RegisterFilter(tests.Context(t), Filter{}) + fs := newFilters(lggr, orm) + err := fs.RegisterFilter(tests.Context(t), Filter{}) require.EqualError(t, err, "name is required") }) t.Run("Returns an error if fails to load filters from db", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once() - err := lp.RegisterFilter(tests.Context(t), Filter{Name: "Filter"}) + err := fs.RegisterFilter(tests.Context(t), Filter{Name: "Filter"}) require.EqualError(t, err, "failed to load filters: failed to select filters from db: db failed") }) - t.Run("Returns an error if filter with the same name is present in db", func(t *testing.T) { - orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) - const filterName = "Filter" - orm.On("SelectFilters", mock.Anything).Return([]Filter{{Name: filterName}}, nil).Once() - err := lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) - require.EqualError(t, err, ErrFilterNameConflict.Error()) - }) - t.Run("Returns an error if adding the same filter twice", func(t *testing.T) { - orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) - const filterName = "Filter" - orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() - orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(1), nil).Once() - err := lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) - require.NoError(t, err) - err = lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) - require.EqualError(t, err, ErrFilterNameConflict.Error()) + t.Run("Returns an error if trying to update primary fields", func(t *testing.T) { + testCases := []struct { + Name string + ModifyField func(*Filter) + }{ + { + Name: "Address", + ModifyField: func(f *Filter) { + privateKey, err := solana.NewRandomPrivateKey() + require.NoError(t, err) + f.Address = PublicKey(privateKey.PublicKey()) + }, + }, + { + Name: "EventSig", + ModifyField: func(f *Filter) { + f.EventSig = EventSignature{3, 2, 1} + }, + }, + { + Name: "EventIDL", + ModifyField: func(f *Filter) { + f.EventIDL = uuid.NewString() + }, + }, + { + Name: "SubkeyPaths", + ModifyField: func(f *Filter) { + f.SubkeyPaths = [][]string{{uuid.NewString()}} + }, + }, + } + for _, tc := range testCases { + t.Run(fmt.Sprintf("Updating %s", tc.Name), func(t *testing.T) { + orm := newMockORM(t) + fs := newFilters(lggr, orm) + const filterName = "Filter" + dbFilter := Filter{Name: filterName} + orm.On("SelectFilters", mock.Anything).Return([]Filter{dbFilter}, nil).Once() + newFilter := dbFilter + tc.ModifyField(&newFilter) + err := fs.RegisterFilter(tests.Context(t), newFilter) + require.EqualError(t, err, ErrFilterNameConflict.Error()) + }) + } }) t.Run("Happy path", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) const filterName = "Filter" orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(0), errors.New("failed to insert")).Once() - err := lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + err := fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) require.Error(t, err) // can readd after db issue is resovled orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(1), nil).Once() - err = lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + err = fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) require.NoError(t, err) }) t.Run("Can reregister after unregister", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) const filterName = "Filter" orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() const filterID = int64(10) orm.On("InsertFilter", mock.Anything, mock.Anything).Return(filterID, nil).Once() - err := lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + err := fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) require.NoError(t, err) orm.On("MarkFilterDeleted", mock.Anything, filterID).Return(nil).Once() - err = lp.UnregisterFilter(tests.Context(t), filterName) + err = fs.UnregisterFilter(tests.Context(t), filterName) require.NoError(t, err) orm.On("InsertFilter", mock.Anything, mock.Anything).Return(filterID+1, nil).Once() - err = lp.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + err = fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) require.NoError(t, err) - require.Len(t, lp.filtersToDelete, 1) - require.Contains(t, lp.filtersToDelete, Filter{Name: filterName, ID: filterID}) - require.Len(t, lp.filtersToBackfill, 1) - require.Contains(t, lp.filtersToBackfill, Filter{Name: filterName, ID: filterID + 1}) + require.Len(t, fs.filtersToDelete, 1) + require.Contains(t, fs.filtersToDelete, Filter{Name: filterName, ID: filterID}) + require.Len(t, fs.filtersToBackfill, 1) + require.Contains(t, fs.filtersToBackfill, Filter{Name: filterName, ID: filterID + 1}) }) } -func TestLogPoller_UnregisterFilter(t *testing.T) { +func TestFilters_UnregisterFilter(t *testing.T) { lggr := logger.Sugared(logger.Test(t)) t.Run("Returns an error if fails to load filters from db", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) orm.On("SelectFilters", mock.Anything).Return(nil, errors.New("db failed")).Once() - err := lp.UnregisterFilter(tests.Context(t), "Filter") + err := fs.UnregisterFilter(tests.Context(t), "Filter") require.EqualError(t, err, "failed to load filters: failed to select filters from db: db failed") }) t.Run("Noop if filter is not present", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) const filterName = "Filter" orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() - err := lp.UnregisterFilter(tests.Context(t), filterName) + err := fs.UnregisterFilter(tests.Context(t), filterName) require.NoError(t, err) }) t.Run("Returns error if fails to mark filter as deleted", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) const filterName = "Filter" const id int64 = 10 orm.On("SelectFilters", mock.Anything).Return([]Filter{{ID: id, Name: filterName}}, nil).Once() orm.On("MarkFilterDeleted", mock.Anything, id).Return(errors.New("db query failed")).Once() - err := lp.UnregisterFilter(tests.Context(t), filterName) + err := fs.UnregisterFilter(tests.Context(t), filterName) require.EqualError(t, err, "failed to mark filter deleted: db query failed") }) t.Run("Happy path", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) const filterName = "Filter" const id int64 = 10 orm.On("SelectFilters", mock.Anything).Return([]Filter{{ID: id, Name: filterName}}, nil).Once() orm.On("MarkFilterDeleted", mock.Anything, id).Return(nil).Once() - err := lp.UnregisterFilter(tests.Context(t), filterName) + err := fs.UnregisterFilter(tests.Context(t), filterName) require.NoError(t, err) - require.Len(t, lp.filtersToDelete, 1) - require.Len(t, lp.filtersToBackfill, 0) - require.Len(t, lp.filtersByName, 0) - require.Len(t, lp.filtersByAddress, 0) + require.Len(t, fs.filtersToDelete, 1) + require.Len(t, fs.filtersToBackfill, 0) + require.Len(t, fs.filtersByName, 0) + require.Len(t, fs.filtersByAddress, 0) }) } -func TestLogPoller_pruneFilters(t *testing.T) { +func TestFilters_pruneFilters(t *testing.T) { lggr := logger.Sugared(logger.Test(t)) t.Run("Happy path", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) toDelete := Filter{ ID: 1, Name: "To delete", @@ -185,13 +215,13 @@ func TestLogPoller_pruneFilters(t *testing.T) { }, }, nil).Once() orm.On("DeleteFilters", mock.Anything, []Filter{toDelete}).Return(nil).Once() - err := lp.pruneFilters(tests.Context(t)) + err := fs.PruneFilters(tests.Context(t)) require.NoError(t, err) - require.Len(t, lp.filtersToDelete, 0) + require.Len(t, fs.filtersToDelete, 0) }) t.Run("If DB removal fails will add filters back into removal slice ", func(t *testing.T) { orm := newMockORM(t) - lp := NewLogPoller(lggr, orm) + fs := newFilters(lggr, orm) toDelete := Filter{ ID: 1, Name: "To delete", @@ -211,11 +241,11 @@ func TestLogPoller_pruneFilters(t *testing.T) { orm.On("DeleteFilters", mock.Anything, []Filter{toDelete}).Return(errors.New("db failed")).Run(func(_ mock.Arguments) { orm.On("MarkFilterDeleted", mock.Anything, newToDelete.ID).Return(nil).Once() orm.On("InsertFilter", mock.Anything, mock.Anything).Return(newToDelete.ID, nil).Once() - require.NoError(t, lp.RegisterFilter(tests.Context(t), newToDelete)) - require.NoError(t, lp.UnregisterFilter(tests.Context(t), newToDelete.Name)) + require.NoError(t, fs.RegisterFilter(tests.Context(t), newToDelete)) + require.NoError(t, fs.UnregisterFilter(tests.Context(t), newToDelete.Name)) }).Once() - err := lp.pruneFilters(tests.Context(t)) + err := fs.PruneFilters(tests.Context(t)) require.EqualError(t, err, "failed to delete filters: db failed") - require.Equal(t, lp.filtersToDelete, []Filter{newToDelete, toDelete}) + require.Equal(t, fs.filtersToDelete, []Filter{newToDelete, toDelete}) }) } diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index 8bcbf37ec..b075a7098 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -3,11 +3,7 @@ package logpoller import ( "context" "errors" - "fmt" - "iter" - "slices" "sync" - "sync/atomic" "time" "github.com/smartcontractkit/chainlink-common/pkg/logger" @@ -30,24 +26,18 @@ type LogPoller struct { lggr logger.SugaredLogger orm ORM - filtersByName map[string]Filter - filtersByAddress map[PublicKey]map[EventSignature][]Filter - filtersToBackfill []Filter - filtersToDelete []Filter // populated on start from db and pruned on first iteration of run - filtersMutex sync.RWMutex - loadedFilters atomic.Bool + filters *filters chStop services.StopChan wg sync.WaitGroup } func NewLogPoller(lggr logger.SugaredLogger, orm ORM) *LogPoller { + lggr = logger.Sugared(logger.Named(lggr, "LogPoller")) return &LogPoller{ - orm: orm, - lggr: logger.Sugared(logger.Named(lggr, "LogPoller")), - - filtersByName: make(map[string]Filter), - filtersByAddress: map[PublicKey]map[EventSignature][]Filter{}, + orm: orm, + lggr: lggr, + filters: newFilters(lggr, orm), } } @@ -68,10 +58,48 @@ func (lp *LogPoller) Close() error { }) } +// RegisterFilter - refer to filters.RegisterFilter for details. +func (lp *LogPoller) RegisterFilter(ctx context.Context, filter Filter) error { + ctx, cancel := lp.chStop.Ctx(ctx) + defer cancel() + return lp.filters.RegisterFilter(ctx, filter) +} + +// UnregisterFilter refer to filters.UnregisterFilter for details +func (lp *LogPoller) UnregisterFilter(ctx context.Context, name string) error { + ctx, cancel := lp.chStop.Ctx(ctx) + defer cancel() + return lp.filters.UnregisterFilter(ctx, name) +} + +func (lp *LogPoller) loadFilters(ctx context.Context) error { + retryTicker := services.TickerConfig{Initial: 0, JitterPct: services.DefaultJitter}.NewTicker(time.Second) + defer retryTicker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-retryTicker.C: + } + err := lp.filters.LoadFilters(ctx) + if err != nil { + lp.lggr.Errorw("Failed loading filters in init logpoller loop, retrying later", "err", err) + } + + return nil + } +} + func (lp *LogPoller) run() { defer lp.wg.Done() ctx, cancel := lp.chStop.NewCtx() defer cancel() + err := lp.loadFilters(ctx) + if err != nil { + lp.lggr.Warnw("Failed loading filters", "err", err) + return + } var blocks chan struct { BlockNumber int64 @@ -83,10 +111,8 @@ func (lp *LogPoller) run() { case <-ctx.Done(): return case block := <-blocks: - lp.filtersMutex.Lock() - filtersToBackfill := lp.filtersToBackfill - lp.filtersToBackfill = nil - lp.filtersMutex.Unlock() + filtersToBackfill := lp.filters.ConsumeFiltersToBackfill(ctx) + // TODO: NONEVM-916 parse, filters and persist logs // NOTE: removal of filters occurs in the separate goroutine, so there is a chance that upon insert // of log corresponding filter won't be present in the db. Ensure to refilter and retry on insert error @@ -109,7 +135,7 @@ func (lp *LogPoller) backgroundWorkerRun() { case <-ctx.Done(): return case <-pruneFilters.C: - err := lp.pruneFilters(ctx) + err := lp.filters.PruneFilters(ctx) if err != nil { lp.lggr.Errorw("Failed to prune filters", "err", err) } @@ -117,200 +143,7 @@ func (lp *LogPoller) backgroundWorkerRun() { } } -func (lp *LogPoller) pruneFilters(ctx context.Context) error { - err := lp.loadFilters(ctx) - if err != nil { - return fmt.Errorf("failed to load filters: %w", err) - } - - lp.filtersMutex.Lock() - filtersToDelete := lp.filtersToDelete - lp.filtersToDelete = nil - lp.filtersMutex.Unlock() - - if len(filtersToDelete) == 0 { - return nil - } - - err = lp.orm.DeleteFilters(ctx, filtersToDelete) - if err != nil { - lp.filtersMutex.Lock() - defer lp.filtersMutex.Unlock() - lp.filtersToDelete = append(lp.filtersToDelete, filtersToDelete...) - return fmt.Errorf("failed to delete filters: %w", err) - } - - return nil -} - func (lp *LogPoller) startFilterBackfill(ctx context.Context, filter Filter, toBlock int64) { // TODO: NONEVM-916 start backfill lp.lggr.Debugw("Starting filter backfill", "filter", filter) } - -// RegisterFilter persists provided filter and ensures that any log emitted by a contract with filter.Address -// that matches filter.EventSig signature will be captured starting from filter.StartingBlock. -// filter.Name must be unique otherwise ErrFilterNameConflict is returned. // TODO: not sure this is a good idea. Callers are most likely going to ignore this error -// The filter may be unregistered later by Filter.Name -// Warnings/debug information is keyed by filter name. -func (lp *LogPoller) RegisterFilter(ctx context.Context, filter Filter) error { - if len(filter.Name) == 0 { - return errors.New("name is required") - } - - err := lp.loadFilters(ctx) - if err != nil { - return fmt.Errorf("failed to load filters: %w", err) - } - - lp.filtersMutex.Lock() - defer lp.filtersMutex.Unlock() - - if _, ok := lp.filtersByName[filter.Name]; ok { - return ErrFilterNameConflict - } - - filterID, err := lp.orm.InsertFilter(ctx, filter) - if err != nil { - return fmt.Errorf("failed to insert filter: %w", err) - } - - filter.ID = filterID - lp.filtersByName[filter.Name] = filter - filtersByEventSig, ok := lp.filtersByAddress[filter.Address] - if !ok { - filtersByEventSig = make(map[EventSignature][]Filter) - lp.filtersByAddress[filter.Address] = filtersByEventSig - } - - filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter) - lp.filtersToBackfill = append(lp.filtersToBackfill, filter) - return nil -} - -// UnregisterFilter will remove the filter with the given name and prune all corresponding logs. -// If the name does not exist, it will log an error but not return an error. -// Warnings/debug information is keyed by filter name. -func (lp *LogPoller) UnregisterFilter(ctx context.Context, name string) error { - err := lp.loadFilters(ctx) - if err != nil { - return fmt.Errorf("failed to load filters: %w", err) - } - - lp.filtersMutex.Lock() - defer lp.filtersMutex.Unlock() - - filter, ok := lp.filtersByName[name] - if !ok { - lp.lggr.Warnw("Filter not found in filtersByName", "name", name) - return nil - } - - if err := lp.orm.MarkFilterDeleted(ctx, filter.ID); err != nil { - return fmt.Errorf("failed to mark filter deleted: %w", err) - } - - delete(lp.filtersByName, filter.Name) - filtersByEventSig, ok := lp.filtersByAddress[filter.Address] - if !ok { - lp.lggr.Warnw("Filter not found in filtersByAddress", "name", name, "address", filter.Address) - return nil - } - - filtersByEventSig[filter.EventSig], ok = removeFilterFromSlice(filtersByEventSig[filter.EventSig], filter) - if !ok { - lp.lggr.Warnw("Filter not found in filtersByEventSig", "name", name, "address", filter.Address) - } - - if len(filtersByEventSig[filter.EventSig]) == 0 { - delete(filtersByEventSig, filter.EventSig) - } - - if len(lp.filtersByAddress[filter.Address]) == 0 { - delete(lp.filtersByAddress, filter.Address) - } - - // remove or ensure that filters was not present in the slice to backfill - lp.filtersToBackfill, _ = removeFilterFromSlice(lp.filtersToBackfill, filter) - lp.filtersToDelete = append(lp.filtersToDelete, filter) - return nil -} - -func (lp *LogPoller) loadFilters(ctx context.Context) error { - if lp.loadedFilters.Load() { - return nil - } - - lp.lggr.Debugw("Loading filters from db") - lp.filtersMutex.Lock() - defer lp.filtersMutex.Unlock() - // reset filters' indexes to ensure we do not have partial data from the previous run - lp.filtersByAddress = make(map[PublicKey]map[EventSignature][]Filter) - lp.filtersByName = make(map[string]Filter) - lp.filtersToBackfill = nil - lp.filtersToDelete = nil - - ctx, cancel := lp.chStop.Ctx(ctx) - defer cancel() - filters, err := lp.orm.SelectFilters(ctx) - if err != nil { - return fmt.Errorf("failed to select filters from db: %w", err) - } - - for _, filter := range filters { - if filter.IsDeleted { - lp.filtersToDelete = append(lp.filtersToDelete, filter) - continue - } - - if _, ok := lp.filtersByName[filter.Name]; ok { - errMsg := fmt.Sprintf("invariant violation while loading from db: expected filters to have unique name: %s ", filter.Name) - lp.lggr.Critical(errMsg) - return errors.New(errMsg) - } - - lp.filtersByName[filter.Name] = filter - filtersByEventSig, ok := lp.filtersByAddress[filter.Address] - if !ok { - filtersByEventSig = make(map[EventSignature][]Filter) - lp.filtersByAddress[filter.Address] = filtersByEventSig - } - - filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter) - lp.filtersToBackfill = append(lp.filtersToBackfill, filter) - } - - lp.loadedFilters.Store(true) - return nil -} - -func removeFilterFromSlice(filters []Filter, filter Filter) ([]Filter, bool) { - index := slices.IndexFunc(filters, func(item Filter) bool { - return item.ID == filter.ID - }) - if index == -1 { - return filters, false - } - - lastIdx := len(filters) - 1 - filters[index], filters[lastIdx] = filters[lastIdx], filters[index] - return filters[:lastIdx], true -} - -// matchingFilters - allows to iterate through filters that match provided keys -func (lp *LogPoller) matchingFilters(addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] { - return func(yield func(Filter) bool) { - lp.filtersMutex.RLock() - defer lp.filtersMutex.RUnlock() - filters, ok := lp.filtersByAddress[addr] - if !ok { - return - } - - for _, filter := range filters[eventSignature] { - if !yield(filter) { - return - } - } - } -} diff --git a/pkg/solana/logpoller/models.go b/pkg/solana/logpoller/models.go index b04a5ae38..cee0ea381 100644 --- a/pkg/solana/logpoller/models.go +++ b/pkg/solana/logpoller/models.go @@ -7,7 +7,7 @@ import ( ) type Filter struct { - ID int64 + ID int64 // only for internal usage. Values set externally are ignored. Name string Address PublicKey EventName string @@ -17,7 +17,11 @@ type Filter struct { SubkeyPaths SubkeyPaths Retention time.Duration MaxLogsKept int64 - IsDeleted bool + IsDeleted bool // only for internal usage. Values set externally are ignored. +} + +func (f Filter) MatchSameLogs(other Filter) bool { + return f.Address == other.Address && f.EventSig == other.EventSig && f.EventIDL == other.EventIDL && f.SubkeyPaths.Equal(other.SubkeyPaths) } type Log struct { diff --git a/pkg/solana/logpoller/orm.go b/pkg/solana/logpoller/orm.go index 247a3d611..f79e242fb 100644 --- a/pkg/solana/logpoller/orm.go +++ b/pkg/solana/logpoller/orm.go @@ -60,6 +60,11 @@ func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (id int64, err INSERT INTO solana.log_poller_filters (chain_id, name, address, event_name, event_sig, starting_block, event_idl, subkey_paths, retention, max_logs_kept) VALUES (:chain_id, :name, :address, :event_name, :event_sig, :starting_block, :event_idl, :subkey_paths, :retention, :max_logs_kept) + ON CONFLICT (chain_id, name) WHERE NOT is_deleted DO UPDATE SET + event_name = EXCLUDED.event_name, + starting_block = EXCLUDED.starting_block, + retention = EXCLUDED.retention, + max_logs_kept = EXCLUDED.max_logs_kept RETURNING id;` query, sqlArgs, err := o.ds.BindNamed(query, args) diff --git a/pkg/solana/logpoller/orm_test.go b/pkg/solana/logpoller/orm_test.go index 217bccb17..ca533d0eb 100644 --- a/pkg/solana/logpoller/orm_test.go +++ b/pkg/solana/logpoller/orm_test.go @@ -76,16 +76,25 @@ func TestLogPollerFilters(t *testing.T) { }) } }) - t.Run("Returns and error if name is not unique", func(t *testing.T) { + t.Run("Updates non primary fields if name and chainID is not unique", func(t *testing.T) { chainID := uuid.NewString() dbx := pg.NewSqlxDB(t, dbURL) orm := NewORM(chainID, dbx, lggr) filter := newRandomFilter(t) ctx := tests.Context(t) - _, err := orm.InsertFilter(ctx, filter) + id, err := orm.InsertFilter(ctx, filter) require.NoError(t, err) - _, err = orm.InsertFilter(ctx, filter) - require.EqualError(t, err, `ERROR: duplicate key value violates unique constraint "solana_log_poller_filter_name" (SQLSTATE 23505)`) + filter.EventName = uuid.NewString() + filter.StartingBlock++ + filter.Retention++ + filter.MaxLogsKept++ + id2, err := orm.InsertFilter(ctx, filter) + require.NoError(t, err) + require.Equal(t, id, id2) + dbFilter, err := orm.GetFilterByID(ctx, id) + require.NoError(t, err) + filter.ID = id + require.Equal(t, filter, dbFilter) }) t.Run("Allows reuse name of a filter marked as deleted", func(t *testing.T) { chainID := uuid.NewString() @@ -106,6 +115,18 @@ func TestLogPollerFilters(t *testing.T) { require.NoError(t, err) require.NotEqual(t, newFilterID, filterID, "expected db to generate new filter as we can not be sure that new one matches the same logs") }) + t.Run("Allows reuse name for a filter with different chainID", func(t *testing.T) { + dbx := pg.NewSqlxDB(t, dbURL) + orm1 := NewORM(uuid.NewString(), dbx, lggr) + orm2 := NewORM(uuid.NewString(), dbx, lggr) + filter := newRandomFilter(t) + ctx := tests.Context(t) + filterID1, err := orm1.InsertFilter(ctx, filter) + require.NoError(t, err) + filterID2, err := orm2.InsertFilter(ctx, filter) + require.NoError(t, err) + require.NotEqual(t, filterID1, filterID2) + }) t.Run("Deletes log on parent filter deletion", func(t *testing.T) { dbx := pg.NewSqlxDB(t, dbURL) chainID := uuid.NewString() diff --git a/pkg/solana/logpoller/types.go b/pkg/solana/logpoller/types.go index 95122bc86..ab918c0d3 100644 --- a/pkg/solana/logpoller/types.go +++ b/pkg/solana/logpoller/types.go @@ -4,6 +4,7 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "slices" "github.com/gagliardetto/solana-go" ) @@ -98,6 +99,10 @@ func (p *SubkeyPaths) Scan(src interface{}) error { } +func (k SubkeyPaths) Equal(o SubkeyPaths) bool { + return slices.EqualFunc(k, o, slices.Equal) +} + const EventSignatureLength = 8 type EventSignature [EventSignatureLength]byte From f3dae353eb6efe7d8669a5558b2df89a04c1bd0d Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Mon, 25 Nov 2024 19:42:28 +0100 Subject: [PATCH 3/8] fix comments --- pkg/solana/logpoller/filters.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go index 44e1e7d94..184ef9e16 100644 --- a/pkg/solana/logpoller/filters.go +++ b/pkg/solana/logpoller/filters.go @@ -64,7 +64,7 @@ func (lp *filters) PruneFilters(ctx context.Context) error { // RegisterFilter persists provided filter and ensures that any log emitted by a contract with filter.Address // that matches filter.EventSig signature will be captured starting from filter.StartingBlock. // The filter may be unregistered later by filter.Name. -// In case of Filter.Name collision (within the chain scope) returns ErrFilterNameConflict if +// In case of Filter.Name collision (within the chain scope) returns ErrFilterNameConflict if // one of the fields defining resulting logs (Address, EventSig, EventIDL, SubkeyPaths) does not match original filter. // Otherwise, updates remaining fields and schedules backfill. // Warnings/debug information is keyed by filter name. @@ -107,7 +107,7 @@ func (lp *filters) RegisterFilter(ctx context.Context, filter Filter) error { return nil } -// UnregisterFilter will remove the filter with the given name and async prune all corresponding logs. +// UnregisterFilter will mark the filter with the given name for pruning and async prune all corresponding logs. // If the name does not exist, it will log an error but not return an error. // Warnings/debug information is keyed by filter name. func (lp *filters) UnregisterFilter(ctx context.Context, name string) error { From 219d0da51b3b80e424ef5b4b531cc57a9e97f817 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Mon, 25 Nov 2024 19:43:02 +0100 Subject: [PATCH 4/8] MatchingFilters tests --- go.mod | 2 +- pkg/solana/logpoller/filters.go | 2 +- pkg/solana/logpoller/filters_test.go | 42 +++++++++++++++++++- pkg/solana/logpoller/models_test.go | 58 ++++++++++++++++++++++++++++ pkg/solana/logpoller/orm_test.go | 40 ------------------- 5 files changed, 101 insertions(+), 43 deletions(-) create mode 100644 pkg/solana/logpoller/models_test.go diff --git a/go.mod b/go.mod index bf4f8cb33..68672b78b 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/smartcontractkit/chainlink-solana -go 1.22.0 +go 1.23 toolchain go1.23.1 diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go index 184ef9e16..76ac20745 100644 --- a/pkg/solana/logpoller/filters.go +++ b/pkg/solana/logpoller/filters.go @@ -162,7 +162,7 @@ func (lp *filters) removeFilterFromIndexes(filter Filter) { // MatchingFilters - returns iterator to go through all matching filters. // Requires LoadFilters to be called at least once. -func (lp *filters) MatchingFilters(ctx context.Context, addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] { +func (lp *filters) MatchingFilters(addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] { return func(yield func(Filter) bool) { lp.filtersMutex.RLock() defer lp.filtersMutex.RUnlock() diff --git a/pkg/solana/logpoller/filters_test.go b/pkg/solana/logpoller/filters_test.go index f4d8a7046..42f650d7f 100644 --- a/pkg/solana/logpoller/filters_test.go +++ b/pkg/solana/logpoller/filters_test.go @@ -197,7 +197,7 @@ func TestFilters_UnregisterFilter(t *testing.T) { }) } -func TestFilters_pruneFilters(t *testing.T) { +func TestFilters_PruneFilters(t *testing.T) { lggr := logger.Sugared(logger.Test(t)) t.Run("Happy path", func(t *testing.T) { orm := newMockORM(t) @@ -249,3 +249,43 @@ func TestFilters_pruneFilters(t *testing.T) { require.Equal(t, fs.filtersToDelete, []Filter{newToDelete, toDelete}) }) } + +func TestFilters_MatchingFilters(t *testing.T) { + orm := newMockORM(t) + lggr := logger.Sugared(logger.Test(t)) + expectedFilter1 := Filter{ + Name: "expectedFilter1", + Address: newRandomPublicKey(t), + EventSig: newRandomEventSignature(t), + } + expectedFilter2 := Filter{ + Name: "expectedFilter2", + Address: expectedFilter1.Address, + EventSig: expectedFilter1.EventSig, + } + sameAddress := Filter{ + Name: "sameAddressWrongEventSig", + Address: expectedFilter1.Address, + EventSig: newRandomEventSignature(t), + } + + sameEventSig := Filter{ + Name: "wrongAddressSameEventSig", + Address: newRandomPublicKey(t), + EventSig: expectedFilter1.EventSig, + } + orm.On("SelectFilters", mock.Anything).Return([]Filter{expectedFilter1, expectedFilter2, sameAddress, sameEventSig}, nil).Once() + expectedFilters := map[string]struct{}{ + expectedFilter1.Name: {}, + expectedFilter2.Name: {}, + } + filters := newFilters(lggr, orm) + err := filters.LoadFilters(tests.Context(t)) + require.NoError(t, err) + for filter := range filters.MatchingFilters(expectedFilter1.Address, expectedFilter1.EventSig) { + _, ok := expectedFilters[filter.Name] + require.True(t, ok, "MatchingFilters returned unexpected filter %s", filter.Name) + delete(expectedFilters, filter.Name) + } + require.Len(t, expectedFilters, 0) +} diff --git a/pkg/solana/logpoller/models_test.go b/pkg/solana/logpoller/models_test.go new file mode 100644 index 000000000..7927b1481 --- /dev/null +++ b/pkg/solana/logpoller/models_test.go @@ -0,0 +1,58 @@ +package logpoller + +import ( + "testing" + "time" + + "github.com/gagliardetto/solana-go" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +func newRandomFilter(t *testing.T) Filter { + return Filter{ + Name: uuid.NewString(), + Address: newRandomPublicKey(t), + EventName: "event", + EventSig: newRandomEventSignature(t), + StartingBlock: 1, + EventIDL: "{}", + SubkeyPaths: [][]string{{"a", "b"}, {"c"}}, + Retention: 1000, + MaxLogsKept: 3, + } +} + +func newRandomPublicKey(t *testing.T) PublicKey { + privateKey, err := solana.NewRandomPrivateKey() + require.NoError(t, err) + pubKey := privateKey.PublicKey() + return PublicKey(pubKey) +} + +func newRandomEventSignature(t *testing.T) EventSignature { + pubKey := newRandomPublicKey(t) + return EventSignature(pubKey[:8]) +} + +func newRandomLog(t *testing.T, filterID int64, chainID string) Log { + privateKey, err := solana.NewRandomPrivateKey() + require.NoError(t, err) + pubKey := privateKey.PublicKey() + data := []byte("solana is fun") + signature, err := privateKey.Sign(data) + require.NoError(t, err) + return Log{ + FilterId: filterID, + ChainId: chainID, + LogIndex: 1, + BlockHash: Hash(pubKey), + BlockNumber: 10, + BlockTimestamp: time.Unix(1731590113, 0), + Address: PublicKey(pubKey), + EventSig: EventSignature{3, 2, 1}, + SubkeyValues: [][]byte{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}, + TxHash: Signature(signature), + Data: data, + } +} diff --git a/pkg/solana/logpoller/orm_test.go b/pkg/solana/logpoller/orm_test.go index ca533d0eb..bb12e9564 100644 --- a/pkg/solana/logpoller/orm_test.go +++ b/pkg/solana/logpoller/orm_test.go @@ -5,7 +5,6 @@ package logpoller import ( "os" "testing" - "time" "github.com/gagliardetto/solana-go" "github.com/google/uuid" @@ -155,45 +154,6 @@ func TestLogPollerFilters(t *testing.T) { }) } -func newRandomFilter(t *testing.T) Filter { - privateKey, err := solana.NewRandomPrivateKey() - require.NoError(t, err) - pubKey := privateKey.PublicKey() - return Filter{ - Name: uuid.NewString(), - Address: PublicKey(pubKey), - EventName: "event", - EventSig: EventSignature{1, 2, 3}, - StartingBlock: 1, - EventIDL: "{}", - SubkeyPaths: [][]string{{"a", "b"}, {"c"}}, - Retention: 1000, - MaxLogsKept: 3, - } -} - -func newRandomLog(t *testing.T, filterID int64, chainID string) Log { - privateKey, err := solana.NewRandomPrivateKey() - require.NoError(t, err) - pubKey := privateKey.PublicKey() - data := []byte("solana is fun") - signature, err := privateKey.Sign(data) - require.NoError(t, err) - return Log{ - FilterId: filterID, - ChainId: chainID, - LogIndex: 1, - BlockHash: Hash(pubKey), - BlockNumber: 10, - BlockTimestamp: time.Unix(1731590113, 0), - Address: PublicKey(pubKey), - EventSig: EventSignature{3, 2, 1}, - SubkeyValues: [][]byte{{3, 2, 1}, {1}, {1, 2}, pubKey.Bytes()}, - TxHash: Signature(signature), - Data: data, - } -} - func TestLogPollerLogs(t *testing.T) { lggr := logger.Test(t) dbURL, ok := os.LookupEnv("CL_DATABASE_URL") From 2fcc013cc0007f74ad92e2f58e9340f94d9be952 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Mon, 25 Nov 2024 19:45:01 +0100 Subject: [PATCH 5/8] remove redundant time offset --- pkg/solana/logpoller/log_poller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index b075a7098..8b4d1a795 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -128,7 +128,7 @@ func (lp *LogPoller) backgroundWorkerRun() { ctx, cancel := lp.chStop.NewCtx() defer cancel() - pruneFilters := services.NewTicker(time.Minute + 618*time.Millisecond) // try to minimize collisions with one-second period + pruneFilters := services.NewTicker(time.Minute) defer pruneFilters.Stop() for { select { From 0ab8f13616f410a8b7656c91bae179d6ebd835b0 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Mon, 25 Nov 2024 20:33:15 +0100 Subject: [PATCH 6/8] switch to mapByID instead of slice --- integration-tests/go.mod | 2 +- pkg/solana/logpoller/filters.go | 89 ++++++++++++++++------------ pkg/solana/logpoller/filters_test.go | 22 ++++--- pkg/solana/logpoller/log_poller.go | 4 +- pkg/solana/logpoller/mock_orm.go | 12 ++-- pkg/solana/logpoller/orm.go | 2 +- 6 files changed, 73 insertions(+), 58 deletions(-) diff --git a/integration-tests/go.mod b/integration-tests/go.mod index 7f5450c7d..ca86de258 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -1,6 +1,6 @@ module github.com/smartcontractkit/chainlink-solana/integration-tests -go 1.22.8 +go 1.23 toolchain go1.23.2 diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go index 76ac20745..a5dd596d8 100644 --- a/pkg/solana/logpoller/filters.go +++ b/pkg/solana/logpoller/filters.go @@ -5,7 +5,7 @@ import ( "errors" "fmt" "iter" - "slices" + "maps" "sync" "sync/atomic" @@ -17,9 +17,9 @@ type filters struct { lggr logger.SugaredLogger filtersByName map[string]Filter - filtersByAddress map[PublicKey]map[EventSignature][]Filter - filtersToBackfill []Filter - filtersToDelete []Filter // populated on start from db and pruned on first iteration of run + filtersByAddress map[PublicKey]map[EventSignature]map[int64]Filter + filtersToBackfill map[int64]Filter + filtersToDelete map[int64]Filter filtersMutex sync.RWMutex loadedFilters atomic.Bool } @@ -28,9 +28,6 @@ func newFilters(lggr logger.SugaredLogger, orm ORM) *filters { return &filters{ orm: orm, lggr: lggr, - - filtersByName: make(map[string]Filter), - filtersByAddress: map[PublicKey]map[EventSignature][]Filter{}, } } @@ -43,7 +40,7 @@ func (lp *filters) PruneFilters(ctx context.Context) error { lp.filtersMutex.Lock() filtersToDelete := lp.filtersToDelete - lp.filtersToDelete = nil + lp.filtersToDelete = make(map[int64]Filter) lp.filtersMutex.Unlock() if len(filtersToDelete) == 0 { @@ -54,7 +51,7 @@ func (lp *filters) PruneFilters(ctx context.Context) error { if err != nil { lp.filtersMutex.Lock() defer lp.filtersMutex.Unlock() - lp.filtersToDelete = append(lp.filtersToDelete, filtersToDelete...) + maps.Copy(lp.filtersToDelete, filtersToDelete) return fmt.Errorf("failed to delete filters: %w", err) } @@ -98,12 +95,18 @@ func (lp *filters) RegisterFilter(ctx context.Context, filter Filter) error { lp.filtersByName[filter.Name] = filter filtersByEventSig, ok := lp.filtersByAddress[filter.Address] if !ok { - filtersByEventSig = make(map[EventSignature][]Filter) + filtersByEventSig = make(map[EventSignature]map[int64]Filter) lp.filtersByAddress[filter.Address] = filtersByEventSig } - filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter) - lp.filtersToBackfill = append(lp.filtersToBackfill, filter) + filtersByID, ok := filtersByEventSig[filter.EventSig] + if !ok { + filtersByID = make(map[int64]Filter) + filtersByEventSig[filter.EventSig] = filtersByID + } + + filtersByID[filter.ID] = filter + lp.filtersToBackfill[filterID] = filter return nil } @@ -131,13 +134,13 @@ func (lp *filters) UnregisterFilter(ctx context.Context, name string) error { lp.removeFilterFromIndexes(filter) - lp.filtersToDelete = append(lp.filtersToDelete, filter) + lp.filtersToDelete[filter.ID] = filter return nil } func (lp *filters) removeFilterFromIndexes(filter Filter) { delete(lp.filtersByName, filter.Name) - lp.filtersToBackfill, _ = removeFilterFromSlice(lp.filtersToBackfill, filter) + delete(lp.filtersToBackfill, filter.ID) filtersByEventSig, ok := lp.filtersByAddress[filter.Address] if !ok { @@ -145,17 +148,18 @@ func (lp *filters) removeFilterFromIndexes(filter Filter) { return } - filtersByEventSig[filter.EventSig], ok = removeFilterFromSlice(filtersByEventSig[filter.EventSig], filter) + filtersByID, ok := filtersByEventSig[filter.EventSig] if !ok { lp.lggr.Warnw("Filter not found in filtersByEventSig", "name", filter.Name, "address", filter.Address) return } - if len(filtersByEventSig[filter.EventSig]) == 0 { + delete(filtersByID, filter.ID) + if len(filtersByID) == 0 { delete(filtersByEventSig, filter.EventSig) } - if len(lp.filtersByAddress[filter.Address]) == 0 { + if len(filtersByEventSig) == 0 { delete(lp.filtersByAddress, filter.Address) } } @@ -163,6 +167,10 @@ func (lp *filters) removeFilterFromIndexes(filter Filter) { // MatchingFilters - returns iterator to go through all matching filters. // Requires LoadFilters to be called at least once. func (lp *filters) MatchingFilters(addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] { + if !lp.loadedFilters.Load() { + lp.lggr.Critical("Invariant violation: expected filters to be loaded before call to MatchingFilters") + return nil + } return func(yield func(Filter) bool) { lp.filtersMutex.RLock() defer lp.filtersMutex.RUnlock() @@ -181,11 +189,15 @@ func (lp *filters) MatchingFilters(addr PublicKey, eventSignature EventSignature // ConsumeFiltersToBackfill - removes all filters from the backfill queue and returns them to caller. // Requires LoadFilters to be called at least once. -func (lp *filters) ConsumeFiltersToBackfill(ctx context.Context) []Filter { +func (lp *filters) ConsumeFiltersToBackfill() map[int64]Filter { + if !lp.loadedFilters.Load() { + lp.lggr.Critical("Invariant violation: expected filters to be loaded before call to MatchingFilters") + return nil + } lp.filtersMutex.Lock() defer lp.filtersMutex.Unlock() filtersToBackfill := lp.filtersToBackfill - lp.filtersToBackfill = nil + lp.filtersToBackfill = make(map[int64]Filter) return filtersToBackfill } @@ -199,10 +211,10 @@ func (lp *filters) LoadFilters(ctx context.Context) error { lp.filtersMutex.Lock() defer lp.filtersMutex.Unlock() // reset filters' indexes to ensure we do not have partial data from the previous run - lp.filtersByAddress = make(map[PublicKey]map[EventSignature][]Filter) lp.filtersByName = make(map[string]Filter) - lp.filtersToBackfill = nil - lp.filtersToDelete = nil + lp.filtersByAddress = make(map[PublicKey]map[EventSignature]map[int64]Filter) + lp.filtersToBackfill = make(map[int64]Filter) + lp.filtersToDelete = make(map[int64]Filter) filters, err := lp.orm.SelectFilters(ctx) if err != nil { @@ -211,7 +223,7 @@ func (lp *filters) LoadFilters(ctx context.Context) error { for _, filter := range filters { if filter.IsDeleted { - lp.filtersToDelete = append(lp.filtersToDelete, filter) + lp.filtersToDelete[filter.ID] = filter continue } @@ -224,28 +236,27 @@ func (lp *filters) LoadFilters(ctx context.Context) error { lp.filtersByName[filter.Name] = filter filtersByEventSig, ok := lp.filtersByAddress[filter.Address] if !ok { - filtersByEventSig = make(map[EventSignature][]Filter) + filtersByEventSig = make(map[EventSignature]map[int64]Filter) lp.filtersByAddress[filter.Address] = filtersByEventSig } - filtersByEventSig[filter.EventSig] = append(filtersByEventSig[filter.EventSig], filter) - lp.filtersToBackfill = append(lp.filtersToBackfill, filter) + filtersByID, ok := filtersByEventSig[filter.EventSig] + if !ok { + filtersByID = make(map[int64]Filter) + filtersByEventSig[filter.EventSig] = filtersByID + } + + if _, ok := filtersByID[filter.ID]; ok { + errMsg := fmt.Sprintf("invariant violation while loading from db: expected filters to have unique ID: %d ", filter.ID) + lp.lggr.Critical(errMsg) + return errors.New(errMsg) + } + + filtersByID[filter.ID] = filter + lp.filtersToBackfill[filter.ID] = filter } lp.loadedFilters.Store(true) return nil } - -func removeFilterFromSlice(filters []Filter, filter Filter) ([]Filter, bool) { - index := slices.IndexFunc(filters, func(item Filter) bool { - return item.ID == filter.ID - }) - if index == -1 { - return filters, false - } - - lastIdx := len(filters) - 1 - filters[index], filters[lastIdx] = filters[lastIdx], filters[index] - return filters[:lastIdx], true -} diff --git a/pkg/solana/logpoller/filters_test.go b/pkg/solana/logpoller/filters_test.go index 42f650d7f..54f996d25 100644 --- a/pkg/solana/logpoller/filters_test.go +++ b/pkg/solana/logpoller/filters_test.go @@ -28,7 +28,7 @@ func TestFilters_LoadFilters(t *testing.T) { Name: "Happy path", } happyPath2 := Filter{ - ID: 1, + ID: 2, Name: "Happy path 2", } orm.On("SelectFilters", mock.Anything).Return([]Filter{ @@ -43,13 +43,13 @@ func TestFilters_LoadFilters(t *testing.T) { require.NoError(t, err) // only one filter to delete require.Len(t, fs.filtersToDelete, 1) - require.Equal(t, deleted, fs.filtersToDelete[0]) + require.Equal(t, deleted, fs.filtersToDelete[deleted.ID]) // backfill and happy path both indexed require.Len(t, fs.filtersByAddress, 1) require.Len(t, fs.filtersByAddress[happyPath.Address], 1) require.Len(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], 2) - require.Contains(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], happyPath) - require.Contains(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], happyPath2) + require.Equal(t, happyPath, fs.filtersByAddress[happyPath.Address][happyPath.EventSig][happyPath.ID]) + require.Equal(t, happyPath2, fs.filtersByAddress[happyPath.Address][happyPath.EventSig][happyPath2.ID]) require.Len(t, fs.filtersByName, 2) require.Equal(t, fs.filtersByName[happyPath.Name], happyPath) require.Equal(t, fs.filtersByName[happyPath2.Name], happyPath2) @@ -148,9 +148,9 @@ func TestFilters_RegisterFilter(t *testing.T) { err = fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) require.NoError(t, err) require.Len(t, fs.filtersToDelete, 1) - require.Contains(t, fs.filtersToDelete, Filter{Name: filterName, ID: filterID}) + require.Equal(t, Filter{Name: filterName, ID: filterID}, fs.filtersToDelete[filterID]) require.Len(t, fs.filtersToBackfill, 1) - require.Contains(t, fs.filtersToBackfill, Filter{Name: filterName, ID: filterID + 1}) + require.Equal(t, Filter{Name: filterName, ID: filterID + 1}, fs.filtersToBackfill[filterID+1]) }) } @@ -214,7 +214,7 @@ func TestFilters_PruneFilters(t *testing.T) { Name: "To keep", }, }, nil).Once() - orm.On("DeleteFilters", mock.Anything, []Filter{toDelete}).Return(nil).Once() + orm.On("DeleteFilters", mock.Anything, map[int64]Filter{toDelete.ID: toDelete}).Return(nil).Once() err := fs.PruneFilters(tests.Context(t)) require.NoError(t, err) require.Len(t, fs.filtersToDelete, 0) @@ -238,7 +238,7 @@ func TestFilters_PruneFilters(t *testing.T) { ID: 3, Name: "To delete 2", } - orm.On("DeleteFilters", mock.Anything, []Filter{toDelete}).Return(errors.New("db failed")).Run(func(_ mock.Arguments) { + orm.On("DeleteFilters", mock.Anything, map[int64]Filter{toDelete.ID: toDelete}).Return(errors.New("db failed")).Run(func(_ mock.Arguments) { orm.On("MarkFilterDeleted", mock.Anything, newToDelete.ID).Return(nil).Once() orm.On("InsertFilter", mock.Anything, mock.Anything).Return(newToDelete.ID, nil).Once() require.NoError(t, fs.RegisterFilter(tests.Context(t), newToDelete)) @@ -246,7 +246,7 @@ func TestFilters_PruneFilters(t *testing.T) { }).Once() err := fs.PruneFilters(tests.Context(t)) require.EqualError(t, err, "failed to delete filters: db failed") - require.Equal(t, fs.filtersToDelete, []Filter{newToDelete, toDelete}) + require.Equal(t, fs.filtersToDelete, map[int64]Filter{newToDelete.ID: newToDelete, toDelete.ID: toDelete}) }) } @@ -254,22 +254,26 @@ func TestFilters_MatchingFilters(t *testing.T) { orm := newMockORM(t) lggr := logger.Sugared(logger.Test(t)) expectedFilter1 := Filter{ + ID: 1, Name: "expectedFilter1", Address: newRandomPublicKey(t), EventSig: newRandomEventSignature(t), } expectedFilter2 := Filter{ + ID: 2, Name: "expectedFilter2", Address: expectedFilter1.Address, EventSig: expectedFilter1.EventSig, } sameAddress := Filter{ + ID: 3, Name: "sameAddressWrongEventSig", Address: expectedFilter1.Address, EventSig: newRandomEventSignature(t), } sameEventSig := Filter{ + ID: 4, Name: "wrongAddressSameEventSig", Address: newRandomPublicKey(t), EventSig: expectedFilter1.EventSig, diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index 8b4d1a795..c5c64f2b0 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -17,7 +17,7 @@ var ( type ORM interface { InsertFilter(ctx context.Context, filter Filter) (id int64, err error) SelectFilters(ctx context.Context) ([]Filter, error) - DeleteFilters(ctx context.Context, filters []Filter) error + DeleteFilters(ctx context.Context, filters map[int64]Filter) error MarkFilterDeleted(ctx context.Context, id int64) (err error) } @@ -111,7 +111,7 @@ func (lp *LogPoller) run() { case <-ctx.Done(): return case block := <-blocks: - filtersToBackfill := lp.filters.ConsumeFiltersToBackfill(ctx) + filtersToBackfill := lp.filters.ConsumeFiltersToBackfill() // TODO: NONEVM-916 parse, filters and persist logs // NOTE: removal of filters occurs in the separate goroutine, so there is a chance that upon insert diff --git a/pkg/solana/logpoller/mock_orm.go b/pkg/solana/logpoller/mock_orm.go index a30fba87f..8d8ae6787 100644 --- a/pkg/solana/logpoller/mock_orm.go +++ b/pkg/solana/logpoller/mock_orm.go @@ -22,7 +22,7 @@ func (_m *mockORM) EXPECT() *mockORM_Expecter { } // DeleteFilters provides a mock function with given fields: ctx, filters -func (_m *mockORM) DeleteFilters(ctx context.Context, filters []Filter) error { +func (_m *mockORM) DeleteFilters(ctx context.Context, filters map[int64]Filter) error { ret := _m.Called(ctx, filters) if len(ret) == 0 { @@ -30,7 +30,7 @@ func (_m *mockORM) DeleteFilters(ctx context.Context, filters []Filter) error { } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []Filter) error); ok { + if rf, ok := ret.Get(0).(func(context.Context, map[int64]Filter) error); ok { r0 = rf(ctx, filters) } else { r0 = ret.Error(0) @@ -46,14 +46,14 @@ type mockORM_DeleteFilters_Call struct { // DeleteFilters is a helper method to define mock.On call // - ctx context.Context -// - filters []Filter +// - filters map[int64]Filter func (_e *mockORM_Expecter) DeleteFilters(ctx interface{}, filters interface{}) *mockORM_DeleteFilters_Call { return &mockORM_DeleteFilters_Call{Call: _e.mock.On("DeleteFilters", ctx, filters)} } -func (_c *mockORM_DeleteFilters_Call) Run(run func(ctx context.Context, filters []Filter)) *mockORM_DeleteFilters_Call { +func (_c *mockORM_DeleteFilters_Call) Run(run func(ctx context.Context, filters map[int64]Filter)) *mockORM_DeleteFilters_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]Filter)) + run(args[0].(context.Context), args[1].(map[int64]Filter)) }) return _c } @@ -63,7 +63,7 @@ func (_c *mockORM_DeleteFilters_Call) Return(_a0 error) *mockORM_DeleteFilters_C return _c } -func (_c *mockORM_DeleteFilters_Call) RunAndReturn(run func(context.Context, []Filter) error) *mockORM_DeleteFilters_Call { +func (_c *mockORM_DeleteFilters_Call) RunAndReturn(run func(context.Context, map[int64]Filter) error) *mockORM_DeleteFilters_Call { _c.Call.Return(run) return _c } diff --git a/pkg/solana/logpoller/orm.go b/pkg/solana/logpoller/orm.go index f79e242fb..e390f94ba 100644 --- a/pkg/solana/logpoller/orm.go +++ b/pkg/solana/logpoller/orm.go @@ -97,7 +97,7 @@ func (o *DSORM) DeleteFilter(ctx context.Context, id int64) (err error) { return err } -func (o *DSORM) DeleteFilters(ctx context.Context, filters []Filter) error { +func (o *DSORM) DeleteFilters(ctx context.Context, filters map[int64]Filter) error { for _, filter := range filters { err := o.DeleteFilter(ctx, filter.ID) if err != nil { From 770c0c028a50bd5c0b6f8cefca982885d77a465c Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Tue, 26 Nov 2024 15:40:10 +0100 Subject: [PATCH 7/8] rename filters --- pkg/solana/logpoller/filters.go | 138 ++++++++++++++++---------------- 1 file changed, 69 insertions(+), 69 deletions(-) diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go index a5dd596d8..1f8bd4933 100644 --- a/pkg/solana/logpoller/filters.go +++ b/pkg/solana/logpoller/filters.go @@ -32,26 +32,26 @@ func newFilters(lggr logger.SugaredLogger, orm ORM) *filters { } // PruneFilters - prunes all filters marked to be deleted from the database and all corresponding logs. -func (lp *filters) PruneFilters(ctx context.Context) error { - err := lp.LoadFilters(ctx) +func (fl *filters) PruneFilters(ctx context.Context) error { + err := fl.LoadFilters(ctx) if err != nil { return fmt.Errorf("failed to load filters: %w", err) } - lp.filtersMutex.Lock() - filtersToDelete := lp.filtersToDelete - lp.filtersToDelete = make(map[int64]Filter) - lp.filtersMutex.Unlock() + fl.filtersMutex.Lock() + filtersToDelete := fl.filtersToDelete + fl.filtersToDelete = make(map[int64]Filter) + fl.filtersMutex.Unlock() if len(filtersToDelete) == 0 { return nil } - err = lp.orm.DeleteFilters(ctx, filtersToDelete) + err = fl.orm.DeleteFilters(ctx, filtersToDelete) if err != nil { - lp.filtersMutex.Lock() - defer lp.filtersMutex.Unlock() - maps.Copy(lp.filtersToDelete, filtersToDelete) + fl.filtersMutex.Lock() + defer fl.filtersMutex.Unlock() + maps.Copy(fl.filtersToDelete, filtersToDelete) return fmt.Errorf("failed to delete filters: %w", err) } @@ -65,38 +65,38 @@ func (lp *filters) PruneFilters(ctx context.Context) error { // one of the fields defining resulting logs (Address, EventSig, EventIDL, SubkeyPaths) does not match original filter. // Otherwise, updates remaining fields and schedules backfill. // Warnings/debug information is keyed by filter name. -func (lp *filters) RegisterFilter(ctx context.Context, filter Filter) error { +func (fl *filters) RegisterFilter(ctx context.Context, filter Filter) error { if len(filter.Name) == 0 { return errors.New("name is required") } - err := lp.LoadFilters(ctx) + err := fl.LoadFilters(ctx) if err != nil { return fmt.Errorf("failed to load filters: %w", err) } - lp.filtersMutex.Lock() - defer lp.filtersMutex.Unlock() + fl.filtersMutex.Lock() + defer fl.filtersMutex.Unlock() - if existingFilter, ok := lp.filtersByName[filter.Name]; ok { + if existingFilter, ok := fl.filtersByName[filter.Name]; ok { if !existingFilter.MatchSameLogs(filter) { return ErrFilterNameConflict } - lp.removeFilterFromIndexes(existingFilter) + fl.removeFilterFromIndexes(existingFilter) } - filterID, err := lp.orm.InsertFilter(ctx, filter) + filterID, err := fl.orm.InsertFilter(ctx, filter) if err != nil { return fmt.Errorf("failed to insert filter: %w", err) } filter.ID = filterID - lp.filtersByName[filter.Name] = filter - filtersByEventSig, ok := lp.filtersByAddress[filter.Address] + fl.filtersByName[filter.Name] = filter + filtersByEventSig, ok := fl.filtersByAddress[filter.Address] if !ok { filtersByEventSig = make(map[EventSignature]map[int64]Filter) - lp.filtersByAddress[filter.Address] = filtersByEventSig + fl.filtersByAddress[filter.Address] = filtersByEventSig } filtersByID, ok := filtersByEventSig[filter.EventSig] @@ -106,51 +106,51 @@ func (lp *filters) RegisterFilter(ctx context.Context, filter Filter) error { } filtersByID[filter.ID] = filter - lp.filtersToBackfill[filterID] = filter + fl.filtersToBackfill[filterID] = filter return nil } // UnregisterFilter will mark the filter with the given name for pruning and async prune all corresponding logs. // If the name does not exist, it will log an error but not return an error. // Warnings/debug information is keyed by filter name. -func (lp *filters) UnregisterFilter(ctx context.Context, name string) error { - err := lp.LoadFilters(ctx) +func (fl *filters) UnregisterFilter(ctx context.Context, name string) error { + err := fl.LoadFilters(ctx) if err != nil { return fmt.Errorf("failed to load filters: %w", err) } - lp.filtersMutex.Lock() - defer lp.filtersMutex.Unlock() + fl.filtersMutex.Lock() + defer fl.filtersMutex.Unlock() - filter, ok := lp.filtersByName[name] + filter, ok := fl.filtersByName[name] if !ok { - lp.lggr.Warnw("Filter not found in filtersByName", "name", name) + fl.lggr.Warnw("Filter not found in filtersByName", "name", name) return nil } - if err := lp.orm.MarkFilterDeleted(ctx, filter.ID); err != nil { + if err := fl.orm.MarkFilterDeleted(ctx, filter.ID); err != nil { return fmt.Errorf("failed to mark filter deleted: %w", err) } - lp.removeFilterFromIndexes(filter) + fl.removeFilterFromIndexes(filter) - lp.filtersToDelete[filter.ID] = filter + fl.filtersToDelete[filter.ID] = filter return nil } -func (lp *filters) removeFilterFromIndexes(filter Filter) { - delete(lp.filtersByName, filter.Name) - delete(lp.filtersToBackfill, filter.ID) +func (fl *filters) removeFilterFromIndexes(filter Filter) { + delete(fl.filtersByName, filter.Name) + delete(fl.filtersToBackfill, filter.ID) - filtersByEventSig, ok := lp.filtersByAddress[filter.Address] + filtersByEventSig, ok := fl.filtersByAddress[filter.Address] if !ok { - lp.lggr.Warnw("Filter not found in filtersByAddress", "name", filter.Name, "address", filter.Address) + fl.lggr.Warnw("Filter not found in filtersByAddress", "name", filter.Name, "address", filter.Address) return } filtersByID, ok := filtersByEventSig[filter.EventSig] if !ok { - lp.lggr.Warnw("Filter not found in filtersByEventSig", "name", filter.Name, "address", filter.Address) + fl.lggr.Warnw("Filter not found in filtersByEventSig", "name", filter.Name, "address", filter.Address) return } @@ -160,21 +160,21 @@ func (lp *filters) removeFilterFromIndexes(filter Filter) { } if len(filtersByEventSig) == 0 { - delete(lp.filtersByAddress, filter.Address) + delete(fl.filtersByAddress, filter.Address) } } // MatchingFilters - returns iterator to go through all matching filters. // Requires LoadFilters to be called at least once. -func (lp *filters) MatchingFilters(addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] { - if !lp.loadedFilters.Load() { - lp.lggr.Critical("Invariant violation: expected filters to be loaded before call to MatchingFilters") +func (fl *filters) MatchingFilters(addr PublicKey, eventSignature EventSignature) iter.Seq[Filter] { + if !fl.loadedFilters.Load() { + fl.lggr.Critical("Invariant violation: expected filters to be loaded before call to MatchingFilters") return nil } return func(yield func(Filter) bool) { - lp.filtersMutex.RLock() - defer lp.filtersMutex.RUnlock() - filters, ok := lp.filtersByAddress[addr] + fl.filtersMutex.RLock() + defer fl.filtersMutex.RUnlock() + filters, ok := fl.filtersByAddress[addr] if !ok { return } @@ -189,55 +189,55 @@ func (lp *filters) MatchingFilters(addr PublicKey, eventSignature EventSignature // ConsumeFiltersToBackfill - removes all filters from the backfill queue and returns them to caller. // Requires LoadFilters to be called at least once. -func (lp *filters) ConsumeFiltersToBackfill() map[int64]Filter { - if !lp.loadedFilters.Load() { - lp.lggr.Critical("Invariant violation: expected filters to be loaded before call to MatchingFilters") +func (fl *filters) ConsumeFiltersToBackfill() map[int64]Filter { + if !fl.loadedFilters.Load() { + fl.lggr.Critical("Invariant violation: expected filters to be loaded before call to MatchingFilters") return nil } - lp.filtersMutex.Lock() - defer lp.filtersMutex.Unlock() - filtersToBackfill := lp.filtersToBackfill - lp.filtersToBackfill = make(map[int64]Filter) + fl.filtersMutex.Lock() + defer fl.filtersMutex.Unlock() + filtersToBackfill := fl.filtersToBackfill + fl.filtersToBackfill = make(map[int64]Filter) return filtersToBackfill } // LoadFilters - loads filters from database. Can be called multiple times without side effects. -func (lp *filters) LoadFilters(ctx context.Context) error { - if lp.loadedFilters.Load() { +func (fl *filters) LoadFilters(ctx context.Context) error { + if fl.loadedFilters.Load() { return nil } - lp.lggr.Debugw("Loading filters from db") - lp.filtersMutex.Lock() - defer lp.filtersMutex.Unlock() + fl.lggr.Debugw("Loading filters from db") + fl.filtersMutex.Lock() + defer fl.filtersMutex.Unlock() // reset filters' indexes to ensure we do not have partial data from the previous run - lp.filtersByName = make(map[string]Filter) - lp.filtersByAddress = make(map[PublicKey]map[EventSignature]map[int64]Filter) - lp.filtersToBackfill = make(map[int64]Filter) - lp.filtersToDelete = make(map[int64]Filter) + fl.filtersByName = make(map[string]Filter) + fl.filtersByAddress = make(map[PublicKey]map[EventSignature]map[int64]Filter) + fl.filtersToBackfill = make(map[int64]Filter) + fl.filtersToDelete = make(map[int64]Filter) - filters, err := lp.orm.SelectFilters(ctx) + filters, err := fl.orm.SelectFilters(ctx) if err != nil { return fmt.Errorf("failed to select filters from db: %w", err) } for _, filter := range filters { if filter.IsDeleted { - lp.filtersToDelete[filter.ID] = filter + fl.filtersToDelete[filter.ID] = filter continue } - if _, ok := lp.filtersByName[filter.Name]; ok { + if _, ok := fl.filtersByName[filter.Name]; ok { errMsg := fmt.Sprintf("invariant violation while loading from db: expected filters to have unique name: %s ", filter.Name) - lp.lggr.Critical(errMsg) + fl.lggr.Critical(errMsg) return errors.New(errMsg) } - lp.filtersByName[filter.Name] = filter - filtersByEventSig, ok := lp.filtersByAddress[filter.Address] + fl.filtersByName[filter.Name] = filter + filtersByEventSig, ok := fl.filtersByAddress[filter.Address] if !ok { filtersByEventSig = make(map[EventSignature]map[int64]Filter) - lp.filtersByAddress[filter.Address] = filtersByEventSig + fl.filtersByAddress[filter.Address] = filtersByEventSig } filtersByID, ok := filtersByEventSig[filter.EventSig] @@ -248,15 +248,15 @@ func (lp *filters) LoadFilters(ctx context.Context) error { if _, ok := filtersByID[filter.ID]; ok { errMsg := fmt.Sprintf("invariant violation while loading from db: expected filters to have unique ID: %d ", filter.ID) - lp.lggr.Critical(errMsg) + fl.lggr.Critical(errMsg) return errors.New(errMsg) } filtersByID[filter.ID] = filter - lp.filtersToBackfill[filter.ID] = filter + fl.filtersToBackfill[filter.ID] = filter } - lp.loadedFilters.Store(true) + fl.loadedFilters.Store(true) return nil } From b51faa857930cfe13597c80b882e9c3a7f4daf34 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Tue, 26 Nov 2024 15:40:32 +0100 Subject: [PATCH 8/8] tests improvements --- pkg/solana/logpoller/filters_test.go | 38 ++++++++++++++++++---------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/pkg/solana/logpoller/filters_test.go b/pkg/solana/logpoller/filters_test.go index 5b2fe892c..6341d6793 100644 --- a/pkg/solana/logpoller/filters_test.go +++ b/pkg/solana/logpoller/filters_test.go @@ -3,6 +3,7 @@ package logpoller import ( "errors" "fmt" + "slices" "testing" "github.com/gagliardetto/solana-go" @@ -45,7 +46,7 @@ func TestFilters_LoadFilters(t *testing.T) { // only one filter to delete require.Len(t, fs.filtersToDelete, 1) require.Equal(t, deleted, fs.filtersToDelete[deleted.ID]) - // backfill and happy path both indexed + // both happy path are indexed require.Len(t, fs.filtersByAddress, 1) require.Len(t, fs.filtersByAddress[happyPath.Address], 1) require.Len(t, fs.filtersByAddress[happyPath.Address][happyPath.EventSig], 2) @@ -126,12 +127,25 @@ func TestFilters_RegisterFilter(t *testing.T) { const filterName = "Filter" orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(0), errors.New("failed to insert")).Once() - err := fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + filter := Filter{Name: filterName} + err := fs.RegisterFilter(tests.Context(t), filter) require.Error(t, err) // can readd after db issue is resolved orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(1), nil).Once() - err = fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) + err = fs.RegisterFilter(tests.Context(t), filter) + require.NoError(t, err) + // can update non-primary fields + filter.EventName = uuid.NewString() + filter.StartingBlock++ + filter.Retention++ + filter.MaxLogsKept++ + orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(1), nil).Once() + err = fs.RegisterFilter(tests.Context(t), filter) require.NoError(t, err) + storedFilters := slices.Collect(fs.MatchingFilters(filter.Address, filter.EventSig)) + require.Len(t, storedFilters, 1) + filter.ID = 1 + require.Equal(t, filter, storedFilters[0]) }) t.Run("Can reregister after unregister", func(t *testing.T) { orm := newMockORM(t) @@ -280,17 +294,15 @@ func TestFilters_MatchingFilters(t *testing.T) { EventSig: expectedFilter1.EventSig, } orm.On("SelectFilters", mock.Anything).Return([]Filter{expectedFilter1, expectedFilter2, sameAddress, sameEventSig}, nil).Once() - expectedFilters := map[string]struct{}{ - expectedFilter1.Name: {}, - expectedFilter2.Name: {}, - } filters := newFilters(lggr, orm) err := filters.LoadFilters(tests.Context(t)) require.NoError(t, err) - for filter := range filters.MatchingFilters(expectedFilter1.Address, expectedFilter1.EventSig) { - _, ok := expectedFilters[filter.Name] - require.True(t, ok, "MatchingFilters returned unexpected filter %s", filter.Name) - delete(expectedFilters, filter.Name) - } - require.Len(t, expectedFilters, 0) + matchingFilters := slices.Collect(filters.MatchingFilters(expectedFilter1.Address, expectedFilter1.EventSig)) + require.Len(t, matchingFilters, 2) + require.Contains(t, matchingFilters, expectedFilter1) + require.Contains(t, matchingFilters, expectedFilter2) + // if at least one key does not match - returns empty iterator + require.Empty(t, slices.Collect(filters.MatchingFilters(newRandomPublicKey(t), expectedFilter1.EventSig))) + require.Empty(t, slices.Collect(filters.MatchingFilters(expectedFilter1.Address, newRandomEventSignature(t)))) + require.Empty(t, slices.Collect(filters.MatchingFilters(newRandomPublicKey(t), newRandomEventSignature(t)))) }