Skip to content

Commit

Permalink
addressed feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
EasterTheBunny committed Nov 20, 2024
1 parent f02da3d commit 5a8683a
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 54 deletions.
4 changes: 2 additions & 2 deletions pkg/solana/logpoller/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func messagesToEvents(messages []string, parser ProgramEventProcessor, detail ev
logIdx++

event.BlockNumber = detail.blockNumber
event.BlockHash = detail.blockHash.String()
event.TransactionHash = detail.trxSig.String()
event.BlockHash = detail.blockHash
event.TransactionHash = detail.trxSig
event.TransactionIndex = detail.trxIdx
event.TransactionLogIndex = logIdx

Expand Down
50 changes: 24 additions & 26 deletions pkg/solana/logpoller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type EncodedLogCollector struct {
chJobs chan Job
workers *WorkerGroup

loadingBlocks atomic.Bool
highestSlot atomic.Uint64
highestSlotLoaded atomic.Uint64
lastSentSlot atomic.Uint64
Expand All @@ -64,7 +63,7 @@ func NewEncodedLogCollector(
c := &EncodedLogCollector{
client: client,
parser: parser,
chSlot: make(chan uint64, 1),
chSlot: make(chan uint64),
chBlock: make(chan uint64, 1),
chJobs: make(chan Job, 1),
lggr: lggr,
Expand Down Expand Up @@ -148,7 +147,7 @@ func (c *EncodedLogCollector) runSlotPolling(ctx context.Context) {
// not to be run as a job, but as a blocking call
result, err := c.client.GetLatestBlockhash(ctxB, rpc.CommitmentFinalized)
if err != nil {
c.lggr.Info("failed to get latest blockhash", "err", err)
c.lggr.Error("failed to get latest blockhash", "err", err)
cancel()

continue
Expand All @@ -162,7 +161,11 @@ func (c *EncodedLogCollector) runSlotPolling(ctx context.Context) {
}

c.lastSentSlot.Store(result.Context.Slot)
c.chSlot <- result.Context.Slot

select {
case c.chSlot <- result.Context.Slot:
default:
}
}

timer.Stop()
Expand All @@ -175,28 +178,11 @@ func (c *EncodedLogCollector) runSlotProcessing(ctx context.Context) {
case <-ctx.Done():
return
case slot := <-c.chSlot:
if slot > c.highestSlot.Load() {
if c.highestSlot.Load() < slot {
c.highestSlot.Store(slot)

if !c.loadingBlocks.Load() {
c.loadingBlocks.Store(true)

// run routine to load blocks in slot range
go func(start, end uint64) {
defer c.loadingBlocks.Store(false)

if err := c.loadSlotBlocksRange(ctx, start, end); err != nil {
// TODO: probably log something here
// a retry will happen anyway on the next round of slots
// so the error is handled by doing nothing
c.lggr.Info("failed to load slot blocks range", "start", start, "end", end, "err", err)

return
}

c.highestSlotLoaded.Store(end)
}(c.highestSlotLoaded.Load()+1, slot)
}
// load blocks in slot range
c.loadRange(ctx, c.highestSlotLoaded.Load()+1, slot)
}
}
}
Expand All @@ -214,7 +200,7 @@ func (c *EncodedLogCollector) runBlockProcessing(ctx context.Context) {
parser: c.parser,
chJobs: c.chJobs,
}); err != nil {
c.lggr.Infof("failed to add job to queue: %s", err)
c.lggr.Errorf("failed to add job to queue: %s", err)
}
}
}
Expand All @@ -227,12 +213,24 @@ func (c *EncodedLogCollector) runJobProcessing(ctx context.Context) {
return
case job := <-c.chJobs:
if err := c.workers.Do(ctx, job); err != nil {
c.lggr.Infof("failed to add job to queue: %s", err)
c.lggr.Errorf("failed to add job to queue: %s", err)
}
}
}
}

func (c *EncodedLogCollector) loadRange(ctx context.Context, start, end uint64) {
if err := c.loadSlotBlocksRange(ctx, start, end); err != nil {
// a retry will happen anyway on the next round of slots
// so the error is handled by doing nothing
c.lggr.Error("failed to load slot blocks range", "start", start, "end", end, "err", err)

return
}

c.highestSlotLoaded.Store(end)
}

func (c *EncodedLogCollector) loadSlotBlocksRange(ctx context.Context, start, end uint64) error {
ctx, cancel := context.WithTimeout(ctx, c.rpcTimeLimit)
defer cancel()
Expand Down
59 changes: 45 additions & 14 deletions pkg/solana/logpoller/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"regexp"
"strconv"
"strings"

"github.com/gagliardetto/solana-go"
)

