Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if message already delivered #23

Merged
merged 13 commits into from
Sep 15, 2023
1 change: 1 addition & 0 deletions messages/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
// for each message protocol, and performs the sending to the destination chain.
type MessageManager interface {
// ShouldSendMessage returns true if the message should be sent to the destination chain
// If an error is returned, the boolean should be ignored by the caller.
ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) (bool, error)
// SendMessage sends the signed message to the destination chain. The payload parsed according to
// the VM rules is also passed in, since MessageManager does not assume any particular VM
Expand Down
18 changes: 18 additions & 0 deletions messages/teleporter/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/big"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/subnet-evm/accounts/abi"
"github.com/ethereum/go-ethereum/common"
)
Expand Down Expand Up @@ -34,6 +35,13 @@ type ReceiveCrossChainMessageInput struct {
RelayerRewardAddress common.Address `json:"relayerRewardAddress"`
}

// MessageReceivedInput is the input to the MessageReceived
// in the contract deployed on the receiving chain
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
type MessageReceivedInput struct {
OriginChainID ids.ID `json:"relayerRewardAddress"`
MessageID *big.Int `json:"messageID"`
}

// unpack Teleporter message bytes according to EVM ABI encoding rules
func unpackTeleporterMessage(messageBytes []byte) (*TeleporterMessage, error) {
args := abi.Arguments{
Expand All @@ -60,3 +68,13 @@ func unpackTeleporterMessage(messageBytes []byte) (*TeleporterMessage, error) {
func packReceiverMessage(inputStruct ReceiveCrossChainMessageInput) ([]byte, error) {
return EVMTeleporterContractABI.Pack("receiveCrossChainMessage", inputStruct.RelayerRewardAddress)
}

func packMessageReceivedMessage(inputStruct MessageReceivedInput) ([]byte, error) {
return EVMTeleporterContractABI.Pack("messageReceived", inputStruct.OriginChainID, inputStruct.MessageID)
}

func unpackMessageReceivedResult(result []byte) (bool, error) {
var success bool
err := EVMTeleporterContractABI.UnpackIntoInterface(&success, "messageReceived", result)
return success, err
}
103 changes: 101 additions & 2 deletions messages/teleporter/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
package teleporter

import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/ava-labs/avalanchego/cache"
Expand All @@ -14,6 +16,8 @@ import (
"github.com/ava-labs/awm-relayer/config"
"github.com/ava-labs/awm-relayer/vms"
"github.com/ava-labs/awm-relayer/vms/vmtypes"
"github.com/ava-labs/subnet-evm/ethclient"
"github.com/ava-labs/subnet-evm/interfaces"
"github.com/ethereum/go-ethereum/common"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -70,6 +74,24 @@ func NewMessageManager(
}, nil
}

func isDestination(messageDestinationID ids.ID, allowedDestinationID ids.ID) bool {
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
return messageDestinationID == allowedDestinationID
}

func isAllowedRelayer(allowedRelayers []common.Address, eoa common.Address) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should relayers be destination specific?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the code level, this particular check is ensuring that the relayer EOA that will send the tx to the destination is included in the Teleporter message's list of allowed relayers. The function above checks that the EVM client that will send the tx is sending to the destination chain ID included in the Teleporter message.

At the architectural level, DestinationClient is the entity responsible for sending the tx to the correct destination chain. MessageRelayers should be composed of exactly one DestinationClient, so in that sense, relayers are destination specific.

// If no allowed relayer addresses were set, then anyone can relay it.
if len(allowedRelayers) == 0 {
return true
}

for _, addr := range allowedRelayers {
if addr == eoa {
return true
}
}
return false
}

// ShouldSendMessage returns true if the message should be sent to the destination chain
func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageInfo, destinationChainID ids.ID) (bool, error) {
// Unpack the teleporter message and add it to the cache
Expand All @@ -87,18 +109,95 @@ func (m *messageManager) ShouldSendMessage(warpMessageInfo *vmtypes.WarpMessageI
if !ok {
return false, fmt.Errorf("relayer not configured to deliver to destination. destinationChainID=%s", destinationChainID.String())
}
if !destinationClient.Allowed(destinationChainID, teleporterMessage.AllowedRelayerAddresses) {
senderAddress := destinationClient.SenderAddress()
if !isAllowedRelayer(teleporterMessage.AllowedRelayerAddresses, senderAddress) {
m.logger.Info(
"Relayer EOA not allowed to deliver this message.",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("teleporterMessageID", teleporterMessage.MessageID.String()),
)
return false, nil
}

delivered, err := m.messageDelivered(
destinationClient,
warpMessageInfo,
teleporterMessage,
senderAddress,
destinationChainID,
)
if err != nil {
m.logger.Error(
"Failed to check if message has been delivered to destination chain.",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("teleporterMessageID", teleporterMessage.MessageID.String()),
zap.Error(err),
)
return false, err
}
if delivered {
m.logger.Info(
"Relayer not allowed to deliver to chain.",
"Message already delivered to destination.",
zap.String("destinationChainID", destinationChainID.String()),
zap.String("teleporterMessageID", teleporterMessage.MessageID.String()),
)
return false, nil
}

// Cache the message so it can be reused in SendMessage
m.teleporterMessageCache.Put(warpMessageInfo.WarpUnsignedMessage.ID(), teleporterMessage)
return true, nil
}

// Helper to check if a message has been delivered to the destination chain
// Returns true if the message has been delivered, false if not
// On error, the boolean result should be ignored
func (m *messageManager) messageDelivered(
destinationClient vms.DestinationClient,
warpMessageInfo *vmtypes.WarpMessageInfo,
teleporterMessage *TeleporterMessage,
senderAddress common.Address,
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
destinationChainID ids.ID) (bool, error) {
// Check if the message has already been delivered to the destination chain
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
client, ok := destinationClient.Client().(ethclient.Client)
if !ok {
m.logger.Error("Destination client is not an Ethereum client.")
return false, errors.New("destination client is not an Ethereum client")
}

data, err := packMessageReceivedMessage(MessageReceivedInput{
OriginChainID: warpMessageInfo.WarpUnsignedMessage.SourceChainID,
MessageID: teleporterMessage.MessageID,
})
if err != nil {
m.logger.Error("Failed packing messageReceived call data.")
return false, err
}
protocolAddress := common.BytesToAddress(m.protocolAddress[:])
callMessage := interfaces.CallMsg{
gwen917 marked this conversation as resolved.
Show resolved Hide resolved
To: &protocolAddress,
From: senderAddress,
Data: data,
}
result, err := client.CallContract(context.Background(), callMessage, nil)
if err != nil {
m.logger.Error(
"Failed calling messageReceived method on destination chain.",
zap.String("destinationChainID", destinationChainID.String()),
)
return false, err
}
// check the contract call result
delivered, err := unpackMessageReceivedResult(result)
if err != nil {
return false, err
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
}
if delivered {
return true, nil
}
return false, nil
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
}

// SendMessage extracts the gasLimit and packs the call data to call the receiveCrossChainMessage method of the Teleporter contract,
// and dispatches transaction construction and broadcast to the destination client
func (m *messageManager) SendMessage(signedMessage *warp.Message, parsedVmPayload []byte, destinationChainID ids.ID) error {
cam-schultz marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
10 changes: 8 additions & 2 deletions vms/destination_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ type DestinationClient interface {
// TODO: Make generic for any VM.
SendTx(signedMessage *warp.Message, toAddress string, gasLimit uint64, callData []byte) error

// Allowed checks if the relayer is allowed to relay the message according to the VM rules and the message metadata
Allowed(chainID ids.ID, allowedRelayers []common.Address) bool
// Client returns the underlying client for the destination chain
Client() interface{}

// SenderAddress returns the address of the relayer on the destination chain
SenderAddress() common.Address

// DestinationChainID returns the ID of the destination chain
DestinationChainID() ids.ID
}

func NewDestinationClient(logger logging.Logger, subnetInfo config.DestinationSubnet) (DestinationClient, error) {
Expand Down
64 changes: 22 additions & 42 deletions vms/evm/destination_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,28 @@ func NewDestinationClient(logger logging.Logger, subnetInfo config.DestinationSu
}, nil
}

func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message,
func (c *destinationClient) SendTx(signedMessage *avalancheWarp.Message,
toAddress string,
gasLimit uint64,
callData []byte) error {
// Synchronize teleporter message requests to the same destination chain so that message ordering is preserved
tdc.lock.Lock()
defer tdc.lock.Unlock()
c.lock.Lock()
defer c.lock.Unlock()
// We need the global 32-byte representation of the destination chain ID, as well as the destination's configured chainID
// Without the destination's configured chainID, transaction signature verification will fail
destinationChainIDBigInt, err := tdc.client.ChainID(context.Background())
destinationChainIDBigInt, err := c.client.ChainID(context.Background())
if err != nil {
tdc.logger.Error(
c.logger.Error(
"Failed to get chain ID from destination chain endpoint",
zap.Error(err),
)
return err
}

// Get the current base fee estimation, which is based on the previous blocks gas usage.
baseFee, err := tdc.client.EstimateBaseFee(context.Background())
baseFee, err := c.client.EstimateBaseFee(context.Background())
if err != nil {
tdc.logger.Error(
c.logger.Error(
"Failed to get base fee",
zap.Error(err),
)
Expand All @@ -125,9 +125,9 @@ func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message,

// Get the suggested gas tip cap of the network
// TODO: Add a configurable ceiling to this value
gasTipCap, err := tdc.client.SuggestGasTipCap(context.Background())
gasTipCap, err := c.client.SuggestGasTipCap(context.Background())
if err != nil {
tdc.logger.Error(
c.logger.Error(
"Failed to get gas tip cap",
zap.Error(err),
)
Expand All @@ -146,7 +146,7 @@ func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message,
// Construct the actual transaction to broadcast on the destination chain
tx := types.NewTx(&types.DynamicFeeTx{
ChainID: destinationChainIDBigInt,
Nonce: tdc.currentNonce,
Nonce: c.currentNonce,
To: &to,
Gas: gasLimit,
GasFeeCap: gasFeeCap,
Expand All @@ -163,17 +163,17 @@ func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message,

// Sign and send the transaction on the destination chain
signer := types.LatestSignerForChainID(destinationChainIDBigInt)
signedTx, err := types.SignTx(tx, signer, tdc.pk)
signedTx, err := types.SignTx(tx, signer, c.pk)
if err != nil {
tdc.logger.Error(
c.logger.Error(
"Failed to sign transaction",
zap.Error(err),
)
return err
}

if err := tdc.client.SendTransaction(context.Background(), signedTx); err != nil {
tdc.logger.Error(
if err := c.client.SendTransaction(context.Background(), signedTx); err != nil {
c.logger.Error(
"Failed to send transaction",
zap.Error(err),
)
Expand All @@ -182,43 +182,23 @@ func (tdc *destinationClient) SendTx(signedMessage *avalancheWarp.Message,

// Increment the nonce to use on the destination chain now that we've sent
// a transaction using the current value.
tdc.currentNonce++
tdc.logger.Info(
c.currentNonce++
c.logger.Info(
"Sent transaction",
zap.String("txID", signedTx.Hash().String()),
)

return nil
}

func (tdc *destinationClient) isDestination(chainID ids.ID) bool {
if chainID != tdc.destinationChainID {
tdc.logger.Info(
"Destination chain ID for message not supported by relayer.",
zap.String("chainID", chainID.String()),
)
return false
}
return true
func (c *destinationClient) Client() interface{} {
return c.client
}

func (tdc *destinationClient) isAllowedRelayer(allowedRelayers []common.Address) bool {
// If no allowed relayer addresses were set, then anyone can relay it.
if len(allowedRelayers) == 0 {
return true
}

for _, addr := range allowedRelayers {
if addr == tdc.eoa {
return true
}
}

tdc.logger.Info("Relayer EOA not allowed to deliver this message.")
return false
func (c *destinationClient) SenderAddress() common.Address {
return c.eoa
}

func (tdc *destinationClient) Allowed(chainID ids.ID, allowedRelayers []common.Address) bool {
return tdc.isDestination(chainID) &&
tdc.isAllowedRelayer(allowedRelayers)
func (c *destinationClient) DestinationChainID() ids.ID {
return c.destinationChainID
}
Loading