Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] Introduce action radio to broadcast #4497

Merged
merged 5 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions actpool/actpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type ActPool interface {
ReceiveBlock(*block.Block) error

AddActionEnvelopeValidators(...action.SealedEnvelopeValidator)
AddSubscriber(sub Subscriber)
}

// Subscriber is the interface for actpool subscriber
Expand Down
93 changes: 93 additions & 0 deletions api/action_radio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package api

import (
"context"
"encoding/hex"

"github.com/iotexproject/iotex-proto/golang/iotextypes"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-core/v2/action"
"github.com/iotexproject/iotex-core/v2/pkg/log"
batch "github.com/iotexproject/iotex-core/v2/pkg/messagebatcher"
)

// ActionRadioOption is the option to create ActionRadio
type ActionRadioOption func(*ActionRadio)

// WithMessageBatch enables message batching
func WithMessageBatch() ActionRadioOption {
return func(ar *ActionRadio) {
ar.messageBatcher = batch.NewManager(func(msg *batch.Message) error {
return ar.broadcastHandler(context.Background(), ar.chainID, msg.Data)
})
}
}

// ActionRadio broadcasts actions to the network
type ActionRadio struct {
broadcastHandler BroadcastOutbound
messageBatcher *batch.Manager
chainID uint32
}

// NewActionRadio creates a new ActionRadio
func NewActionRadio(broadcastHandler BroadcastOutbound, chainID uint32, opts ...ActionRadioOption) *ActionRadio {
ar := &ActionRadio{
broadcastHandler: broadcastHandler,
chainID: chainID,
}
for _, opt := range opts {
opt(ar)
}
return ar
}

// Start starts the action radio
func (ar *ActionRadio) Start() error {
if ar.messageBatcher != nil {
return ar.messageBatcher.Start()
}
return nil
}

// Stop stops the action radio
func (ar *ActionRadio) Stop() error {
if ar.messageBatcher != nil {
return ar.messageBatcher.Stop()
}
return nil
}

// OnAdded broadcasts the action to the network
func (ar *ActionRadio) OnAdded(selp *action.SealedEnvelope) {
var (
hasSidecar = selp.BlobTxSidecar() != nil
hash, _ = selp.Hash()
out proto.Message
err error
)
if hasSidecar {
out = &iotextypes.ActionHash{
Hash: hash[:],
}
} else {
out = selp.Proto()
}
if ar.messageBatcher != nil && !hasSidecar { // TODO: batch blobTx
err = ar.messageBatcher.Put(&batch.Message{
ChainID: ar.chainID,
Target: nil,
Data: out,
})
} else {
err = ar.broadcastHandler(context.Background(), ar.chainID, out)
}
if err != nil {
log.L().Warn("Failed to broadcast SendAction request.", zap.Error(err), zap.String("actionHash", hex.EncodeToString(hash[:])))
}
Comment on lines +87 to +89
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return err

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, it is an interface implementation

}

// OnRemoved does nothing
func (ar *ActionRadio) OnRemoved(act *action.SealedEnvelope) {}
35 changes: 35 additions & 0 deletions api/action_radio_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package api

import (
"context"
"math/big"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/iotexproject/iotex-core/v2/action"
"github.com/iotexproject/iotex-core/v2/test/identityset"
)

