From 30814f82791119231b594ab50629a786200caa04 Mon Sep 17 00:00:00 2001 From: xdlbdy Date: Tue, 28 Mar 2023 21:03:11 +0800 Subject: [PATCH] feat: trigger controller wait eventbus ready Signed-off-by: xdlbdy --- go.mod | 8 +- go.sum | 16 ++-- internal/controller/trigger/controller.go | 84 ++++++++++--------- .../controller/trigger/controller_test.go | 16 ++-- pkg/cluster/controller.go | 1 + pkg/cluster/eventbus.go | 13 ++- pkg/cluster/mock_controller.go | 15 ++++ pkg/go.mod | 8 +- pkg/go.sum | 16 ++-- pkg/util/wait.go | 20 +++++ 10 files changed, 123 insertions(+), 74 deletions(-) diff --git a/go.mod b/go.mod index 216f13c92..5490ba673 100644 --- a/go.mod +++ b/go.mod @@ -119,11 +119,11 @@ require ( go.uber.org/multierr v1.7.0 // indirect go.uber.org/zap v1.17.0 // indirect golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect - golang.org/x/net v0.4.0 // indirect - golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 // indirect + golang.org/x/net v0.8.0 // indirect + golang.org/x/oauth2 v0.6.0 // indirect golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.3.0 // indirect - golang.org/x/text v0.5.0 // indirect + golang.org/x/sys v0.6.0 // indirect + golang.org/x/text v0.8.0 // indirect google.golang.org/appengine v1.6.7 // indirect ) diff --git a/go.sum b/go.sum index 65e22ce1a..e3094f115 100644 --- a/go.sum +++ b/go.sum @@ -535,8 +535,8 @@ golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU= -golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -545,8 +545,8 @@ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= -golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk= -golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg= +golang.org/x/oauth2 v0.6.0 h1:Lh8GPgSKBfWSwFvtuWOfeI3aAAnbXTSutYxJiOJFgIw= +golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -607,8 +607,8 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= -golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -619,8 +619,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= -golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= +golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/internal/controller/trigger/controller.go b/internal/controller/trigger/controller.go index 73fd0458a..50ac7dbbf 100644 --- a/internal/controller/trigger/controller.go +++ b/internal/controller/trigger/controller.go @@ -16,7 +16,7 @@ package trigger import ( "context" - stdErr "errors" + stderr "errors" "fmt" "io" "os" @@ -24,6 +24,9 @@ import ( "sync" "time" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/emptypb" + eb "github.com/vanus-labs/vanus/client" "github.com/vanus-labs/vanus/internal/controller/member" "github.com/vanus-labs/vanus/internal/controller/trigger/metadata" @@ -42,14 +45,14 @@ import ( "github.com/vanus-labs/vanus/pkg/util" ctrlpb "github.com/vanus-labs/vanus/proto/pkg/controller" metapb "github.com/vanus-labs/vanus/proto/pkg/meta" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/protobuf/types/known/emptypb" ) var _ ctrlpb.TriggerControllerServer = &controller{} const ( defaultGcSubscriptionInterval = time.Second * 10 + waitEventbusReadyTime = time.Minute * 3 + waitEventbusCheckPeriod = time.Second * 2 ) func NewController(config Config, mem member.Member) *controller { @@ -57,7 +60,6 @@ func NewController(config Config, mem member.Member) *controller { config: config, member: mem, needCleanSubscription: map[vanus.ID]string{}, - state: primitive.ServerStateCreated, cl: cluster.NewClusterController(config.ControllerAddr, insecure.NewCredentials()), ebClient: eb.Connect(config.ControllerAddr), } @@ -79,7 +81,6 @@ type controller struct { isLeader bool ctx context.Context stopFunc context.CancelFunc - state primitive.ServerState cl cluster.Cluster ebClient eb.Client } @@ -87,9 +88,6 @@ type controller struct { func (ctrl *controller) SetDeadLetterEventOffset( ctx context.Context, request *ctrlpb.SetDeadLetterEventOffsetRequest, ) (*emptypb.Empty, error) { - if ctrl.state != primitive.ServerStateRunning { - return nil, errors.ErrServerNotStart - } subID := vanus.ID(request.SubscriptionId) err := ctrl.subscriptionManager.SaveDeadLetterOffset(ctx, subID, request.GetOffset()) if err != nil { @@ -101,9 +99,6 @@ func (ctrl *controller) SetDeadLetterEventOffset( func (ctrl *controller) GetDeadLetterEventOffset( ctx context.Context, request *ctrlpb.GetDeadLetterEventOffsetRequest, ) (*ctrlpb.GetDeadLetterEventOffsetResponse, error) { - if ctrl.state != primitive.ServerStateRunning { - return nil, errors.ErrServerNotStart - } subID := vanus.ID(request.SubscriptionId) offset, err := ctrl.subscriptionManager.GetDeadLetterOffset(ctx, subID) if err != nil { @@ -115,9 +110,6 @@ func (ctrl *controller) GetDeadLetterEventOffset( func (ctrl *controller) CommitOffset( ctx context.Context, request *ctrlpb.CommitOffsetRequest, ) (*ctrlpb.CommitOffsetResponse, error) { - if ctrl.state != primitive.ServerStateRunning { - return nil, errors.ErrServerNotStart - } resp := new(ctrlpb.CommitOffsetResponse) for _, subInfo := range request.SubscriptionInfo { if len(subInfo.Offsets) == 0 { @@ -139,9 +131,6 @@ func (ctrl *controller) CommitOffset( func (ctrl *controller) ResetOffsetToTimestamp( ctx context.Context, request *ctrlpb.ResetOffsetToTimestampRequest, ) (*ctrlpb.ResetOffsetToTimestampResponse, error) { - if ctrl.state != primitive.ServerStateRunning { - return nil, errors.ErrServerNotStart - } if request.Timestamp == 0 { return nil, errors.ErrInvalidRequest.WithMessage("timestamp is invalid") } @@ -166,15 +155,20 @@ func (ctrl *controller) ResetOffsetToTimestamp( func (ctrl *controller) CreateSubscription( ctx context.Context, request *ctrlpb.CreateSubscriptionRequest, ) (*metapb.Subscription, error) { - if ctrl.state != primitive.ServerStateRunning { - return nil, errors.ErrServerNotStart - } err := validation.ValidateSubscriptionRequest(ctx, request.Subscription) if err != nil { log.Info(ctx).Err(err).Msg("create subscription validate fail") return nil, err } sub := convert.FromPbSubscriptionRequest(request.Subscription) + _, err = ctrl.cl.NamespaceService().GetNamespace(ctx, sub.NamespaceID.Uint64()) + if err != nil { + return nil, err + } + _, err = ctrl.cl.EventbusService().GetEventbus(ctx, sub.EventbusID.Uint64()) + if err != nil { + return nil, err + } sub.ID, err = vanus.NewID() sub.CreatedAt = time.Now() sub.UpdatedAt = time.Now() @@ -200,9 +194,6 @@ func (ctrl *controller) CreateSubscription( func (ctrl *controller) UpdateSubscription( ctx context.Context, request *ctrlpb.UpdateSubscriptionRequest, ) (*metapb.Subscription, error) { - if ctrl.state != primitive.ServerStateRunning { - return nil, errors.ErrServerNotStart - } subID := vanus.ID(request.Id) sub := ctrl.subscriptionManager.GetSubscription(ctx, subID) if sub == nil { @@ -243,9 +234,6 @@ func (ctrl *controller) UpdateSubscription( func (ctrl *controller) DeleteSubscription( ctx context.Context, request *ctrlpb.DeleteSubscriptionRequest, ) (*emptypb.Empty, error) { - if ctrl.state != primitive.ServerStateRunning { - return nil, errors.ErrServerNotStart - } subID := vanus.ID(request.Id) sub := ctrl.subscriptionManager.GetSubscription(ctx, subID) if sub != nil { @@ -269,9 +257,6 @@ func (ctrl *controller) DeleteSubscription( func (ctrl *controller) DisableSubscription( ctx context.Context, request *ctrlpb.DisableSubscriptionRequest, ) (*emptypb.Empty, error) { - if ctrl.state != primitive.ServerStateRunning { - return nil, errors.ErrServerNotStart - } subID := vanus.ID(request.Id) sub := ctrl.subscriptionManager.GetSubscription(ctx, subID) if sub == nil { @@ -302,9 +287,6 @@ func (ctrl *controller) DisableSubscription( func (ctrl *controller) ResumeSubscription( ctx context.Context, request *ctrlpb.ResumeSubscriptionRequest, ) (*emptypb.Empty, error) { - if ctrl.state != primitive.ServerStateRunning { - return nil, errors.ErrServerNotStart - } subID := vanus.ID(request.Id) sub := ctrl.subscriptionManager.GetSubscription(ctx, subID) if sub == nil { @@ -326,9 +308,6 @@ func (ctrl *controller) ResumeSubscription( func (ctrl *controller) GetSubscription( ctx context.Context, request *ctrlpb.GetSubscriptionRequest, ) (*metapb.Subscription, error) { - if ctrl.state != primitive.ServerStateRunning { - return nil, errors.ErrServerNotStart - } sub := ctrl.subscriptionManager.GetSubscription(ctx, vanus.ID(request.Id)) if sub == nil { return nil, errors.ErrResourceNotFound.WithMessage("subscription not exist") @@ -355,7 +334,7 @@ func (ctrl *controller) TriggerWorkerHeartbeat( } req, err := heartbeat.Recv() if err != nil { - if !stdErr.Is(err, io.EOF) { + if !stderr.Is(err, io.EOF) { log.Warn(ctx).Err(err).Msg("heartbeat recv error") } log.Info(ctx).Msg("heartbeat close") @@ -562,7 +541,6 @@ func (ctrl *controller) membershipChangedProcessor( ctrl.subscriptionManager.Start() ctrl.scheduler.Run() go ctrl.gcSubscriptions(ctx) - ctrl.state = primitive.ServerStateRunning ctrl.isLeader = true case member.EventBecomeFollower: if !ctrl.isLeader { @@ -579,13 +557,11 @@ func (ctrl *controller) membershipChangedProcessor( func (ctrl *controller) stop(_ context.Context) error { ctrl.member.ResignIfLeader() - ctrl.state = primitive.ServerStateStopping ctrl.stopFunc() ctrl.scheduler.Stop() ctrl.workerManager.Stop() ctrl.subscriptionManager.Stop() ctrl.storage.Close() - ctrl.state = primitive.ServerStateStopped return nil } @@ -621,12 +597,40 @@ func (ctrl *controller) initTriggerSystemEventbus() { go func() { ctx := context.Background() log.Info(ctx).Msg("trigger controller is ready to check system eventbus") + if err := ctrl.cl.WaitForControllerReady(false); err != nil { + log.Error().Err(err).Msg("trigger controller check system eventbus, " + + "but Vanus cluster hasn't ready, exit") + os.Exit(-1) + } + ready := util.WaitReady(func() bool { + exist, err := ctrl.cl.EventbusService().IsSystemEventbusExistByName(ctx, primitive.TimerEventbusName) + if err != nil { + log.Error().Err(err).Msg("check TimerEventbus exist has error") + return false + } + return exist + }, waitEventbusReadyTime, waitEventbusCheckPeriod) + if !ready { + log.Error().Msg("check TimerEventbus timeout no exist, will exist") + os.Exit(-1) + } + + // wait TimerEventbus + exist, err := ctrl.cl.EventbusService().IsSystemEventbusExistByName(ctx, primitive.RetryEventbusName) + if err != nil { + log.Error().Err(err).Msg("failed to check RetryEventbus exist, exit") + os.Exit(-1) + } + if exist { + log.Info().Msg("trigger controller check RetryEventbus exist") + return + } + log.Info().Msg("trigger controller check RetryEventbus no exist, will create") if err := ctrl.cl.WaitForControllerReady(true); err != nil { log.Error(ctx).Err(err). Msg("trigger controller try to create system eventbus, but Vanus cluster hasn't ready, exit") os.Exit(-1) } - if _, err := ctrl.cl.EventbusService().CreateSystemEventbusIfNotExist(ctx, primitive.RetryEventbusName, "System Eventbus For Trigger Service"); err != nil { log.Error(ctx).Err(err).Msg("failed to create RetryEventbus, exit") diff --git a/internal/controller/trigger/controller_test.go b/internal/controller/trigger/controller_test.go index 179296c45..bee225e98 100644 --- a/internal/controller/trigger/controller_test.go +++ b/internal/controller/trigger/controller_test.go @@ -24,6 +24,7 @@ import ( "github.com/golang/mock/gomock" . "github.com/smartystreets/goconvey/convey" + "github.com/vanus-labs/vanus/pkg/cluster" ctrlpb "github.com/vanus-labs/vanus/proto/pkg/controller" metapb "github.com/vanus-labs/vanus/proto/pkg/meta" @@ -47,7 +48,6 @@ func TestController_CommitOffset(t *testing.T) { ctrl.subscriptionManager = subManager subID := vanus.NewTestID() - ctrl.state = primitive.ServerStateRunning request := &ctrlpb.CommitOffsetRequest{ ForceCommit: true, SubscriptionInfo: []*metapb.SubscriptionInfo{{ @@ -87,7 +87,6 @@ func TestController_ResetOffsetToTimestamp(t *testing.T) { ctrl.subscriptionManager = subManager subID := vanus.NewTestID() - ctrl.state = primitive.ServerStateRunning Convey("reset offset subscription not exist", func() { subManager.EXPECT().GetSubscription(gomock.Any(), gomock.Eq(subID)).AnyTimes().Return(nil) _, err := ctrl.ResetOffsetToTimestamp(ctx, &ctrlpb.ResetOffsetToTimestampRequest{ @@ -123,11 +122,17 @@ func TestController_CreateSubscription(t *testing.T) { subManager := subscription.NewMockManager(mockCtrl) ctrl.subscriptionManager = subManager ctrl.scheduler = worker.NewSubscriptionScheduler(ctrl.workerManager, ctrl.subscriptionManager) - - ctrl.state = primitive.ServerStateRunning + mockCluster := cluster.NewMockCluster(mockCtrl) + ctrl.cl = mockCluster Convey("create subscription", func() { subManager.EXPECT().GetSubscriptionByName(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) subManager.EXPECT().AddSubscription(gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + mockNsSvc := cluster.NewMockNamespaceService(mockCtrl) + mockEbSvc := cluster.NewMockEventbusService(mockCtrl) + mockCluster.EXPECT().NamespaceService().AnyTimes().Return(mockNsSvc) + mockCluster.EXPECT().EventbusService().AnyTimes().Return(mockEbSvc) + mockNsSvc.EXPECT().GetNamespace(gomock.Any(), gomock.Any()).AnyTimes().Return(nil, nil) + mockEbSvc.EXPECT().GetEventbus(gomock.Any(), gomock.Any()).AnyTimes().Return(nil, nil) create := &ctrlpb.CreateSubscriptionRequest{ Subscription: &ctrlpb.SubscriptionRequest{ NamespaceId: vanus.NewTestID().Uint64(), @@ -168,7 +173,6 @@ func TestController_UpdateSubscription(t *testing.T) { subID := vanus.NewTestID() eventbusID := vanus.NewTestID() namespaceID := vanus.NewTestID() - ctrl.state = primitive.ServerStateRunning Convey("update subscription not exist", func() { subManager.EXPECT().GetSubscription(gomock.Any(), gomock.Eq(subID)).Return(nil) request := &ctrlpb.UpdateSubscriptionRequest{ @@ -387,7 +391,6 @@ func TestController_DeleteSubscription(t *testing.T) { request := &ctrlpb.DeleteSubscriptionRequest{ Id: subID.Uint64(), } - ctrl.state = primitive.ServerStateRunning Convey("delete subscription no exist", func() { subManager.EXPECT().GetSubscription(gomock.Any(), gomock.Eq(subID)).Return(nil) _, err := ctrl.DeleteSubscription(ctx, request) @@ -447,7 +450,6 @@ func TestController_GetSubscription(t *testing.T) { request := &ctrlpb.GetSubscriptionRequest{ Id: subID.Uint64(), } - ctrl.state = primitive.ServerStateRunning Convey("get subscription no exist", func() { subManager.EXPECT().GetSubscription(gomock.Any(), gomock.Eq(subID)).Return(nil) _, err := ctrl.GetSubscription(ctx, request) diff --git a/pkg/cluster/controller.go b/pkg/cluster/controller.go index d4e7bc8fc..872808b2b 100644 --- a/pkg/cluster/controller.go +++ b/pkg/cluster/controller.go @@ -63,6 +63,7 @@ type NamespaceService interface { } type EventbusService interface { + IsSystemEventbusExistByName(ctx context.Context, name string) (bool, error) CreateSystemEventbusIfNotExist(ctx context.Context, name string, desc string) (*metapb.Eventbus, error) Delete(ctx context.Context, id uint64) error GetSystemEventbusByName(ctx context.Context, name string) (*metapb.Eventbus, error) diff --git a/pkg/cluster/eventbus.go b/pkg/cluster/eventbus.go index cdb912f06..8a72b0295 100644 --- a/pkg/cluster/eventbus.go +++ b/pkg/cluster/eventbus.go @@ -6,11 +6,12 @@ import ( "strings" "sync" + "google.golang.org/protobuf/types/known/wrapperspb" + "github.com/vanus-labs/vanus/pkg/cluster/raw_client" "github.com/vanus-labs/vanus/pkg/errors" ctrlpb "github.com/vanus-labs/vanus/proto/pkg/controller" "github.com/vanus-labs/vanus/proto/pkg/meta" - "google.golang.org/protobuf/types/known/wrapperspb" ) var ( @@ -75,7 +76,13 @@ func (es *eventbusService) GetEventbus(ctx context.Context, id uint64) (*meta.Ev func (es *eventbusService) IsSystemEventbusExistByName(ctx context.Context, name string) (bool, error) { ebPb, err := es.GetSystemEventbusByName(ctx, name) - return ebPb != nil, err + if err != nil { + if errors.Is(err, errors.ErrResourceNotFound) { + return false, nil + } + return false, err + } + return ebPb != nil, nil } func (es *eventbusService) CreateSystemEventbusIfNotExist(ctx context.Context, name string, desc string) (*meta.Eventbus, error) { @@ -83,7 +90,7 @@ func (es *eventbusService) CreateSystemEventbusIfNotExist(ctx context.Context, n return nil, errors.New("invalid system eventbus name") } exist, err := es.IsSystemEventbusExistByName(ctx, name) - if err != nil && !errors.Is(err, errors.ErrResourceNotFound) { + if err != nil { return nil, err } diff --git a/pkg/cluster/mock_controller.go b/pkg/cluster/mock_controller.go index 0b050a0a5..961bc43d0 100644 --- a/pkg/cluster/mock_controller.go +++ b/pkg/cluster/mock_controller.go @@ -371,6 +371,21 @@ func (mr *MockEventbusServiceMockRecorder) GetSystemEventbusByName(ctx, name int return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSystemEventbusByName", reflect.TypeOf((*MockEventbusService)(nil).GetSystemEventbusByName), ctx, name) } +// IsSystemEventbusExistByName mocks base method. +func (m *MockEventbusService) IsSystemEventbusExistByName(ctx context.Context, name string) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsSystemEventbusExistByName", ctx, name) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsSystemEventbusExistByName indicates an expected call of IsSystemEventbusExistByName. +func (mr *MockEventbusServiceMockRecorder) IsSystemEventbusExistByName(ctx, name interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSystemEventbusExistByName", reflect.TypeOf((*MockEventbusService)(nil).IsSystemEventbusExistByName), ctx, name) +} + // RawClient mocks base method. func (m *MockEventbusService) RawClient() controller.EventbusControllerClient { m.ctrl.T.Helper() diff --git a/pkg/go.mod b/pkg/go.mod index e006cb09e..233d9df71 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -8,7 +8,7 @@ require ( github.com/smartystreets/goconvey v1.7.2 github.com/vanus-labs/vanus/observability v0.5.7 github.com/vanus-labs/vanus/proto v0.5.7 - golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 + golang.org/x/oauth2 v0.6.0 google.golang.org/grpc v1.51.0 google.golang.org/protobuf v1.28.1 ) @@ -21,9 +21,9 @@ require ( github.com/mattn/go-isatty v0.0.14 // indirect github.com/rs/zerolog v1.29.0 // indirect github.com/smartystreets/assertions v1.2.0 // indirect - golang.org/x/net v0.4.0 // indirect - golang.org/x/sys v0.3.0 // indirect - golang.org/x/text v0.5.0 // indirect + golang.org/x/net v0.8.0 // indirect + golang.org/x/sys v0.6.0 // indirect + golang.org/x/text v0.8.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd // indirect ) diff --git a/pkg/go.sum b/pkg/go.sum index 55e27f162..ab39db777 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -34,10 +34,10 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU= -golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= -golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk= -golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783/go.mod h1:h4gKUeWbJ4rQPri7E0u6Gs4e9Ri2zaLxzw5DI5XGrYg= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/oauth2 v0.6.0 h1:Lh8GPgSKBfWSwFvtuWOfeI3aAAnbXTSutYxJiOJFgIw= +golang.org/x/oauth2 v0.6.0/go.mod h1:ycmewcwgD4Rpr3eZJLSB4Kyyljb3qDh40vJ8STE5HKw= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -47,14 +47,14 @@ golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= -golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= -golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= +golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= diff --git a/pkg/util/wait.go b/pkg/util/wait.go index b83b77520..b22a6c7ef 100644 --- a/pkg/util/wait.go +++ b/pkg/util/wait.go @@ -65,3 +65,23 @@ func UntilWithContext(ctx context.Context, f func(context.Context), period time. } } } + +func WaitReady(f func() bool, timeout, period time.Duration) bool { + tk := time.NewTicker(period) + t := time.NewTicker(timeout) + defer func() { + tk.Stop() + t.Stop() + }() + for { + select { + case <-t.C: + return false + case <-tk.C: + b := f() + if b { + return true + } + } + } +}