Skip to content

Commit

Permalink
change DeliverTx to take typed tx
Browse files Browse the repository at this point in the history
  • Loading branch information
codchen committed Nov 22, 2023
1 parent a8f569a commit 0284942
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 41 deletions.
11 changes: 8 additions & 3 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,12 @@ func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abc
}

sdkCtx := app.getContextForTx(mode, req.Tx)
gInfo, result, _, priority, err := app.runTx(sdkCtx, mode, req.Tx)
tx, err := app.txDecoder(req.Tx)
if err != nil {
res := sdkerrors.ResponseCheckTx(err, 0, 0, app.trace)
return &res, err
}
gInfo, result, _, priority, err := app.runTx(sdkCtx, mode, tx, sha256.Sum256(req.Tx))
if err != nil {
res := sdkerrors.ResponseCheckTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)
return &res, err
Expand All @@ -239,7 +244,7 @@ func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abc
// Otherwise, the ResponseDeliverTx will contain releveant error information.
// Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant
// gas execution context.
func (app *BaseApp) DeliverTx(ctx sdk.Context, req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) {
func (app *BaseApp) DeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, tx sdk.Tx, checksum [32]byte) (res abci.ResponseDeliverTx) {
defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx")
defer func() {
for _, streamingListener := range app.abciListeners {
Expand All @@ -259,7 +264,7 @@ func (app *BaseApp) DeliverTx(ctx sdk.Context, req abci.RequestDeliverTx) (res a
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
}()

gInfo, result, anteEvents, _, err := app.runTx(ctx.WithTxBytes(req.Tx).WithVoteInfos(app.voteInfos), runTxModeDeliver, req.Tx)
gInfo, result, anteEvents, _, err := app.runTx(ctx.WithTxBytes(req.Tx).WithVoteInfos(app.voteInfos), runTxModeDeliver, tx, checksum)
if err != nil {
resultStr = "failed"
// if we have a result, use those events instead of just the anteEvents
Expand Down
21 changes: 7 additions & 14 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package baseapp

import (
"context"
"crypto/sha256"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -821,15 +820,15 @@ func (app *BaseApp) getContextForTx(mode runTxMode, txBytes []byte) sdk.Context

// cacheTxContext returns a new context based off of the provided context with
// a branched multi-store.
func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context, sdk.CacheMultiStore) {
func (app *BaseApp) cacheTxContext(ctx sdk.Context, checksum [32]byte) (sdk.Context, sdk.CacheMultiStore) {
ms := ctx.MultiStore()
// TODO: https://github.com/cosmos/cosmos-sdk/issues/2824
msCache := ms.CacheMultiStore()
if msCache.TracingEnabled() {
msCache = msCache.SetTracingContext(
sdk.TraceContext(
map[string]interface{}{
"txHash": fmt.Sprintf("%X", sha256.Sum256(txBytes)),
"txHash": fmt.Sprintf("%X", checksum),
},
),
).(sdk.CacheMultiStore)
Expand All @@ -845,8 +844,7 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context
// Note, gas execution info is always returned. A reference to a Result is
// returned if the tx does not run out of gas and if all the messages are valid
// and execute successfully. An error is returned otherwise.
func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, priority int64, err error) {

func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, tx sdk.Tx, checksum [32]byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, priority int64, err error) {
defer telemetry.MeasureThroughputSinceWithLabels(
telemetry.TxCount,
[]metrics.Label{
Expand All @@ -869,7 +867,7 @@ func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInf
spanCtx, span := app.TracingInfo.StartWithContext("RunTx", ctx.TraceSpanContext())
defer span.End()
ctx = ctx.WithTraceSpanContext(spanCtx)
span.SetAttributes(attribute.String("txHash", fmt.Sprintf("%X", sha256.Sum256(txBytes))))
span.SetAttributes(attribute.String("txHash", fmt.Sprintf("%X", checksum)))

// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
// determined by the GasMeter. We need access to the context to get the gas
Expand Down Expand Up @@ -916,11 +914,6 @@ func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInf
defer consumeBlockGas()
}

tx, err := app.txDecoder(txBytes)
if err != nil {
return sdk.GasInfo{}, nil, nil, 0, err
}

msgs := tx.GetMsgs()

if err := validateBasicTxMsgs(msgs); err != nil {
Expand All @@ -942,7 +935,7 @@ func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInf
// NOTE: Alternatively, we could require that AnteHandler ensures that
// writes do not happen if aborted/failed. This may have some
// performance benefits, but it'll be more difficult to get right.
anteCtx, msCache = app.cacheTxContext(ctx, txBytes)
anteCtx, msCache = app.cacheTxContext(ctx, checksum)
anteCtx = anteCtx.WithEventManager(sdk.NewEventManager())
newCtx, err := app.anteHandler(anteCtx, tx, mode == runTxModeSimulate)

Expand Down Expand Up @@ -994,7 +987,7 @@ func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInf
// Create a new Context based off of the existing Context with a MultiStore branch
// in case message processing fails. At this point, the MultiStore
// is a branch of a branch.
runMsgCtx, msCache := app.cacheTxContext(ctx, txBytes)
runMsgCtx, msCache := app.cacheTxContext(ctx, checksum)

// Attempt to execute all messages and only update state if all messages pass
// and we're in DeliverTx. Note, runMsgs will never return a reference to a
Expand Down Expand Up @@ -1057,7 +1050,7 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, mode runTxMode) (*s
err error
)

msgCtx, msgMsCache := app.cacheTxContext(ctx, []byte{})
msgCtx, msgMsCache := app.cacheTxContext(ctx, [32]byte{})
msgCtx = msgCtx.WithMessageIndex(i)

startTime := time.Now()
Expand Down
23 changes: 12 additions & 11 deletions baseapp/deliver_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package baseapp
import (
"bytes"
"context"
"crypto/sha256"
"encoding/binary"
"fmt"
"math/rand"
Expand Down Expand Up @@ -229,7 +230,7 @@ func TestWithRouter(t *testing.T) {
txBytes, err := codec.Marshal(tx)
require.NoError(t, err)

res := app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes})
res := app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes}, tx, sha256.Sum256(txBytes))
require.True(t, res.IsOK(), fmt.Sprintf("%v", res))
}

Expand Down Expand Up @@ -439,7 +440,7 @@ func TestMultiMsgDeliverTx(t *testing.T) {
tx := newTxCounter(0, 0, 1, 2)
txBytes, err := codec.Marshal(tx)
require.NoError(t, err)
res := app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes})
res := app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes}, tx, sha256.Sum256(txBytes))
require.True(t, res.IsOK(), fmt.Sprintf("%v", res))

store := app.deliverState.ctx.KVStore(capKey1)
Expand All @@ -459,7 +460,7 @@ func TestMultiMsgDeliverTx(t *testing.T) {
tx.Msgs = append(tx.Msgs, msgCounter2{1})
txBytes, err = codec.Marshal(tx)
require.NoError(t, err)
res = app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes})
res = app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes}, tx, sha256.Sum256(txBytes))
require.True(t, res.IsOK(), fmt.Sprintf("%v", res))