func TestActionRadio(t *testing.T) {
r := require.New(t)
broadcastCount := uint64(0)
radio := NewActionRadio(
func(_ context.Context, _ uint32, _ proto.Message) error {
atomic.AddUint64(&broadcastCount, 1)
return nil
},
0)
r.NoError(radio.Start())
defer radio.Stop()
Copy link
Member

@dustinxie dustinxie Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer func() {
    r.NoError(radio.Stop())
}()


gas := uint64(100000)
gasPrice := big.NewInt(10)
selp, err := action.SignedTransfer(identityset.Address(1).String(), identityset.PrivateKey(1), 1, big.NewInt(1), nil, gas, gasPrice)
r.NoError(err)

radio.OnAdded(selp)
r.Equal(uint64(1), atomic.LoadUint64(&broadcastCount))
}
42 changes: 9 additions & 33 deletions api/coreservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ import (
"github.com/iotexproject/iotex-core/v2/db"
"github.com/iotexproject/iotex-core/v2/gasstation"
"github.com/iotexproject/iotex-core/v2/pkg/log"
batch "github.com/iotexproject/iotex-core/v2/pkg/messagebatcher"
"github.com/iotexproject/iotex-core/v2/pkg/tracer"
"github.com/iotexproject/iotex-core/v2/pkg/unit"
"github.com/iotexproject/iotex-core/v2/pkg/version"
Expand Down Expand Up @@ -205,7 +204,7 @@ type (
chainListener apitypes.Listener
electionCommittee committee.Committee
readCache *ReadCache
messageBatcher *batch.Manager
actionRadio *ActionRadio
apiStats *nodestats.APILocalStats
getBlockTime evm.GetBlockTime
}
Expand Down Expand Up @@ -297,9 +296,8 @@ func newCoreService(
}

if core.broadcastHandler != nil {
core.messageBatcher = batch.NewManager(func(msg *batch.Message) error {
return core.broadcastHandler(context.Background(), core.bc.ChainID(), msg.Data)
})
core.actionRadio = NewActionRadio(core.broadcastHandler, core.bc.ChainID(), WithMessageBatch())
actPool.AddSubscriber(core.actionRadio)
}

return &core, nil
Expand Down Expand Up @@ -495,28 +493,6 @@ func (core *coreService) SendAction(ctx context.Context, in *iotextypes.Action)
}
return "", st.Err()
}
// If there is no error putting into local actpool, broadcast it to the network
// broadcast action hash if it's blobTx
hasSidecar := selp.BlobTxSidecar() != nil
out := proto.Message(in)
if hasSidecar {
out = &iotextypes.ActionHash{
Hash: hash[:],
}
}
if core.messageBatcher != nil && !hasSidecar {
// TODO: batch blobTx
err = core.messageBatcher.Put(&batch.Message{
ChainID: core.bc.ChainID(),
Target: nil,
Data: out,
})
} else {
err = core.broadcastHandler(ctx, core.bc.ChainID(), out)
}
if err != nil {
l.Warn("Failed to broadcast SendAction request.", zap.Error(err))
}
return hex.EncodeToString(hash[:]), nil
}

Expand Down Expand Up @@ -899,19 +875,19 @@ func (core *coreService) Start(_ context.Context) error {
if err := core.chainListener.Start(); err != nil {
return errors.Wrap(err, "failed to start blockchain listener")
}
if core.messageBatcher != nil {
if err := core.messageBatcher.Start(); err != nil {
return errors.Wrap(err, "failed to start message batcher")
if core.actionRadio != nil {
if err := core.actionRadio.Start(); err != nil {
return errors.Wrap(err, "failed to start action radio")
}
}
return nil
}

// Stop stops the API server
func (core *coreService) Stop(_ context.Context) error {
if core.messageBatcher != nil {
if err := core.messageBatcher.Stop(); err != nil {
return errors.Wrap(err, "failed to stop message batcher")
if core.actionRadio != nil {
if err := core.actionRadio.Stop(); err != nil {
return errors.Wrap(err, "failed to stop action radio")
}
}
return core.chainListener.Stop()
Expand Down
5 changes: 3 additions & 2 deletions api/grpcserver_integrity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ func TestGrpcServer_SendActionIntegrity(t *testing.T) {
cfg := newConfig()
cfg.api.GRPCPort = testutil.RandomPort()
cfg.genesis.MidwayBlockHeight = 10
svr, _, _, _, _, _, bfIndexFile, err := createServerV2(cfg, true)
svr, _, _, _, _, ap, bfIndexFile, err := createServerV2(cfg, true)
require.NoError(err)
grpcHandler := newGRPCHandler(svr.core)
defer func() {
Expand All @@ -1306,7 +1306,8 @@ func TestGrpcServer_SendActionIntegrity(t *testing.T) {
broadcastHandlerCount++
return nil
}
coreService.messageBatcher = nil
coreService.actionRadio = NewActionRadio(coreService.broadcastHandler, 0)
ap.AddSubscriber(coreService.actionRadio)

for i, test := range _sendActionTests {
request := &iotexapi.SendActionRequest{Action: test.actionPb}
Expand Down
5 changes: 1 addition & 4 deletions misc/scripts/mockgen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ mockgen -destination=./test/mock/mock_lifecycle/mock_lifecycle.go \

mkdir -p ./test/mock/mock_actpool
mockgen -destination=./test/mock/mock_actpool/mock_actpool.go \
-source=./actpool/actpool.go \
-self_package=github.com/iotexproject/iotex-core/v2/actpool \
-package=mock_actpool \
ActPool
github.com/iotexproject/iotex-core/v2/actpool ActPool

mkdir -p ./test/mock/mock_actioniterator
mockgen -destination=./test/mock/mock_actioniterator/mock_actioniterator.go \
Expand Down
Loading
Loading