diff --git a/.changeset/eighty-hotels-sit.md b/.changeset/eighty-hotels-sit.md new file mode 100644 index 00000000000..e83b70c7695 --- /dev/null +++ b/.changeset/eighty-hotels-sit.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Fix panic if mercury server returns error #bugfix diff --git a/CHANGELOG.md b/CHANGELOG.md index 0523236e22d..6b767f6973d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,7 +49,7 @@ You may disable if this results in excessive log volume. Disable like so: ``` - [Pipeline] + [JobPipeline] VerboseLogging = false ``` @@ -79,7 +79,7 @@ - [#12404](https://github.com/smartcontractkit/chainlink/pull/12404) [`b74079b672`](https://github.com/smartcontractkit/chainlink/commit/b74079b672f36fb0c241f90ea1e875ea3a9524da) Thanks [@HenryNguyen5](https://github.com/HenryNguyen5)! - Add OCR3 capability contract wrapper -- [#12498](https://github.com/smartcontractkit/chainlink/pull/12498) [`1c576d0e34`](https://github.com/smartcontractkit/chainlink/commit/1c576d0e34d93a6298ddcb662ee89fd04eeda53e) Thanks [@samsondav](https://github.com/samsondav)! - Add new config option Pipeline.VerboseLogging +- [#12498](https://github.com/smartcontractkit/chainlink/pull/12498) [`1c576d0e34`](https://github.com/smartcontractkit/chainlink/commit/1c576d0e34d93a6298ddcb662ee89fd04eeda53e) Thanks [@samsondav](https://github.com/samsondav)! - Add new config option JobPipeline.VerboseLogging VerboseLogging enables detailed logging of pipeline execution steps. This is disabled by default because it increases log volume for pipeline runs, but can @@ -90,7 +90,7 @@ Set it like the following example: ``` - [Pipeline] + [JobPipeline] VerboseLogging = true ``` diff --git a/core/services/relay/evm/mercury/queue.go b/core/services/relay/evm/mercury/queue.go index 8a89f47302b..30a6e5e6eac 100644 --- a/core/services/relay/evm/mercury/queue.go +++ b/core/services/relay/evm/mercury/queue.go @@ -25,7 +25,7 @@ type asyncDeleter interface { AsyncDelete(req *pb.TransmitRequest) } -var _ services.Service = (*TransmitQueue)(nil) +var _ services.Service = (*transmitQueue)(nil) var transmitQueueLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "mercury_transmit_queue_load", @@ -40,7 +40,7 @@ const promInterval = 6500 * time.Millisecond // TransmitQueue is the high-level package that everything outside of this file should be using // It stores pending transmissions, yielding the latest (highest priority) first to the caller -type TransmitQueue struct { +type transmitQueue struct { services.StateMachine cond sync.Cond @@ -62,11 +62,20 @@ type Transmission struct { ReportCtx ocrtypes.ReportContext // contains priority information (latest epoch/round wins) } +type TransmitQueue interface { + services.Service + + BlockingPop() (t *Transmission) + Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) + Init(transmissions []*Transmission) + IsEmpty() bool +} + // maxlen controls how many items will be stored in the queue // 0 means unlimited - be careful, this can cause memory leaks -func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, asyncDeleter asyncDeleter) *TransmitQueue { +func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, asyncDeleter asyncDeleter) TransmitQueue { mu := new(sync.RWMutex) - return &TransmitQueue{ + return &transmitQueue{ services.StateMachine{}, sync.Cond{L: mu}, lggr.Named("TransmitQueue"), @@ -80,13 +89,13 @@ func NewTransmitQueue(lggr logger.Logger, serverURL, feedID string, maxlen int, } } -func (tq *TransmitQueue) Init(transmissions []*Transmission) { +func (tq *transmitQueue) Init(transmissions []*Transmission) { pq := priorityQueue(transmissions) heap.Init(&pq) // ensure the heap is ordered tq.pq = &pq } -func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) { +func (tq *transmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) { tq.cond.L.Lock() defer tq.cond.L.Unlock() @@ -111,7 +120,7 @@ func (tq *TransmitQueue) Push(req *pb.TransmitRequest, reportCtx ocrtypes.Report // BlockingPop will block until at least one item is in the heap, and then return it // If the queue is closed, it will immediately return nil -func (tq *TransmitQueue) BlockingPop() (t *Transmission) { +func (tq *transmitQueue) BlockingPop() (t *Transmission) { tq.cond.L.Lock() defer tq.cond.L.Unlock() if tq.closed { @@ -126,13 +135,13 @@ func (tq *TransmitQueue) BlockingPop() (t *Transmission) { return t } -func (tq *TransmitQueue) IsEmpty() bool { +func (tq *transmitQueue) IsEmpty() bool { tq.mu.RLock() defer tq.mu.RUnlock() return tq.pq.Len() == 0 } -func (tq *TransmitQueue) Start(context.Context) error { +func (tq *transmitQueue) Start(context.Context) error { return tq.StartOnce("TransmitQueue", func() error { t := time.NewTicker(utils.WithJitter(promInterval)) wg := new(sync.WaitGroup) @@ -148,7 +157,7 @@ func (tq *TransmitQueue) Start(context.Context) error { }) } -func (tq *TransmitQueue) Close() error { +func (tq *transmitQueue) Close() error { return tq.StopOnce("TransmitQueue", func() error { tq.cond.L.Lock() tq.closed = true @@ -159,7 +168,7 @@ func (tq *TransmitQueue) Close() error { }) } -func (tq *TransmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, wg *sync.WaitGroup) { +func (tq *transmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, wg *sync.WaitGroup) { defer wg.Done() for { @@ -172,25 +181,25 @@ func (tq *TransmitQueue) monitorLoop(c <-chan time.Time, chStop <-chan struct{}, } } -func (tq *TransmitQueue) report() { +func (tq *transmitQueue) report() { tq.mu.RLock() length := tq.pq.Len() tq.mu.RUnlock() tq.transmitQueueLoad.Set(float64(length)) } -func (tq *TransmitQueue) Ready() error { +func (tq *transmitQueue) Ready() error { return nil } -func (tq *TransmitQueue) Name() string { return tq.lggr.Name() } -func (tq *TransmitQueue) HealthReport() map[string]error { +func (tq *transmitQueue) Name() string { return tq.lggr.Name() } +func (tq *transmitQueue) HealthReport() map[string]error { report := map[string]error{tq.Name(): errors.Join( tq.status(), )} return report } -func (tq *TransmitQueue) status() (merr error) { +func (tq *transmitQueue) status() (merr error) { tq.mu.RLock() length := tq.pq.Len() closed := tq.closed @@ -206,7 +215,7 @@ func (tq *TransmitQueue) status() (merr error) { // pop latest Transmission from the heap // Not thread-safe -func (tq *TransmitQueue) pop() *Transmission { +func (tq *transmitQueue) pop() *Transmission { if tq.pq.Len() == 0 { return nil } diff --git a/core/services/relay/evm/mercury/transmitter.go b/core/services/relay/evm/mercury/transmitter.go index 6f49ca91bfc..3e4eaf699dc 100644 --- a/core/services/relay/evm/mercury/transmitter.go +++ b/core/services/relay/evm/mercury/transmitter.go @@ -144,10 +144,12 @@ type server struct { c wsrpc.Client pm *PersistenceManager - q *TransmitQueue + q TransmitQueue deleteQueue chan *pb.TransmitRequest + url string + transmitSuccessCount prometheus.Counter transmitDuplicateCount prometheus.Counter transmitConnectionErrorCount prometheus.Counter @@ -259,7 +261,7 @@ func (s *server) runQueueLoop(stopCh services.StopChan, wg *sync.WaitGroup, feed s.transmitDuplicateCount.Inc() s.lggr.Debugw("Transmit report success; duplicate report", "payload", hexutil.Encode(t.Req.Payload), "response", res, "reportCtx", t.ReportCtx) default: - transmitServerErrorCount.WithLabelValues(feedIDHex, fmt.Sprintf("%d", res.Code)).Inc() + transmitServerErrorCount.WithLabelValues(feedIDHex, s.url, fmt.Sprintf("%d", res.Code)).Inc() s.lggr.Errorw("Transmit report failed; mercury server returned error", "response", res, "reportCtx", t.ReportCtx, "err", res.Error, "code", res.Code) } } @@ -284,6 +286,7 @@ func NewTransmitter(lggr logger.Logger, clients map[string]wsrpc.Client, fromAcc pm, NewTransmitQueue(cLggr, serverURL, feedIDHex, maxTransmitQueueSize, pm), make(chan *pb.TransmitRequest, maxDeleteQueueSize), + serverURL, transmitSuccessCount.WithLabelValues(feedIDHex, serverURL), transmitDuplicateCount.WithLabelValues(feedIDHex, serverURL), transmitConnectionErrorCount.WithLabelValues(feedIDHex, serverURL), diff --git a/core/services/relay/evm/mercury/transmitter_test.go b/core/services/relay/evm/mercury/transmitter_test.go index d7d62a9f422..ef6629e22cf 100644 --- a/core/services/relay/evm/mercury/transmitter_test.go +++ b/core/services/relay/evm/mercury/transmitter_test.go @@ -3,7 +3,9 @@ package mercury import ( "context" "math/big" + "sync" "testing" + "time" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" @@ -12,6 +14,7 @@ import ( ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -43,8 +46,8 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { require.NoError(t, err) // ensure it was added to the queue - require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) }) t.Run("v2 report transmission successfully enqueued", func(t *testing.T) { report := sampleV2Report @@ -57,8 +60,8 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { require.NoError(t, err) // ensure it was added to the queue - require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) }) t.Run("v3 report transmission successfully enqueued", func(t *testing.T) { report := sampleV3Report @@ -71,8 +74,8 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { require.NoError(t, err) // ensure it was added to the queue - require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) }) }) @@ -93,12 +96,12 @@ func Test_MercuryTransmitter_Transmit(t *testing.T) { require.NoError(t, err) // ensure it was added to the queue - require.Equal(t, mt.servers[sURL].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL].q.pq.Pop().(*Transmission).Req.Payload, report) - require.Equal(t, mt.servers[sURL2].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL2].q.pq.Pop().(*Transmission).Req.Payload, report) - require.Equal(t, mt.servers[sURL3].q.pq.Len(), 1) - assert.Subset(t, mt.servers[sURL3].q.pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL2].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL2].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) + require.Equal(t, mt.servers[sURL3].q.(*transmitQueue).pq.Len(), 1) + assert.Subset(t, mt.servers[sURL3].q.(*transmitQueue).pq.Pop().(*Transmission).Req.Payload, report) }) } @@ -383,3 +386,166 @@ func Test_sortReportsLatestFirst(t *testing.T) { assert.Nil(t, reports[6]) assert.Nil(t, reports[7]) } + +type mockQ struct { + ch chan *Transmission +} + +func newMockQ() *mockQ { + return &mockQ{make(chan *Transmission, 100)} +} + +func (m *mockQ) Start(context.Context) error { return nil } +func (m *mockQ) Close() error { + m.ch <- nil + return nil +} +func (m *mockQ) Ready() error { return nil } +func (m *mockQ) HealthReport() map[string]error { return nil } +func (m *mockQ) Name() string { return "" } +func (m *mockQ) BlockingPop() (t *Transmission) { + val := <-m.ch + return val +} +func (m *mockQ) Push(req *pb.TransmitRequest, reportCtx ocrtypes.ReportContext) (ok bool) { + m.ch <- &Transmission{Req: req, ReportCtx: reportCtx} + return true +} +func (m *mockQ) Init(transmissions []*Transmission) {} +func (m *mockQ) IsEmpty() bool { return false } + +func Test_MercuryTransmitter_runQueueLoop(t *testing.T) { + feedIDHex := utils.NewHash().Hex() + lggr := logger.TestLogger(t) + c := &mocks.MockWSRPCClient{} + db := pgtest.NewSqlxDB(t) + orm := NewORM(db) + pm := NewPersistenceManager(lggr, sURL, orm, 0, 0, 0, 0) + cfg := mockCfg{} + + s := newServer(lggr, cfg, c, pm, sURL, feedIDHex) + + req := &pb.TransmitRequest{ + Payload: []byte{1, 2, 3}, + ReportFormat: 32, + } + + t.Run("pulls from queue and transmits successfully", func(t *testing.T) { + transmit := make(chan *pb.TransmitRequest, 1) + c.TransmitF = func(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { + transmit <- in + return &pb.TransmitResponse{Code: 0, Error: ""}, nil + } + q := newMockQ() + s.q = q + wg := &sync.WaitGroup{} + wg.Add(1) + + go s.runQueueLoop(nil, wg, feedIDHex) + + q.Push(req, sampleReportContext) + + select { + case tr := <-transmit: + assert.Equal(t, []byte{1, 2, 3}, tr.Payload) + assert.Equal(t, 32, int(tr.ReportFormat)) + // case <-time.After(testutils.WaitTimeout(t)): + case <-time.After(1 * time.Second): + t.Fatal("expected a transmit request to be sent") + } + + q.Close() + wg.Wait() + }) + + t.Run("on duplicate, success", func(t *testing.T) { + transmit := make(chan *pb.TransmitRequest, 1) + c.TransmitF = func(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { + transmit <- in + return &pb.TransmitResponse{Code: DuplicateReport, Error: ""}, nil + } + q := newMockQ() + s.q = q + wg := &sync.WaitGroup{} + wg.Add(1) + + go s.runQueueLoop(nil, wg, feedIDHex) + + q.Push(req, sampleReportContext) + + select { + case tr := <-transmit: + assert.Equal(t, []byte{1, 2, 3}, tr.Payload) + assert.Equal(t, 32, int(tr.ReportFormat)) + // case <-time.After(testutils.WaitTimeout(t)): + case <-time.After(1 * time.Second): + t.Fatal("expected a transmit request to be sent") + } + + q.Close() + wg.Wait() + }) + t.Run("on server-side error, does not retry", func(t *testing.T) { + transmit := make(chan *pb.TransmitRequest, 1) + c.TransmitF = func(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { + transmit <- in + return &pb.TransmitResponse{Code: DuplicateReport, Error: ""}, nil + } + q := newMockQ() + s.q = q + wg := &sync.WaitGroup{} + wg.Add(1) + + go s.runQueueLoop(nil, wg, feedIDHex) + + q.Push(req, sampleReportContext) + + select { + case tr := <-transmit: + assert.Equal(t, []byte{1, 2, 3}, tr.Payload) + assert.Equal(t, 32, int(tr.ReportFormat)) + // case <-time.After(testutils.WaitTimeout(t)): + case <-time.After(1 * time.Second): + t.Fatal("expected a transmit request to be sent") + } + + q.Close() + wg.Wait() + }) + t.Run("on transmit error, retries", func(t *testing.T) { + transmit := make(chan *pb.TransmitRequest, 1) + c.TransmitF = func(ctx context.Context, in *pb.TransmitRequest) (*pb.TransmitResponse, error) { + transmit <- in + return &pb.TransmitResponse{}, errors.New("transmission error") + } + q := newMockQ() + s.q = q + wg := &sync.WaitGroup{} + wg.Add(1) + stopCh := make(chan struct{}, 1) + + go s.runQueueLoop(stopCh, wg, feedIDHex) + + q.Push(req, sampleReportContext) + + cnt := 0 + Loop: + for { + select { + case tr := <-transmit: + assert.Equal(t, []byte{1, 2, 3}, tr.Payload) + assert.Equal(t, 32, int(tr.ReportFormat)) + if cnt > 2 { + break Loop + } + cnt++ + // case <-time.After(testutils.WaitTimeout(t)): + case <-time.After(1 * time.Second): + t.Fatal("expected 3 transmit requests to be sent") + } + } + + close(stopCh) + wg.Wait() + }) +}