Skip to content

Commit

Permalink
fix: track
Browse files Browse the repository at this point in the history
  • Loading branch information
garrettladley committed Oct 23, 2024
1 parent 2fff550 commit d2a2968
Show file tree
Hide file tree
Showing 4 changed files with 377 additions and 0 deletions.
71 changes: 71 additions & 0 deletions internal/loadbalance/picker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package loadbalance

import (
"strings"
"sync"
"sync/atomic"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
)

var _ base.PickerBuilder = (*Picker)(nil)

type Picker struct {
mu sync.RWMutex
leader balancer.SubConn
followers []balancer.SubConn
current uint64
}

func (p *Picker) Build(buildInfo base.PickerBuildInfo) balancer.Picker {
p.mu.Lock()
defer p.mu.Unlock()
var followers []balancer.SubConn
for sc, scInfo := range buildInfo.ReadySCs {
isLeader := scInfo.
Address.
Attributes.
Value("is_leader").(bool)
if isLeader {
p.leader = sc
continue
}
followers = append(followers, sc)
}
p.followers = followers
return p
}

var _ balancer.Picker = (*Picker)(nil)

func (p *Picker) Pick(info balancer.PickInfo) (
balancer.PickResult, error,
) {
p.mu.RLock()
defer p.mu.RUnlock()
var result balancer.PickResult
if strings.Contains(info.FullMethodName, "Produce") ||
len(p.followers) == 0 {
result.SubConn = p.leader
} else if strings.Contains(info.FullMethodName, "Consume") {
result.SubConn = p.nextFollower()
}
if result.SubConn == nil {
return result, balancer.ErrNoSubConnAvailable
}
return result, nil
}

func (p *Picker) nextFollower() balancer.SubConn {
cur := atomic.AddUint64(&p.current, uint64(1))
len := uint64(len(p.followers))
idx := int(cur % len)
return p.followers[idx]
}

func init() {
balancer.Register(
base.NewBalancerBuilder(Name, &Picker{}, base.Config{}),
)
}
89 changes: 89 additions & 0 deletions internal/loadbalance/picker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package loadbalance_test

import (
"testing"

"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/resolver"

loadbalance "github.com/garrettladley/dclog/internal/loadbalance"
"github.com/stretchr/testify/require"
)

func TestPickerNoSubConnAvailable(t *testing.T) {
picker := &loadbalance.Picker{}
for _, method := range []string{
"/log.vX.Log/Produce",
"/log.vX.Log/Consume",
} {
info := balancer.PickInfo{
FullMethodName: method,
}
result, err := picker.Pick(info)
require.Equal(t, balancer.ErrNoSubConnAvailable, err)
require.Nil(t, result.SubConn)
}
}

func TestPickerProducesToLeader(t *testing.T) {
picker, subConns := setupTest()
info := balancer.PickInfo{
FullMethodName: "/log.vX.Log/Produce",
}
for i := 0; i < 5; i++ {
gotPick, err := picker.Pick(info)
require.NoError(t, err)
require.Equal(t, subConns[0], gotPick.SubConn)
}
}

func TestPickerConsumesFromFollowers(t *testing.T) {
picker, subConns := setupTest()
info := balancer.PickInfo{
FullMethodName: "/log.vX.Log/Consume",
}
for i := 0; i < 5; i++ {
pick, err := picker.Pick(info)
require.NoError(t, err)
require.Equal(t, subConns[i%2+1], pick.SubConn)
}
}

func setupTest() (*loadbalance.Picker, []*subConn) {
var subConns []*subConn
buildInfo := base.PickerBuildInfo{
ReadySCs: make(map[balancer.SubConn]base.SubConnInfo),
}
for i := 0; i < 3; i++ {
sc := &subConn{}
addr := resolver.Address{
Attributes: attributes.New("is_leader", i == 0),
}
// 0th sub conn is the leader
sc.UpdateAddresses([]resolver.Address{addr})
buildInfo.ReadySCs[sc] = base.SubConnInfo{Address: addr}
subConns = append(subConns, sc)
}
picker := &loadbalance.Picker{}
picker.Build(buildInfo)
return picker, subConns
}

// subConn implements balancer.SubConn.
type subConn struct {
addrs []resolver.Address
}

func (s *subConn) UpdateAddresses(addrs []resolver.Address) {
s.addrs = addrs
}

func (s *subConn) Connect() {}

func (s *subConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, func() {}
}

func (s *subConn) Shutdown() {}
103 changes: 103 additions & 0 deletions internal/loadbalance/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package loadbalance

import (
"context"
"fmt"
"sync"

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"

api "github.com/garrettladley/dclog/api/v1"
)

type Resolver struct {
mu sync.Mutex
clientConn resolver.ClientConn
resolverConn *grpc.ClientConn
serviceConfig *serviceconfig.ParseResult
logger *zap.Logger
}

var _ resolver.Builder = (*Resolver)(nil)