store = app.deliverState.ctx.KVStore(capKey1)
Expand Down Expand Up @@ -658,7 +659,7 @@ func TestRunInvalidTransaction(t *testing.T) {
txBytes, err := newCdc.Marshal(tx)
require.NoError(t, err)

res := app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes})
res := app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes}, tx, sha256.Sum256(txBytes))
require.EqualValues(t, sdkerrors.ErrTxDecode.ABCICode(), res.Code)
require.EqualValues(t, sdkerrors.ErrTxDecode.Codespace(), res.Codespace)
}
Expand Down Expand Up @@ -932,7 +933,7 @@ func TestBaseAppAnteHandler(t *testing.T) {
tx.setFailOnAnte(true)
txBytes, err := cdc.Marshal(tx)
require.NoError(t, err)
res := app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes})
res := app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes}, tx, sha256.Sum256(txBytes))
require.Empty(t, res.Events)
require.False(t, res.IsOK(), fmt.Sprintf("%v", res))

Expand All @@ -948,7 +949,7 @@ func TestBaseAppAnteHandler(t *testing.T) {
txBytes, err = cdc.Marshal(tx)
require.NoError(t, err)

res = app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes})
res = app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes}, tx, sha256.Sum256(txBytes))
// should emit ante event
require.NotEmpty(t, res.Events)
require.False(t, res.IsOK(), fmt.Sprintf("%v", res))
Expand All @@ -965,7 +966,7 @@ func TestBaseAppAnteHandler(t *testing.T) {
txBytes, err = cdc.Marshal(tx)
require.NoError(t, err)

res = app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes})
res = app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes}, tx, sha256.Sum256(txBytes))
require.NotEmpty(t, res.Events)
require.True(t, res.IsOK(), fmt.Sprintf("%v", res))

