Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process missed blocks on startup #20

Merged
merged 52 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
c74ef48
add json database
cam-schultz Aug 29, 2023
1280b4a
initialize subscriber from stored block height
cam-schultz Aug 29, 2023
551b90e
subscriber initialization failure is non fatal
cam-schultz Aug 29, 2023
5ca0bd0
Merge branch 'main' into catch-up-blocks
cam-schultz Aug 29, 2023
ba3f2fb
add storage location cfg option
cam-schultz Aug 29, 2023
39fdc21
better read error handling
cam-schultz Aug 30, 2023
a1aab00
add comments
cam-schultz Aug 30, 2023
6913fb9
properly set default storage location
cam-schultz Aug 30, 2023
505b337
fix update json db vals bug
cam-schultz Aug 30, 2023
de7a347
add initialization log message
cam-schultz Aug 30, 2023
349ac1e
typo
cam-schultz Aug 30, 2023
162ca0d
validate rpc endpoint in config
cam-schultz Aug 30, 2023
7d07d82
create subscription before processing back events
cam-schultz Aug 30, 2023
9a0baa0
rename type
cam-schultz Aug 30, 2023
dc7ae3c
rename latestSeenBlock
cam-schultz Aug 30, 2023
860f595
json storage unit tests
cam-schultz Aug 30, 2023
aa8dd8b
correct comment
cam-schultz Aug 30, 2023
ac5d57d
better error messages
cam-schultz Aug 30, 2023
9a51727
more unit tests
cam-schultz Aug 31, 2023
16af742
error on invalid log
cam-schultz Sep 1, 2023
2bb9605
save constructed urls
cam-schultz Sep 1, 2023
1727f90
make json db writes atomic
cam-schultz Sep 1, 2023
e49395b
add license header
cam-schultz Sep 1, 2023
ebdfc79
properly handle json file missing case
cam-schultz Sep 5, 2023
532094c
cleanup
cam-schultz Sep 5, 2023
f4d2c83
write latest block to db on initialization
cam-schultz Sep 5, 2023
17879c0
Merge branch 'main' into catch-up-blocks
cam-schultz Sep 5, 2023
4e5419d
consolidate endpoint methods
cam-schultz Sep 5, 2023
cf9a0e7
clean up comments
cam-schultz Sep 5, 2023
9fdd275
rename var
cam-schultz Sep 5, 2023
32146a4
cap the number of blocks to process on startup
cam-schultz Sep 5, 2023
ce66b82
cleanup
cam-schultz Sep 5, 2023
e012c4e
refactor subscriber interface
cam-schultz Sep 5, 2023
44384ca
fix test
cam-schultz Sep 5, 2023
e9c0b01
use Int:Sub
cam-schultz Sep 6, 2023
878d4c6
process logs in serial for each source chain
cam-schultz Sep 6, 2023
85cf76c
return nil error in success case
cam-schultz Sep 6, 2023
813a8e9
remove irrelevant comment
cam-schultz Sep 6, 2023
fde1675
improve logs
cam-schultz Sep 7, 2023
c4e36bd
sort logs from eth_getLogs
cam-schultz Sep 7, 2023
b8902c5
fix logs
cam-schultz Sep 7, 2023
9bda673
sort by log index
cam-schultz Sep 11, 2023
48ec689
cleanup
cam-schultz Sep 11, 2023
4ed7671
cache current state
cam-schultz Sep 11, 2023
4cdc487
clarify latestSeenBlock comments
cam-schultz Sep 11, 2023
55bc03c
clarify latestSeenBlock comments
cam-schultz Sep 11, 2023
232f6fe
fix test
cam-schultz Sep 11, 2023
5b0cc63
rename latestProcessedBlock
cam-schultz Sep 12, 2023
995d204
rename SetProcessedBlockHeightToLatest
cam-schultz Sep 13, 2023
e30a4ac
update comment
cam-schultz Sep 13, 2023
1e9891d
Merge branch 'main' into catch-up-blocks
cam-schultz Sep 13, 2023
8870d8b
appease linter
cam-schultz Sep 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 61 additions & 15 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type SourceSubnet struct {
APINodeHost string `mapstructure:"api-node-host" json:"api-node-host"`
APINodePort uint32 `mapstructure:"api-node-port" json:"api-node-port"`
EncryptConnection bool `mapstructure:"encrypt-connection" json:"encrypt-connection"`
RPCEndpoint string `mapstructure:"rpc-endpoint" json:"rpc-endpoint"`
WSEndpoint string `mapstructure:"ws-endpoint" json:"ws-endpoint"`
MessageContracts map[string]MessageProtocolConfig `mapstructure:"message-contracts" json:"message-contracts"`
}
Expand All @@ -62,6 +63,7 @@ type Config struct {
NetworkID uint32 `mapstructure:"network-id" json:"network-id"`
PChainAPIURL string `mapstructure:"p-chain-api-url" json:"p-chain-api-url"`
EncryptConnection bool `mapstructure:"encrypt-connection" json:"encrypt-connection"`
StorageLocation string `mapstructure:"storage-location" json:"storage-location"`
SourceSubnets []SourceSubnet `mapstructure:"source-subnets" json:"source-subnets"`
DestinationSubnets []DestinationSubnet `mapstructure:"destination-subnets" json:"destination-subnets"`
}
Expand All @@ -71,6 +73,7 @@ func SetDefaultConfigValues(v *viper.Viper) {
v.SetDefault(NetworkIDKey, constants.MainnetID)
v.SetDefault(PChainAPIURLKey, "https://api.avax.network")
v.SetDefault(EncryptConnectionKey, true)
v.SetDefault(StorageLocationKey, "./.awm-relayer-storage")
}

