Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process missed blocks on startup #20

Merged
merged 52 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
c74ef48
add json database
cam-schultz Aug 29, 2023
1280b4a
initialize subscriber from stored block height
cam-schultz Aug 29, 2023
551b90e
subscriber initialization failure is non fatal
cam-schultz Aug 29, 2023
5ca0bd0
Merge branch 'main' into catch-up-blocks
cam-schultz Aug 29, 2023
ba3f2fb
add storage location cfg option
cam-schultz Aug 29, 2023
39fdc21
better read error handling
cam-schultz Aug 30, 2023
a1aab00
add comments
cam-schultz Aug 30, 2023
6913fb9
properly set default storage location
cam-schultz Aug 30, 2023
505b337
fix update json db vals bug
cam-schultz Aug 30, 2023
de7a347
add initialization log message
cam-schultz Aug 30, 2023
349ac1e
typo
cam-schultz Aug 30, 2023
162ca0d
validate rpc endpoint in config
cam-schultz Aug 30, 2023
7d07d82
create subscription before processing back events
cam-schultz Aug 30, 2023
9a0baa0
rename type
cam-schultz Aug 30, 2023
dc7ae3c
rename latestSeenBlock
cam-schultz Aug 30, 2023
860f595
json storage unit tests
cam-schultz Aug 30, 2023
aa8dd8b
correct comment
cam-schultz Aug 30, 2023
ac5d57d
better error messages
cam-schultz Aug 30, 2023
9a51727
more unit tests
cam-schultz Aug 31, 2023
16af742
error on invalid log
cam-schultz Sep 1, 2023
2bb9605
save constructed urls
cam-schultz Sep 1, 2023
1727f90
make json db writes atomic
cam-schultz Sep 1, 2023
e49395b
add license header
cam-schultz Sep 1, 2023
ebdfc79
properly handle json file missing case
cam-schultz Sep 5, 2023
532094c
cleanup
cam-schultz Sep 5, 2023
f4d2c83
write latest block to db on initialization
cam-schultz Sep 5, 2023
17879c0
Merge branch 'main' into catch-up-blocks
cam-schultz Sep 5, 2023
4e5419d
consolidate endpoint methods
cam-schultz Sep 5, 2023
cf9a0e7
clean up comments
cam-schultz Sep 5, 2023
9fdd275
rename var
cam-schultz Sep 5, 2023
32146a4
cap the number of blocks to process on startup
cam-schultz Sep 5, 2023
ce66b82
cleanup
cam-schultz Sep 5, 2023
e012c4e
refactor subscriber interface
cam-schultz Sep 5, 2023
44384ca
fix test
cam-schultz Sep 5, 2023
e9c0b01
use Int:Sub
cam-schultz Sep 6, 2023
878d4c6
process logs in serial for each source chain
cam-schultz Sep 6, 2023
85cf76c
return nil error in success case
cam-schultz Sep 6, 2023
813a8e9
remove irrelevant comment
cam-schultz Sep 6, 2023
fde1675
improve logs
cam-schultz Sep 7, 2023
c4e36bd
sort logs from eth_getLogs
cam-schultz Sep 7, 2023
b8902c5
fix logs
cam-schultz Sep 7, 2023
9bda673
sort by log index
cam-schultz Sep 11, 2023
48ec689
cleanup
cam-schultz Sep 11, 2023
4ed7671
cache current state
cam-schultz Sep 11, 2023
4cdc487
clarify latestSeenBlock comments
cam-schultz Sep 11, 2023
55bc03c
clarify latestSeenBlock comments
cam-schultz Sep 11, 2023
232f6fe
fix test
cam-schultz Sep 11, 2023
5b0cc63
rename latestProcessedBlock
cam-schultz Sep 12, 2023
995d204
rename SetProcessedBlockHeightToLatest
cam-schultz Sep 13, 2023
e30a4ac
update comment
cam-schultz Sep 13, 2023
1e9891d
Merge branch 'main' into catch-up-blocks
cam-schultz Sep 13, 2023
8870d8b
appease linter
cam-schultz Sep 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type SourceSubnet struct {
APINodeHost string `mapstructure:"api-node-host" json:"api-node-host"`
APINodePort uint32 `mapstructure:"api-node-port" json:"api-node-port"`
EncryptConnection bool `mapstructure:"encrypt-connection" json:"encrypt-connection"`
RPCEndpoint string `mapstructure:"rpc-endpoint" json:"rpc-endpoint"`
WSEndpoint string `mapstructure:"ws-endpoint" json:"ws-endpoint"`
MessageContracts map[string]MessageProtocolConfig `mapstructure:"message-contracts" json:"message-contracts"`
}
Expand All @@ -62,6 +63,7 @@ type Config struct {
NetworkID uint32 `mapstructure:"network-id" json:"network-id"`
PChainAPIURL string `mapstructure:"p-chain-api-url" json:"p-chain-api-url"`
EncryptConnection bool `mapstructure:"encrypt-connection" json:"encrypt-connection"`
StorageLocation string `mapstructure:"storage-location" json:"storage-location"`
SourceSubnets []SourceSubnet `mapstructure:"source-subnets" json:"source-subnets"`
DestinationSubnets []DestinationSubnet `mapstructure:"destination-subnets" json:"destination-subnets"`
}
Expand All @@ -71,6 +73,7 @@ func SetDefaultConfigValues(v *viper.Viper) {
v.SetDefault(NetworkIDKey, constants.MainnetID)
v.SetDefault(PChainAPIURLKey, "https://api.avax.network")
v.SetDefault(EncryptConnectionKey, true)
v.SetDefault(StorageLocationKey, "./.awm-relayer-storage")
}

