diff --git a/pkg/solana/logpoller/job.go b/pkg/solana/logpoller/job.go index fe87b10dd..09b3aefa5 100644 --- a/pkg/solana/logpoller/job.go +++ b/pkg/solana/logpoller/job.go @@ -131,8 +131,8 @@ func messagesToEvents(messages []string, parser ProgramEventProcessor, detail ev logIdx++ event.BlockNumber = detail.blockNumber - event.BlockHash = detail.blockHash.String() - event.TransactionHash = detail.trxSig.String() + event.BlockHash = detail.blockHash + event.TransactionHash = detail.trxSig event.TransactionIndex = detail.trxIdx event.TransactionLogIndex = logIdx diff --git a/pkg/solana/logpoller/loader.go b/pkg/solana/logpoller/loader.go index e152a6bd6..4757f73c4 100644 --- a/pkg/solana/logpoller/loader.go +++ b/pkg/solana/logpoller/loader.go @@ -50,7 +50,6 @@ type EncodedLogCollector struct { chJobs chan Job workers *WorkerGroup - loadingBlocks atomic.Bool highestSlot atomic.Uint64 highestSlotLoaded atomic.Uint64 lastSentSlot atomic.Uint64 @@ -64,7 +63,7 @@ func NewEncodedLogCollector( c := &EncodedLogCollector{ client: client, parser: parser, - chSlot: make(chan uint64, 1), + chSlot: make(chan uint64), chBlock: make(chan uint64, 1), chJobs: make(chan Job, 1), lggr: lggr, @@ -148,7 +147,7 @@ func (c *EncodedLogCollector) runSlotPolling(ctx context.Context) { // not to be run as a job, but as a blocking call result, err := c.client.GetLatestBlockhash(ctxB, rpc.CommitmentFinalized) if err != nil { - c.lggr.Info("failed to get latest blockhash", "err", err) + c.lggr.Error("failed to get latest blockhash", "err", err) cancel() continue @@ -162,7 +161,11 @@ func (c *EncodedLogCollector) runSlotPolling(ctx context.Context) { } c.lastSentSlot.Store(result.Context.Slot) - c.chSlot <- result.Context.Slot + + select { + case c.chSlot <- result.Context.Slot: + default: + } } timer.Stop() @@ -175,28 +178,11 @@ func (c *EncodedLogCollector) runSlotProcessing(ctx context.Context) { case <-ctx.Done(): return case slot := <-c.chSlot: - if slot > c.highestSlot.Load() { + if c.highestSlot.Load() < slot { c.highestSlot.Store(slot) - if !c.loadingBlocks.Load() { - c.loadingBlocks.Store(true) - - // run routine to load blocks in slot range - go func(start, end uint64) { - defer c.loadingBlocks.Store(false) - - if err := c.loadSlotBlocksRange(ctx, start, end); err != nil { - // TODO: probably log something here - // a retry will happen anyway on the next round of slots - // so the error is handled by doing nothing - c.lggr.Info("failed to load slot blocks range", "start", start, "end", end, "err", err) - - return - } - - c.highestSlotLoaded.Store(end) - }(c.highestSlotLoaded.Load()+1, slot) - } + // load blocks in slot range + c.loadRange(ctx, c.highestSlotLoaded.Load()+1, slot) } } } @@ -214,7 +200,7 @@ func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) { parser: c.parser, chJobs: c.chJobs, }); err != nil { - c.lggr.Infof("failed to add job to queue: %s", err) + c.lggr.Errorf("failed to add job to queue: %s", err) } } } @@ -227,12 +213,24 @@ func (c *EncodedLogCollector) runJobProcessing(ctx context.Context) { return case job := <-c.chJobs: if err := c.workers.Do(ctx, job); err != nil { - c.lggr.Infof("failed to add job to queue: %s", err) + c.lggr.Errorf("failed to add job to queue: %s", err) } } } } +func (c *EncodedLogCollector) loadRange(ctx context.Context, start, end uint64) { + if err := c.loadSlotBlocksRange(ctx, start, end); err != nil { + // a retry will happen anyway on the next round of slots + // so the error is handled by doing nothing + c.lggr.Error("failed to load slot blocks range", "start", start, "end", end, "err", err) + + return + } + + c.highestSlotLoaded.Store(end) +} + func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, end uint64) error { ctx, cancel := context.WithTimeout(ctx, c.rpcTimeLimit) defer cancel() diff --git a/pkg/solana/logpoller/parser.go b/pkg/solana/logpoller/parser.go index 13d69809d..f42ef2e2b 100644 --- a/pkg/solana/logpoller/parser.go +++ b/pkg/solana/logpoller/parser.go @@ -5,6 +5,8 @@ import ( "regexp" "strconv" "strings" + + "github.com/gagliardetto/solana-go" ) type MessageStyle string @@ -26,8 +28,8 @@ var ( type BlockData struct { BlockNumber uint64 - BlockHash string - TransactionHash string + BlockHash solana.Hash + TransactionHash solana.Signature TransactionIndex int TransactionLogIndex uint } @@ -71,12 +73,13 @@ func parseProgramLogs(logs []string) []ProgramOutput { instLogs := []ProgramOutput{} lastEventIdx := -1 + lastLogIdx := -1 for _, log := range logs { if strings.HasPrefix(log, "Program log:") { logDataMatches := logMatcher.FindStringSubmatch(log) - if len(logDataMatches) <= 1 { + if len(logDataMatches) <= 1 || lastLogIdx < 0 { continue } @@ -84,31 +87,39 @@ func parseProgramLogs(logs []string) []ProgramOutput { if len(instructionMatches) > 1 { // is an event which should be followed by: Program data: (.*) - instLogs[len(instLogs)-1].Events = append(instLogs[len(instLogs)-1].Events, ProgramEvent{ + instLogs[lastLogIdx].Events = append(instLogs[lastLogIdx].Events, ProgramEvent{ Prefix: prefixBuilder(depth), FunctionName: instructionMatches[1], }) - lastEventIdx = len(instLogs[len(instLogs)-1].Events) - 1 + lastEventIdx = len(instLogs[lastLogIdx].Events) - 1 } else { // if contains: Instruction: (.*) this is an event and should be followed by: Program data: // else this is a log - instLogs[len(instLogs)-1].Logs = append(instLogs[len(instLogs)-1].Logs, ProgramLog{ + instLogs[lastLogIdx].Logs = append(instLogs[lastLogIdx].Logs, ProgramLog{ Prefix: prefixBuilder(depth), Style: MessageStyleMuted, Text: log, }) } } else if strings.HasPrefix(log, "Program data:") { + if lastLogIdx < 0 { + continue + } + dataMatches := dataMatcher.FindStringSubmatch(log) if len(dataMatches) > 1 { if lastEventIdx > -1 { - instLogs[len(instLogs)-1].Events[lastEventIdx].Data = dataMatches[1] + instLogs[lastLogIdx].Events[lastEventIdx].Data = dataMatches[1] } } } else if strings.HasPrefix(log, "Log truncated") { - instLogs[len(instLogs)-1].Truncated = true + if lastLogIdx < 0 { + continue + } + + instLogs[lastLogIdx].Truncated = true } else { matches := invokeMatcher.FindStringSubmatch(log) @@ -121,8 +132,14 @@ func parseProgramLogs(logs []string) []ProgramOutput { Logs: []ProgramLog{}, Truncated: false, }) + + lastLogIdx = len(instLogs) - 1 } else { - instLogs[len(instLogs)-1].Logs = append(instLogs[len(instLogs)-1].Logs, ProgramLog{ + if lastLogIdx < 0 { + continue + } + + instLogs[lastLogIdx].Logs = append(instLogs[lastLogIdx].Logs, ProgramLog{ Prefix: prefixBuilder(depth), Style: MessageStyleInfo, Text: fmt.Sprintf("Program invoked: %s", matches[1]), @@ -131,7 +148,11 @@ func parseProgramLogs(logs []string) []ProgramOutput { depth++ } else if strings.Contains(log, "success") { - instLogs[len(instLogs)-1].Logs = append(instLogs[len(instLogs)-1].Logs, ProgramLog{ + if lastLogIdx < 0 { + continue + } + + instLogs[lastLogIdx].Logs = append(instLogs[lastLogIdx].Logs, ProgramLog{ Prefix: prefixBuilder(depth), Style: MessageStyleSuccess, Text: "Program returned success", @@ -139,7 +160,11 @@ func parseProgramLogs(logs []string) []ProgramOutput { depth-- } else if strings.Contains(log, "failed") { - instLogs[len(instLogs)-1].Failed = true + if lastLogIdx < 0 { + continue + } + + instLogs[lastLogIdx].Failed = true idx := strings.Index(log, ": ") + 2 currText := fmt.Sprintf(`Program returned error: "%s"`, log[idx:]) @@ -151,7 +176,7 @@ func parseProgramLogs(logs []string) []ProgramOutput { currText = strings.ToTitle(log) } - instLogs[len(instLogs)-1].Logs = append(instLogs[len(instLogs)-1].Logs, ProgramLog{ + instLogs[lastLogIdx].Logs = append(instLogs[lastLogIdx].Logs, ProgramLog{ Prefix: prefixBuilder(depth), Style: MessageStyleWarning, Text: currText, @@ -167,13 +192,19 @@ func parseProgramLogs(logs []string) []ProgramOutput { Logs: []ProgramLog{}, Truncated: false, }) + + lastLogIdx = len(instLogs) - 1 + } + + if lastLogIdx < 0 { + continue } matches := consumedMatcher.FindStringSubmatch(log) if len(matches) == 3 { if depth == 1 { if val, err := strconv.Atoi(matches[1]); err == nil { - instLogs[len(instLogs)-1].ComputeUnits = uint(val) //nolint:gosec + instLogs[lastLogIdx].ComputeUnits = uint(val) //nolint:gosec } } @@ -181,7 +212,7 @@ func parseProgramLogs(logs []string) []ProgramOutput { } // native program logs don't start with "Program log:" - instLogs[len(instLogs)-1].Logs = append(instLogs[len(instLogs)-1].Logs, ProgramLog{ + instLogs[lastLogIdx].Logs = append(instLogs[lastLogIdx].Logs, ProgramLog{ Prefix: prefixBuilder(depth), Style: MessageStyleMuted, Text: log, diff --git a/pkg/solana/logpoller/worker.go b/pkg/solana/logpoller/worker.go index f17e3ad20..0e7d31df0 100644 --- a/pkg/solana/logpoller/worker.go +++ b/pkg/solana/logpoller/worker.go @@ -39,7 +39,7 @@ type worker struct { func (w *worker) Do(ctx context.Context, job Job) { if ctx.Err() == nil { if err := job.Run(ctx); err != nil { - w.Lggr.Infof("job %s failed with error; retrying: %s", job, err) + w.Lggr.Errorf("job %s failed with error; retrying: %s", job, err) w.Retry <- job } } @@ -182,10 +182,6 @@ Loop: break Loop } } - - // run the job queue one more time just in case some - // new work items snuck in - g.processQueue(ctx) } func (g *WorkerGroup) runRetryQueue(ctx context.Context) { @@ -202,19 +198,19 @@ func (g *WorkerGroup) runRetryQueue(ctx context.Context) { retry.count++ if retry.count > g.maxRetryCount { - g.lggr.Infof("job %s dropped after max retries", job) + g.lggr.Errorf("job %s dropped after max retries", job) continue } wait := calculateExponentialBackoff(retry.count) - g.lggr.Infof("retrying job in %dms", wait/time.Millisecond) + g.lggr.Errorf("retrying job in %dms", wait/time.Millisecond) retry.when = time.Now().Add(wait) default: wait := calculateExponentialBackoff(0) - g.lggr.Infof("retrying job %s in %dms", job, wait/time.Millisecond) + g.lggr.Errorf("retrying job %s in %s", job, wait) retry = retryableJob{ name: createRandomString(12), @@ -227,7 +223,7 @@ func (g *WorkerGroup) runRetryQueue(ctx context.Context) { g.retryMap[retry.name] = retry if len(g.retryMap) >= DefaultNotifyRetryDepth { - g.lggr.Infof("retry queue depth: %d", len(g.retryMap)) + g.lggr.Errorf("retry queue depth: %d", len(g.retryMap)) } g.mu.Unlock() } @@ -277,7 +273,7 @@ func (g *WorkerGroup) processQueue(ctx context.Context) { } if g.queue.Len() >= DefaultNotifyQueueDepth { - g.lggr.Infof("queue depth: %d", g.queue.Len()) + g.lggr.Errorf("queue depth: %d", g.queue.Len()) } value, err := g.queue.Pop() diff --git a/pkg/solana/logpoller/worker_test.go b/pkg/solana/logpoller/worker_test.go index b6d967ce7..2d2afadec 100644 --- a/pkg/solana/logpoller/worker_test.go +++ b/pkg/solana/logpoller/worker_test.go @@ -116,6 +116,8 @@ func TestWorkerGroup_Close(t *testing.T) { var mu sync.RWMutex output := make([]int, 10) + chContinue := make(chan struct{}, 1) + for idx := range output { _ = group.Do(ctx, testJob{job: func(ctx context.Context) error { mu.Lock() @@ -140,12 +142,18 @@ func TestWorkerGroup_Close(t *testing.T) { output[idx] = 1 + select { + case chContinue <- struct{}{}: + default: + } + return nil }}) } - // wait for the first 9 to finish and close the group - time.Sleep(100 * time.Millisecond) + // wait for at least one job to finish and close the group + tests.RequireSignal(t, chContinue, "timed out waiting for at least one job to complete") + group.Close() mu.RLock()