Skip to content

Commit

Permalink
feat: add more fields to eventbus and subscription (#316)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenfengwang authored Nov 25, 2022
1 parent f14ef91 commit 5fdb2bb
Show file tree
Hide file tree
Showing 14 changed files with 1,053 additions and 845 deletions.
13 changes: 7 additions & 6 deletions internal/controller/eventbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,13 @@ func (ctrl *controller) CreateEventBus(ctx context.Context,
return nil, err
}
eb := &metadata.Eventbus{
ID: id,
Name: req.Name,
LogNumber: int(logNum),
EventLogs: make([]*metadata.Eventlog, int(logNum)),
ID: id,
Name: req.Name,
LogNumber: int(logNum),
EventLogs: make([]*metadata.Eventlog, int(logNum)),
Description: req.Description,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
exist, err := ctrl.kvStore.Exists(ctx, metadata.GetEventbusMetadataKey(eb.Name))
if err != nil {
Expand Down Expand Up @@ -204,8 +207,6 @@ func (ctrl *controller) getEventbus(name string) (*metapb.EventBus, error) {
}

ebMD := metadata.Convert2ProtoEventBus(_eb)[0]
ebMD.Name = _eb.Name
ebMD.Logs = metadata.Convert2ProtoEventLog(_eb.EventLogs...)
addrs := make([]string, 0)
for _, v := range ctrl.cfg.Topology {
addrs = append(addrs, v)
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/eventbus/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestController_CreateEventBus(t *testing.T) {
})
So(err, ShouldBeNil)
So(res.Name, ShouldEqual, "test-1")
So(res.Id, ShouldEqual, el.EventbusID.Uint64())
So(res.Id, ShouldNotEqual, 0)
So(res.Logs, ShouldHaveLength, 1)
So(res.LogNumber, ShouldEqual, 1)
So(res.Logs[0].EventBusName, ShouldEqual, "test-1")
Expand Down
21 changes: 15 additions & 6 deletions internal/controller/eventbus/metadata/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,34 @@ package metadata

import (
"encoding/json"
"time"

"github.com/linkall-labs/vanus/internal/primitive/vanus"
"github.com/linkall-labs/vanus/proto/pkg/meta"
)

type Eventbus struct {
ID vanus.ID `json:"id"`
Name string `json:"name"`
LogNumber int `json:"log_number"`
EventLogs []*Eventlog `json:"event_logs"`
ID vanus.ID `json:"id"`
Name string `json:"name"`
LogNumber int `json:"log_number"`
EventLogs []*Eventlog `json:"event_logs"`
Description string `json:"description"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}

func Convert2ProtoEventBus(ins ...*Eventbus) []*meta.EventBus {
pebs := make([]*meta.EventBus, len(ins))
for idx := 0; idx < len(ins); idx++ {
eb := ins[idx]
pebs[idx] = &meta.EventBus{
Name: eb.Name,
LogNumber: int32(eb.LogNumber),
Name: eb.Name,
LogNumber: int32(eb.LogNumber),
Logs: Convert2ProtoEventLog(eb.EventLogs...),
Id: eb.ID.Uint64(),
Description: eb.Description,
CreatedAt: eb.CreatedAt.UnixMilli(),
UpdatedAt: eb.UpdatedAt.UnixMilli(),
}
}
return pebs
Expand Down
3 changes: 3 additions & 0 deletions internal/controller/trigger/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ func (ctrl *controller) CreateSubscription(ctx context.Context,
}
sub := convert.FromPbSubscriptionRequest(request.Subscription)
sub.ID, err = vanus.NewID()
sub.CreatedAt = time.Now()
sub.UpdatedAt = time.Now()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -186,6 +188,7 @@ func (ctrl *controller) UpdateSubscription(ctx context.Context,
if !change {
return nil, errors.ErrInvalidRequest.WithMessage("no change")
}
sub.UpdatedAt = time.Now()
sub.Phase = metadata.SubscriptionPhasePending
if err := ctrl.subscriptionManager.UpdateSubscription(ctx, sub); err != nil {
return nil, err
Expand Down
13 changes: 13 additions & 0 deletions internal/controller/trigger/metadata/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ type Subscription struct {
ProtocolSetting *primitive.ProtocolSetting `json:"protocol_settings,omitempty"`
EventBus string `json:"eventbus"`
Transformer *primitive.Transformer `json:"transformer,omitempty"`
Name string `json:"name"`
Disable bool `json:"disable"`
Description string `json:"description"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`

// not from api
Phase SubscriptionPhase `json:"phase"`
Expand All @@ -89,6 +94,14 @@ func (s *Subscription) Update(update *Subscription) bool {
change = true
s.Source = update.Source
}
if s.Description != update.Description {
change = true
s.Description = update.Description
}
if s.Disable != update.Disable {
change = true
s.Disable = update.Disable
}
if !reflect.DeepEqual(s.Types, update.Types) {
change = true
s.Types = update.Types
Expand Down
8 changes: 8 additions & 0 deletions internal/convert/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func FromPbSubscriptionRequest(sub *ctrl.SubscriptionRequest) *metadata.Subscrip
Filters: fromPbFilters(sub.Filters),
Transformer: fromPbTransformer(sub.Transformer),
EventBus: sub.EventBus,
Name: sub.Name,
Disable: sub.Disable,
Description: sub.Description,
}
return to
}
Expand Down Expand Up @@ -282,6 +285,11 @@ func ToPbSubscription(sub *metadata.Subscription, offsets info.ListOffsetInfo) *
Filters: toPbFilters(sub.Filters),
Transformer: ToPbTransformer(sub.Transformer),
Offsets: ToPbOffsetInfos(offsets),
Name: sub.Name,
Disable: sub.Disable,
Description: sub.Description,
CreatedAt: sub.CreatedAt.UnixMilli(),
UpdatedAt: sub.UpdatedAt.UnixMilli(),
}
return to
}
Expand Down
8 changes: 8 additions & 0 deletions internal/primitive/vanus/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package vanus

import (
"context"
"errors"
"fmt"
"strconv"
"sync"
Expand Down Expand Up @@ -222,7 +223,14 @@ func NewIDFromUint64(id uint64) ID {
return ID(id)
}

var (
ErrEmptyID = errors.New("id: empty")
)

func NewIDFromString(id string) (ID, error) {
if id == "" {
return emptyID, ErrEmptyID
}
i, err := strconv.ParseUint(id, base, bitSize)
if err != nil {
return emptyID, err
Expand Down
Loading

0 comments on commit 5fdb2bb

Please sign in to comment.