diff --git a/.changeset/rich-melons-sin.md b/.changeset/rich-melons-sin.md new file mode 100644 index 00000000000..38fb0b0051f --- /dev/null +++ b/.changeset/rich-melons-sin.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#db_update add persistence for DON-2-DON discovery announcements diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 76e1472192f..59e22f5b2c9 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -206,7 +206,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { var externalPeerWrapper p2ptypes.PeerWrapper if cfg.Capabilities().Peering().Enabled() { - externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), globalLogger) + externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), opts.DS, globalLogger) signer := externalPeer externalPeerWrapper = externalPeer diff --git a/core/services/ocrcommon/discoverer_database.go b/core/services/ocrcommon/discoverer_database.go index ea75f9e6d21..051105b746d 100644 --- a/core/services/ocrcommon/discoverer_database.go +++ b/core/services/ocrcommon/discoverer_database.go @@ -2,6 +2,7 @@ package ocrcommon import ( "context" + "fmt" "github.com/lib/pq" "github.com/pkg/errors" @@ -14,35 +15,60 @@ import ( var _ ocrnetworking.DiscovererDatabase = &DiscovererDatabase{} +const ( + // ocrDiscovererTable is the name of the table used to store OCR announcements + ocrDiscovererTable = "ocr_discoverer_announcements" + // don2donDiscovererTable is the name of the table used to store DON2DON announcements + don2donDiscovererTable = "don2don_discoverer_announcements" +) + +// DiscovererDatabase is a key-value store for p2p announcements +// that are based on the RageP2P library and bootstrap nodes type DiscovererDatabase struct { - ds sqlutil.DataSource - peerID string + ds sqlutil.DataSource + peerID string + tableName string +} + +// NewOCRDiscovererDatabase creates a new DiscovererDatabase for OCR announcements +func NewOCRDiscovererDatabase(ds sqlutil.DataSource, peerID string) *DiscovererDatabase { + return &DiscovererDatabase{ + ds: ds, + peerID: peerID, + tableName: ocrDiscovererTable, + } } -func NewDiscovererDatabase(ds sqlutil.DataSource, peerID string) *DiscovererDatabase { +// NewDON2DONDiscovererDatabase creates a new DiscovererDatabase for DON2DON announcements +func NewDON2DONDiscovererDatabase(ds sqlutil.DataSource, peerID string) *DiscovererDatabase { return &DiscovererDatabase{ - ds, - peerID, + ds: ds, + peerID: peerID, + tableName: don2donDiscovererTable, } } // StoreAnnouncement has key-value-store semantics and stores a peerID (key) and an associated serialized // announcement (value). func (d *DiscovererDatabase) StoreAnnouncement(ctx context.Context, peerID string, ann []byte) error { - _, err := d.ds.ExecContext(ctx, ` -INSERT INTO ocr_discoverer_announcements (local_peer_id, remote_peer_id, ann, created_at, updated_at) -VALUES ($1,$2,$3,NOW(),NOW()) ON CONFLICT (local_peer_id, remote_peer_id) DO UPDATE SET + q := fmt.Sprintf(` +INSERT INTO %s (local_peer_id, remote_peer_id, ann, created_at, updated_at) +VALUES ($1,$2,$3,NOW(),NOW()) ON CONFLICT (local_peer_id, remote_peer_id) DO UPDATE SET ann = EXCLUDED.ann, updated_at = EXCLUDED.updated_at -;`, d.peerID, peerID, ann) +;`, d.tableName) + + _, err := d.ds.ExecContext(ctx, + q, d.peerID, peerID, ann) return errors.Wrap(err, "DiscovererDatabase failed to StoreAnnouncement") } // ReadAnnouncements returns one serialized announcement (if available) for each of the peerIDs in the form of a map // keyed by each announcement's corresponding peer ID. func (d *DiscovererDatabase) ReadAnnouncements(ctx context.Context, peerIDs []string) (results map[string][]byte, err error) { - rows, err := d.ds.QueryContext(ctx, ` -SELECT remote_peer_id, ann FROM ocr_discoverer_announcements WHERE remote_peer_id = ANY($1) AND local_peer_id = $2`, pq.Array(peerIDs), d.peerID) + q := fmt.Sprintf(`SELECT remote_peer_id, ann FROM %s WHERE remote_peer_id = ANY($1) AND local_peer_id = $2`, d.tableName) + + rows, err := d.ds.QueryContext(ctx, q, pq.Array(peerIDs), d.peerID) if err != nil { return nil, errors.Wrap(err, "DiscovererDatabase failed to ReadAnnouncements") } diff --git a/core/services/ocrcommon/discoverer_database_test.go b/core/services/ocrcommon/discoverer_database_test.go index 30fb02a8265..16c6d26a42d 100644 --- a/core/services/ocrcommon/discoverer_database_test.go +++ b/core/services/ocrcommon/discoverer_database_test.go @@ -3,6 +3,7 @@ package ocrcommon_test import ( "crypto/ed25519" "crypto/rand" + "fmt" "testing" ragep2ptypes "github.com/smartcontractkit/libocr/ragep2p/types" @@ -21,66 +22,87 @@ func Test_DiscovererDatabase(t *testing.T) { localPeerID1 := mustRandomP2PPeerID(t) localPeerID2 := mustRandomP2PPeerID(t) - dd1 := ocrcommon.NewDiscovererDatabase(db, localPeerID1.Raw()) - dd2 := ocrcommon.NewDiscovererDatabase(db, localPeerID2.Raw()) - - ctx := testutils.Context(t) - - t.Run("StoreAnnouncement writes a value", func(t *testing.T) { - ann := []byte{1, 2, 3} - err := dd1.StoreAnnouncement(ctx, "remote1", ann) - assert.NoError(t, err) - - // test upsert - ann = []byte{4, 5, 6} - err = dd1.StoreAnnouncement(ctx, "remote1", ann) - assert.NoError(t, err) - - // write a different value - ann = []byte{7, 8, 9} - err = dd1.StoreAnnouncement(ctx, "remote2", ann) - assert.NoError(t, err) - }) - - t.Run("ReadAnnouncements reads values filtered by given peerIDs", func(t *testing.T) { - announcements, err := dd1.ReadAnnouncements(ctx, []string{"remote1", "remote2"}) - require.NoError(t, err) - - assert.Len(t, announcements, 2) - assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"]) - assert.Equal(t, []byte{7, 8, 9}, announcements["remote2"]) - - announcements, err = dd1.ReadAnnouncements(ctx, []string{"remote1"}) - require.NoError(t, err) - - assert.Len(t, announcements, 1) - assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"]) - }) - - t.Run("is scoped to local peer ID", func(t *testing.T) { - ann := []byte{10, 11, 12} - err := dd2.StoreAnnouncement(ctx, "remote1", ann) - assert.NoError(t, err) - - announcements, err := dd2.ReadAnnouncements(ctx, []string{"remote1"}) - require.NoError(t, err) - assert.Len(t, announcements, 1) - assert.Equal(t, []byte{10, 11, 12}, announcements["remote1"]) - - announcements, err = dd1.ReadAnnouncements(ctx, []string{"remote1"}) - require.NoError(t, err) - assert.Len(t, announcements, 1) - assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"]) - }) - - t.Run("persists data across restarts", func(t *testing.T) { - dd3 := ocrcommon.NewDiscovererDatabase(db, localPeerID1.Raw()) - - announcements, err := dd3.ReadAnnouncements(ctx, []string{"remote1"}) - require.NoError(t, err) - assert.Len(t, announcements, 1) - assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"]) - }) + type test struct { + name string + dd1 *ocrcommon.DiscovererDatabase + dd2 *ocrcommon.DiscovererDatabase + } + + tests := []test{ + { + name: "ocr discoverer database", + dd1: ocrcommon.NewOCRDiscovererDatabase(db, localPeerID1.Raw()), + dd2: ocrcommon.NewOCRDiscovererDatabase(db, localPeerID2.Raw()), + }, + { + name: "don2don discoverer database", + dd1: ocrcommon.NewDON2DONDiscovererDatabase(db, localPeerID1.Raw()), + dd2: ocrcommon.NewDON2DONDiscovererDatabase(db, localPeerID2.Raw()), + }, + } + + for _, tt := range tests { + dd1 := tt.dd1 + dd2 := tt.dd2 + + ctx := testutils.Context(t) + + t.Run(fmt.Sprintf("%s StoreAnnouncement writes a value", tt.name), func(t *testing.T) { + ann := []byte{1, 2, 3} + err := dd1.StoreAnnouncement(ctx, "remote1", ann) + assert.NoError(t, err) + + // test upsert + ann = []byte{4, 5, 6} + err = dd1.StoreAnnouncement(ctx, "remote1", ann) + assert.NoError(t, err) + + // write a different value + ann = []byte{7, 8, 9} + err = dd1.StoreAnnouncement(ctx, "remote2", ann) + assert.NoError(t, err) + }) + + t.Run(fmt.Sprintf("%s ReadAnnouncements reads values filtered by given peerIDs", tt.name), func(t *testing.T) { + announcements, err := dd1.ReadAnnouncements(ctx, []string{"remote1", "remote2"}) + require.NoError(t, err) + + assert.Len(t, announcements, 2) + assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"]) + assert.Equal(t, []byte{7, 8, 9}, announcements["remote2"]) + + announcements, err = dd1.ReadAnnouncements(ctx, []string{"remote1"}) + require.NoError(t, err) + + assert.Len(t, announcements, 1) + assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"]) + }) + + t.Run(fmt.Sprintf("%s is scoped to local peer ID", tt.name), func(t *testing.T) { + ann := []byte{10, 11, 12} + err := dd2.StoreAnnouncement(ctx, "remote1", ann) + assert.NoError(t, err) + + announcements, err := dd2.ReadAnnouncements(ctx, []string{"remote1"}) + require.NoError(t, err) + assert.Len(t, announcements, 1) + assert.Equal(t, []byte{10, 11, 12}, announcements["remote1"]) + + announcements, err = dd1.ReadAnnouncements(ctx, []string{"remote1"}) + require.NoError(t, err) + assert.Len(t, announcements, 1) + assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"]) + }) + + t.Run(fmt.Sprintf("%s persists data across restarts", tt.name), func(t *testing.T) { + dd3 := ocrcommon.NewOCRDiscovererDatabase(db, localPeerID1.Raw()) + + announcements, err := dd3.ReadAnnouncements(ctx, []string{"remote1"}) + require.NoError(t, err) + assert.Len(t, announcements, 1) + assert.Equal(t, []byte{4, 5, 6}, announcements["remote1"]) + }) + } } func mustRandomP2PPeerID(t *testing.T) p2pkey.PeerID { diff --git a/core/services/ocrcommon/peer_wrapper.go b/core/services/ocrcommon/peer_wrapper.go index 97c429f9a5f..762a3d05aa3 100644 --- a/core/services/ocrcommon/peer_wrapper.go +++ b/core/services/ocrcommon/peer_wrapper.go @@ -117,7 +117,7 @@ func (p *SingletonPeerWrapper) peerConfig() (ocrnetworking.PeerConfig, error) { } p.PeerID = key.PeerID() - discovererDB := NewDiscovererDatabase(p.ds, p.PeerID.Raw()) + discovererDB := NewOCRDiscovererDatabase(p.ds, p.PeerID.Raw()) config := p.p2pCfg peerConfig := ocrnetworking.PeerConfig{ diff --git a/core/services/p2p/wrapper/wrapper.go b/core/services/p2p/wrapper/wrapper.go index acb6694b5a3..7b5b92af72e 100644 --- a/core/services/p2p/wrapper/wrapper.go +++ b/core/services/p2p/wrapper/wrapper.go @@ -10,9 +10,12 @@ import ( "github.com/smartcontractkit/libocr/commontypes" ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" + "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" "github.com/smartcontractkit/chainlink/v2/core/services/p2p" "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) @@ -23,16 +26,18 @@ type peerWrapper struct { p2pConfig config.P2P privateKey ed25519.PrivateKey lggr logger.Logger + ds sqlutil.DataSource } var _ types.PeerWrapper = &peerWrapper{} var _ types.Signer = &peerWrapper{} -func NewExternalPeerWrapper(keystoreP2P keystore.P2P, p2pConfig config.P2P, lggr logger.Logger) *peerWrapper { +func NewExternalPeerWrapper(keystoreP2P keystore.P2P, p2pConfig config.P2P, ds sqlutil.DataSource, lggr logger.Logger) *peerWrapper { return &peerWrapper{ keystoreP2P: keystoreP2P, p2pConfig: p2pConfig, lggr: lggr, + ds: ds, } } @@ -42,15 +47,14 @@ func (e *peerWrapper) GetPeer() types.Peer { // convert to "external" P2P PeerConfig, which is independent of OCR // this has to be done in Start() because keystore is not unlocked at construction time -func convertPeerConfig(keystoreP2P keystore.P2P, p2pConfig config.P2P) (p2p.PeerConfig, error) { - key, err := keystoreP2P.GetOrFirst(p2pConfig.PeerID()) +func (e *peerWrapper) convertPeerConfig() (p2p.PeerConfig, error) { + key, err := e.keystoreP2P.GetOrFirst(e.p2pConfig.PeerID()) if err != nil { return p2p.PeerConfig{}, err } - // TODO(KS-106): use real DB - discovererDB := p2p.NewInMemoryDiscovererDatabase() - bootstrappers, err := convertBootstrapperLocators(p2pConfig.V2().DefaultBootstrappers()) + discovererDB := ocrcommon.NewDON2DONDiscovererDatabase(e.ds, key.PeerID().Raw()) + bootstrappers, err := convertBootstrapperLocators(e.p2pConfig.V2().DefaultBootstrappers()) if err != nil { return p2p.PeerConfig{}, err } @@ -58,12 +62,12 @@ func convertPeerConfig(keystoreP2P keystore.P2P, p2pConfig config.P2P) (p2p.Peer peerConfig := p2p.PeerConfig{ PrivateKey: key.PrivKey, - ListenAddresses: p2pConfig.V2().ListenAddresses(), - AnnounceAddresses: p2pConfig.V2().AnnounceAddresses(), + ListenAddresses: e.p2pConfig.V2().ListenAddresses(), + AnnounceAddresses: e.p2pConfig.V2().AnnounceAddresses(), Bootstrappers: bootstrappers, - DeltaReconcile: p2pConfig.V2().DeltaReconcile().Duration(), - DeltaDial: p2pConfig.V2().DeltaDial().Duration(), + DeltaReconcile: e.p2pConfig.V2().DeltaReconcile().Duration(), + DeltaDial: e.p2pConfig.V2().DeltaDial().Duration(), DiscovererDatabase: discovererDB, // NOTE: this is equivalent to prometheus.DefaultRegisterer, but we need to use a separate @@ -95,7 +99,7 @@ func convertBootstrapperLocators(bootstrappers []commontypes.BootstrapperLocator } func (e *peerWrapper) Start(ctx context.Context) error { - cfg, err := convertPeerConfig(e.keystoreP2P, e.p2pConfig) + cfg, err := e.convertPeerConfig() if err != nil { return err } diff --git a/core/services/p2p/wrapper/wrapper_test.go b/core/services/p2p/wrapper/wrapper_test.go index dd91ecaee47..3c164b5c711 100644 --- a/core/services/p2p/wrapper/wrapper_test.go +++ b/core/services/p2p/wrapper/wrapper_test.go @@ -10,6 +10,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey" @@ -18,6 +19,8 @@ import ( ) func TestPeerWrapper_CleanStartClose(t *testing.T) { + db := pgtest.NewSqlxDB(t) + lggr := logger.TestLogger(t) port := freeport.GetOne(t) cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { @@ -30,7 +33,7 @@ func TestPeerWrapper_CleanStartClose(t *testing.T) { require.NoError(t, err) keystoreP2P.On("GetOrFirst", mock.Anything).Return(key, nil) - wrapper := wrapper.NewExternalPeerWrapper(keystoreP2P, cfg.Capabilities().Peering(), lggr) + wrapper := wrapper.NewExternalPeerWrapper(keystoreP2P, cfg.Capabilities().Peering(), db, lggr) require.NotNil(t, wrapper) require.NoError(t, wrapper.Start(testutils.Context(t))) require.NoError(t, wrapper.Close()) diff --git a/core/store/migrate/migrations/0240_don2don_discoverer.sql b/core/store/migrate/migrations/0240_don2don_discoverer.sql new file mode 100644 index 00000000000..c08b8cd7b11 --- /dev/null +++ b/core/store/migrate/migrations/0240_don2don_discoverer.sql @@ -0,0 +1,14 @@ +-- +goose Up +-- this migration is for the don2don_discoverer_announcements table +-- it is essentially the same as ocr_discoverer_announcements but scoped to the don2don use case +-- both cases are based on RageP2P library and bootstrap nodes. for now but we want to keep their addresses separate to avoid accidental cross-communication +CREATE TABLE don2don_discoverer_announcements ( + local_peer_id text NOT NULL, + remote_peer_id text NOT NULL, + ann bytea NOT NULL, + created_at timestamptz not null, + updated_at timestamptz not null, + PRIMARY KEY(local_peer_id, remote_peer_id) +); +-- +goose Down +DROP TABLE don2don_discoverer_announcements;