diff --git a/pkg/solana/logpoller/job.go b/pkg/solana/logpoller/job.go index 1d827a85b..ce62de190 100644 --- a/pkg/solana/logpoller/job.go +++ b/pkg/solana/logpoller/job.go @@ -33,7 +33,8 @@ func (j retryableJob) Run(ctx context.Context) error { } type eventDetail struct { - blockNumber uint64 + slotNumber uint64 + blockHeight uint64 blockHash solana.Hash trxIdx int trxSig solana.Signature @@ -59,7 +60,7 @@ func (j *processEventJob) Run(_ context.Context) error { type getTransactionsFromBlockJob struct { slotNumber uint64 client RPCClient - parser ProgramEventProcessor + parser *orderedParser chJobs chan Job } @@ -103,17 +104,20 @@ func (j *getTransactionsFromBlockJob) Run(ctx context.Context) error { } detail := eventDetail{ - blockHash: block.Blockhash, + slotNumber: j.slotNumber, + blockHash: block.Blockhash, } if block.BlockHeight != nil { - detail.blockNumber = *block.BlockHeight + detail.blockHeight = *block.BlockHeight } if len(block.Transactions) != len(blockSigsOnly.Signatures) { return fmt.Errorf("block %d has %d transactions but %d signatures", j.slotNumber, len(block.Transactions), len(blockSigsOnly.Signatures)) } + j.parser.ExpectTxs(j.slotNumber, len(block.Transactions)) + for idx, trx := range block.Transactions { detail.trxIdx = idx if len(blockSigsOnly.Signatures)-1 <= idx { @@ -130,14 +134,15 @@ func messagesToEvents(messages []string, parser ProgramEventProcessor, detail ev var logIdx uint for _, outputs := range parseProgramLogs(messages) { for _, event := range outputs.Events { - logIdx++ - - event.BlockNumber = detail.blockNumber + event.SlotNumber = detail.slotNumber + event.BlockHeight = detail.blockHeight event.BlockHash = detail.blockHash event.TransactionHash = detail.trxSig event.TransactionIndex = detail.trxIdx event.TransactionLogIndex = logIdx + logIdx++ + chJobs <- &processEventJob{ parser: parser, event: event, diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index 56fcef25c..25118c6bc 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -3,6 +3,10 @@ package logpoller import ( "context" "errors" + "fmt" + "reflect" + "sort" + "sync" "sync/atomic" "time" @@ -40,7 +44,7 @@ type EncodedLogCollector struct { // dependencies and configuration client RPCClient - parser ProgramEventProcessor + parser *orderedParser lggr logger.Logger rpcTimeLimit time.Duration @@ -62,7 +66,7 @@ func NewEncodedLogCollector( ) *EncodedLogCollector { c := &EncodedLogCollector{ client: client, - parser: parser, + parser: newOrderedParser(parser), chSlot: make(chan uint64), chBlock: make(chan uint64, 1), chJobs: make(chan Job, 1), @@ -201,10 +205,15 @@ func (c *EncodedLogCollector) runSlotProcessing(ctx context.Context) { continue } + from := c.highestSlot.Load() + 1 + if c.highestSlot.Load() == 0 { + from = slot + } + c.highestSlot.Store(slot) // load blocks in slot range - c.loadRange(ctx, c.highestSlotLoaded.Load()+1, slot) + c.loadRange(ctx, from, slot) } } } @@ -214,9 +223,9 @@ func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) { select { case <-ctx.Done(): return - case block := <-c.chBlock: + case slot := <-c.chBlock: if err := c.workers.Do(ctx, &getTransactionsFromBlockJob{ - slotNumber: block, + slotNumber: slot, client: c.client, parser: c.parser, chJobs: c.chJobs, @@ -270,6 +279,7 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en } for _, block := range result { + c.parser.ExpectBlock(block) select { case <-ctx.Done(): return nil @@ -279,3 +289,165 @@ func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, en return nil } + +type orderedParser struct { + parser ProgramEventProcessor + mu sync.Mutex + blocks []uint64 + ready []uint64 + expect map[uint64]int + actual map[uint64][]ProgramEvent +} + +func newOrderedParser(parser ProgramEventProcessor) *orderedParser { + return &orderedParser{ + parser: parser, + blocks: make([]uint64, 0), + ready: make([]uint64, 0), + expect: make(map[uint64]int), + actual: make(map[uint64][]ProgramEvent), + } +} + +func (p *orderedParser) ExpectBlock(block uint64) { + p.mu.Lock() + defer p.mu.Unlock() + + p.blocks = append(p.blocks, block) + + // ensure sort ascending + sort.Slice(p.blocks, func(i, j int) bool { + return p.blocks[i] < p.blocks[j] + }) +} + +func (p *orderedParser) ExpectTxs(block uint64, quantity int) { + p.mu.Lock() + defer p.mu.Unlock() + + p.expect[block] = quantity + p.actual[block] = make([]ProgramEvent, 0, quantity) +} + +func (p *orderedParser) Process(event ProgramEvent) error { + p.mu.Lock() + defer p.mu.Unlock() + + meetsExpectations, err := p.addAndCompareExpectations(event) + if err != nil { + return err + } + + // incoming event does not meet expectations for transaction + // event is added to actual and no error is returned + if !meetsExpectations { + return nil + } + + p.clearEmptyBlocks() + p.setReady(event.SlotNumber) + + return p.sendReadySlots() +} + +func (p *orderedParser) addAndCompareExpectations(evt ProgramEvent) (bool, error) { + expectations, ok := p.expect[evt.SlotNumber] + if !ok { + return false, fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) + } + + evts, ok := p.actual[evt.SlotNumber] + if !ok { + return false, fmt.Errorf("%w: %d", errExpectationsNotSet, evt.SlotNumber) + } + + p.actual[evt.SlotNumber] = append(evts, evt) + + return expectations == len(evts)+1, nil +} + +func (p *orderedParser) clearEmptyBlocks() { + rmvIdx := make([]int, 0) + + for idx, block := range p.blocks { + exp, ok := p.expect[block] + if !ok { + // transaction expectations have not been applied for this block yet + continue + } + + if exp == 0 { + rmvIdx = append(rmvIdx, idx) + + delete(p.expect, block) + delete(p.actual, block) + } + } + + for count, idx := range rmvIdx { + p.blocks = remove(p.blocks, idx-count) + } +} + +func (p *orderedParser) setReady(slot uint64) { + p.ready = append(p.ready, slot) +} + +func (p *orderedParser) sendReadySlots() error { + rmvIdx := make([]int, 0) + + // start at the lowest block and find ready blocks + for idx, block := range p.blocks { + // to ensure ordered delivery, break from the loop if a ready block isn't found + // this function should be preceded by clearEmptyBlocks + rIdx, ok := getIdx(p.ready, block) + if !ok { + return nil + } + + evts, ok := p.actual[block] + if !ok { + return errors.New("invalid state") + } + + var errs error + for _, evt := range evts { + errs = errors.Join(errs, p.parser.Process(evt)) + } + + // need possible retry + if errs != nil { + return errs + } + + p.ready = remove(p.ready, rIdx) + rmvIdx = append(rmvIdx, idx) + + delete(p.expect, block) + delete(p.actual, block) + } + + for count, idx := range rmvIdx { + p.blocks = remove(p.blocks, idx-count) + } + + return nil +} + +func getIdx[T any](slice []T, match T) (int, bool) { + for idx, value := range slice { + if reflect.DeepEqual(value, match) { + return idx, true + } + } + + return -1, false +} + +func remove[T any](slice []T, s int) []T { + return append(slice[:s], slice[s+1:]...) +} + +var ( + errExpectationsNotSet = errors.New("expectations not set") +) diff --git a/pkg/solana/logpoller/loader_test.go b/pkg/solana/logpoller/loader_test.go index 69a37702b..d752b0054 100644 --- a/pkg/solana/logpoller/loader_test.go +++ b/pkg/solana/logpoller/loader_test.go @@ -3,6 +3,7 @@ package logpoller_test import ( "context" "crypto/rand" + "reflect" "sync" "sync/atomic" "testing" @@ -88,6 +89,143 @@ func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) { client.AssertExpectations(t) } +func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { + t.Parallel() + + client := new(mocks.RPCClient) + parser := new(testParser) + ctx := tests.Context(t) + + collector := logpoller.NewEncodedLogCollector(client, parser, logger.Nop()) + + require.NoError(t, collector.Start(ctx)) + t.Cleanup(func() { + require.NoError(t, collector.Close()) + }) + + var latest atomic.Uint64 + + latest.Store(uint64(40)) + + slots := []uint64{43, 42} + sigs := make([]solana.Signature, len(slots)) + hashes := make([]solana.Hash, len(slots)) + + for idx := range len(sigs) { + _, _ = rand.Read(sigs[idx][:]) + _, _ = rand.Read(hashes[idx][:]) + } + + client.EXPECT(). + GetLatestBlockhash(mock.Anything, rpc.CommitmentFinalized). + RunAndReturn(func(ctx context.Context, ct rpc.CommitmentType) (*rpc.GetLatestBlockhashResult, error) { + defer func() { + latest.Store(latest.Load() + 2) + }() + + return &rpc.GetLatestBlockhashResult{ + RPCContext: rpc.RPCContext{ + Context: rpc.Context{ + Slot: latest.Load(), + }, + }, + }, nil + }) + + client.EXPECT(). + GetBlocks(mock.Anything, mock.MatchedBy(func(val uint64) bool { + return val > uint64(0) + }), mock.MatchedBy(func(val *uint64) bool { + return val != nil && *val <= latest.Load() + }), mock.Anything). + RunAndReturn(func(_ context.Context, u1 uint64, u2 *uint64, _ rpc.CommitmentType) (rpc.BlocksResult, error) { + blocks := make([]uint64, *u2-u1+1) + for idx := range blocks { + blocks[idx] = u1 + uint64(idx) + } + + return rpc.BlocksResult(blocks), nil + }) + + client.EXPECT(). + GetBlockWithOpts(mock.Anything, mock.MatchedBy(func(val uint64) bool { + return true + }), mock.Anything). + RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { + slotIdx := -1 + for idx, slt := range slots { + if slt == slot { + slotIdx = idx + + break + } + } + + if slot == 42 { + // force slot 42 to return after 43 + time.Sleep(1 * time.Second) + } + + height := slot - 1 + + if slotIdx == -1 { + var hash solana.Hash + _, _ = rand.Read(hash[:]) + + return &rpc.GetBlockResult{ + Blockhash: hash, + Transactions: []rpc.TransactionWithMeta{}, + Signatures: []solana.Signature{}, + BlockHeight: &height, + }, nil + } + + return &rpc.GetBlockResult{ + Blockhash: hashes[slotIdx], + Transactions: []rpc.TransactionWithMeta{ + { + Meta: &rpc.TransactionMeta{ + LogMessages: messages, + }, + }, + }, + Signatures: []solana.Signature{sigs[slotIdx]}, + BlockHeight: &height, + }, nil + }) + + tests.AssertEventually(t, func() bool { + return reflect.DeepEqual(parser.Events(), []logpoller.ProgramEvent{ + { + BlockData: logpoller.BlockData{ + SlotNumber: 42, + BlockHeight: 41, + BlockHash: hashes[1], + TransactionHash: sigs[1], + TransactionIndex: 0, + TransactionLogIndex: 0, + }, + Prefix: ">", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 43, + BlockHeight: 42, + BlockHash: hashes[0], + TransactionHash: sigs[0], + TransactionIndex: 0, + TransactionLogIndex: 0, + }, + Prefix: ">", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + }) + }) + + client.AssertExpectations(t) +} + func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { client := new(mocks.RPCClient) parser := new(testParser) @@ -347,12 +485,16 @@ func (p *testBlockProducer) GetTransaction(_ context.Context, sig solana.Signatu type testParser struct { called atomic.Bool - count atomic.Uint64 + mu sync.Mutex + events []logpoller.ProgramEvent } func (p *testParser) Process(event logpoller.ProgramEvent) error { p.called.Store(true) - p.count.Store(p.count.Load() + 1) + + p.mu.Lock() + p.events = append(p.events, event) + p.mu.Unlock() return nil } @@ -362,5 +504,15 @@ func (p *testParser) Called() bool { } func (p *testParser) Count() uint64 { - return p.count.Load() + p.mu.Lock() + defer p.mu.Unlock() + + return uint64(len(p.events)) +} + +func (p *testParser) Events() []logpoller.ProgramEvent { + p.mu.Lock() + defer p.mu.Unlock() + + return p.events } diff --git a/pkg/solana/logpoller/log_data_parser.go b/pkg/solana/logpoller/log_data_parser.go index 4cfd04470..4080a09e2 100644 --- a/pkg/solana/logpoller/log_data_parser.go +++ b/pkg/solana/logpoller/log_data_parser.go @@ -16,7 +16,8 @@ var ( ) type BlockData struct { - BlockNumber uint64 + SlotNumber uint64 + BlockHeight uint64 BlockHash solana.Hash TransactionHash solana.Signature TransactionIndex int