Skip to content

Commit

Permalink
core/services/functions: switch to sqlutil.DataStore (#12811)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 authored Apr 16, 2024
1 parent d97f14b commit 6b0a7af
Show file tree
Hide file tree
Showing 36 changed files with 652 additions and 743 deletions.
5 changes: 5 additions & 0 deletions .changeset/real-numbers-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

core/services/functions: switch to sqlutil.DataStore #internal
1 change: 1 addition & 0 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

delegates[job.OffchainReporting2] = ocr2.NewDelegate(
sqlxDB,
opts.DB,
jobORM,
bridgeORM,
mercuryORM,
Expand Down
15 changes: 7 additions & 8 deletions core/services/functions/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/functions/config"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/threshold"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
evmrelayTypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/services/s4"
"github.com/smartcontractkit/chainlink/v2/core/services/synchronization/telem"
Expand Down Expand Up @@ -270,7 +269,7 @@ func (l *functionsListener) setError(ctx context.Context, requestId RequestID, e
promRequestComputationError.WithLabelValues(l.contractAddressHex).Inc()
}
readyForProcessing := errType != INTERNAL_ERROR
if err := l.pluginORM.SetError(requestId, errType, errBytes, time.Now(), readyForProcessing, pg.WithParentCtx(ctx)); err != nil {
if err := l.pluginORM.SetError(ctx, requestId, errType, errBytes, time.Now(), readyForProcessing); err != nil {
l.logger.Errorw("call to SetError failed", "requestID", formatRequestId(requestId), "err", err)
}
}
Expand Down Expand Up @@ -321,7 +320,7 @@ func (l *functionsListener) HandleOffchainRequest(ctx context.Context, request *
CoordinatorContractAddress: &senderAddr,
OnchainMetadata: []byte(OffchainRequestMarker),
}
if err := l.pluginORM.CreateRequest(newReq, pg.WithParentCtx(ctx)); err != nil {
if err := l.pluginORM.CreateRequest(ctx, newReq); err != nil {
if errors.Is(err, ErrDuplicateRequestID) {
l.logger.Warnw("HandleOffchainRequest: received duplicate request ID", "requestID", formatRequestId(requestId), "err", err)
} else {
Expand All @@ -348,7 +347,7 @@ func (l *functionsListener) handleOracleRequestV1(request *evmrelayTypes.OracleR
CoordinatorContractAddress: &request.CoordinatorContract,
OnchainMetadata: request.OnchainMetadata,
}
if err := l.pluginORM.CreateRequest(newReq, pg.WithParentCtx(ctx)); err != nil {
if err := l.pluginORM.CreateRequest(ctx, newReq); err != nil {
if errors.Is(err, ErrDuplicateRequestID) {
l.logger.Warnw("handleOracleRequestV1: received a log with duplicate request ID", "requestID", formatRequestId(request.RequestId), "err", err)
} else {
Expand Down Expand Up @@ -450,7 +449,7 @@ func (l *functionsListener) handleRequest(ctx context.Context, requestID Request
promRequestComputationSuccess.WithLabelValues(l.contractAddressHex).Inc()
promComputationResultSize.WithLabelValues(l.contractAddressHex).Set(float64(len(computationResult)))
l.logger.Debugw("saving computation result", "requestID", requestIDStr)
if err2 := l.pluginORM.SetResult(requestID, computationResult, time.Now(), pg.WithParentCtx(ctx)); err2 != nil {
if err2 := l.pluginORM.SetResult(ctx, requestID, computationResult, time.Now()); err2 != nil {
l.logger.Errorw("call to SetResult failed", "requestID", requestIDStr, "err", err2)
return err2
}
Expand All @@ -464,7 +463,7 @@ func (l *functionsListener) handleOracleResponseV1(response *evmrelayTypes.Oracl

ctx, cancel := l.getNewHandlerContext()
defer cancel()
if err := l.pluginORM.SetConfirmed(response.RequestId, pg.WithParentCtx(ctx)); err != nil {
if err := l.pluginORM.SetConfirmed(ctx, response.RequestId); err != nil {
l.logger.Errorw("setting CONFIRMED state failed", "requestID", formatRequestId(response.RequestId), "err", err)
}
promRequestConfirmed.WithLabelValues(l.contractAddressHex).Inc()
Expand All @@ -486,7 +485,7 @@ func (l *functionsListener) timeoutRequests() {
case <-ticker.C:
cutoff := time.Now().Add(-(time.Duration(timeoutSec) * time.Second))
ctx, cancel := l.getNewHandlerContext()
ids, err := l.pluginORM.TimeoutExpiredResults(cutoff, batchSize, pg.WithParentCtx(ctx))
ids, err := l.pluginORM.TimeoutExpiredResults(ctx, cutoff, batchSize)
cancel()
if err != nil {
l.logger.Errorw("error when calling FindExpiredResults", "err", err)
Expand Down Expand Up @@ -531,7 +530,7 @@ func (l *functionsListener) pruneRequests() {
case <-ticker.C:
ctx, cancel := l.getNewHandlerContext()
startTime := time.Now()
nTotal, nPruned, err := l.pluginORM.PruneOldestRequests(maxStoredRequests, batchSize, pg.WithParentCtx(ctx))
nTotal, nPruned, err := l.pluginORM.PruneOldestRequests(ctx, maxStoredRequests, batchSize)
cancel()
elapsedMillis := time.Since(startTime).Milliseconds()
if err != nil {
Expand Down
24 changes: 11 additions & 13 deletions core/services/functions/listener_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package functions_test

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -35,7 +36,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/functions/config"
threshold_mocks "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/threshold/mocks"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestFunctionsListener_HandleOracleRequestV1_Success(t *testing.T) {
uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.bridgeAccessor.On("NewExternalAdapterClient", mock.Anything).Return(uni.eaClient, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(ResultBytes, nil, nil, nil)
uni.pluginORM.On("SetResult", RequestID, ResultBytes, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
uni.pluginORM.On("SetResult", mock.Anything, RequestID, ResultBytes, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
close(doneCh)
}).Return(nil)

Expand All @@ -189,7 +189,7 @@ func TestFunctionsListener_HandleOffchainRequest_Success(t *testing.T) {
uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.bridgeAccessor.On("NewExternalAdapterClient", mock.Anything).Return(uni.eaClient, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(ResultBytes, nil, nil, nil)
uni.pluginORM.On("SetResult", RequestID, ResultBytes, mock.Anything, mock.Anything).Return(nil)
uni.pluginORM.On("SetResult", mock.Anything, RequestID, ResultBytes, mock.Anything, mock.Anything).Return(nil)

request := &functions_service.OffchainRequest{
RequestId: RequestID[:],
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestFunctionsListener_HandleOffchainRequest_InternalError(t *testing.T) {
uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.bridgeAccessor.On("NewExternalAdapterClient", mock.Anything).Return(uni.eaClient, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, nil, errors.New("error"))
uni.pluginORM.On("SetError", RequestID, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
uni.pluginORM.On("SetError", mock.Anything, RequestID, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)

request := &functions_service.OffchainRequest{
RequestId: RequestID[:],
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestFunctionsListener_HandleOracleRequestV1_ComputationError(t *testing.T)
uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.bridgeAccessor.On("NewExternalAdapterClient", mock.Anything).Return(uni.eaClient, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(nil, ErrorBytes, nil, nil)
uni.pluginORM.On("SetError", RequestID, mock.Anything, ErrorBytes, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
uni.pluginORM.On("SetError", mock.Anything, RequestID, mock.Anything, ErrorBytes, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
close(doneCh)
}).Return(nil)

Expand Down Expand Up @@ -307,7 +307,7 @@ func TestFunctionsListener_HandleOracleRequestV1_ThresholdDecryptedSecrets(t *te
uni.eaClient.On("FetchEncryptedSecrets", mock.Anything, mock.Anything, RequestIDStr, mock.Anything, mock.Anything).Return(EncryptedSecrets, nil, nil)
uni.decryptor.On("Decrypt", mock.Anything, decryptionPlugin.CiphertextId(RequestID[:]), EncryptedSecrets).Return(DecryptedSecrets, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(ResultBytes, nil, nil, nil)
uni.pluginORM.On("SetResult", RequestID, ResultBytes, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
uni.pluginORM.On("SetResult", mock.Anything, RequestID, ResultBytes, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
close(doneCh)
}).Return(nil)

Expand All @@ -333,7 +333,7 @@ func TestFunctionsListener_HandleOracleRequestV1_CBORTooBig(t *testing.T) {
uni.logPollerWrapper.On("LatestEvents", mock.Anything).Return([]types.OracleRequest{request}, nil, nil).Once()
uni.logPollerWrapper.On("LatestEvents", mock.Anything).Return(nil, nil, nil)
uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.pluginORM.On("SetError", RequestID, functions_service.USER_ERROR, []byte("request too big (max 10 bytes)"), mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
uni.pluginORM.On("SetError", mock.Anything, RequestID, functions_service.USER_ERROR, []byte("request too big (max 10 bytes)"), mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
close(doneCh)
}).Return(nil)

Expand Down Expand Up @@ -361,7 +361,7 @@ func TestFunctionsListener_ReportSourceCodeDomains(t *testing.T) {
uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Return(nil)
uni.bridgeAccessor.On("NewExternalAdapterClient", mock.Anything).Return(uni.eaClient, nil)
uni.eaClient.On("RunComputation", mock.Anything, RequestIDStr, mock.Anything, SubscriptionOwner.Hex(), SubscriptionID, mock.Anything, mock.Anything, mock.Anything).Return(ResultBytes, nil, Domains, nil)
uni.pluginORM.On("SetResult", RequestID, ResultBytes, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
uni.pluginORM.On("SetResult", mock.Anything, RequestID, ResultBytes, mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
close(doneCh)
}).Return(nil)
var sentMessage []byte
Expand All @@ -388,7 +388,7 @@ func TestFunctionsListener_PruneRequests(t *testing.T) {
uni := NewFunctionsListenerUniverse(t, 0, 1)
doneCh := make(chan bool)
uni.logPollerWrapper.On("LatestEvents", mock.Anything).Return(nil, nil, nil)
uni.pluginORM.On("PruneOldestRequests", functions_service.DefaultPruneMaxStoredRequests, functions_service.DefaultPruneBatchSize, mock.Anything).Return(uint32(0), uint32(0), nil).Run(func(args mock.Arguments) {
uni.pluginORM.On("PruneOldestRequests", mock.Anything, functions_service.DefaultPruneMaxStoredRequests, functions_service.DefaultPruneBatchSize, mock.Anything).Return(uint32(0), uint32(0), nil).Run(func(args mock.Arguments) {
doneCh <- true
})

Expand All @@ -403,7 +403,7 @@ func TestFunctionsListener_TimeoutRequests(t *testing.T) {
uni := NewFunctionsListenerUniverse(t, 1, 0)
doneCh := make(chan bool)
uni.logPollerWrapper.On("LatestEvents", mock.Anything).Return(nil, nil, nil)
uni.pluginORM.On("TimeoutExpiredResults", mock.Anything, uint32(1), mock.Anything).Return([]functions_service.RequestID{}, nil).Run(func(args mock.Arguments) {
uni.pluginORM.On("TimeoutExpiredResults", mock.Anything, mock.Anything, uint32(1), mock.Anything).Return([]functions_service.RequestID{}, nil).Run(func(args mock.Arguments) {
doneCh <- true
})

Expand All @@ -423,9 +423,7 @@ func TestFunctionsListener_ORMDoesNotFreezeHandlersForever(t *testing.T) {
uni.logPollerWrapper.On("LatestEvents", mock.Anything).Return([]types.OracleRequest{request}, nil, nil).Once()
uni.logPollerWrapper.On("LatestEvents", mock.Anything).Return(nil, nil, nil)
uni.pluginORM.On("CreateRequest", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
var queryerWrapper pg.Q
args.Get(1).(pg.QOpt)(&queryerWrapper)
<-queryerWrapper.ParentCtx.Done()
<-args.Get(0).(context.Context).Done()
ormCallExited.Done()
}).Return(errors.New("timeout"))

Expand Down
Loading

0 comments on commit 6b0a7af

Please sign in to comment.