Skip to content

Commit

Permalink
Merge branch 'main' of github.com:vanus-labs/vanus into client
Browse files Browse the repository at this point in the history
Signed-off-by: jyjiangkai <[email protected]>
  • Loading branch information
hwjiangkai committed Mar 26, 2023
2 parents b777d52 + 9c695ff commit 23dcb26
Show file tree
Hide file tree
Showing 86 changed files with 9,951 additions and 692 deletions.
2 changes: 1 addition & 1 deletion client/internal/eventlog/name_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func toSegment(segment *metapb.Segment) *record.Segment {
StartOffset: segment.GetStartOffsetInLog(),
EndOffset: segment.GetEndOffsetInLog(),
FirstEventBornAt: time.UnixMilli(segment.FirstEventBornAtByUnixMs),
LastEventBornAt: time.UnixMilli(segment.LastEvnetBornAtByUnixMs),
LastEventBornAt: time.UnixMilli(segment.LastEventBornAtByUnixMs),
Writable: segment.State == "working", // TODO: writable
Blocks: blocks,
LeaderBlockID: segment.GetLeaderBlockId(),
Expand Down
7 changes: 4 additions & 3 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func main() {
os.Exit(-1)
}

namespaceCtrlStv := tenant.NewController(cfg.GetTenantConfig(), mem)
tenantCtrlStv := tenant.NewController(cfg.GetTenantConfig(), mem)
segmentCtrl := eventbus.NewController(cfg.GetEventbusCtrlConfig(), mem)
triggerCtrlStv := trigger.NewController(cfg.GetTriggerConfig(), mem)

Expand Down Expand Up @@ -121,7 +121,8 @@ func main() {
}

ctrlpb.RegisterPingServerServer(grpcServer, segmentCtrl)
ctrlpb.RegisterNamespaceControllerServer(grpcServer, namespaceCtrlStv)
ctrlpb.RegisterNamespaceControllerServer(grpcServer, tenantCtrlStv)
ctrlpb.RegisterAuthControllerServer(grpcServer, tenantCtrlStv)
ctrlpb.RegisterEventbusControllerServer(grpcServer, segmentCtrl)
ctrlpb.RegisterEventlogControllerServer(grpcServer, segmentCtrl)
ctrlpb.RegisterSegmentControllerServer(grpcServer, segmentCtrl)
Expand All @@ -139,7 +140,7 @@ func main() {
wg.Done()
}()

if err = namespaceCtrlStv.Start(); err != nil {
if err = tenantCtrlStv.Start(); err != nil {
log.Error(ctx, "start namespace controller fail", map[string]interface{}{
log.KeyError: err,
})
Expand Down
17 changes: 11 additions & 6 deletions internal/controller/eventbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/vanus-labs/vanus/internal/controller/eventbus/eventlog"
"github.com/vanus-labs/vanus/internal/controller/eventbus/metadata"
"github.com/vanus-labs/vanus/internal/controller/eventbus/server"
Expand All @@ -42,11 +48,6 @@ import (
"github.com/vanus-labs/vanus/pkg/util"
ctrlpb "github.com/vanus-labs/vanus/proto/pkg/controller"
metapb "github.com/vanus-labs/vanus/proto/pkg/meta"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

var (
Expand Down Expand Up @@ -332,6 +333,7 @@ func (ctrl *controller) deleteEventbus(ctx context.Context, id vanus.ID) error {
if !exist {
return errors.ErrResourceNotFound.WithMessage("the eventbus doesn't exist")
}
// todo user can't delete system eventbus, but timer need to delete system eventbus
err := ctrl.kvStore.Delete(ctx, metadata.GetEventbusMetadataKey(id))
if err != nil {
return errors.ErrInternal.WithMessage("delete eventbus metadata in kv failed").Wrap(err)
Expand Down Expand Up @@ -376,13 +378,16 @@ func (ctrl *controller) getEventbus(id vanus.ID) (*metapb.Eventbus, error) {
}

func (ctrl *controller) ListEventbus(ctx context.Context,
_ *ctrlpb.ListEventbusRequest,
req *ctrlpb.ListEventbusRequest,
) (*ctrlpb.ListEventbusResponse, error) {
eventbusList := make([]*metapb.Eventbus, 0)
for _, v := range ctrl.eventbusMap {
if strings.HasPrefix(v.Name, primitive.SystemEventbusNamePrefix) {
continue
}
if req.NamespaceId != 0 && req.NamespaceId != v.NamespaceID {
continue
}
ebMD := metadata.Convert2ProtoEventbus(v)[0]
eventbusList = append(eventbusList, ebMD)
}
Expand Down
54 changes: 34 additions & 20 deletions internal/controller/eventbus/eventlog/eventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,27 +603,17 @@ func (mgr *eventlogManager) checkSegmentExpired(ctx context.Context) {
executionID := uuid.NewString()
mgr.eventlogMap.Range(func(key, value interface{}) bool {
elog, _ := value.(*eventlog)
head := elog.head()
checkCtx := context.Background()
for head != nil {
for head, next := elog.headAndNext(); head != nil; head, next = elog.headAndNext() {
switch {
case head.LastEventBornTime.Second() == 0:
// TODO(wenfeng.wang) fix if set
head.LastEventBornTime = time.Now().Add(mgr.segmentExpiredTime)
elog.lock()
if err := elog.updateSegment(checkCtx, head); err != nil {
log.Warning(ctx, "update segment's metadata failed", map[string]interface{}{
log.KeyError: err,
"segment": head.String(),
"eventlog": elog.md.ID.String(),
})
head.LastEventBornTime = time.Time{}
}
elog.unlock()
case !head.isFull() || next == nil:
return true
case !head.isFull():
return true
case time.Since(head.LastEventBornTime.Add(mgr.segmentExpiredTime)) > 0:
case next.StartOffsetInLog == 0:
// StartOffsetInLog must be set when mark previous segment full.
panic("next segment has not StartOffsetInLog") // unreachable
case head.LastEventBornTime.IsZero():
// LastEventBornTime must be set when mark the segment full.
panic("full segment has not LastEventBornTime") // unreachable
case time.Since(head.LastEventBornTime) > mgr.segmentExpiredTime:
err := elog.deleteHead(ctx)
if err != nil {
log.Warning(ctx, "delete segment error", map[string]interface{}{
Expand Down Expand Up @@ -652,7 +642,6 @@ func (mgr *eventlogManager) checkSegmentExpired(ctx context.Context) {
default:
return true
}
head = elog.head()
}
return true
})
Expand Down Expand Up @@ -1079,6 +1068,24 @@ func (el *eventlog) head() *Segment {
return ptr.Value.(*Segment)
}

// headAndNext returns copies of head and next segment in the eventlog.
func (el *eventlog) headAndNext() (*Segment, *Segment) {
el.mutex.RLock()
defer el.mutex.RUnlock()

switch el.size() {
case 0:
return nil, nil
case 1:
head := *el.segmentList.Front().Value.(*Segment)
return &head, nil
default:
ptr := el.segmentList.Front()
head, next := *ptr.Value.(*Segment), *ptr.Next().Value.(*Segment)
return &head, &next
}
}

func (el *eventlog) tail() *Segment {
if el.size() == 0 {
return nil
Expand Down Expand Up @@ -1179,18 +1186,22 @@ func (el *eventlog) listOfPrevious(seg *Segment) []*Segment { //nolint:unused //
func (el *eventlog) deleteHead(ctx context.Context) error {
el.mutex.Lock()
defer el.mutex.Unlock()

if el.segmentList.Len() == 0 {
return nil
}

headV := el.segmentList.Front()
nextV := headV.Next()
head, _ := headV.Value.(*Segment)

segments := make([]vanus.ID, 0, len(el.segments)-1)
for _, v := range el.segments {
if v.Uint64() != head.ID.Uint64() {
segments = append(segments, v)
}
}

if err := el.kvClient.Delete(ctx, metadata.GetEventlogSegmentsMetadataKey(el.md.ID, head.ID)); err != nil {
log.Warning(ctx, "delete segment failed when delete head", map[string]interface{}{
log.KeyError: err,
Expand All @@ -1213,11 +1224,14 @@ func (el *eventlog) deleteHead(ctx context.Context) error {
return err
}
}

if el.writePtr == head {
el.writePtr = nil
}

_ = el.segmentList.RemoveFront()
el.segments = segments

return nil
}

Expand Down
Loading

0 comments on commit 23dcb26

Please sign in to comment.