type MessageStyle string
Expand All @@ -26,8 +28,8 @@ var (

type BlockData struct {
BlockNumber uint64
BlockHash string
TransactionHash string
BlockHash solana.Hash
TransactionHash solana.Signature
TransactionIndex int
TransactionLogIndex uint
}
Expand Down Expand Up @@ -71,44 +73,53 @@ func parseProgramLogs(logs []string) []ProgramOutput {

instLogs := []ProgramOutput{}
lastEventIdx := -1
lastLogIdx := -1

for _, log := range logs {
if strings.HasPrefix(log, "Program log:") {
logDataMatches := logMatcher.FindStringSubmatch(log)

if len(logDataMatches) <= 1 {
if len(logDataMatches) <= 1 || lastLogIdx < 0 {
continue
}

instructionMatches := instructionMatcher.FindStringSubmatch(logDataMatches[1])

if len(instructionMatches) > 1 {
// is an event which should be followed by: Program data: (.*)
instLogs[len(instLogs)-1].Events = append(instLogs[len(instLogs)-1].Events, ProgramEvent{
instLogs[lastLogIdx].Events = append(instLogs[lastLogIdx].Events, ProgramEvent{
Prefix: prefixBuilder(depth),
FunctionName: instructionMatches[1],
})

lastEventIdx = len(instLogs[len(instLogs)-1].Events) - 1
lastEventIdx = len(instLogs[lastLogIdx].Events) - 1
} else {
// if contains: Instruction: (.*) this is an event and should be followed by: Program data:
// else this is a log
instLogs[len(instLogs)-1].Logs = append(instLogs[len(instLogs)-1].Logs, ProgramLog{
instLogs[lastLogIdx].Logs = append(instLogs[lastLogIdx].Logs, ProgramLog{
Prefix: prefixBuilder(depth),
Style: MessageStyleMuted,
Text: log,
})
}
} else if strings.HasPrefix(log, "Program data:") {
if lastLogIdx < 0 {
continue
}

dataMatches := dataMatcher.FindStringSubmatch(log)

if len(dataMatches) > 1 {
if lastEventIdx > -1 {
instLogs[len(instLogs)-1].Events[lastEventIdx].Data = dataMatches[1]
instLogs[lastLogIdx].Events[lastEventIdx].Data = dataMatches[1]
}
}
} else if strings.HasPrefix(log, "Log truncated") {
instLogs[len(instLogs)-1].Truncated = true
if lastLogIdx < 0 {
continue
}

instLogs[lastLogIdx].Truncated = true
} else {
matches := invokeMatcher.FindStringSubmatch(log)

Expand All @@ -121,8 +132,14 @@ func parseProgramLogs(logs []string) []ProgramOutput {
Logs: []ProgramLog{},
Truncated: false,
})

lastLogIdx = len(instLogs) - 1
} else {
instLogs[len(instLogs)-1].Logs = append(instLogs[len(instLogs)-1].Logs, ProgramLog{
if lastLogIdx < 0 {
continue
}

instLogs[lastLogIdx].Logs = append(instLogs[lastLogIdx].Logs, ProgramLog{
Prefix: prefixBuilder(depth),
Style: MessageStyleInfo,
Text: fmt.Sprintf("Program invoked: %s", matches[1]),
Expand All @@ -131,15 +148,23 @@ func parseProgramLogs(logs []string) []ProgramOutput {

depth++
} else if strings.Contains(log, "success") {
instLogs[len(instLogs)-1].Logs = append(instLogs[len(instLogs)-1].Logs, ProgramLog{
if lastLogIdx < 0 {
continue
}

instLogs[lastLogIdx].Logs = append(instLogs[lastLogIdx].Logs, ProgramLog{
Prefix: prefixBuilder(depth),
Style: MessageStyleSuccess,
Text: "Program returned success",
})

depth--
} else if strings.Contains(log, "failed") {
instLogs[len(instLogs)-1].Failed = true
if lastLogIdx < 0 {
continue
}

instLogs[lastLogIdx].Failed = true

idx := strings.Index(log, ": ") + 2
currText := fmt.Sprintf(`Program returned error: "%s"`, log[idx:])
Expand All @@ -151,7 +176,7 @@ func parseProgramLogs(logs []string) []ProgramOutput {
currText = strings.ToTitle(log)
}

instLogs[len(instLogs)-1].Logs = append(instLogs[len(instLogs)-1].Logs, ProgramLog{
instLogs[lastLogIdx].Logs = append(instLogs[lastLogIdx].Logs, ProgramLog{
Prefix: prefixBuilder(depth),
Style: MessageStyleWarning,
Text: currText,
Expand All @@ -167,21 +192,27 @@ func parseProgramLogs(logs []string) []ProgramOutput {
Logs: []ProgramLog{},
Truncated: false,
})

lastLogIdx = len(instLogs) - 1
}

if lastLogIdx < 0 {
continue
}

matches := consumedMatcher.FindStringSubmatch(log)
if len(matches) == 3 {
if depth == 1 {
if val, err := strconv.Atoi(matches[1]); err == nil {
instLogs[len(instLogs)-1].ComputeUnits = uint(val) //nolint:gosec
instLogs[lastLogIdx].ComputeUnits = uint(val) //nolint:gosec
}
}

log = fmt.Sprintf("Program consumed: %s %s", matches[1], matches[2])
}

// native program logs don't start with "Program log:"
instLogs[len(instLogs)-1].Logs = append(instLogs[len(instLogs)-1].Logs, ProgramLog{
instLogs[lastLogIdx].Logs = append(instLogs[lastLogIdx].Logs, ProgramLog{
Prefix: prefixBuilder(depth),
Style: MessageStyleMuted,
Text: log,
Expand Down
16 changes: 6 additions & 10 deletions pkg/solana/logpoller/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type worker struct {
func (w *worker) Do(ctx context.Context, job Job) {
if ctx.Err() == nil {
if err := job.Run(ctx); err != nil {
w.Lggr.Infof("job %s failed with error; retrying: %s", job, err)
w.Lggr.Errorf("job %s failed with error; retrying: %s", job, err)
w.Retry <- job
}
}
Expand Down Expand Up @@ -182,10 +182,6 @@ Loop:
break Loop
}
}

// run the job queue one more time just in case some
// new work items snuck in
g.processQueue(ctx)
}

func (g *WorkerGroup) runRetryQueue(ctx context.Context) {
Expand All @@ -202,19 +198,19 @@ func (g *WorkerGroup) runRetryQueue(ctx context.Context) {
retry.count++

if retry.count > g.maxRetryCount {
g.lggr.Infof("job %s dropped after max retries", job)
g.lggr.Errorf("job %s dropped after max retries", job)

continue
}

wait := calculateExponentialBackoff(retry.count)
g.lggr.Infof("retrying job in %dms", wait/time.Millisecond)
g.lggr.Errorf("retrying job in %dms", wait/time.Millisecond)

retry.when = time.Now().Add(wait)
default:
wait := calculateExponentialBackoff(0)

g.lggr.Infof("retrying job %s in %dms", job, wait/time.Millisecond)
g.lggr.Errorf("retrying job %s in %s", job, wait)

retry = retryableJob{
name: createRandomString(12),
Expand All @@ -227,7 +223,7 @@ func (g *WorkerGroup) runRetryQueue(ctx context.Context) {
g.retryMap[retry.name] = retry

if len(g.retryMap) >= DefaultNotifyRetryDepth {
g.lggr.Infof("retry queue depth: %d", len(g.retryMap))
g.lggr.Errorf("retry queue depth: %d", len(g.retryMap))
}
g.mu.Unlock()
}
Expand Down Expand Up @@ -277,7 +273,7 @@ func (g *WorkerGroup) processQueue(ctx context.Context) {
}

if g.queue.Len() >= DefaultNotifyQueueDepth {
g.lggr.Infof("queue depth: %d", g.queue.Len())
g.lggr.Errorf("queue depth: %d", g.queue.Len())
}

value, err := g.queue.Pop()
Expand Down
12 changes: 10 additions & 2 deletions pkg/solana/logpoller/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ func TestWorkerGroup_Close(t *testing.T) {
var mu sync.RWMutex
output := make([]int, 10)

chContinue := make(chan struct{}, 1)

for idx := range output {
_ = group.Do(ctx, testJob{job: func(ctx context.Context) error {
mu.Lock()
Expand All @@ -140,12 +142,18 @@ func TestWorkerGroup_Close(t *testing.T) {

output[idx] = 1

select {
case chContinue <- struct{}{}:
default:
}

return nil
}})
}

// wait for the first 9 to finish and close the group
time.Sleep(100 * time.Millisecond)
// wait for at least one job to finish and close the group
tests.RequireSignal(t, chContinue, "timed out waiting for at least one job to complete")

group.Close()

mu.RLock()
Expand Down

0 comments on commit 5a8683a

Please sign in to comment.