Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if message already delivered #23

Merged
merged 13 commits into from
Sep 15, 2023
1 change: 1 addition & 0 deletions messages/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
// for each message protocol, and performs the sending to the destination chain.
type MessageManager interface {
// ShouldSendMessage returns true if the message should be sent to the destination chain
// If an error is returned, the boolean should be ignored by the caller.
ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) (bool, error)
// SendMessage sends the signed message to the destination chain. The payload parsed according to
// the VM rules is also passed in, since MessageManager does not assume any particular VM
Expand Down
18 changes: 18 additions & 0 deletions messages/teleporter/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/ava-labs/subnet-evm/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ava-labs/avalanchego/ids"
)

// TeleporterMessage contains the Teleporter message, including
Expand All @@ -34,6 +35,13 @@ type ReceiveCrossChainMessageInput struct {
RelayerRewardAddress common.Address `json:"relayerRewardAddress"`
}

// MessageReceivedInput is the input to the MessageReceived
// in the contract deployed on the receiving chain
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
type MessageReceivedInput struct {
OriginChainID ids.ID `json:"relayerRewardAddress"`
MessageID *big.Int `json:"messageID"`
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
}

// unpack Teleporter message bytes according to EVM ABI encoding rules
func unpackTeleporterMessage(messageBytes []byte) (*TeleporterMessage, error) {
args := abi.Arguments{
Expand All @@ -60,3 +68,13 @@ func unpackTeleporterMessage(messageBytes []byte) (*TeleporterMessage, error) {
func packReceiverMessage(inputStruct ReceiveCrossChainMessageInput) ([]byte, error) {
return EVMTeleporterContractABI.Pack("receiveCrossChainMessage", inputStruct.RelayerRewardAddress)
}

func packMessageReceivedMessage(inputStruct MessageReceivedInput) ([]byte, error) {
return EVMTeleporterContractABI.Pack("messageReceived", inputStruct.OriginChainID, inputStruct.MessageID)
}

func unpackMessageReceivedResult(result []byte) (bool, error) {
var success bool
err := EVMTeleporterContractABI.UnpackIntoInterface(&success, "messageReceived", result)
return success, err
}
72 changes: 70 additions & 2 deletions messages/teleporter/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package teleporter

import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/ava-labs/avalanchego/cache"
Expand All @@ -14,6 +16,8 @@ import (
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/awm-relayer/vms/vmtypes"
"github.com/ava-labs/subnet-evm/ethclient"
"github.com/ava-labs/subnet-evm/interfaces"
"github.com/ethereum/go-ethereum/common"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -70,6 +74,24 @@ func NewMessageManager(
}, nil
}

func isDestination(messageDestinationID ids.ID, allowedDestinationID ids.ID) bool {
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
return messageDestinationID == allowedDestinationID
}

func isAllowedRelayer(allowedRelayers []common.Address, eoa common.Address) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should relayers be destination specific?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the code level, this particular check is ensuring that the relayer EOA that will send the tx to the destination is included in the Teleporter message's list of allowed relayers. The function above checks that the EVM client that will send the tx is sending to the destination chain ID included in the Teleporter message.

At the architectural level, DestinationClient is the entity responsible for sending the tx to the correct destination chain. MessageRelayers should be composed of exactly one DestinationClient, so in that sense, relayers are destination specific.

// If no allowed relayer addresses were set, then anyone can relay it.
if len(allowedRelayers) == 0 {
return true
}

for _, addr := range allowedRelayers {
if addr == eoa {
return true
}
}
return false
}

// ShouldSendMessage returns true if the message should be sent to the destination chain
func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) (bool, error) {
// Unpack the teleporter message and add it to the cache
Expand All @@ -87,13 +109,59 @@ func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageI
if !ok {
return false, fmt.Errorf("relayer not configured to deliver to destination. destinationChainID=%s", destinationChainID.String())
}
if !destinationClient.Allowed(destinationChainID, teleporterMessage.AllowedRelayerAddresses) {
senderAddress := destinationClient.SenderAddress()
if !isAllowedRelayer(teleporterMessage.AllowedRelayerAddresses, senderAddress) {
m.logger.Info("Relayer EOA not allowed to deliver this message.")
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
return false, nil
}
if !isDestination(destinationChainID, destinationClient.DestinationChainID()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check seems redundant given that we look up the destination client by the chain ID already. If we choose to keep it, could we move this to right after the destinationClient is looked up from the destinationClients map?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I've removed it altogether, but left the corresponding method in DestinationClient, as it makes sense as a getter for that interface.

m.logger.Info(
"Relayer not allowed to deliver to chain.",
"Destination chain ID for message not supported by relayer.",
zap.String("destinationChainID", destinationChainID.String()),
)
return false, nil
}

// Check if the message has already been delivered to the destination chain
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
client, ok := destinationClient.Client().(ethclient.Client)
if !ok {
m.logger.Info("Destination client is not an Ethereum client.")
return false, errors.New("destination client is not an Ethereum client")
}

data, err := packMessageReceivedMessage(MessageReceivedInput{
OriginChainID: warpMessageInfo.WarpUnsignedMessage.SourceChainID,
MessageID: teleporterMessage.MessageID,
})
if err != nil {
m.logger.Info("Failed packing messageReceived call data.")
return false, err
}
protocolAddress := common.BytesToAddress(m.protocolAddress[:])
callMessage := interfaces.CallMsg{
gwen917 marked this conversation as resolved.
Show resolved Hide resolved
To: &protocolAddress,
From: senderAddress,
Data: data,
}
result, err := client.CallContract(context.Background(), callMessage, nil)
if err != nil {
m.logger.Info("Failed calling messageReceived method on destination chain.")
return false, err
}
// check the contract call result
received, err := unpackMessageReceivedResult(result)
if err != nil {
return false, err
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
}
if received {
m.logger.Info(
"Message already deliverd to destination.",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("teleporterMessageID", teleporterMessage.MessageID.String()),
)
return false, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the caller should look at the err to distinguished between the message has been received and error conditions. Do the callers do that currently? How should the caller handle errors?

Copy link
Collaborator Author

@cam-schultz cam-schultz Sep 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to ShouldSendMessage is valid regardless of the value of the boolean, as long as the error is nil. We perform that check here: https://github.com/ava-labs/awm-relayer/blob/2060ab680cbd1a2501a87ac3ec01d6cd9da79f37/relayer/message_relayer.go#L80-L95

I added a comment to the MessageManager interface noting that the boolean should be ignored if a error is returned.

}

// Cache the message so it can be reused in SendMessage
m.teleporterMessageCache.Put(warpMessageInfo.WarpUnsignedMessage.ID(), teleporterMessage)
return true, nil
Expand Down
10 changes: 8 additions & 2 deletions vms/destination_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ type DestinationClient interface {
// TODO: Make generic for any VM.
SendTx(signedMessage *warp.Message, toAddress string, gasLimit uint64, callData []byte) error

// Allowed checks if the relayer is allowed to relay the message according to the VM rules and the message metadata
Allowed(chainID ids.ID, allowedRelayers []common.Address) bool
// Client returns the underlying client for the destination chain
Client() interface{}

// SenderAddress returns the address of the relayer on the destination chain
SenderAddress() common.Address

// DestinationChainID returns the ID of the destination chain
DestinationChainID() ids.ID
}

func NewDestinationClient(logger logging.Logger, subnetInfo config.DestinationSubnet) (DestinationClient, error) {
Expand Down
64 changes: 22 additions & 42 deletions vms/evm/destination_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,28 @@ func NewDestinationClient(logger logging.Logger, subnetInfo config.DestinationSu
}, nil
}

func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message,
func (c *destinationClient) SendTx(signedMessage *avalancheWarp.Message,
toAddress string,
gasLimit uint64,
callData []byte) error {
// Synchronize teleporter message requests to the same destination chain so that message ordering is preserved
tdc.lock.Lock()
defer tdc.lock.Unlock()
c.lock.Lock()
defer c.lock.Unlock()
// We need the global 32-byte representation of the destination chain ID, as well as the destination's configured chainID
// Without the destination's configured chainID, transaction signature verification will fail
destinationChainIDBigInt, err := tdc.client.ChainID(context.Background())
destinationChainIDBigInt, err := c.client.ChainID(context.Background())
if err != nil {
tdc.logger.Error(
c.logger.Error(
"Failed to get chain ID from destination chain endpoint",
zap.Error(err),
)
return err
}

// Get the current base fee estimation, which is based on the previous blocks gas usage.
baseFee, err := tdc.client.EstimateBaseFee(context.Background())
baseFee, err := c.client.EstimateBaseFee(context.Background())
if err != nil {
tdc.logger.Error(
c.logger.Error(
"Failed to get base fee",
zap.Error(err),
)
Expand All @@ -125,9 +125,9 @@ func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message,

// Get the suggested gas tip cap of the network
// TODO: Add a configurable ceiling to this value
gasTipCap, err := tdc.client.SuggestGasTipCap(context.Background())
gasTipCap, err := c.client.SuggestGasTipCap(context.Background())
if err != nil {
tdc.logger.Error(
c.logger.Error(
"Failed to get gas tip cap",
zap.Error(err),
)
Expand All @@ -146,7 +146,7 @@ func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message,
// Construct the actual transaction to broadcast on the destination chain
tx := types.NewTx(&types.DynamicFeeTx{
ChainID: destinationChainIDBigInt,
Nonce: tdc.currentNonce,
Nonce: c.currentNonce,
To: &to,
Gas: gasLimit,
GasFeeCap: gasFeeCap,
Expand All @@ -163,17 +163,17 @@ func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message,

// Sign and send the transaction on the destination chain
signer := types.LatestSignerForChainID(destinationChainIDBigInt)
signedTx, err := types.SignTx(tx, signer, tdc.pk)
signedTx, err := types.SignTx(tx, signer, c.pk)
if err != nil {
tdc.logger.Error(
c.logger.Error(
"Failed to sign transaction",
zap.Error(err),
)
return err
}

if err := tdc.client.SendTransaction(context.Background(), signedTx); err != nil {
tdc.logger.Error(
if err := c.client.SendTransaction(context.Background(), signedTx); err != nil {
c.logger.Error(
"Failed to send transaction",
zap.Error(err),
)
Expand All @@ -182,43 +182,23 @@ func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message,

// Increment the nonce to use on the destination chain now that we've sent
// a transaction using the current value.
tdc.currentNonce++
tdc.logger.Info(
c.currentNonce++
c.logger.Info(
"Sent transaction",
zap.String("txID", signedTx.Hash().String()),
)

return nil
}

func (tdc *destinationClient) isDestination(chainID ids.ID) bool {
if chainID != tdc.destinationChainID {
tdc.logger.Info(
"Destination chain ID for message not supported by relayer.",
zap.String("chainID", chainID.String()),
)
return false
}
return true
func (c *destinationClient) Client() interface{} {
return c.client
}

func (tdc *destinationClient) isAllowedRelayer(allowedRelayers []common.Address) bool {
// If no allowed relayer addresses were set, then anyone can relay it.
if len(allowedRelayers) == 0 {
return true
}

for _, addr := range allowedRelayers {
if addr == tdc.eoa {
return true
}
}

tdc.logger.Info("Relayer EOA not allowed to deliver this message.")
return false
func (c *destinationClient) SenderAddress() common.Address {
return c.eoa
}

func (tdc *destinationClient) Allowed(chainID ids.ID, allowedRelayers []common.Address) bool {
return tdc.isDestination(chainID) &&
tdc.isAllowedRelayer(allowedRelayers)
func (c *destinationClient) DestinationChainID() ids.ID {
return c.destinationChainID
}
Loading