Skip to content

Commit

Permalink
refactor: add eventbus state check
Browse files Browse the repository at this point in the history
Signed-off-by: jyjiangkai <[email protected]>
  • Loading branch information
hwjiangkai committed Mar 26, 2023
1 parent 19445de commit b777d52
Show file tree
Hide file tree
Showing 38 changed files with 286 additions and 150 deletions.
49 changes: 24 additions & 25 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,20 @@ import (
"google.golang.org/grpc/credentials/insecure"

// this project.
eb "github.com/vanus-labs/vanus/client/internal/vanus/eventbus"
eb "github.com/vanus-labs/vanus/client/internal/eventbus"
"github.com/vanus-labs/vanus/client/pkg/api"
"github.com/vanus-labs/vanus/client/pkg/eventbus"
)

type Client interface {
Eventbus(ctx context.Context, opts ...api.EventbusOption) api.Eventbus
Disconnect(ctx context.Context)
}

type client struct {
// Endpoints is a list of URLs.
Endpoints []string
eventbusCache sync.Map
endpoints []string
cache sync.Map

mu sync.RWMutex
tracer *tracing.Tracer
}

func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) api.Eventbus {
func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) (api.Eventbus, error) {
_, span := c.tracer.Start(ctx, "EventbusService")
defer span.End()

Expand All @@ -56,20 +50,20 @@ func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) api.E
apply(defaultOpts)
}

err := GetEventbusIDIfNotSet(ctx, c.Endpoints, defaultOpts)
err := GetEventbusIDIfNotSet(ctx, c.endpoints, defaultOpts)
if err != nil {
log.Error(ctx, "get eventbus id failed", map[string]interface{}{
log.KeyError: err,
"eventbus_name": defaultOpts.Name,
"eventbus_id": defaultOpts.ID,
})
return nil
return nil, err
}

bus := func() api.Eventbus {
c.mu.RLock()
defer c.mu.RUnlock()
if value, ok := c.eventbusCache.Load(defaultOpts.ID); ok {
if value, ok := c.cache.Load(defaultOpts.ID); ok {
return value.(api.Eventbus)
} else {
return nil
Expand All @@ -79,37 +73,43 @@ func (c *client) Eventbus(ctx context.Context, opts ...api.EventbusOption) api.E
if bus == nil {
c.mu.Lock()
defer c.mu.Unlock()
if value, ok := c.eventbusCache.Load(defaultOpts.ID); ok { // double check
return value.(api.Eventbus)
if value, ok := c.cache.Load(defaultOpts.ID); ok { // double check
return value.(api.Eventbus), nil
} else {
cfg := &eb.Config{
Endpoints: c.Endpoints,
Endpoints: c.endpoints,
ID: defaultOpts.ID,
}
newEventbus := eventbus.NewEventbus(cfg)
c.eventbusCache.Store(defaultOpts.ID, newEventbus)
return newEventbus
newEventbus := eventbus.NewEventbus(cfg, c.close)
c.cache.Store(defaultOpts.ID, newEventbus)
return newEventbus, nil
}
}
return bus
return bus, nil
}

func (c *client) Disconnect(ctx context.Context) {
c.mu.Lock()
defer c.mu.Unlock()
c.eventbusCache.Range(func(key, value interface{}) bool {
c.cache.Range(func(key, value interface{}) bool {
value.(api.Eventbus).Close(ctx)
c.eventbusCache.Delete(key)
c.cache.Delete(key)
return true
})
}

func Connect(endpoints []string) Client {
func (c *client) close(id uint64) {
c.mu.Lock()
defer c.mu.Unlock()
c.cache.Delete(id)
}

func Connect(endpoints []string) api.Client {
if len(endpoints) == 0 {
return nil
}
return &client{
Endpoints: endpoints,
endpoints: endpoints,
}
}

Expand All @@ -129,4 +129,3 @@ func GetEventbusIDIfNotSet(ctx context.Context, endpoints []string, opts *api.Ev
}
return nil
}

6 changes: 4 additions & 2 deletions client/examples/eventbus/append/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ func main() {
if err != nil {
panic("invalid id")
}
bus := c.Eventbus(ctx, api.WithName("quick-start"), api.WithID(eventbusID.Uint64()))
bus, err := c.Eventbus(ctx, api.WithName("quick-start"), api.WithID(eventbusID.Uint64()))
if err != nil {
panic(err.Error())
}
w := bus.Writer()
// Create an Event.
event := ce.NewEvent()
Expand All @@ -64,4 +67,3 @@ func main() {
log.Printf("success! eventID:%s\n", eventID)
}
}

5 changes: 4 additions & 1 deletion client/examples/eventbus/read/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ func main() {
if err != nil {
panic("invalid id")
}
eb := c.Eventbus(ctx, api.WithID(eventbusID.Uint64()))
eb, err := c.Eventbus(ctx, api.WithID(eventbusID.Uint64()))
if err != nil {
panic(err.Error())
}
ls, err := eb.ListLog(ctx)
if err != nil {
log.Print(err.Error())
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@

package net

import "github.com/vanus-labs/vanus/client/internal/vanus/net/connection"
import "github.com/vanus-labs/vanus/client/internal/net/connection"

var Connect = connection.Connect
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"github.com/vanus-labs/vanus/pkg/errors"

// this project.
"github.com/vanus-labs/vanus/client/internal/vanus/net/connection"
"github.com/vanus-labs/vanus/client/internal/vanus/net/rpc"
"github.com/vanus-labs/vanus/client/internal/net/connection"
"github.com/vanus-labs/vanus/client/internal/net/rpc"
)

const (
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
segpb "github.com/vanus-labs/vanus/proto/pkg/segment"

// this project.
"github.com/vanus-labs/vanus/client/internal/vanus/net/rpc"
"github.com/vanus-labs/vanus/client/internal/vanus/net/rpc/bare"
"github.com/vanus-labs/vanus/client/internal/net/rpc"
"github.com/vanus-labs/vanus/client/internal/net/rpc/bare"
"github.com/vanus-labs/vanus/client/pkg/primitive"
)

Expand Down
7 changes: 7 additions & 0 deletions client/pkg/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ import (
"github.com/vanus-labs/vanus/proto/pkg/codec"
)

type CloseFunc func(id uint64)

type Client interface {
Eventbus(ctx context.Context, opts ...EventbusOption) (Eventbus, error)
Disconnect(ctx context.Context)
}

type Eventbus interface {
Writer(opts ...WriteOption) BusWriter
Reader(opts ...ReadOption) BusReader
Expand Down
59 changes: 57 additions & 2 deletions client/pkg/api/mock_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit b777d52

Please sign in to comment.