Skip to content

Commit

Permalink
[refactor] Introduce action radio to broadcast (#4497)
Browse files Browse the repository at this point in the history
Co-authored-by: CoderZhi <[email protected]>
  • Loading branch information
envestcc and CoderZhi authored Nov 26, 2024
1 parent 669fae5 commit 6ba6aa7
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 103 deletions.
1 change: 1 addition & 0 deletions actpool/actpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type ActPool interface {
ReceiveBlock(*block.Block) error

AddActionEnvelopeValidators(...action.SealedEnvelopeValidator)
AddSubscriber(sub Subscriber)
}

// Subscriber is the interface for actpool subscriber
Expand Down
93 changes: 93 additions & 0 deletions api/action_radio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package api

import (
"context"
"encoding/hex"

"github.com/iotexproject/iotex-proto/golang/iotextypes"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-core/v2/action"
"github.com/iotexproject/iotex-core/v2/pkg/log"
batch "github.com/iotexproject/iotex-core/v2/pkg/messagebatcher"
)

// ActionRadioOption is the option to create ActionRadio
type ActionRadioOption func(*ActionRadio)

// WithMessageBatch enables message batching
func WithMessageBatch() ActionRadioOption {
return func(ar *ActionRadio) {
ar.messageBatcher = batch.NewManager(func(msg *batch.Message) error {
return ar.broadcastHandler(context.Background(), ar.chainID, msg.Data)
})
}
}

// ActionRadio broadcasts actions to the network
type ActionRadio struct {
broadcastHandler BroadcastOutbound
messageBatcher *batch.Manager
chainID uint32
}

// NewActionRadio creates a new ActionRadio
func NewActionRadio(broadcastHandler BroadcastOutbound, chainID uint32, opts ...ActionRadioOption) *ActionRadio {
ar := &ActionRadio{
broadcastHandler: broadcastHandler,
chainID: chainID,
}
for _, opt := range opts {
opt(ar)
}
return ar
}

// Start starts the action radio
func (ar *ActionRadio) Start() error {
if ar.messageBatcher != nil {
return ar.messageBatcher.Start()
}
return nil
}

// Stop stops the action radio
func (ar *ActionRadio) Stop() error {
if ar.messageBatcher != nil {
return ar.messageBatcher.Stop()
}
return nil
}

// OnAdded broadcasts the action to the network
func (ar *ActionRadio) OnAdded(selp *action.SealedEnvelope) {
var (
hasSidecar = selp.BlobTxSidecar() != nil
hash, _ = selp.Hash()
out proto.Message
err error
)
if hasSidecar {
out = &iotextypes.ActionHash{
Hash: hash[:],
}
} else {
out = selp.Proto()
}
if ar.messageBatcher != nil && !hasSidecar { // TODO: batch blobTx
err = ar.messageBatcher.Put(&batch.Message{
ChainID: ar.chainID,
Target: nil,
Data: out,
})
} else {
err = ar.broadcastHandler(context.Background(), ar.chainID, out)
}
if err != nil {
log.L().Warn("Failed to broadcast SendAction request.", zap.Error(err), zap.String("actionHash", hex.EncodeToString(hash[:])))
}
}

// OnRemoved does nothing
func (ar *ActionRadio) OnRemoved(act *action.SealedEnvelope) {}
37 changes: 37 additions & 0 deletions api/action_radio_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package api

import (
"context"
"math/big"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-core/v2/action"
"github.com/iotexproject/iotex-core/v2/test/identityset"
)

func TestActionRadio(t *testing.T) {
r := require.New(t)
broadcastCount := uint64(0)
radio := NewActionRadio(
func(_ context.Context, _ uint32, _ proto.Message) error {
atomic.AddUint64(&broadcastCount, 1)
return nil
},
0)
r.NoError(radio.Start())
defer func() {
r.NoError(radio.Stop())
}()

gas := uint64(100000)
gasPrice := big.NewInt(10)
selp, err := action.SignedTransfer(identityset.Address(1).String(), identityset.PrivateKey(1), 1, big.NewInt(1), nil, gas, gasPrice)
r.NoError(err)

radio.OnAdded(selp)
r.Equal(uint64(1), atomic.LoadUint64(&broadcastCount))
}
42 changes: 9 additions & 33 deletions api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ import (
"github.com/iotexproject/iotex-core/v2/db"
"github.com/iotexproject/iotex-core/v2/gasstation"
"github.com/iotexproject/iotex-core/v2/pkg/log"
batch "github.com/iotexproject/iotex-core/v2/pkg/messagebatcher"
"github.com/iotexproject/iotex-core/v2/pkg/tracer"
"github.com/iotexproject/iotex-core/v2/pkg/unit"
"github.com/iotexproject/iotex-core/v2/pkg/version"
Expand Down Expand Up @@ -205,7 +204,7 @@ type (
chainListener apitypes.Listener
electionCommittee committee.Committee
readCache *ReadCache
messageBatcher *batch.Manager
actionRadio *ActionRadio
apiStats *nodestats.APILocalStats
getBlockTime evm.GetBlockTime
}
Expand Down Expand Up @@ -297,9 +296,8 @@ func newCoreService(
}

if core.broadcastHandler != nil {
core.messageBatcher = batch.NewManager(func(msg *batch.Message) error {
return core.broadcastHandler(context.Background(), core.bc.ChainID(), msg.Data)
})
core.actionRadio = NewActionRadio(core.broadcastHandler, core.bc.ChainID(), WithMessageBatch())
actPool.AddSubscriber(core.actionRadio)
}

return &core, nil
Expand Down Expand Up @@ -495,28 +493,6 @@ func (core *coreService) SendAction(ctx context.Context, in *iotextypes.Action)
}
return "", st.Err()
}
// If there is no error putting into local actpool, broadcast it to the network
// broadcast action hash if it's blobTx
hasSidecar := selp.BlobTxSidecar() != nil
out := proto.Message(in)
if hasSidecar {
out = &iotextypes.ActionHash{
Hash: hash[:],
}
}
if core.messageBatcher != nil && !hasSidecar {
// TODO: batch blobTx
err = core.messageBatcher.Put(&batch.Message{
ChainID: core.bc.ChainID(),
Target: nil,
Data: out,
})
} else {
err = core.broadcastHandler(ctx, core.bc.ChainID(), out)
}
if err != nil {
l.Warn("Failed to broadcast SendAction request.", zap.Error(err))
}
return hex.EncodeToString(hash[:]), nil
}