// BuildConfig constructs the relayer config using Viper.
Expand Down Expand Up @@ -99,6 +102,7 @@ func BuildConfig(v *viper.Viper) (Config, bool, error) {
cfg.NetworkID = v.GetUint32(NetworkIDKey)
cfg.PChainAPIURL = v.GetString(PChainAPIURLKey)
cfg.EncryptConnection = v.GetBool(EncryptConnectionKey)
cfg.StorageLocation = v.GetString(StorageLocationKey)
if err := v.UnmarshalKey(DestinationSubnetsKey, &cfg.DestinationSubnets); err != nil {
return Config{}, false, fmt.Errorf("failed to unmarshal destination subnets: %v", err)
}
Expand Down Expand Up @@ -215,6 +219,9 @@ func (s *SourceSubnet) Validate() error {
if _, err := url.ParseRequestURI(s.GetNodeWSEndpoint()); err != nil {
return fmt.Errorf("invalid relayer subscribe URL in source subnet configuration: %v", err)
}
if _, err := url.ParseRequestURI(s.GetNodeRPCEndpoint()); err != nil {
return fmt.Errorf("invalid relayer RPC URL in source subnet configuration: %v", err)
}

// Validate the VM specific settings
switch ParseVM(s.VM) {
Expand Down Expand Up @@ -295,6 +302,23 @@ func (s *DestinationSubnet) GetNodeRPCEndpoint() string {
return fmt.Sprintf("%s/ext/bc/%s/rpc", baseUrl, chainID)
}

// Constructs an RPC endpoint for the subnet.
// If the RPCEndpoint field is set in the configuration, returns that directly.
// Otherwise, constructs the endpoint from the APINodeHost, APINodePort, and EncryptConnection fields,
// following the /ext/bc/{chainID}/rpc format.
func (s *SourceSubnet) GetNodeRPCEndpoint() string {
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
if s.RPCEndpoint != "" {
return s.RPCEndpoint
}
baseUrl := constructURL("http", s.APINodeHost, s.APINodePort, s.EncryptConnection)
chainID := s.ChainID
subnetID, _ := ids.FromString(s.SubnetID) // already validated in Validate()
if subnetID == constants.PrimaryNetworkID {
chainID = cChainIdentifierString
}
return fmt.Sprintf("%s/ext/bc/%s/rpc", baseUrl, chainID)
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
}

// Constructs a WS endpoint for the subnet.
// If the WSEndpoint field is set in the configuration, returns that directly.
// Otherwise, constructs the endpoint from the APINodeHost, APINodePort, and EncryptConnection fields,
Expand Down
29 changes: 18 additions & 11 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,11 @@ func TestGetDestinationRPCEndpoint(t *testing.T) {
}
}

func TestGetSourceSubnetWSEndpoint(t *testing.T) {
func TestGetSourceSubnetEndpoints(t *testing.T) {
testCases := []struct {
s SourceSubnet
expectedResult string
s SourceSubnet
expectedWsResult string
expectedRpcResult string
}{
{
s: SourceSubnet{
Expand All @@ -137,7 +138,8 @@ func TestGetSourceSubnetWSEndpoint(t *testing.T) {
ChainID: testChainID,
SubnetID: testSubnetID,
},
expectedResult: fmt.Sprintf("ws://127.0.0.1:9650/ext/bc/%s/ws", testChainID),
expectedWsResult: fmt.Sprintf("ws://127.0.0.1:9650/ext/bc/%s/ws", testChainID),
expectedRpcResult: fmt.Sprintf("http://127.0.0.1:9650/ext/bc/%s/rpc", testChainID),
},
{
s: SourceSubnet{
Expand All @@ -147,7 +149,8 @@ func TestGetSourceSubnetWSEndpoint(t *testing.T) {
ChainID: testChainID,
SubnetID: testSubnetID,
},
expectedResult: fmt.Sprintf("wss://127.0.0.1:9650/ext/bc/%s/ws", testChainID),
expectedWsResult: fmt.Sprintf("wss://127.0.0.1:9650/ext/bc/%s/ws", testChainID),
expectedRpcResult: fmt.Sprintf("https://127.0.0.1:9650/ext/bc/%s/rpc", testChainID),
},
{
s: SourceSubnet{
Expand All @@ -157,7 +160,8 @@ func TestGetSourceSubnetWSEndpoint(t *testing.T) {
ChainID: testChainID,
SubnetID: testSubnetID,
},
expectedResult: fmt.Sprintf("ws://api.avax.network/ext/bc/%s/ws", testChainID),
expectedWsResult: fmt.Sprintf("ws://api.avax.network/ext/bc/%s/ws", testChainID),
expectedRpcResult: fmt.Sprintf("http://api.avax.network/ext/bc/%s/rpc", testChainID),
},
{
s: SourceSubnet{
Expand All @@ -167,7 +171,8 @@ func TestGetSourceSubnetWSEndpoint(t *testing.T) {
ChainID: testChainID,
SubnetID: primarySubnetID,
},
expectedResult: "ws://127.0.0.1:9650/ext/bc/C/ws",
expectedWsResult: "ws://127.0.0.1:9650/ext/bc/C/ws",
expectedRpcResult: "http://127.0.0.1:9650/ext/bc/C/rpc",
},
{
s: SourceSubnet{
Expand All @@ -176,15 +181,17 @@ func TestGetSourceSubnetWSEndpoint(t *testing.T) {
APINodePort: 9650,
ChainID: testChainID,
SubnetID: testSubnetID,
WSEndpoint: "wss://subnets.avax.network/mysubnet/ws", // overrides all other settings used to construct the endpoint
WSEndpoint: "wss://subnets.avax.network/mysubnet/ws", // overrides all other settings used to construct the endpoint
RPCEndpoint: "https://subnets.avax.network/mysubnet/rpc", // overrides all other settings used to construct the endpoint
},
expectedResult: "wss://subnets.avax.network/mysubnet/ws",
expectedWsResult: "wss://subnets.avax.network/mysubnet/ws",
expectedRpcResult: "https://subnets.avax.network/mysubnet/rpc",
},
}

for i, testCase := range testCases {
res := testCase.s.GetNodeWSEndpoint()
assert.Equal(t, testCase.expectedResult, res, fmt.Sprintf("test case %d failed", i))
assert.Equal(t, testCase.expectedWsResult, testCase.s.GetNodeWSEndpoint(), fmt.Sprintf("test case %d failed", i))
assert.Equal(t, testCase.expectedRpcResult, testCase.s.GetNodeRPCEndpoint(), fmt.Sprintf("test case %d failed", i))
}
}

Expand Down
1 change: 1 addition & 0 deletions config/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ const (
DestinationSubnetsKey = "destination-subnets"
EncryptConnectionKey = "encrypt-connection"
AccountPrivateKeyKey = "account-private-key"
StorageLocationKey = "storage-location"
)
13 changes: 13 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package database

import "github.com/ava-labs/avalanchego/ids"

const (
LatestSeenBlockKey = "latestSeenBlock"
)

// RelayerDatabase is a key-value store for relayer state, with each chainID maintaining its own state
type RelayerDatabase interface {
Get(chainID ids.ID, key []byte) ([]byte, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we return []byte instead of string directly? When we write in to the file, we convert []byte to string. Also, when we use the result, we convert []byte to string.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this offline, but for visibility, I chose []byte in the interface so that the keys and values could be as general as possible. The underlying JSON databse uses strings so that the resulting file is human readable.

Put(chainID ids.ID, key []byte, value []byte) error
}
172 changes: 172 additions & 0 deletions database/json_file_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package database

import (
"encoding/json"
"os"
"path/filepath"
"sync"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/pkg/errors"
"go.uber.org/zap"
)

var _ RelayerDatabase = &JSONFileStorage{}

var (
fileDoesNotExistErr = errors.New("JSON database file does not exist")
)

type chainState map[string]string

// JSONFileStorage implements RelayerDatabase
type JSONFileStorage struct {
// the directory where the json files are stored
dir string

// Each network has its own mutex
// The chainIDs used to index the JSONFileStorage are created at initialization
// and are not modified afterwards, so we don't need to lock the map itself.
mutexes map[ids.ID]*sync.RWMutex
logger logging.Logger
}

// NewJSONFileStorage creates a new JSONFileStorage instance
func NewJSONFileStorage(logger logging.Logger, dir string, networks []ids.ID) (*JSONFileStorage, error) {
storage := &JSONFileStorage{
dir: filepath.Clean(dir),
mutexes: make(map[ids.ID]*sync.RWMutex),
logger: logger,
}

for _, network := range networks {
storage.mutexes[network] = &sync.RWMutex{}
}

_, err := os.Stat(dir)
if err == nil {
// dir already exist
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
// return the existing storage
return storage, nil
}

// 0755: The owner can read, write, execute.
// Everyone else can read and execute but not modify the file.
err = os.MkdirAll(dir, 0755)
if err != nil {
storage.logger.Error("failed to create dir",
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
zap.Error(err))
return nil, err
}

return storage, nil
}

// Get the latest chain state from the json database, and retrieve the value from the key
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
func (s *JSONFileStorage) Get(chainID ids.ID, key []byte) ([]byte, error) {
currentState := make(chainState)
fileExists, err := s.read(chainID, &currentState)
if err != nil {
s.logger.Error(
"failed to read file",
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
zap.Error(err),
)
return nil, err
}
if !fileExists {
return nil, fileDoesNotExistErr
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
}

return []byte(currentState[string(key)]), nil
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
}

// Put the value into the json database. Read the current chain state and overwrite the key, if it exists
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
func (s *JSONFileStorage) Put(chainID ids.ID, key []byte, value []byte) error {
currentState := make(chainState)
_, err := s.read(chainID, &currentState)
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit counter intuitive that Put would first involve reading the whole DB file from disk. Should we consider adding an in-memory cache/copy of the current values such that we don't need to read before writing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I added a field for the current chain state, which Put writes to before writing the whole object to disk. We can of course use the cached state in Get as well, but I think we should move that to its own issue.

if err != nil {
s.logger.Error(
"failed to read file",
zap.Error(err),
)
return err
}

currentState[string(key)] = string(value)

return s.write(chainID, currentState)
Copy link
Contributor

@bernard-avalabs bernard-avalabs Sep 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is a problem in our use case, but by releasing the lock in between the read and write, concurrent Puts can result in "lost updates" where one or more writes are "lost" (essentially overwritten by a previous snapshot).

If this can be a problem, then we should grab the lock before the read and release it after the write. We can do that by doing one of the following: move the locking out of read and write (and perform them in the functions that call them), use recursive locks (I don't think they are supported by default in golang), or add a flag to read/write to tell them that a lock is not required for this call.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our use case, only under extreme circumstances would there be concurrent writes to the same JSON file in such a way that could cause issues. In order for that to happen, we'd have to have logs be processed in such a way that subsequent blocks would overlap in the DB write. Even if that were to occur, the worst that would happen would be a log being processed multiple times, which wouldn't be an issue for correctness. #23 would guard against this as well.

That all being said, having the writes be atomic is a good idea, that eliminates this class of bugs altogether, so I've moved the locking behavior into Put/Get as you suggested.

}

// write value into the file
func (s *JSONFileStorage) write(network ids.ID, v interface{}) error {
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
mutex, ok := s.mutexes[network]
if !ok {
return errors.New("network does not exist")
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
}

mutex.Lock()
defer mutex.Unlock()

fnlPath := filepath.Join(s.dir, network.String()+".json")
tmpPath := fnlPath + ".tmp"

b, err := json.MarshalIndent(v, "", "\t")
if err != nil {
return err
}

// write marshaled data to the temp file
// if write failed, the original file is not affected
// 0644 Only the owner can read and write.
// Everyone else can only read. No one can execute the file.
if err := os.WriteFile(tmpPath, b, 0644); err != nil {
return errors.Wrap(err, "failed to write file")
}

// move final file into place
if err := os.Rename(tmpPath, fnlPath); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe for a future PR: If we want to keep a few old versions of the file, we can implement "log rotation" by renaming the existing file to a backup name (same name but with an added suffix) before overwriting it. Probably not necessary for our use case.

return errors.Wrap(err, "failed to rename file")
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We may want to consider to performing a sync on the file if we want to be sure it is on disk (and not just in the writeback buffer of the file cache). Not sure if it is necessary in our use case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't aware of this until you pointed it out, but Go Files do not perform buffered writes, so as soon as File.Write is called (as is the case internally in all of the os function calls here), the changes are written to disk. See here for a discussion on this: https://stackoverflow.com/a/10890222

Copy link
Contributor

@bernard-avalabs bernard-avalabs Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect golang's File object is a light wrapper around a file descriptor (created using an open syscall). Writes to it are not buffered at the application level (unlike files opened with fopen) so they do not need to be flushed, but they are still written to the file system's buffercache first and later written back to disk.

An explicit sync is not necessary if the system does not abruptly power down. However, if there is a power outage, data in the buffercache that hasn't been written to disk can be lost. Most applications don't bother with calling sync as it hurts performance. However, databases often do, and we are building a type of database here.

With that said, a sync here might not be necessary if we assume that the filesystem will writeback the inode update (for the rename) after the writes to the file. This is probably a good assumption for most filesystems.

return nil
}

// Read from disk and unmarshal into v
// Returns a bool indicating whether the file exists, and an error.
// If an error is returned, the bool should be ignored.
func (s *JSONFileStorage) read(network ids.ID, v interface{}) (bool, error) {
mutex, ok := s.mutexes[network]
if !ok {
return false, errors.New("network does not exist")
}

mutex.RLock()
defer mutex.RUnlock()

path := filepath.Join(s.dir, network.String()+".json")

// If the file does not exist, return false, but do not return an error as this
// is an expected case
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would os.ErrNotExist catch? In the comments and it mentions returning a *PathError

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and how should we handle if an error is returned but it's not os.ErrNoExist? Right now we continue to read file

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

os.Stat returns a *PathError that wraps a os.ErrNotExist if the file is not found. See here for an example: https://go.dev/play/p/qFwPDKFqSZS

s.logger.Debug(
"file does not exist",
zap.String("path", path),
zap.Error(err),
)
return false, nil
}

b, err := os.ReadFile(path)
if err != nil {
return false, errors.Wrap(err, "failed to read file")
}

// unmarshal data
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
if err = json.Unmarshal(b, &v); err != nil {
return false, errors.Wrap(err, "failed to unmarshal json file")
}

// Return true to indicate that the file exists and we read from it successfully
return true, nil
}
Loading
Loading