From 6ba6aa7d16836c0ac63cfe95cfee72bf02029550 Mon Sep 17 00:00:00 2001 From: Chen Chen Date: Tue, 26 Nov 2024 17:29:16 +0800 Subject: [PATCH] [refactor] Introduce action radio to broadcast (#4497) Co-authored-by: CoderZhi --- actpool/actpool.go | 1 + api/action_radio.go | 93 ++++++++++++++++++ api/action_radio_test.go | 37 ++++++++ api/coreservice.go | 42 ++------- api/grpcserver_integrity_test.go | 5 +- misc/scripts/mockgen.sh | 5 +- test/mock/mock_actpool/mock_actpool.go | 94 ++++++------------- .../internal/client/client_test.go | 1 + 8 files changed, 175 insertions(+), 103 deletions(-) create mode 100644 api/action_radio.go create mode 100644 api/action_radio_test.go diff --git a/actpool/actpool.go b/actpool/actpool.go index 91354c9c05..5716435a2b 100644 --- a/actpool/actpool.go +++ b/actpool/actpool.go @@ -82,6 +82,7 @@ type ActPool interface { ReceiveBlock(*block.Block) error AddActionEnvelopeValidators(...action.SealedEnvelopeValidator) + AddSubscriber(sub Subscriber) } // Subscriber is the interface for actpool subscriber diff --git a/api/action_radio.go b/api/action_radio.go new file mode 100644 index 0000000000..6beda0ea06 --- /dev/null +++ b/api/action_radio.go @@ -0,0 +1,93 @@ +package api + +import ( + "context" + "encoding/hex" + + "github.com/iotexproject/iotex-proto/golang/iotextypes" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + + "github.com/iotexproject/iotex-core/v2/action" + "github.com/iotexproject/iotex-core/v2/pkg/log" + batch "github.com/iotexproject/iotex-core/v2/pkg/messagebatcher" +) + +// ActionRadioOption is the option to create ActionRadio +type ActionRadioOption func(*ActionRadio) + +// WithMessageBatch enables message batching +func WithMessageBatch() ActionRadioOption { + return func(ar *ActionRadio) { + ar.messageBatcher = batch.NewManager(func(msg *batch.Message) error { + return ar.broadcastHandler(context.Background(), ar.chainID, msg.Data) + }) + } +} + +// ActionRadio broadcasts actions to the network +type ActionRadio struct { + broadcastHandler BroadcastOutbound + messageBatcher *batch.Manager + chainID uint32 +} + +// NewActionRadio creates a new ActionRadio +func NewActionRadio(broadcastHandler BroadcastOutbound, chainID uint32, opts ...ActionRadioOption) *ActionRadio { + ar := &ActionRadio{ + broadcastHandler: broadcastHandler, + chainID: chainID, + } + for _, opt := range opts { + opt(ar) + } + return ar +} + +// Start starts the action radio +func (ar *ActionRadio) Start() error { + if ar.messageBatcher != nil { + return ar.messageBatcher.Start() + } + return nil +} + +// Stop stops the action radio +func (ar *ActionRadio) Stop() error { + if ar.messageBatcher != nil { + return ar.messageBatcher.Stop() + } + return nil +} + +// OnAdded broadcasts the action to the network +func (ar *ActionRadio) OnAdded(selp *action.SealedEnvelope) { + var ( + hasSidecar = selp.BlobTxSidecar() != nil + hash, _ = selp.Hash() + out proto.Message + err error + ) + if hasSidecar { + out = &iotextypes.ActionHash{ + Hash: hash[:], + } + } else { + out = selp.Proto() + } + if ar.messageBatcher != nil && !hasSidecar { // TODO: batch blobTx + err = ar.messageBatcher.Put(&batch.Message{ + ChainID: ar.chainID, + Target: nil, + Data: out, + }) + } else { + err = ar.broadcastHandler(context.Background(), ar.chainID, out) + } + if err != nil { + log.L().Warn("Failed to broadcast SendAction request.", zap.Error(err), zap.String("actionHash", hex.EncodeToString(hash[:]))) + } +} + +// OnRemoved does nothing +func (ar *ActionRadio) OnRemoved(act *action.SealedEnvelope) {} diff --git a/api/action_radio_test.go b/api/action_radio_test.go new file mode 100644 index 0000000000..1fec4274d3 --- /dev/null +++ b/api/action_radio_test.go @@ -0,0 +1,37 @@ +package api + +import ( + "context" + "math/big" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "github.com/iotexproject/iotex-core/v2/action" + "github.com/iotexproject/iotex-core/v2/test/identityset" +) + +func TestActionRadio(t *testing.T) { + r := require.New(t) + broadcastCount := uint64(0) + radio := NewActionRadio( + func(_ context.Context, _ uint32, _ proto.Message) error { + atomic.AddUint64(&broadcastCount, 1) + return nil + }, + 0) + r.NoError(radio.Start()) + defer func() { + r.NoError(radio.Stop()) + }() + + gas := uint64(100000) + gasPrice := big.NewInt(10) + selp, err := action.SignedTransfer(identityset.Address(1).String(), identityset.PrivateKey(1), 1, big.NewInt(1), nil, gas, gasPrice) + r.NoError(err) + + radio.OnAdded(selp) + r.Equal(uint64(1), atomic.LoadUint64(&broadcastCount)) +} diff --git a/api/coreservice.go b/api/coreservice.go index 87341ca957..e8203aae6b 100644 --- a/api/coreservice.go +++ b/api/coreservice.go @@ -63,7 +63,6 @@ import ( "github.com/iotexproject/iotex-core/v2/db" "github.com/iotexproject/iotex-core/v2/gasstation" "github.com/iotexproject/iotex-core/v2/pkg/log" - batch "github.com/iotexproject/iotex-core/v2/pkg/messagebatcher" "github.com/iotexproject/iotex-core/v2/pkg/tracer" "github.com/iotexproject/iotex-core/v2/pkg/unit" "github.com/iotexproject/iotex-core/v2/pkg/version" @@ -205,7 +204,7 @@ type ( chainListener apitypes.Listener electionCommittee committee.Committee readCache *ReadCache - messageBatcher *batch.Manager + actionRadio *ActionRadio apiStats *nodestats.APILocalStats getBlockTime evm.GetBlockTime } @@ -297,9 +296,8 @@ func newCoreService( } if core.broadcastHandler != nil { - core.messageBatcher = batch.NewManager(func(msg *batch.Message) error { - return core.broadcastHandler(context.Background(), core.bc.ChainID(), msg.Data) - }) + core.actionRadio = NewActionRadio(core.broadcastHandler, core.bc.ChainID(), WithMessageBatch()) + actPool.AddSubscriber(core.actionRadio) } return &core, nil @@ -495,28 +493,6 @@ func (core *coreService) SendAction(ctx context.Context, in *iotextypes.Action) } return "", st.Err() } - // If there is no error putting into local actpool, broadcast it to the network - // broadcast action hash if it's blobTx - hasSidecar := selp.BlobTxSidecar() != nil - out := proto.Message(in) - if hasSidecar { - out = &iotextypes.ActionHash{ - Hash: hash[:], - } - } - if core.messageBatcher != nil && !hasSidecar { - // TODO: batch blobTx - err = core.messageBatcher.Put(&batch.Message{ - ChainID: core.bc.ChainID(), - Target: nil, - Data: out, - }) - } else { - err = core.broadcastHandler(ctx, core.bc.ChainID(), out) - } - if err != nil { - l.Warn("Failed to broadcast SendAction request.", zap.Error(err)) - } return hex.EncodeToString(hash[:]), nil } @@ -899,9 +875,9 @@ func (core *coreService) Start(_ context.Context) error { if err := core.chainListener.Start(); err != nil { return errors.Wrap(err, "failed to start blockchain listener") } - if core.messageBatcher != nil { - if err := core.messageBatcher.Start(); err != nil { - return errors.Wrap(err, "failed to start message batcher") + if core.actionRadio != nil { + if err := core.actionRadio.Start(); err != nil { + return errors.Wrap(err, "failed to start action radio") } } return nil @@ -909,9 +885,9 @@ func (core *coreService) Start(_ context.Context) error { // Stop stops the API server func (core *coreService) Stop(_ context.Context) error { - if core.messageBatcher != nil { - if err := core.messageBatcher.Stop(); err != nil { - return errors.Wrap(err, "failed to stop message batcher") + if core.actionRadio != nil { + if err := core.actionRadio.Stop(); err != nil { + return errors.Wrap(err, "failed to stop action radio") } } return core.chainListener.Stop() diff --git a/api/grpcserver_integrity_test.go b/api/grpcserver_integrity_test.go index b415ac9783..e650f96286 100644 --- a/api/grpcserver_integrity_test.go +++ b/api/grpcserver_integrity_test.go @@ -1292,7 +1292,7 @@ func TestGrpcServer_SendActionIntegrity(t *testing.T) { cfg := newConfig() cfg.api.GRPCPort = testutil.RandomPort() cfg.genesis.MidwayBlockHeight = 10 - svr, _, _, _, _, _, bfIndexFile, err := createServerV2(cfg, true) + svr, _, _, _, _, ap, bfIndexFile, err := createServerV2(cfg, true) require.NoError(err) grpcHandler := newGRPCHandler(svr.core) defer func() { @@ -1306,7 +1306,8 @@ func TestGrpcServer_SendActionIntegrity(t *testing.T) { broadcastHandlerCount++ return nil } - coreService.messageBatcher = nil + coreService.actionRadio = NewActionRadio(coreService.broadcastHandler, 0) + ap.AddSubscriber(coreService.actionRadio) for i, test := range _sendActionTests { request := &iotexapi.SendActionRequest{Action: test.actionPb} diff --git a/misc/scripts/mockgen.sh b/misc/scripts/mockgen.sh index f372dd8ce9..c2f5deb9d7 100755 --- a/misc/scripts/mockgen.sh +++ b/misc/scripts/mockgen.sh @@ -52,10 +52,7 @@ mockgen -destination=./test/mock/mock_lifecycle/mock_lifecycle.go \ mkdir -p ./test/mock/mock_actpool mockgen -destination=./test/mock/mock_actpool/mock_actpool.go \ - -source=./actpool/actpool.go \ - -self_package=github.com/iotexproject/iotex-core/v2/actpool \ - -package=mock_actpool \ - ActPool + github.com/iotexproject/iotex-core/v2/actpool ActPool mkdir -p ./test/mock/mock_actioniterator mockgen -destination=./test/mock/mock_actioniterator/mock_actioniterator.go \ diff --git a/test/mock/mock_actpool/mock_actpool.go b/test/mock/mock_actpool/mock_actpool.go index 015dbf911b..02c59975fd 100644 --- a/test/mock/mock_actpool/mock_actpool.go +++ b/test/mock/mock_actpool/mock_actpool.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: ./actpool/actpool.go +// Source: github.com/iotexproject/iotex-core/v2/actpool (interfaces: ActPool) // Package mock_actpool is a generated GoMock package. package mock_actpool @@ -12,6 +12,7 @@ import ( hash "github.com/iotexproject/go-pkgs/hash" address "github.com/iotexproject/iotex-address/address" action "github.com/iotexproject/iotex-core/v2/action" + actpool "github.com/iotexproject/iotex-core/v2/actpool" block "github.com/iotexproject/iotex-core/v2/blockchain/block" ) @@ -39,17 +40,17 @@ func (m *MockActPool) EXPECT() *MockActPoolMockRecorder { } // Add mocks base method. -func (m *MockActPool) Add(ctx context.Context, act *action.SealedEnvelope) error { +func (m *MockActPool) Add(arg0 context.Context, arg1 *action.SealedEnvelope) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Add", ctx, act) + ret := m.ctrl.Call(m, "Add", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // Add indicates an expected call of Add. -func (mr *MockActPoolMockRecorder) Add(ctx, act interface{}) *gomock.Call { +func (mr *MockActPoolMockRecorder) Add(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockActPool)(nil).Add), ctx, act) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Add", reflect.TypeOf((*MockActPool)(nil).Add), arg0, arg1) } // AddActionEnvelopeValidators mocks base method. @@ -68,6 +69,18 @@ func (mr *MockActPoolMockRecorder) AddActionEnvelopeValidators(arg0 ...interface return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddActionEnvelopeValidators", reflect.TypeOf((*MockActPool)(nil).AddActionEnvelopeValidators), arg0...) } +// AddSubscriber mocks base method. +func (m *MockActPool) AddSubscriber(arg0 actpool.Subscriber) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "AddSubscriber", arg0) +} + +// AddSubscriber indicates an expected call of AddSubscriber. +func (mr *MockActPoolMockRecorder) AddSubscriber(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSubscriber", reflect.TypeOf((*MockActPool)(nil).AddSubscriber), arg0) +} + // DeleteAction mocks base method. func (m *MockActPool) DeleteAction(arg0 address.Address) { m.ctrl.T.Helper() @@ -81,18 +94,18 @@ func (mr *MockActPoolMockRecorder) DeleteAction(arg0 interface{}) *gomock.Call { } // GetActionByHash mocks base method. -func (m *MockActPool) GetActionByHash(hash hash.Hash256) (*action.SealedEnvelope, error) { +func (m *MockActPool) GetActionByHash(arg0 hash.Hash256) (*action.SealedEnvelope, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetActionByHash", hash) + ret := m.ctrl.Call(m, "GetActionByHash", arg0) ret0, _ := ret[0].(*action.SealedEnvelope) ret1, _ := ret[1].(error) return ret0, ret1 } // GetActionByHash indicates an expected call of GetActionByHash. -func (mr *MockActPoolMockRecorder) GetActionByHash(hash interface{}) *gomock.Call { +func (mr *MockActPoolMockRecorder) GetActionByHash(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActionByHash", reflect.TypeOf((*MockActPool)(nil).GetActionByHash), hash) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActionByHash", reflect.TypeOf((*MockActPool)(nil).GetActionByHash), arg0) } // GetCapacity mocks base method. @@ -138,18 +151,18 @@ func (mr *MockActPoolMockRecorder) GetGasSize() *gomock.Call { } // GetPendingNonce mocks base method. -func (m *MockActPool) GetPendingNonce(addr string) (uint64, error) { +func (m *MockActPool) GetPendingNonce(arg0 string) (uint64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetPendingNonce", addr) + ret := m.ctrl.Call(m, "GetPendingNonce", arg0) ret0, _ := ret[0].(uint64) ret1, _ := ret[1].(error) return ret0, ret1 } // GetPendingNonce indicates an expected call of GetPendingNonce. -func (mr *MockActPoolMockRecorder) GetPendingNonce(addr interface{}) *gomock.Call { +func (mr *MockActPoolMockRecorder) GetPendingNonce(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPendingNonce", reflect.TypeOf((*MockActPool)(nil).GetPendingNonce), addr) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPendingNonce", reflect.TypeOf((*MockActPool)(nil).GetPendingNonce), arg0) } // GetSize mocks base method. @@ -167,17 +180,17 @@ func (mr *MockActPoolMockRecorder) GetSize() *gomock.Call { } // GetUnconfirmedActs mocks base method. -func (m *MockActPool) GetUnconfirmedActs(addr string) []*action.SealedEnvelope { +func (m *MockActPool) GetUnconfirmedActs(arg0 string) []*action.SealedEnvelope { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetUnconfirmedActs", addr) + ret := m.ctrl.Call(m, "GetUnconfirmedActs", arg0) ret0, _ := ret[0].([]*action.SealedEnvelope) return ret0 } // GetUnconfirmedActs indicates an expected call of GetUnconfirmedActs. -func (mr *MockActPoolMockRecorder) GetUnconfirmedActs(addr interface{}) *gomock.Call { +func (mr *MockActPoolMockRecorder) GetUnconfirmedActs(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnconfirmedActs", reflect.TypeOf((*MockActPool)(nil).GetUnconfirmedActs), addr) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnconfirmedActs", reflect.TypeOf((*MockActPool)(nil).GetUnconfirmedActs), arg0) } // PendingActionMap mocks base method. @@ -261,50 +274,3 @@ func (mr *MockActPoolMockRecorder) Validate(arg0, arg1 interface{}) *gomock.Call mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Validate", reflect.TypeOf((*MockActPool)(nil).Validate), arg0, arg1) } - -// MockSubscriber is a mock of Subscriber interface. -type MockSubscriber struct { - ctrl *gomock.Controller - recorder *MockSubscriberMockRecorder -} - -// MockSubscriberMockRecorder is the mock recorder for MockSubscriber. -type MockSubscriberMockRecorder struct { - mock *MockSubscriber -} - -// NewMockSubscriber creates a new mock instance. -func NewMockSubscriber(ctrl *gomock.Controller) *MockSubscriber { - mock := &MockSubscriber{ctrl: ctrl} - mock.recorder = &MockSubscriberMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockSubscriber) EXPECT() *MockSubscriberMockRecorder { - return m.recorder -} - -// OnAdded mocks base method. -func (m *MockSubscriber) OnAdded(arg0 *action.SealedEnvelope) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "OnAdded", arg0) -} - -// OnAdded indicates an expected call of OnAdded. -func (mr *MockSubscriberMockRecorder) OnAdded(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnAdded", reflect.TypeOf((*MockSubscriber)(nil).OnAdded), arg0) -} - -// OnRemoved mocks base method. -func (m *MockSubscriber) OnRemoved(arg0 *action.SealedEnvelope) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "OnRemoved", arg0) -} - -// OnRemoved indicates an expected call of OnRemoved. -func (mr *MockSubscriberMockRecorder) OnRemoved(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnRemoved", reflect.TypeOf((*MockSubscriber)(nil).OnRemoved), arg0) -} diff --git a/tools/actioninjector.v2/internal/client/client_test.go b/tools/actioninjector.v2/internal/client/client_test.go index 6a499a540b..28eeab7697 100644 --- a/tools/actioninjector.v2/internal/client/client_test.go +++ b/tools/actioninjector.v2/internal/client/client_test.go @@ -76,6 +76,7 @@ func TestClient(t *testing.T) { bc.EXPECT().BlockHeaderByHeight(gomock.Any()).Return(&blh, nil).AnyTimes() ap.EXPECT().GetPendingNonce(gomock.Any()).Return(uint64(1), nil).AnyTimes() ap.EXPECT().Add(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + ap.EXPECT().AddSubscriber(gomock.Any()).AnyTimes() newOption := api.WithBroadcastOutbound(func(_ context.Context, _ uint32, _ proto.Message) error { return nil })