Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BCF-3052 - Job Based KV Store and juelsFeePerCoin reboot persistence … #12401

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/lemon-ladybugs-doubt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add kv store tied to jobs and use it for juels fee per coin cache to store persisted values for backup
68 changes: 68 additions & 0 deletions core/services/job/kv_orm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package job

import (
"encoding/json"
"fmt"
"time"

"github.com/jmoiron/sqlx"
"github.com/jmoiron/sqlx/types"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

// KVStore is a simple KV store that can store and retrieve serializable data.
//
//go:generate mockery --quiet --name KVStore --output ./mocks/ --case=underscore
type KVStore interface {
Store(key string, val interface{}) error
Get(key string, dest interface{}) error
}

type kVStore struct {
jobID int32
q pg.Q
lggr logger.SugaredLogger
}

var _ KVStore = (*kVStore)(nil)

func NewKVStore(jobID int32, db *sqlx.DB, cfg pg.QConfig, lggr logger.Logger) kVStore {
namedLogger := logger.Sugared(lggr.Named("JobORM"))
return kVStore{
jobID: jobID,
q: pg.NewQ(db, namedLogger, cfg),
lggr: namedLogger,
}
}

// Store saves serializable value by key.
func (kv kVStore) Store(key string, val interface{}) error {
jsonVal, err := json.Marshal(val)
if err != nil {
return err
}

sql := `INSERT INTO job_kv_store (job_id, key, val)
VALUES ($1, $2, $3)
ON CONFLICT (job_id, key) DO UPDATE SET
val = EXCLUDED.val,
updated_at = $4;`

if err = kv.q.ExecQ(sql, kv.jobID, key, types.JSONText(jsonVal), time.Now()); err != nil {
return fmt.Errorf("failed to store value: %s for key: %s for jobID: %d : %w", string(jsonVal), key, kv.jobID, err)
}
return nil
}

// Get retrieves serializable value by key.
func (kv kVStore) Get(key string, dest interface{}) error {
var ret json.RawMessage
sql := "SELECT val FROM job_kv_store WHERE job_id = $1 AND key = $2"
if err := kv.q.Get(&ret, sql, kv.jobID, key); err != nil {
return fmt.Errorf("failed to get value by key: %s for jobID: %d : %w", key, kv.jobID, err)
}

return json.Unmarshal(ret, dest)
}
85 changes: 85 additions & 0 deletions core/services/job/kv_orm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package job_test

import (
"fmt"
"reflect"
"testing"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/directrequest"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
"github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs"
)

func TestJobKVStore(t *testing.T) {
config := configtest.NewTestGeneralConfig(t)
db := pgtest.NewSqlxDB(t)

lggr := logger.TestLogger(t)

pipelineORM := pipeline.NewORM(db, logger.TestLogger(t), config.Database(), config.JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(db, logger.TestLogger(t), config.Database())

jobID := int32(1337)
kvStore := job.NewKVStore(jobID, db, config.Database(), lggr)
jobORM := NewTestORM(t, db, pipelineORM, bridgesORM, cltest.NewKeyStore(t, db, config.Database()), config.Database())

jb, err := directrequest.ValidatedDirectRequestSpec(testspecs.GetDirectRequestSpec())
require.NoError(t, err)
jb.ID = jobID
require.NoError(t, jobORM.CreateJob(&jb))

type testData struct {
Test string
}

type nested struct {
Contact testData // Nested struct
}

values := []interface{}{
42, // int
"hello", // string
3.14, // float64
true, // bool
[]int{1, 2, 3}, // slice of ints
map[string]int{"a": 1, "b": 2}, // map of string to int
testData{Test: "value1"}, // regular struct
nested{testData{"value2"}}, // nested struct
}

for i, value := range values {
testKey := "test_key_" + fmt.Sprint(i)
require.NoError(t, kvStore.Store(testKey, value))

// Get the type of the current value
valueType := reflect.TypeOf(value)
// Create a new instance of the value's type
temp := reflect.New(valueType).Interface()

require.NoError(t, kvStore.Get(testKey, &temp))

tempValue := reflect.ValueOf(temp).Elem().Interface()
require.Equal(t, value, tempValue)
}

key := "test_key_updating"
td1 := testData{Test: "value1"}
td2 := testData{Test: "value2"}

var retData testData
require.NoError(t, kvStore.Store(key, td1))
require.NoError(t, kvStore.Get(key, &retData))
require.Equal(t, td1, retData)

require.NoError(t, kvStore.Store(key, td2))
require.NoError(t, kvStore.Get(key, &retData))
require.Equal(t, td2, retData)
}
60 changes: 60 additions & 0 deletions core/services/job/mocks/kv_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) ([]job.Servi
}
lggr := logger.Sugared(d.lggr.Named(jb.ExternalJobID.String()).With(lggrCtx.Args()...))

kvStore := job.NewKVStore(jb.ID, d.db, d.cfg.Database(), lggr)

rid, err := spec.RelayID()
if err != nil {
return nil, ErrJobSpecNoRelayer{Err: err, PluginName: string(spec.PluginType)}
Expand Down Expand Up @@ -448,7 +450,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) ([]job.Servi
return d.newServicesLLO(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)

case types.Median:
return d.newServicesMedian(ctx, lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)
return d.newServicesMedian(ctx, lggr, jb, bootstrapPeers, kb, kvStore, ocrDB, lc, ocrLogger)

case types.DKG:
return d.newServicesDKG(lggr, jb, bootstrapPeers, kb, ocrDB, lc, ocrLogger)
Expand Down Expand Up @@ -927,6 +929,7 @@ func (d *Delegate) newServicesMedian(
jb job.Job,
bootstrapPeers []commontypes.BootstrapperLocator,
kb ocr2key.KeyBundle,
kvStore job.KVStore,
ocrDB *db,
lc ocrtypes.LocalConfig,
ocrLogger commontypes.Logger,
Expand Down Expand Up @@ -962,7 +965,7 @@ func (d *Delegate) newServicesMedian(
return nil, ErrRelayNotEnabled{Err: err, PluginName: "median", Relay: spec.Relay}
}

medianServices, err2 := median.NewMedianServices(ctx, jb, d.isNewlyCreatedJob, relayer, d.pipelineRunner, lggr, oracleArgsNoPlugin, mConfig, enhancedTelemChan, errorLog)
medianServices, err2 := median.NewMedianServices(ctx, jb, d.isNewlyCreatedJob, relayer, kvStore, d.pipelineRunner, lggr, oracleArgsNoPlugin, mConfig, enhancedTelemChan, errorLog)

if ocrcommon.ShouldCollectEnhancedTelemetry(&jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, enhancedTelemChan, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.ContractID, synchronization.EnhancedEA), lggr.Named("EnhancedTelemetry"))
Expand Down
3 changes: 2 additions & 1 deletion core/services/ocr2/plugins/median/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func NewMedianServices(ctx context.Context,
jb job.Job,
isNewlyCreatedJob bool,
relayer loop.Relayer,
kvStore job.KVStore,
pipelineRunner pipeline.Runner,
lggr logger.Logger,
argsNoPlugin libocr.OCR2OracleArgs,
Expand Down Expand Up @@ -128,7 +129,7 @@ func NewMedianServices(ctx context.Context,

if !pluginConfig.JuelsPerFeeCoinCacheDisabled {
lggr.Infof("juelsPerFeeCoin data source caching is enabled")
if juelsPerFeeCoinSource, err = ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()); err != nil {
if juelsPerFeeCoinSource, err = ocrcommon.NewInMemoryDataSourceCache(juelsPerFeeCoinSource, kvStore, pluginConfig.JuelsPerFeeCoinCacheDuration.Duration()); err != nil {
return nil, err
}
}
Expand Down
Loading
Loading