Expand Down Expand Up @@ -1043,15 +1044,15 @@ func TestGasConsumptionBadTx(t *testing.T) {
txBytes, err := cdc.Marshal(tx)
require.NoError(t, err)

res := app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes})
res := app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes}, tx, sha256.Sum256(txBytes))
require.False(t, res.IsOK(), fmt.Sprintf("%v", res))

// require next tx to fail due to black gas limit
tx = newTxCounter(5, 0)
txBytes, err = cdc.Marshal(tx)
require.NoError(t, err)

res = app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes})
res = app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes}, tx, sha256.Sum256(txBytes))
require.False(t, res.IsOK(), fmt.Sprintf("%v", res))
}

Expand Down Expand Up @@ -1520,7 +1521,7 @@ func TestDeliverTx(t *testing.T) {
txBytes, err := codec.Marshal(tx)
require.NoError(t, err)

res := app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes})
res := app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes}, tx, sha256.Sum256(txBytes))
require.True(t, res.IsOK(), fmt.Sprintf("%v", res))
events := res.GetEvents()
require.Len(t, events, 3, "should contain ante handler, message type and counter events respectively")
Expand Down Expand Up @@ -1729,7 +1730,7 @@ func setupBaseAppWithSnapshots(t *testing.T, blocks uint, blockTxs int, options
}
txBytes, err := codec.Marshal(tx)
require.NoError(t, err)
resp := app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes})
resp := app.DeliverTx(app.deliverState.ctx, abci.RequestDeliverTx{Tx: txBytes}, tx, sha256.Sum256(txBytes))
require.True(t, resp.IsOK(), "%v", resp.String())
}
app.EndBlock(app.deliverState.ctx, abci.RequestEndBlock{Height: height})
Expand Down
15 changes: 11 additions & 4 deletions baseapp/msg_service_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package baseapp_test