func (r *Resolver) Build(
target resolver.Target,
cc resolver.ClientConn,
opts resolver.BuildOptions,
) (resolver.Resolver, error) {
r.logger = zap.L().Named("resolver")
r.clientConn = cc
var dialOpts []grpc.DialOption
if opts.DialCreds != nil {
dialOpts = append(
dialOpts,
grpc.WithTransportCredentials(opts.DialCreds),
)
}
r.serviceConfig = r.clientConn.ParseServiceConfig(
fmt.Sprintf(`{"loadBalancingConfig":[{"%s":{}}]}`, Name),
)
var err error
r.resolverConn, err = grpc.NewClient(target.URL.Host, dialOpts...)
if err != nil {
return nil, err
}
r.ResolveNow(resolver.ResolveNowOptions{})

return r, nil
}

const Name = "dclog"

func (r *Resolver) Scheme() string {
return Name
}

func init() {
resolver.Register(&Resolver{})
}

var _ resolver.Resolver = (*Resolver)(nil)

func (r *Resolver) ResolveNow(resolver.ResolveNowOptions) {
r.mu.Lock()
defer r.mu.Unlock()
client := api.NewLogClient(r.resolverConn)
// get cluster and then set on cc attributes
ctx := context.Background()
res, err := client.GetServers(ctx, &api.GetServersRequest{})
if err != nil {
r.logger.Error(
"failed to resolve server",
zap.Error(err),
)
return
}
var addrs []resolver.Address
for _, server := range res.Servers {
addrs = append(addrs, resolver.Address{
Addr: server.RpcAddr,
Attributes: attributes.New(
"is_leader",
server.IsLeader,
),
})
}
r.clientConn.UpdateState(resolver.State{

Check failure on line 90 in internal/loadbalance/resolver.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `r.clientConn.UpdateState` is not checked (errcheck)
Addresses: addrs,
ServiceConfig: r.serviceConfig,
})
}

func (r *Resolver) Close() {
if err := r.resolverConn.Close(); err != nil {
r.logger.Error(
"failed to close conn",
zap.Error(err),
)
}
}
114 changes: 114 additions & 0 deletions internal/loadbalance/resolver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package loadbalance_test

import (
"net"
"net/url"
"testing"

"github.com/garrettladley/dclog/internal/config"
"github.com/garrettladley/dclog/internal/loadbalance"
"github.com/garrettladley/dclog/internal/server"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"

api "github.com/garrettladley/dclog/api/v1"
)

func TestResolver(t *testing.T) {
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

tlsConfig, err := config.SetupTLSConfig(config.TLSConfig{
CertFile: config.ServerCertFile,
KeyFile: config.ServerKeyFile,
CAFile: config.CAFile,
Server: true,
ServerAddress: "127.0.0.1",
})
require.NoError(t, err)
serverCreds := credentials.NewTLS(tlsConfig)

srv, err := server.NewGRPCServer(&server.Config{
GetServerer: &getServers{},
}, grpc.Creds(serverCreds))
require.NoError(t, err)

go srv.Serve(l)

Check failure on line 40 in internal/loadbalance/resolver_test.go

View workflow job for this annotation

GitHub Actions / Lint

Error return value of `srv.Serve` is not checked (errcheck)

conn := &clientConn{}
tlsConfig, err = config.SetupTLSConfig(config.TLSConfig{
CertFile: config.RootClientCertFile,
KeyFile: config.RootClientKeyFile,
CAFile: config.CAFile,
Server: false,
ServerAddress: "127.0.0.1",
})
require.NoError(t, err)
clientCreds := credentials.NewTLS(tlsConfig)
opts := resolver.BuildOptions{
DialCreds: clientCreds,
}
r := &loadbalance.Resolver{}
_, err = r.Build(
resolver.Target{
URL: url.URL{Host: l.Addr().String()},
},
conn,
opts,
)
require.NoError(t, err)

wantState := resolver.State{
Addresses: []resolver.Address{{
Addr: "localhost:9001",
Attributes: attributes.New("is_leader", true),
}, {
Addr: "localhost:9002",
Attributes: attributes.New("is_leader", false),
}},
}
require.Equal(t, wantState, conn.state)

conn.state.Addresses = nil
r.ResolveNow(resolver.ResolveNowOptions{})
require.Equal(t, wantState, conn.state)
}

type getServers struct{}

func (s *getServers) GetServers() ([]*api.Server, error) {
return []*api.Server{{
Id: "leader",
RpcAddr: "localhost:9001",
IsLeader: true,
}, {
Id: "follower",
RpcAddr: "localhost:9002",
}}, nil
}

type clientConn struct {
resolver.ClientConn
state resolver.State
}

func (c *clientConn) UpdateState(state resolver.State) error {
c.state = state
return nil
}

func (c *clientConn) ReportError(err error) {}

func (c *clientConn) NewAddress(addrs []resolver.Address) {}

func (c *clientConn) NewServiceConfig(config string) {}

func (c *clientConn) ParseServiceConfig(
config string,
) *serviceconfig.ParseResult {
return nil
}

0 comments on commit d2a2968

Please sign in to comment.