// BuildConfig constructs the relayer config using Viper.
Expand Down Expand Up @@ -99,6 +102,7 @@ func BuildConfig(v *viper.Viper) (Config, bool, error) {
cfg.NetworkID = v.GetUint32(NetworkIDKey)
cfg.PChainAPIURL = v.GetString(PChainAPIURLKey)
cfg.EncryptConnection = v.GetBool(EncryptConnectionKey)
cfg.StorageLocation = v.GetString(StorageLocationKey)
if err := v.UnmarshalKey(DestinationSubnetsKey, &cfg.DestinationSubnets); err != nil {
return Config{}, false, fmt.Errorf("failed to unmarshal destination subnets: %v", err)
}
Expand Down Expand Up @@ -215,6 +219,9 @@ func (s *SourceSubnet) Validate() error {
if _, err := url.ParseRequestURI(s.GetNodeWSEndpoint()); err != nil {
return fmt.Errorf("invalid relayer subscribe URL in source subnet configuration: %v", err)
}
if _, err := url.ParseRequestURI(s.GetNodeRPCEndpoint()); err != nil {
return fmt.Errorf("invalid relayer RPC URL in source subnet configuration: %v", err)
}

// Validate the VM specific settings
switch ParseVM(s.VM) {
Expand Down Expand Up @@ -267,15 +274,25 @@ func (s *DestinationSubnet) Validate() error {
return nil
}

func constructURL(protocol string, host string, port uint32, encrypt bool) string {
func constructURL(protocol string, host string, port uint32, encrypt bool, chainIDStr string, subnetIDStr string) string {
var protocolPathMap = map[string]string{
"http": "rpc",
"ws": "ws",
}
path := protocolPathMap[protocol]

if encrypt {
protocol = protocol + "s"
}
portStr := ""
if port != 0 {
portStr = fmt.Sprintf(":%d", port)
}
return fmt.Sprintf("%s://%s%s", protocol, host, portStr)
subnetID, _ := ids.FromString(subnetIDStr) // already validated in Validate()
if subnetID == constants.PrimaryNetworkID {
chainIDStr = cChainIdentifierString
}
return fmt.Sprintf("%s://%s%s/ext/bc/%s/%s", protocol, host, portStr, chainIDStr, path)
}

// Constructs an RPC endpoint for the subnet.
Expand All @@ -286,13 +303,38 @@ func (s *DestinationSubnet) GetNodeRPCEndpoint() string {
if s.RPCEndpoint != "" {
return s.RPCEndpoint
}
baseUrl := constructURL("http", s.APINodeHost, s.APINodePort, s.EncryptConnection)
chainID := s.ChainID
subnetID, _ := ids.FromString(s.SubnetID) // already validated in Validate()
if subnetID == constants.PrimaryNetworkID {
chainID = cChainIdentifierString

// Save this result for future use
s.RPCEndpoint = constructURL(
"http",
s.APINodeHost,
s.APINodePort,
s.EncryptConnection,
s.ChainID,
s.SubnetID,
)
return s.RPCEndpoint
}

// Constructs an RPC endpoint for the subnet.
// If the RPCEndpoint field is set in the configuration, returns that directly.
// Otherwise, constructs the endpoint from the APINodeHost, APINodePort, and EncryptConnection fields,
// following the /ext/bc/{chainID}/rpc format.
func (s *SourceSubnet) GetNodeRPCEndpoint() string {
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
if s.RPCEndpoint != "" {
return s.RPCEndpoint
}
return fmt.Sprintf("%s/ext/bc/%s/rpc", baseUrl, chainID)

// Save this result for future use
s.RPCEndpoint = constructURL(
"http",
s.APINodeHost,
s.APINodePort,
s.EncryptConnection,
s.ChainID,
s.SubnetID,
)
return s.RPCEndpoint
}

// Constructs a WS endpoint for the subnet.
Expand All @@ -303,13 +345,17 @@ func (s *SourceSubnet) GetNodeWSEndpoint() string {
if s.WSEndpoint != "" {
return s.WSEndpoint
}
baseUrl := constructURL("ws", s.APINodeHost, s.APINodePort, s.EncryptConnection)
chainID := s.ChainID
subnetID, _ := ids.FromString(s.SubnetID) // already validated in Validate()
if subnetID == constants.PrimaryNetworkID {
chainID = cChainIdentifierString
}
return fmt.Sprintf("%s/ext/bc/%s/ws", baseUrl, chainID)

// Save this result for future use
s.WSEndpoint = constructURL(
"ws",
s.APINodeHost,
s.APINodePort,
s.EncryptConnection,
s.ChainID,
s.SubnetID,
)
return s.WSEndpoint
}

// Get the private key and derive the wallet address from a relayer's configured private key for a given destination subnet.
Expand Down
29 changes: 18 additions & 11 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,11 @@ func TestGetDestinationRPCEndpoint(t *testing.T) {
}
}

func TestGetSourceSubnetWSEndpoint(t *testing.T) {
func TestGetSourceSubnetEndpoints(t *testing.T) {
testCases := []struct {
s SourceSubnet
expectedResult string
s SourceSubnet
expectedWsResult string
expectedRpcResult string
}{
{
s: SourceSubnet{
Expand All @@ -137,7 +138,8 @@ func TestGetSourceSubnetWSEndpoint(t *testing.T) {
ChainID: testChainID,
SubnetID: testSubnetID,
},
expectedResult: fmt.Sprintf("ws://127.0.0.1:9650/ext/bc/%s/ws", testChainID),
expectedWsResult: fmt.Sprintf("ws://127.0.0.1:9650/ext/bc/%s/ws", testChainID),
expectedRpcResult: fmt.Sprintf("http://127.0.0.1:9650/ext/bc/%s/rpc", testChainID),
},
{
s: SourceSubnet{
Expand All @@ -147,7 +149,8 @@ func TestGetSourceSubnetWSEndpoint(t *testing.T) {
ChainID: testChainID,
SubnetID: testSubnetID,
},
expectedResult: fmt.Sprintf("wss://127.0.0.1:9650/ext/bc/%s/ws", testChainID),
expectedWsResult: fmt.Sprintf("wss://127.0.0.1:9650/ext/bc/%s/ws", testChainID),
expectedRpcResult: fmt.Sprintf("https://127.0.0.1:9650/ext/bc/%s/rpc", testChainID),
},
{
s: SourceSubnet{
Expand All @@ -157,7 +160,8 @@ func TestGetSourceSubnetWSEndpoint(t *testing.T) {
ChainID: testChainID,
SubnetID: testSubnetID,
},
expectedResult: fmt.Sprintf("ws://api.avax.network/ext/bc/%s/ws", testChainID),
expectedWsResult: fmt.Sprintf("ws://api.avax.network/ext/bc/%s/ws", testChainID),
expectedRpcResult: fmt.Sprintf("http://api.avax.network/ext/bc/%s/rpc", testChainID),
},
{
s: SourceSubnet{
Expand All @@ -167,7 +171,8 @@ func TestGetSourceSubnetWSEndpoint(t *testing.T) {
ChainID: testChainID,
SubnetID: primarySubnetID,
},
expectedResult: "ws://127.0.0.1:9650/ext/bc/C/ws",
expectedWsResult: "ws://127.0.0.1:9650/ext/bc/C/ws",
expectedRpcResult: "http://127.0.0.1:9650/ext/bc/C/rpc",
},
{
s: SourceSubnet{
Expand All @@ -176,15 +181,17 @@ func TestGetSourceSubnetWSEndpoint(t *testing.T) {
APINodePort: 9650,
ChainID: testChainID,
SubnetID: testSubnetID,
WSEndpoint: "wss://subnets.avax.network/mysubnet/ws", // overrides all other settings used to construct the endpoint
WSEndpoint: "wss://subnets.avax.network/mysubnet/ws", // overrides all other settings used to construct the endpoint
RPCEndpoint: "https://subnets.avax.network/mysubnet/rpc", // overrides all other settings used to construct the endpoint
},
expectedResult: "wss://subnets.avax.network/mysubnet/ws",
expectedWsResult: "wss://subnets.avax.network/mysubnet/ws",
expectedRpcResult: "https://subnets.avax.network/mysubnet/rpc",
},
}

for i, testCase := range testCases {
res := testCase.s.GetNodeWSEndpoint()
assert.Equal(t, testCase.expectedResult, res, fmt.Sprintf("test case %d failed", i))
assert.Equal(t, testCase.expectedWsResult, testCase.s.GetNodeWSEndpoint(), fmt.Sprintf("test case %d failed", i))
assert.Equal(t, testCase.expectedRpcResult, testCase.s.GetNodeRPCEndpoint(), fmt.Sprintf("test case %d failed", i))
}
}

Expand Down
1 change: 1 addition & 0 deletions config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ const (
DestinationSubnetsKey = "destination-subnets"
EncryptConnectionKey = "encrypt-connection"
AccountPrivateKeyKey = "account-private-key"
StorageLocationKey = "storage-location"
)
25 changes: 25 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (C) 2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package database

import (
"github.com/ava-labs/avalanchego/ids"
"github.com/pkg/errors"
)

const (
LatestProcessedBlockKey = "latestProcessedBlock"
)

var (
ErrKeyNotFound = errors.New("key not found")
ErrChainNotFound = errors.New("no database for chain")
ErrDatabaseMisconfiguration = errors.New("database misconfiguration")
)

// RelayerDatabase is a key-value store for relayer state, with each chainID maintaining its own state
type RelayerDatabase interface {
Get(chainID ids.ID, key []byte) ([]byte, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we return []byte instead of string directly? When we write in to the file, we convert []byte to string. Also, when we use the result, we convert []byte to string.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this offline, but for visibility, I chose []byte in the interface so that the keys and values could be as general as possible. The underlying JSON databse uses strings so that the resulting file is human readable.

Put(chainID ids.ID, key []byte, value []byte) error
}
Loading
Loading