import (
"context"
"crypto/sha256"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -73,18 +74,24 @@ func TestMsgService(t *testing.T) {
encCfg := simapp.MakeTestEncodingConfig()
testdata.RegisterInterfaces(encCfg.InterfaceRegistry)
db := dbm.NewMemDB()
app := baseapp.NewBaseApp("test", log.NewTestingLogger(t), db, encCfg.TxConfig.TxDecoder(), nil, &testutil.TestAppOpts{})
decoder := encCfg.TxConfig.TxDecoder()
app := baseapp.NewBaseApp("test", log.NewTestingLogger(t), db, decoder, nil, &testutil.TestAppOpts{})
app.SetInterfaceRegistry(encCfg.InterfaceRegistry)
testdata.RegisterMsgServer(
app.MsgServiceRouter(),
testdata.MsgServerImpl{},
)
app.SetFinalizeBlocker(func(ctx sdk.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
txResults := []*abci.ExecTxResult{}
for _, tx := range req.Txs {
for _, txbz := range req.Txs {
tx, err := decoder(txbz)
if err != nil {
txResults = append(txResults, &abci.ExecTxResult{})
continue
}
deliverTxResp := app.DeliverTx(ctx, abci.RequestDeliverTx{
Tx: tx,
})
Tx: txbz,
}, tx, sha256.Sum256(txbz))
txResults = append(txResults, &abci.ExecTxResult{
Code: deliverTxResp.Code,
Data: deliverTxResp.Data,
Expand Down
12 changes: 9 additions & 3 deletions baseapp/test_helpers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package baseapp

import (
"crypto/sha256"

sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
Expand All @@ -15,7 +17,7 @@ func (app *BaseApp) Check(txEncoder sdk.TxEncoder, tx sdk.Tx) (sdk.GasInfo, *sdk
return sdk.GasInfo{}, nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "%s", err)
}
ctx := app.checkState.ctx.WithTxBytes(bz).WithVoteInfos(app.voteInfos).WithConsensusParams(app.GetConsensusParams(app.checkState.ctx))
gasInfo, result, _, _, err := app.runTx(ctx, runTxModeCheck, bz)
gasInfo, result, _, _, err := app.runTx(ctx, runTxModeCheck, tx, sha256.Sum256(bz))
if len(ctx.MultiStore().GetEvents()) > 0 {
panic("Expected checkTx events to be empty")
}
Expand All @@ -25,7 +27,11 @@ func (app *BaseApp) Check(txEncoder sdk.TxEncoder, tx sdk.Tx) (sdk.GasInfo, *sdk
func (app *BaseApp) Simulate(txBytes []byte) (sdk.GasInfo, *sdk.Result, error) {
ctx := app.checkState.ctx.WithTxBytes(txBytes).WithVoteInfos(app.voteInfos).WithConsensusParams(app.GetConsensusParams(app.checkState.ctx))
ctx, _ = ctx.CacheContext()
gasInfo, result, _, _, err := app.runTx(ctx, runTxModeSimulate, txBytes)
tx, err := app.txDecoder(txBytes)
if err != nil {
return sdk.GasInfo{}, nil, err
}
gasInfo, result, _, _, err := app.runTx(ctx, runTxModeSimulate, tx, sha256.Sum256(txBytes))
if len(ctx.MultiStore().GetEvents()) > 0 {
panic("Expected simulate events to be empty")
}
Expand All @@ -39,7 +45,7 @@ func (app *BaseApp) Deliver(txEncoder sdk.TxEncoder, tx sdk.Tx) (sdk.GasInfo, *s
return sdk.GasInfo{}, nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "%s", err)
}
ctx := app.deliverState.ctx.WithTxBytes(bz).WithVoteInfos(app.voteInfos).WithConsensusParams(app.GetConsensusParams(app.deliverState.ctx))
gasInfo, result, _, _, err := app.runTx(ctx, runTxModeDeliver, bz)
gasInfo, result, _, _, err := app.runTx(ctx, runTxModeDeliver, tx, sha256.Sum256(bz))
return gasInfo, result, err
}

Expand Down
12 changes: 9 additions & 3 deletions server/mock/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mock

import (
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -39,10 +40,15 @@ func NewApp(rootDir string, logger log.Logger) (abci.Application, error) {
baseApp.SetInitChainer(InitChainer(capKeyMainStore))
baseApp.SetFinalizeBlocker(func(ctx sdk.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
txResults := []*abci.ExecTxResult{}
for _, tx := range req.Txs {
for _, txbz := range req.Txs {
tx, err := decodeTx(txbz)
if err != nil {
txResults = append(txResults, &abci.ExecTxResult{})
continue
}
deliverTxResp := baseApp.DeliverTx(ctx, abci.RequestDeliverTx{
Tx: tx,
})
Tx: txbz,
}, tx, sha256.Sum256(txbz))
txResults = append(txResults, &abci.ExecTxResult{
Code: deliverTxResp.Code,
Data: deliverTxResp.Data,
Expand Down
7 changes: 6 additions & 1 deletion simapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package simapp

import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -515,9 +516,13 @@ func (app *SimApp) FinalizeBlocker(ctx sdk.Context, req *abci.RequestFinalizeBlo
txResults := []*abci.ExecTxResult{}
for i, tx := range req.Txs {
ctx = ctx.WithContext(context.WithValue(ctx.Context(), ante.ContextKeyTxIndexKey, i))
if typedTxs[i] == nil {
txResults = append(txResults, &abci.ExecTxResult{}) // empty result
continue
}
deliverTxResp := app.DeliverTx(ctx, abci.RequestDeliverTx{
Tx: tx,
})
}, typedTxs[i], sha256.Sum256(tx))
txResults = append(txResults, &abci.ExecTxResult{
Code: deliverTxResp.Code,
Data: deliverTxResp.Data,
Expand Down
5 changes: 3 additions & 2 deletions x/genutil/gentx.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package genutil

import (
"crypto/sha256"
"encoding/json"
"fmt"

Expand Down Expand Up @@ -87,7 +88,7 @@ func ValidateAccountInGenesis(
return nil
}

type deliverTxfn func(sdk.Context, abci.RequestDeliverTx) abci.ResponseDeliverTx
type deliverTxfn func(sdk.Context, abci.RequestDeliverTx, sdk.Tx, [32]byte) abci.ResponseDeliverTx

// DeliverGenTxs iterates over all genesis txs, decodes each into a Tx and
// invokes the provided deliverTxfn with the decoded Tx. It returns the result
Expand All @@ -109,7 +110,7 @@ func DeliverGenTxs(
panic(err)
}

res := deliverTx(ctx, abci.RequestDeliverTx{Tx: bz})
res := deliverTx(ctx, abci.RequestDeliverTx{Tx: bz}, tx, sha256.Sum256(bz))
if !res.IsOK() {
panic(res.Log)
}
Expand Down

0 comments on commit 0284942

Please sign in to comment.