diff --git a/.mockery.yaml b/.mockery.yaml index 1df96bfec..8923cfb95 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -40,3 +40,4 @@ packages: github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller: interfaces: RPCClient: + EventSaver: diff --git a/pkg/solana/logpoller/loader_test.go b/pkg/solana/logpoller/loader_test.go index 69a37702b..79dfc76a8 100644 --- a/pkg/solana/logpoller/loader_test.go +++ b/pkg/solana/logpoller/loader_test.go @@ -53,33 +53,7 @@ func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) { require.NoError(t, collector.Close()) }) - slot := uint64(42) - sig := solana.Signature{2, 1, 4, 2} - blockHeight := uint64(21) - - client.EXPECT().GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized).Return(&rpc.GetLatestBlockhashResult{ - RPCContext: rpc.RPCContext{ - Context: rpc.Context{ - Slot: slot, - }, - }, - }, nil) - - client.EXPECT().GetBlocks(mock.Anything, uint64(1), mock.MatchedBy(func(val *uint64) bool { - return val != nil && *val == slot - }), mock.Anything).Return(rpc.BlocksResult{slot}, nil) - - client.EXPECT().GetBlockWithOpts(mock.Anything, slot, mock.Anything).Return(&rpc.GetBlockResult{ - Transactions: []rpc.TransactionWithMeta{ - { - Meta: &rpc.TransactionMeta{ - LogMessages: messages, - }, - }, - }, - Signatures: []solana.Signature{sig}, - BlockHeight: &blockHeight, - }, nil).Twice() + clientExpectSingleEvent(client) tests.AssertEventually(t, func() bool { return parser.Called() @@ -364,3 +338,33 @@ func (p *testParser) Called() bool { func (p *testParser) Count() uint64 { return p.count.Load() } + +func clientExpectSingleEvent(client *mocks.RPCClient) { + slot := uint64(42) + sig := solana.Signature{2, 1, 4, 2} + blockHeight := uint64(21) + + client.EXPECT().GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized).Return(&rpc.GetLatestBlockhashResult{ + RPCContext: rpc.RPCContext{ + Context: rpc.Context{ + Slot: slot, + }, + }, + }, nil) + + client.EXPECT().GetBlocks(mock.Anything, uint64(1), mock.MatchedBy(func(val *uint64) bool { + return val != nil && *val == slot + }), mock.Anything).Return(rpc.BlocksResult{slot}, nil) + + client.EXPECT().GetBlockWithOpts(mock.Anything, slot, mock.Anything).Return(&rpc.GetBlockResult{ + Transactions: []rpc.TransactionWithMeta{ + { + Meta: &rpc.TransactionMeta{ + LogMessages: messages, + }, + }, + }, + Signatures: []solana.Signature{sig}, + BlockHeight: &blockHeight, + }, nil).Twice() +} diff --git a/pkg/solana/logpoller/mocks/event_saver.go b/pkg/solana/logpoller/mocks/event_saver.go new file mode 100644 index 000000000..3ef56441b --- /dev/null +++ b/pkg/solana/logpoller/mocks/event_saver.go @@ -0,0 +1,81 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mocks + +import ( + logpoller "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" + mock "github.com/stretchr/testify/mock" +) + +// EventSaver is an autogenerated mock type for the EventSaver type +type EventSaver struct { + mock.Mock +} + +type EventSaver_Expecter struct { + mock *mock.Mock +} + +func (_m *EventSaver) EXPECT() *EventSaver_Expecter { + return &EventSaver_Expecter{mock: &_m.Mock} +} + +// SaveEvent provides a mock function with given fields: evt +func (_m *EventSaver) SaveEvent(evt logpoller.ProgramEvent) error { + ret := _m.Called(evt) + + if len(ret) == 0 { + panic("no return value specified for SaveEvent") + } + + var r0 error + if rf, ok := ret.Get(0).(func(logpoller.ProgramEvent) error); ok { + r0 = rf(evt) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// EventSaver_SaveEvent_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SaveEvent' +type EventSaver_SaveEvent_Call struct { + *mock.Call +} + +// SaveEvent is a helper method to define mock.On call +// - evt logpoller.ProgramEvent +func (_e *EventSaver_Expecter) SaveEvent(evt interface{}) *EventSaver_SaveEvent_Call { + return &EventSaver_SaveEvent_Call{Call: _e.mock.On("SaveEvent", evt)} +} + +func (_c *EventSaver_SaveEvent_Call) Run(run func(evt logpoller.ProgramEvent)) *EventSaver_SaveEvent_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(logpoller.ProgramEvent)) + }) + return _c +} + +func (_c *EventSaver_SaveEvent_Call) Return(_a0 error) *EventSaver_SaveEvent_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *EventSaver_SaveEvent_Call) RunAndReturn(run func(logpoller.ProgramEvent) error) *EventSaver_SaveEvent_Call { + _c.Call.Return(run) + return _c +} + +// NewEventSaver creates a new instance of EventSaver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewEventSaver(t interface { + mock.TestingT + Cleanup(func()) +}) *EventSaver { + mock := &EventSaver{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/solana/logpoller/poller.go b/pkg/solana/logpoller/poller.go new file mode 100644 index 000000000..e73f8b71a --- /dev/null +++ b/pkg/solana/logpoller/poller.go @@ -0,0 +1,121 @@ +package logpoller + +import ( + "context" + "crypto/sha256" + "encoding/base64" + "fmt" + "strings" + "sync" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/services" +) + +type EventSaver interface { + SaveEvent(evt ProgramEvent) error +} + +type Service struct { + // dependencies + lggr logger.Logger + saver EventSaver + + // internal + loader *EncodedLogCollector + mu sync.RWMutex + discriminators map[string]struct{} + chSave chan ProgramEvent + + // service state management + services.Service + engine *services.Engine +} + +func New(client RPCClient, lggr logger.Logger, saver EventSaver) *Service { + p := &Service{ + saver: saver, + discriminators: make(map[string]struct{}), + chSave: make(chan ProgramEvent), + } + + p.Service, p.engine = services.Config{ + Name: "LogPollerService", + NewSubServices: func(lggr logger.Logger) []services.Service { + p.loader = NewEncodedLogCollector(client, p, lggr) + + return []services.Service{p.loader} + }, + Start: p.start, + Close: p.close, + }.NewServiceEngine(lggr) + p.lggr = p.engine.SugaredLogger + + return p +} + +func (p *Service) AddFilter(name string) error { + p.mu.Lock() + defer p.mu.Unlock() + + hash := sha256.New() + hash.Write([]byte(fmt.Sprintf("event:%s", name))) + + p.discriminators[string(hash.Sum(nil)[:8])] = struct{}{} + + return nil +} + +func (p *Service) start(_ context.Context) error { + p.engine.Go(p.runSaveProcess) + + return nil +} + +func (p *Service) close() error { + return nil +} + +func (p *Service) Process(evt ProgramEvent) error { + encodedData := strings.TrimSpace(evt.Data) + data, err := base64.StdEncoding.DecodeString(encodedData) + if err != nil { + // don't return an error here, just log it + // returning an error will trigger a retry + p.lggr.Errorw("failed to base64 decode data", "err", err) + + return nil + } + + // silently discard events that don't match any expected event signatures + if !p.dataMatchesEventSig(data[:8]) { + return nil + } + + p.chSave <- evt + + return nil +} + +func (p *Service) runSaveProcess(ctx context.Context) { + // this process should ensure ordered batches before saving atomically to the database + for { + select { + case <-ctx.Done(): + return + case evt := <-p.chSave: + if err := p.saver.SaveEvent(evt); err != nil { + p.lggr.Errorw("failed to save event", "err", err) + } + } + } +} + +func (p *Service) dataMatchesEventSig(sig []byte) bool { + p.mu.RLock() + defer p.mu.RUnlock() + + _, ok := p.discriminators[string(sig)] + + return ok +} diff --git a/pkg/solana/logpoller/poller_test.go b/pkg/solana/logpoller/poller_test.go new file mode 100644 index 000000000..901cde69a --- /dev/null +++ b/pkg/solana/logpoller/poller_test.go @@ -0,0 +1,58 @@ +package logpoller_test + +import ( + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/mocks" +) + +func TestLogPoller_ProcessAndSave(t *testing.T) { + t.Parallel() + + client := new(mocks.RPCClient) + saver := new(testSaver) + poller := logpoller.New(client, logger.Nop(), saver) + + require.NoError(t, poller.AddFilter("TestEvent")) + + clientExpectSingleEvent(client) + + require.NoError(t, poller.Start(tests.Context(t))) + + t.Cleanup(func() { + require.NoError(t, poller.Close()) + }) + + tests.AssertEventually(t, func() bool { + return saver.Called() + }) + + client.AssertExpectations(t) +} + +type testSaver struct { + called atomic.Bool + count atomic.Uint64 +} + +func (s *testSaver) SaveEvent(event logpoller.ProgramEvent) error { + s.called.Store(true) + s.count.Store(s.count.Load() + 1) + + return nil +} + +func (s *testSaver) Called() bool { + return s.called.Load() +} + +func (s *testSaver) Count() uint64 { + return s.count.Load() +}