Skip to content
This repository has been archived by the owner on Apr 2, 2020. It is now read-only.

Commit

Permalink
Merge pull request #28 from booster-proj/issue/26
Browse files Browse the repository at this point in the history
Issue/26
  • Loading branch information
dmorn authored Jan 24, 2019
2 parents cdde760 + dceda24 commit 3b14402
Show file tree
Hide file tree
Showing 18 changed files with 449 additions and 501 deletions.
19 changes: 13 additions & 6 deletions booster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,15 @@ import (
"upspin.io/log"
)

// Balancer describes which functionalities must be provided in order
// to allow booster to get sources.
type Balancer interface {
Get(ctx context.Context, target string, blacklisted ...core.Source) (core.Source, error)
Len() int
}

// New returns an instance of a booster dialer.
func New(b *core.Balancer) *Dialer {
func New(b Balancer) *Dialer {
return &Dialer{b: b}
}

Expand All @@ -39,7 +46,7 @@ type MetricsExporter interface {
// instance to to retrieve a source to use when it comes to dial a network
// connection.
type Dialer struct {
b *core.Balancer
b Balancer

metrics struct {
sync.Mutex
Expand All @@ -58,21 +65,21 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (conn
// If the dialing fails, keep on trying with the other sources until exaustion.
for i := 0; len(bl) < d.Len(); i++ {
var src core.Source
src, err = d.b.Get(ctx, bl...)
src, err = d.b.Get(ctx, address, bl...)
if err != nil {
// Fail directly if the balancer returns an error, as
// we do not have any source to use.
return
}

d.sendMetrics(src.Name(), address)
d.sendMetrics(src.ID(), address)

log.Debug.Printf("DialContext: Attempt #%d to connect to %v (source %v)", i, address, src.Name())
log.Debug.Printf("DialContext: Attempt #%d to connect to %v (source %v)", i, address, src.ID())

conn, err = src.DialContext(ctx, "tcp4", address)
if err != nil {
// Log this error, otherwise it will be silently skipped.
log.Error.Printf("Unable to dial connection to %v using source %v. Error: %v", address, src.Name(), err)
log.Error.Printf("Unable to dial connection to %v using source %v. Error: %v", address, src.ID(), err)
bl = append(bl, src)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/booster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func main() {
log.Fatal(err)
}

remote.StaticConf = remote.Config{
remote.Info = remote.BoosterInfo{
Version: version,
Commit: commit,
BuildTime: buildTime,
Expand All @@ -96,7 +96,7 @@ func main() {
Store: rs,
MetricsExporter: exp,
})
d := booster.New(b)
d := booster.New(rs)
d.SetMetricsExporter(exp)

router := remote.NewRouter()
Expand Down
22 changes: 9 additions & 13 deletions core/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type Dialer interface {
type Source interface {
Dialer

// Name uniquely identifies a source.
Name() string
// ID uniquely identifies a source.
ID() string

// Tells the receiver to cleanup and close all connections.
Close() error
Expand Down Expand Up @@ -80,27 +80,23 @@ func (b *Balancer) Get(ctx context.Context, blacklist ...Source) (Source, error)

bl := make(map[string]interface{})
for _, v := range blacklist {
bl[v.Name()] = nil
bl[v.ID()] = nil
}

var s Source
var err error
for i := 0; i < b.r.Len(); i++ {
s, err = b.Strategy(ctx, b.r)
s, err := b.Strategy(ctx, b.r)
if err != nil {
// Avoid retring if the strategy returns an error.
return nil, err
}

// Check if the source is contained in the blacklist.
if _, ok := bl[s.Name()]; ok {
// Skip this source
continue
if _, ok := bl[s.ID()]; !ok {
return s, nil
}
break
}

return s, nil
return nil, errors.New("balancer: unable to find any suitable source")
}

// Put adds ss as sources to the current balancer ring. If ss.len() == 0, Put silently returns,
Expand Down Expand Up @@ -144,7 +140,7 @@ func (b *Balancer) Del(ss ...Source) {
// Create a map of sources that have to be deleted (lookup O(1))
m := make(map[string]Source)
for _, v := range ss {
m[v.Name()] = v
m[v.ID()] = v
}

b.mux.Lock()
Expand All @@ -154,7 +150,7 @@ func (b *Balancer) Del(ss ...Source) {
b.r.Do(func(s Source) {
// Check if the identifier of this stored source is contained in the map
// of sources that have to be removed.
if _, ok := m[s.Name()]; !ok {
if _, ok := m[s.ID()]; !ok {
// If this source is not contained in the map, add it to the
// list of accepted sources.
l = append(l, s)
Expand Down
34 changes: 21 additions & 13 deletions core/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newMock(id string) *mock {
return &mock{id: id}
}

func (s *mock) Name() string {
func (s *mock) ID() string {
return s.id
}

Expand Down Expand Up @@ -69,8 +69,8 @@ func TestPut(t *testing.T) {
}

b.Do(func(s core.Source) {
if s.Name() != "s0" {
t.Fatalf("Unexpected source Identifier: wanted s0, found %s", s.Name())
if s.ID() != "s0" {
t.Fatalf("Unexpected source Identifier: wanted s0, found %s", s.ID())
}
})
}
Expand Down Expand Up @@ -114,8 +114,8 @@ func TestGet_roundRobin(t *testing.T) {
t.Fatalf("Unexpected error while getting source: %v. %v", i, err)
}

if s.Name() != v.out {
t.Fatalf("Unexpected source Name: iteration(%v): wanted %v, found %v", i, v.out, s.Name())
if s.ID() != v.out {
t.Fatalf("Unexpected source ID: iteration(%v): wanted %v, found %v", i, v.out, s.ID())
}
}
}
Expand All @@ -128,17 +128,25 @@ func TestGetBlacklist_roundRobin(t *testing.T) {

b.Put(s0, s1)

s, _ := b.Get(context.TODO())
if s.Name() != "s0" {
t.Fatalf("Unexpected source Name: wanted %v, found %v", s0.Name(), s1.Name())
ctx := context.TODO()
s, _ := b.Get(ctx)
if s.ID() != "s0" {
t.Fatalf("Unexpected source ID: wanted %v, found %v", s0.ID(), s1.ID())
}

s2, err := b.Get(context.TODO(), s1)
s2, err := b.Get(ctx, s1)
if err != nil {
t.Fatalf("Unexpected error while getting source: %v", err)
}
if s2.Name() != "s0" {
t.Fatalf("Unexpected source Name: wanted %v, found %v, blacklisted source %v", s0.Name(), s2.Name(), s1.Name())
if s2.ID() != "s0" {
t.Fatalf("Unexpected source ID: wanted %v, found %v, blacklisted source %v", s0.ID(), s2.ID(), s1.ID())
}

// Test with one source blacklisted
b.Del(s1)
s3, err := b.Get(ctx, s0)
if err == nil {
t.Fatalf("Unexpected source %v: wanted an error", s3.ID())
}
}

Expand Down Expand Up @@ -167,8 +175,8 @@ func TestDel(t *testing.T) {
}

b.Do(func(s core.Source) {
if s.Name() != "s1" {
t.Fatalf("Unexpected source Name: wanted s1, found %v", s.Name())
if s.ID() != "s1" {
t.Fatalf("Unexpected source ID: wanted s1, found %v", s.ID())
}
})

Expand Down
28 changes: 14 additions & 14 deletions core/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ func TestNextPrev(t *testing.T) {
i := 0
r.Do(func(s core.Source) {
v := tt[i]
if v != s.Name() {
t.Fatalf("Unexpected Name value (%d): wanted %v, found %v", i, v, s.Name())
if v != s.ID() {
t.Fatalf("Unexpected ID value (%d): wanted %v, found %v", i, v, s.ID())
}
i++
})

// We should be at element 0 at this point.
if r.Prev().Source().Name() != "3" {
t.Fatalf("Unexpected Prev Name: wanted 3, found %s", r.Source().Name())
if r.Prev().Source().ID() != "3" {
t.Fatalf("Unexpected Prev ID: wanted 3, found %s", r.Source().ID())
}
}

Expand Down Expand Up @@ -124,14 +124,14 @@ func TestLink(t *testing.T) {
// after r. The result points to the element following the
// last element of s after insertion.
//
if r.Source().Name() != "0" {
t.Fatalf("Unexpected linked r element: wanted 0, found %s", r.Source().Name())
if r.Source().ID() != "0" {
t.Fatalf("Unexpected linked r element: wanted 0, found %s", r.Source().ID())
}

for i := 0; i < n; i++ {
s := fmt.Sprintf("%d", i)
if r.Source().Name() != s {
t.Fatalf("%d: Unexpected linked r source Name: wanted %s, found %v", i, s, r.Source().Name())
if r.Source().ID() != s {
t.Fatalf("%d: Unexpected linked r source ID: wanted %s, found %v", i, s, r.Source().ID())
}
r.Next()
}
Expand All @@ -145,7 +145,7 @@ func TestUnlink(t *testing.T) {
r.Next()
}

t.Logf("r Name before unlink: %s", r.Source().Name())
t.Logf("r ID before unlink: %s", r.Source().ID())
t.Log("r content before unlink:")
r.Do(func(s core.Source) {
t.Log(s)
Expand All @@ -157,7 +157,7 @@ func TestUnlink(t *testing.T) {
t.Fatalf("Unexpected unlinked ring Len: wanted 2, found %d", r.Len())
}

t.Logf("r Name after unlink: %s", r.Source().Name())
t.Logf("r ID after unlink: %s", r.Source().ID())
t.Log("r content after unlink:")
r.Do(func(s core.Source) {
t.Log(s)
Expand All @@ -166,17 +166,17 @@ func TestUnlink(t *testing.T) {
// We should still point to element 0
tt := []string{"0", "3"}
for i, v := range tt {
if v != r.Source().Name() {
t.Fatalf("%d: Unexpected source Name: wanted %s, found %s", i, v, r.Source().Name())
if v != r.Source().ID() {
t.Fatalf("%d: Unexpected source ID: wanted %s, found %s", i, v, r.Source().ID())
}
r.Next()
}

// Check content of removed subring
tt = []string{"1", "2"}
for i, v := range tt {
if v != s.Source().Name() {
t.Fatalf("%d: Unexpected source Name: wanted %s, found %s", i, v, s.Source().Name())
if v != s.Source().ID() {
t.Fatalf("%d: Unexpected source ID: wanted %s, found %s", i, v, s.Source().ID())
}
s.Next()
}
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
module github.com/booster-proj/booster

require (
github.com/booster-proj/proxy v0.1.3
github.com/booster-proj/proxy v0.1.4
github.com/cenkalti/backoff v2.1.0+incompatible // indirect
github.com/gorilla/context v1.1.1 // indirect
github.com/gorilla/mux v1.6.2
github.com/grandcat/zeroconf v0.0.0-20180329153754-df75bb3ccae1
github.com/miekg/dns v1.1.1 // indirect
github.com/prometheus/client_golang v0.9.2
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 // indirect
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f
golang.org/x/net v0.0.0-20190119204137-ed066c81e75e // indirect
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
golang.org/x/sys v0.0.0-20181026064943-731415f00dce
upspin.io v0.0.0-20180816050821-c137ad0d6be9
upspin.io v0.0.0-20181217205605-686971a7c4ba
)
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLM
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/booster-proj/proxy v0.1.3 h1:DrXIF0u8A0I+2KO3FTpzMeN646i1/fVvvP8OiMVHLKU=
github.com/booster-proj/proxy v0.1.3/go.mod h1:le5Yiwdxl9hONszuGsdz3dJtafDZs938B2hwn7DctAc=
github.com/booster-proj/proxy v0.1.4 h1:b0hwZGltYX5Eqr7IeKWBEzYAwXh+ETAHFnXFBL0cgxM=
github.com/booster-proj/proxy v0.1.4/go.mod h1:le5Yiwdxl9hONszuGsdz3dJtafDZs938B2hwn7DctAc=
github.com/cenkalti/backoff v2.1.0+incompatible h1:FIRvWBZrzS4YC7NT5cOuZjexzFvIr+Dbi6aD1cZaNBk=
github.com/cenkalti/backoff v2.1.0+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
Expand Down Expand Up @@ -30,11 +32,16 @@ golang.org/x/net v0.0.0-20180801234040-f4c29de78a2a h1:8fCF9zjAir2SP3N+axz9xs+0r
golang.org/x/net v0.0.0-20180801234040-f4c29de78a2a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc h1:a3CU5tJYVj92DY2LaA1kUkrsqD5/3mLDhx2NcNqyW+0=
golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190119204137-ed066c81e75e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f h1:Bl/8QSvNqXvPGPGXa2z5xUTmV7VDcZyvRZ+QQXkXTZQ=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20181026064943-731415f00dce h1:196tugxh+2x7vxu5cHKw/TepDbiqTPsHAm+12BkDe0w=
golang.org/x/sys v0.0.0-20181026064943-731415f00dce/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
upspin.io v0.0.0-20180816050821-c137ad0d6be9 h1:cHep5ZfwbkvJ3mBXmxuq2IyaHVnOSqXDf2R58uWPJgo=
upspin.io v0.0.0-20180816050821-c137ad0d6be9/go.mod h1:4hdXTXkMPXxzbiw/sultoifpccn98hChAFvrU19V2ug=
upspin.io v0.0.0-20181217205605-686971a7c4ba h1:UPE8bF1YPv3BPJTXJLIUVnuBeyx6ExH3Tz5ttW6RYeE=
upspin.io v0.0.0-20181217205605-686971a7c4ba/go.mod h1:4hdXTXkMPXxzbiw/sultoifpccn98hChAFvrU19V2ug=
18 changes: 9 additions & 9 deletions remote/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ func healthCheckHandler(w http.ResponseWriter, r *http.Request) {

json.NewEncoder(w).Encode(struct {
Alive bool `json:"alive"`
Config
BoosterInfo
}{
Alive: true,
Config: StaticConf,
Alive: true,
BoosterInfo: Info,
})
}

Expand Down Expand Up @@ -70,14 +70,14 @@ func makePoliciesHandler(s *store.SourceStore) func(w http.ResponseWriter, r *ht
func makeBlockHandler(s *store.SourceStore) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
name := vars["name"]
id := vars["id"]

p := &store.Policy{
ID: "block_" + name,
ID: "block_" + id,
Issuer: "remote",
Code: store.PolicyBlock,
Accept: func(n string) bool {
return n != name
Accept: func(tid, target string) bool {
return tid != id
},
}

Expand All @@ -95,7 +95,7 @@ func makeBlockHandler(s *store.SourceStore) func(w http.ResponseWriter, r *http.
if payload.Reason != "" {
p.Reason = payload.Reason
}
s.AddPolicy(p)
s.AppendPolicy(p)
w.WriteHeader(http.StatusCreated)
} else {
// Only POST and DELETE are registered.
Expand All @@ -108,7 +108,7 @@ func makeBlockHandler(s *store.SourceStore) func(w http.ResponseWriter, r *http.
func metricsForwardHandler(w http.ResponseWriter, r *http.Request) {
URL, _ := url.Parse(r.URL.String())
URL.Scheme = "http"
URL.Host = fmt.Sprintf("localhost:%d", StaticConf.PromPort)
URL.Host = fmt.Sprintf("localhost:%d", Info.PromPort)
URL.Path = "api/v1/query"

req, err := http.NewRequest(r.Method, URL.String(), r.Body)
Expand Down
Loading

0 comments on commit 3b14402

Please sign in to comment.