Skip to content

Commit

Permalink
Merge pull request hyperledger-labs#106 from siburu/get-msg-result
Browse files Browse the repository at this point in the history
Fix SendMsgs, delete Send, and introduce GetMsgResult

Signed-off-by: Jun Kimura <[email protected]>
  • Loading branch information
bluele authored Sep 23, 2023
2 parents d1346bc + 690c60e commit c440f67
Show file tree
Hide file tree
Showing 14 changed files with 639 additions and 64 deletions.
135 changes: 124 additions & 11 deletions chains/tendermint/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package tendermint

import (
"context"
"encoding/hex"
"fmt"
"os"
"path"
"strings"
"sync"
"time"

"cosmossdk.io/errors"
"github.com/avast/retry-go"
"github.com/cometbft/cometbft/libs/log"
rpcclient "github.com/cometbft/cometbft/rpc/client"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
libclient "github.com/cometbft/cometbft/rpc/jsonrpc/client"
sdkCtx "github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/flags"
Expand Down Expand Up @@ -160,22 +164,41 @@ func (c *Chain) Timestamp(height ibcexported.Height) (time.Time, error) {
}
}

func (c *Chain) AverageBlockTime() time.Duration {
return time.Duration(c.config.AverageBlockTimeMsec) * time.Millisecond
}

// RegisterMsgEventListener registers a given EventListener to the chain
func (c *Chain) RegisterMsgEventListener(listener core.MsgEventListener) {
c.msgEventListener = listener
}

