Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-mercury server #12337

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .changeset/strange-tables-occur.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
"chainlink": patch
---

Mercury jobs can now broadcast to multiple mercury servers.

Previously, a single mercury server would be specified in a job spec as so:

```toml
[pluginConfig]
serverURL = "example.com/foo"
serverPubKey = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93"
```

You may now specify multiple mercury servers, as so:

```toml
[pluginConfig]
servers = { "example.com/foo" = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", "mercury2.example:1234/bar" = "524ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93" }
```

85 changes: 65 additions & 20 deletions core/services/ocr2/plugins/mercury/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"net/url"
"regexp"
"sort"

pkgerrors "github.com/pkg/errors"

Expand All @@ -17,8 +18,18 @@ import (
)

type PluginConfig struct {
// Must either specify details for single server OR multiple servers.
// Specifying both is not valid.

// Single mercury server
// LEGACY: This is the old way of specifying a mercury server
RawServerURL string `json:"serverURL" toml:"serverURL"`
ServerPubKey utils.PlainHexBytes `json:"serverPubKey" toml:"serverPubKey"`

// Multi mercury servers
// This is the preferred way to specify mercury server(s)
Servers map[string]utils.PlainHexBytes `json:"servers" toml:"servers"`

// InitialBlockNumber allows to set a custom "validFromBlockNumber" for
// the first ever report in the case of a brand new feed, where the mercury
// server does not have any previous reports. For a brand new feed, this
Expand All @@ -29,26 +40,64 @@ type PluginConfig struct {
NativeFeedID *mercuryutils.FeedID `json:"nativeFeedID" toml:"nativeFeedID"`
}

func ValidatePluginConfig(config PluginConfig, feedID mercuryutils.FeedID) (merr error) {
if config.RawServerURL == "" {
merr = errors.New("mercury: ServerURL must be specified")
func validateURL(rawServerURL string) error {
var normalizedURI string
if schemeRegexp.MatchString(rawServerURL) {
normalizedURI = rawServerURL
} else {
var normalizedURI string
if schemeRegexp.MatchString(config.RawServerURL) {
normalizedURI = config.RawServerURL
normalizedURI = fmt.Sprintf("wss://%s", rawServerURL)
}
uri, err := url.ParseRequestURI(normalizedURI)
if err != nil {
return pkgerrors.Errorf(`Mercury: invalid value for ServerURL, got: %q`, rawServerURL)
}
if uri.Scheme != "wss" {
return pkgerrors.Errorf(`Mercury: invalid scheme specified for MercuryServer, got: %q (scheme: %q) but expected a websocket url e.g. "192.0.2.2:4242" or "wss://192.0.2.2:4242"`, rawServerURL, uri.Scheme)
}
return nil
}

type Server struct {
URL string
PubKey utils.PlainHexBytes
}

func (p PluginConfig) GetServers() (servers []Server) {
if p.RawServerURL != "" {
return []Server{{URL: wssRegexp.ReplaceAllString(p.RawServerURL, ""), PubKey: p.ServerPubKey}}
}
for url, pubKey := range p.Servers {
servers = append(servers, Server{URL: wssRegexp.ReplaceAllString(url, ""), PubKey: pubKey})
}
sort.Slice(servers, func(i, j int) bool {
return servers[i].URL < servers[j].URL
})
return
}

func ValidatePluginConfig(config PluginConfig, feedID mercuryutils.FeedID) (merr error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we protect against the same Server URL being specified multiple times? I don't think there are any technical sharp edges with allowing it (Checkout from the mercury pool will make sure a wsrpc client only has one connection to the mercury server IFIUC)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It unmarshals into a map so duplicates would be handled however the TOML library usually handles that. Either the dupe would be ignored, or it would throw an error. Either way its impossible to have dupes after unmarshalling.

if len(config.Servers) > 0 {
if config.RawServerURL != "" || len(config.ServerPubKey) != 0 {
merr = errors.Join(merr, errors.New("Mercury: Servers and RawServerURL/ServerPubKey may not be specified together"))
} else {
normalizedURI = fmt.Sprintf("wss://%s", config.RawServerURL)
for serverName, serverPubKey := range config.Servers {
if err := validateURL(serverName); err != nil {
merr = errors.Join(merr, pkgerrors.Wrap(err, "Mercury: invalid value for ServerURL"))
}
if len(serverPubKey) != 32 {
merr = errors.Join(merr, errors.New("Mercury: ServerPubKey must be a 32-byte hex string"))
}
}
}
uri, err := url.ParseRequestURI(normalizedURI)
if err != nil {
merr = pkgerrors.Wrap(err, "Mercury: invalid value for ServerURL")
} else if uri.Scheme != "wss" {
merr = pkgerrors.Errorf(`Mercury: invalid scheme specified for MercuryServer, got: %q (scheme: %q) but expected a websocket url e.g. "192.0.2.2:4242" or "wss://192.0.2.2:4242"`, config.RawServerURL, uri.Scheme)
} else if config.RawServerURL == "" {
merr = errors.Join(merr, errors.New("Mercury: Servers must be specified"))
} else {
if err := validateURL(config.RawServerURL); err != nil {
merr = errors.Join(merr, pkgerrors.Wrap(err, "Mercury: invalid value for ServerURL"))
}
if len(config.ServerPubKey) != 32 {
merr = errors.Join(merr, errors.New("Mercury: If RawServerURL is specified, ServerPubKey is also required and must be a 32-byte hex string"))
}
}

if len(config.ServerPubKey) != 32 {
merr = errors.Join(merr, errors.New("mercury: ServerPubKey is required and must be a 32-byte hex string"))
}

switch feedID.Version() {
Expand Down Expand Up @@ -78,7 +127,3 @@ func ValidatePluginConfig(config PluginConfig, feedID mercuryutils.FeedID) (merr

var schemeRegexp = regexp.MustCompile(`^[a-zA-Z][a-zA-Z0-9+.-]*://`)
var wssRegexp = regexp.MustCompile(`^wss://`)

func (p PluginConfig) ServerURL() string {
return wssRegexp.ReplaceAllString(p.RawServerURL, "")
}
90 changes: 80 additions & 10 deletions core/services/ocr2/plugins/mercury/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/pelletier/go-toml/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/utils"
)

var v1FeedId = [32]uint8{00, 01, 107, 74, 167, 229, 124, 167, 182, 138, 225, 191, 69, 101, 63, 86, 182, 86, 253, 58, 163, 53, 239, 127, 174, 105, 107, 102, 63, 27, 132, 114}
Expand All @@ -31,6 +33,46 @@ func Test_PluginConfig(t *testing.T) {
err = ValidatePluginConfig(mc, v1FeedId)
require.NoError(t, err)
})
t.Run("with multiple server URLs", func(t *testing.T) {
t.Run("if no ServerURL/ServerPubKey is specified", func(t *testing.T) {
rawToml := `
Servers = { "example.com:80" = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", "example2.invalid:1234" = "524ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93" }
`

var mc PluginConfig
err := toml.Unmarshal([]byte(rawToml), &mc)
require.NoError(t, err)

assert.Len(t, mc.Servers, 2)
assert.Equal(t, "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", mc.Servers["example.com:80"].String())
assert.Equal(t, "524ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", mc.Servers["example2.invalid:1234"].String())

err = ValidatePluginConfig(mc, v1FeedId)
require.NoError(t, err)
})
t.Run("if ServerURL or ServerPubKey is specified", func(t *testing.T) {
rawToml := `
Servers = { "example.com:80" = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", "example2.invalid:1234" = "524ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93" }
ServerURL = "example.com:80"
`
var mc PluginConfig
err := toml.Unmarshal([]byte(rawToml), &mc)
require.NoError(t, err)

err = ValidatePluginConfig(mc, v1FeedId)
require.EqualError(t, err, "Mercury: Servers and RawServerURL/ServerPubKey may not be specified together")

rawToml = `
Servers = { "example.com:80" = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93", "example2.invalid:1234" = "524ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93" }
ServerPubKey = "724ff6eae9e900270edfff233e16322a70ec06e1a6e62a81ef13921f398f6c93"
`
err = toml.Unmarshal([]byte(rawToml), &mc)
require.NoError(t, err)

err = ValidatePluginConfig(mc, v1FeedId)
require.EqualError(t, err, "Mercury: Servers and RawServerURL/ServerPubKey may not be specified together")
})
})

t.Run("with invalid values", func(t *testing.T) {
rawToml := `
Expand All @@ -53,7 +95,7 @@ func Test_PluginConfig(t *testing.T) {
err = ValidatePluginConfig(mc, v1FeedId)
require.Error(t, err)
assert.Contains(t, err.Error(), `Mercury: invalid scheme specified for MercuryServer, got: "http://example.com" (scheme: "http") but expected a websocket url e.g. "192.0.2.2:4242" or "wss://192.0.2.2:4242"`)
assert.Contains(t, err.Error(), `mercury: ServerPubKey is required and must be a 32-byte hex string`)
assert.Contains(t, err.Error(), `If RawServerURL is specified, ServerPubKey is also required and must be a 32-byte hex string`)
})

t.Run("with unnecessary values", func(t *testing.T) {
Expand Down Expand Up @@ -135,13 +177,41 @@ func Test_PluginConfig(t *testing.T) {
})
}

func Test_PluginConfig_ServerURL(t *testing.T) {
pc := PluginConfig{RawServerURL: "example.com"}
assert.Equal(t, "example.com", pc.ServerURL())
pc = PluginConfig{RawServerURL: "wss://example.com"}
assert.Equal(t, "example.com", pc.ServerURL())
pc = PluginConfig{RawServerURL: "example.com:1234/foo"}
assert.Equal(t, "example.com:1234/foo", pc.ServerURL())
pc = PluginConfig{RawServerURL: "wss://example.com:1234/foo"}
assert.Equal(t, "example.com:1234/foo", pc.ServerURL())
func Test_PluginConfig_GetServers(t *testing.T) {
t.Run("with single server", func(t *testing.T) {
pubKey := utils.PlainHexBytes([]byte{1, 2, 3})
pc := PluginConfig{RawServerURL: "example.com", ServerPubKey: pubKey}
require.Len(t, pc.GetServers(), 1)
assert.Equal(t, "example.com", pc.GetServers()[0].URL)
assert.Equal(t, pubKey, pc.GetServers()[0].PubKey)

pc = PluginConfig{RawServerURL: "wss://example.com", ServerPubKey: pubKey}
require.Len(t, pc.GetServers(), 1)
assert.Equal(t, "example.com", pc.GetServers()[0].URL)
assert.Equal(t, pubKey, pc.GetServers()[0].PubKey)

pc = PluginConfig{RawServerURL: "example.com:1234/foo", ServerPubKey: pubKey}
require.Len(t, pc.GetServers(), 1)
assert.Equal(t, "example.com:1234/foo", pc.GetServers()[0].URL)
assert.Equal(t, pubKey, pc.GetServers()[0].PubKey)

pc = PluginConfig{RawServerURL: "wss://example.com:1234/foo", ServerPubKey: pubKey}
require.Len(t, pc.GetServers(), 1)
assert.Equal(t, "example.com:1234/foo", pc.GetServers()[0].URL)
assert.Equal(t, pubKey, pc.GetServers()[0].PubKey)
})

t.Run("with multiple servers", func(t *testing.T) {
servers := map[string]utils.PlainHexBytes{
"example.com:80": utils.PlainHexBytes([]byte{1, 2, 3}),
"mercuryserver.invalid:1234/foo": utils.PlainHexBytes([]byte{4, 5, 6}),
}
pc := PluginConfig{Servers: servers}

require.Len(t, pc.GetServers(), 2)
assert.Equal(t, "example.com:80", pc.GetServers()[0].URL)
assert.Equal(t, utils.PlainHexBytes{1, 2, 3}, pc.GetServers()[0].PubKey)
assert.Equal(t, "mercuryserver.invalid:1234/foo", pc.GetServers()[1].URL)
assert.Equal(t, utils.PlainHexBytes{4, 5, 6}, pc.GetServers()[1].PubKey)
})
}
28 changes: 16 additions & 12 deletions core/services/ocr2/plugins/mercury/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"math/big"
"net"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -389,31 +390,36 @@ func addV3MercuryJob(
bootstrapNodePort int,
bmBridge,
bidBridge,
askBridge,
serverURL string,
serverPubKey,
askBridge string,
servers map[string]string,
clientPubKey ed25519.PublicKey,
feedName string,
feedID [32]byte,
linkFeedID [32]byte,
nativeFeedID [32]byte,
) {
srvs := make([]string, 0, len(servers))
for u, k := range servers {
srvs = append(srvs, fmt.Sprintf("%q = %q", u, k))
}
serversStr := fmt.Sprintf("{ %s }", strings.Join(srvs, ", "))

node.AddJob(t, fmt.Sprintf(`
type = "offchainreporting2"
schemaVersion = 1
name = "mercury-%[1]d-%[12]s"
name = "mercury-%[1]d-%[11]s"
forwardingAllowed = false
maxTaskDuration = "1s"
contractID = "%[2]s"
feedID = "0x%[11]x"
feedID = "0x%[10]x"
contractConfigTrackerPollInterval = "1s"
ocrKeyBundleID = "%[3]s"
p2pv2Bootstrappers = [
"%[4]s"
]
relay = "evm"
pluginType = "mercury"
transmitterID = "%[10]x"
transmitterID = "%[9]x"
observationSource = """
// Benchmark Price
price1 [type=bridge name="%[5]s" timeout="50ms" requestData="{\\"data\\":{\\"from\\":\\"ETH\\",\\"to\\":\\"USD\\"}}"];
Expand All @@ -438,10 +444,9 @@ observationSource = """
"""

[pluginConfig]
serverURL = "%[8]s"
serverPubKey = "%[9]x"
linkFeedID = "0x%[13]x"
nativeFeedID = "0x%[14]x"
servers = %[8]s
linkFeedID = "0x%[12]x"
nativeFeedID = "0x%[13]x"

[relayConfig]
chainID = 1337
Expand All @@ -453,8 +458,7 @@ chainID = 1337
bmBridge,
bidBridge,
askBridge,
serverURL,
serverPubKey,
serversStr,
clientPubKey,
feedID,
feedName,
Expand Down
Loading
Loading