Expand Down Expand Up @@ -899,19 +875,19 @@ func (core *coreService) Start(_ context.Context) error {
if err := core.chainListener.Start(); err != nil {
return errors.Wrap(err, "failed to start blockchain listener")
}
if core.messageBatcher != nil {
if err := core.messageBatcher.Start(); err != nil {
return errors.Wrap(err, "failed to start message batcher")
if core.actionRadio != nil {
if err := core.actionRadio.Start(); err != nil {
return errors.Wrap(err, "failed to start action radio")
}
}
return nil
}

// Stop stops the API server
func (core *coreService) Stop(_ context.Context) error {
if core.messageBatcher != nil {
if err := core.messageBatcher.Stop(); err != nil {
return errors.Wrap(err, "failed to stop message batcher")
if core.actionRadio != nil {
if err := core.actionRadio.Stop(); err != nil {
return errors.Wrap(err, "failed to stop action radio")
}
}
return core.chainListener.Stop()
Expand Down
5 changes: 3 additions & 2 deletions api/grpcserver_integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ func TestGrpcServer_SendActionIntegrity(t *testing.T) {
cfg := newConfig()
cfg.api.GRPCPort = testutil.RandomPort()
cfg.genesis.MidwayBlockHeight = 10
svr, _, _, _, _, _, bfIndexFile, err := createServerV2(cfg, true)
svr, _, _, _, _, ap, bfIndexFile, err := createServerV2(cfg, true)
require.NoError(err)
grpcHandler := newGRPCHandler(svr.core)
defer func() {
Expand All @@ -1306,7 +1306,8 @@ func TestGrpcServer_SendActionIntegrity(t *testing.T) {
broadcastHandlerCount++
return nil
}
coreService.messageBatcher = nil
coreService.actionRadio = NewActionRadio(coreService.broadcastHandler, 0)
ap.AddSubscriber(coreService.actionRadio)

for i, test := range _sendActionTests {
request := &iotexapi.SendActionRequest{Action: test.actionPb}
Expand Down
5 changes: 1 addition & 4 deletions misc/scripts/mockgen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ mockgen -destination=./test/mock/mock_lifecycle/mock_lifecycle.go \

mkdir -p ./test/mock/mock_actpool
mockgen -destination=./test/mock/mock_actpool/mock_actpool.go \
-source=./actpool/actpool.go \
-self_package=github.com/iotexproject/iotex-core/v2/actpool \
-package=mock_actpool \
ActPool
github.com/iotexproject/iotex-core/v2/actpool ActPool

mkdir -p ./test/mock/mock_actioniterator
mockgen -destination=./test/mock/mock_actioniterator/mock_actioniterator.go \
Expand Down
Loading

0 comments on commit 6ba6aa7

Please sign in to comment.