func (c *Chain) sendMsgs(msgs []sdk.Msg) (*sdk.TxResponse, error) {
// broadcast tx
res, _, err := c.rawSendMsgs(msgs)
if err != nil {
return nil, err
} else if res.Code != 0 {
// CheckTx failed
return nil, fmt.Errorf("CheckTx failed: %v", errors.ABCIError(res.Codespace, res.Code, res.RawLog))
}

// wait for tx being committed
if resTx, err := c.waitForCommit(res.TxHash); err != nil {
return nil, err
} else if resTx.TxResult.IsErr() {
// DeliverTx failed
return nil, fmt.Errorf("DeliverTx failed: %v", errors.ABCIError(res.Codespace, res.Code, res.RawLog))
}
if res.Code == 0 && c.msgEventListener != nil {

// call msgEventListener if needed
if c.msgEventListener != nil {
if err := c.msgEventListener.OnSentMsg(msgs); err != nil {
c.logger.Error("failed to OnSendMsg call", "msgs", msgs, "err", err)
return res, nil
}
}

return res, nil
}

Expand Down Expand Up @@ -236,6 +259,55 @@ func (c *Chain) rawSendMsgs(msgs []sdk.Msg) (*sdk.TxResponse, bool, error) {
return res, true, nil
}

func (c *Chain) waitForCommit(txHash string) (*coretypes.ResultTx, error) {
var resTx *coretypes.ResultTx

retryInterval := c.AverageBlockTime()
maxRetry := uint(c.config.MaxRetryForCommit)

if err := retry.Do(func() error {
var err error
var recoverable bool
resTx, recoverable, err = c.rawQueryTx(txHash)
if err != nil {
if recoverable {
return err
} else {
return retry.Unrecoverable(err)
}
}
return nil
}, retry.Attempts(maxRetry), retry.Delay(retryInterval), rtyErr); err != nil {
return resTx, fmt.Errorf("failed to make sure that tx is committed: %v", err)
}

return resTx, nil
}

// rawQueryTx returns a tx of which hash equals to `hexTxHash`.
// If the RPC is successful but the tx is not found, this returns nil with nil error.
func (c *Chain) rawQueryTx(hexTxHash string) (*coretypes.ResultTx, bool, error) {
ctx := c.CLIContext(0)

txHash, err := hex.DecodeString(hexTxHash)
if err != nil {
return nil, false, fmt.Errorf("failed to decode the hex string of tx hash: %v", err)
}

node, err := ctx.GetNode()
if err != nil {
return nil, false, fmt.Errorf("failed to get node: %v", err)
}

resTx, err := node.Tx(context.TODO(), txHash, false)
if err != nil {
recoverable := !strings.Contains(err.Error(), "transaction indexing is disabled")
return nil, recoverable, fmt.Errorf("failed to retrieve tx: %v", err)
}

return resTx, false, nil
}

func prepareFactory(clientCtx sdkCtx.Context, txf tx.Factory) (tx.Factory, error) {
from := clientCtx.GetFromAddress()

Expand Down Expand Up @@ -323,25 +395,66 @@ func CalculateGas(
return simRes, uint64(txf.GasAdjustment() * float64(simRes.GasInfo.GasUsed)), nil
}

func (c *Chain) SendMsgs(msgs []sdk.Msg) ([]byte, error) {
func (c *Chain) SendMsgs(msgs []sdk.Msg) ([]core.MsgID, error) {
// Broadcast those bytes
res, err := c.sendMsgs(msgs)
if err != nil {
return nil, err
}
return []byte(res.Logs.String()), nil
var msgIDs []core.MsgID
for msgIndex := range msgs {
msgIDs = append(msgIDs, &MsgID{
txHash: res.TxHash,
msgIndex: uint32(msgIndex),
})
}
return msgIDs, nil
}

func (c *Chain) Send(msgs []sdk.Msg) bool {
res, err := c.sendMsgs(msgs)
if err != nil || res.Code != 0 {
c.LogFailedTx(res, err, msgs)
return false
func (c *Chain) GetMsgResult(id core.MsgID) (core.MsgResult, error) {
msgID, ok := id.(*MsgID)
if !ok {
return nil, fmt.Errorf("unexpected message id type: %T", id)
}

// find tx
resTx, err := c.waitForCommit(msgID.txHash)
if err != nil {
return nil, fmt.Errorf("failed to query tx: %v", err)
}

// check height of the delivered tx
version := clienttypes.ParseChainID(c.ChainID())
height := clienttypes.NewHeight(version, uint64(resTx.Height))

// check if the tx execution succeeded
if resTx.TxResult.IsErr() {
err := errors.ABCIError(resTx.TxResult.Codespace, resTx.TxResult.Code, resTx.TxResult.Log)
txFailureReason := err.Error()
return &MsgResult{
height: height,
txStatus: false,
txFailureReason: txFailureReason,
}, nil
}

// parse the log into ABCI logs
abciLogs, err := sdk.ParseABCILogs(resTx.TxResult.Log)
if err != nil {
return nil, fmt.Errorf("failed to parse ABCI logs: %v", err)
}

// parse the ABCI logs into core.MsgEventLog's
events, err := parseMsgEventLogs(abciLogs, msgID.msgIndex)
if err != nil {
return nil, fmt.Errorf("failed to parse msg event log: %v", err)
}
// NOTE: Add more data to this such as identifiers
c.LogSuccessTx(res, msgs)

return true
return &MsgResult{
height: height,
txStatus: true,
events: events,
}, nil
}

// ------------------------------- //
Expand Down
117 changes: 89 additions & 28 deletions chains/tendermint/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 0 additions & 11 deletions chains/tendermint/log-chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (

sdk "github.com/cosmos/cosmos-sdk/types"
proto "github.com/cosmos/gogoproto/proto"
clienttypes "github.com/cosmos/ibc-go/v7/modules/core/02-client/types"
ibcexported "github.com/cosmos/ibc-go/v7/modules/core/exported"
)

// LogFailedTx takes the transaction and the messages to create it and logs the appropriate data
Expand Down Expand Up @@ -75,15 +73,6 @@ func (c *Chain) Print(toPrint proto.Message, text, indent bool) error {
return nil
}

// MustGetHeight takes the height inteface and returns the actual height
func MustGetHeight(h ibcexported.Height) uint64 {
height, ok := h.(clienttypes.Height)
if !ok {
panic("height is not an instance of height! wtf")
}
return height.GetRevisionHeight()
}

func getMsgAction(msgs []sdk.Msg) string {
var out string
for i, msg := range msgs {
Expand Down
Loading

0 comments on commit c440f67

Please sign in to comment.