Skip to content

Commit

Permalink
Add backend health check
Browse files Browse the repository at this point in the history
  • Loading branch information
krapie committed Feb 2, 2024
1 parent 0d4af54 commit c3120dc
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 28 deletions.
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,11 @@
build:
CGO_ENABLED=0 go build -o ./bin/plumber .

docker-build:
docker build --push -t krapi0314/plumber .

docker-compose-up:
docker-compose -f ./docker/docker-compose.yml up --build -d

docker-compose-down:
docker-compose -f ./docker/docker-compose.yml down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,18 @@ make build
Plumber aims to support [Yorkie](https://github.com/yorkie-team/yorkie) as a backend for the load balancer.
The following features are planned to be implemented first:

### v0.0.1
### v0.1.0

- [ ] Support static load balancing with round-robin algorithm
- [ ] Support backends health check
- [x] Support static load balancing with round-robin algorithm
- [x] Support backends health check

### v0.1.0
### v0.2.0

- [ ] Support consistent hashing algorithm
- [ ] Support dynamic backend configuration (with K8s API)
- [ ] Support mechanism to resolve split-brain of long-lived connection

### v0.2.0
### v0.3.0

- [ ] Support interceptor to modify request/response
- [ ] TBD
38 changes: 27 additions & 11 deletions internal/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,59 @@ import (
"net/http"

"github.com/krapie/plumber/internal/backend"
"github.com/krapie/plumber/internal/health"
"github.com/krapie/plumber/internal/loadbalancer"
)

type Agent struct {
lb loadbalancer.LoadBalancer
loadBalancer loadbalancer.LoadBalancer
healthChecker *health.Checker
}

func NewAgent() (*Agent, error) {
lb, err := loadbalancer.NewRoundRobinLB()
loadBalancer, err := loadbalancer.NewRoundRobinLB()
if err != nil {
return nil, err
}

return &Agent{
lb: lb,
loadBalancer: loadBalancer,
healthChecker: health.NewHealthChecker(2),
}, nil
}

func (s *Agent) Run(backendAddresses []string) error {
err := s.addBackends(backendAddresses)
if err != nil {
return err
}

s.healthChecker.AddBackends(s.loadBalancer.GetBackends())
s.healthChecker.Run()
log.Printf("[Agent] Running health check")

http.HandleFunc("/", s.loadBalancer.ServeProxy)
log.Printf("[Agent] Starting server on :80")
err = http.ListenAndServe(":80", nil)
if err != nil {
return err
}

return nil
}

func (s *Agent) addBackends(backendAddresses []string) error {
for _, addr := range backendAddresses {
b, err := backend.NewDefaultBackend(addr)
if err != nil {
return err
}

err = s.lb.AddBackend(b)
err = s.loadBalancer.AddBackend(b)
if err != nil {
return err
}
}

http.HandleFunc("/", s.lb.ServeProxy)
log.Printf("[Plumber] Starting server on :80")
err := http.ListenAndServe(":80", nil)
if err != nil {
return err
}

return nil
}
36 changes: 33 additions & 3 deletions internal/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,19 @@ import (
"net/http"
"net/http/httputil"
"net/url"
"sync"
)

const (
ALIVE_UP = true
ALIVE_DOWN = false
)

type Backend struct {
Addr string
Addr *url.URL
Alive bool

mutex sync.RWMutex
proxy *httputil.ReverseProxy
}

Expand All @@ -18,12 +26,34 @@ func NewDefaultBackend(addr string) (*Backend, error) {
return nil, err
}

proxy := httputil.NewSingleHostReverseProxy(parsedAddr)
proxy.ErrorHandler = func(rw http.ResponseWriter, req *http.Request, err error) {
http.Error(rw, "Error occurred while processing request", http.StatusBadGateway)
}

return &Backend{
Addr: addr,
proxy: httputil.NewSingleHostReverseProxy(parsedAddr),
Addr: parsedAddr,
Alive: true,

mutex: sync.RWMutex{},
proxy: proxy,
}, nil
}

func (b *Backend) Serve(rw http.ResponseWriter, req *http.Request) {
b.proxy.ServeHTTP(rw, req)
}

func (b *Backend) SetAlive(alive bool) {
b.mutex.Lock()
b.Alive = alive
b.mutex.Unlock()
}

func (b *Backend) IsAlive() bool {
b.mutex.RLock()
alive := b.Alive
b.mutex.RUnlock()

return alive
}
68 changes: 68 additions & 0 deletions internal/health/health_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package health

import (
"net"
"net/url"
"time"

"github.com/krapie/plumber/internal/backend"
)

const TCP = "tcp"

type Checker struct {
backends []*backend.Backend
interval int
}

func NewHealthChecker(interval int) *Checker {
return &Checker{
backends: nil,
interval: interval,
}
}

func (c *Checker) Run() {
go c.healthCheck()
}

func (c *Checker) AddBackends(backends []*backend.Backend) {
c.backends = backends
}

func (c *Checker) healthCheck() {
t := time.NewTicker(time.Duration(c.interval) * time.Second)
for {
select {
case <-t.C:
// log.Printf("[Health] Running health check")
c.checkBackendLiveness()
}
}
}

func (c *Checker) checkBackendLiveness() {
for _, b := range c.backends {
isAlive := c.checkTCPConnection(b.Addr, c.interval)
if isAlive {
b.SetAlive(backend.ALIVE_UP)
} else {
b.SetAlive(backend.ALIVE_DOWN)
}
// log.Printf("[Health] Backend %s is %v", b.Addr, b.IsAlive())
}
}

func (c *Checker) checkTCPConnection(addr *url.URL, interval int) bool {
conn, err := net.DialTimeout(TCP, addr.Host, time.Duration(interval)*time.Second)
if err != nil {
return false
}

err = conn.Close()
if err != nil {
return false
}

return true
}
1 change: 1 addition & 0 deletions internal/loadbalancer/load_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ import (
// LoadBalancer is an interface for a load balancer.
type LoadBalancer interface {
AddBackend(b *backend.Backend) error
GetBackends() []*backend.Backend
ServeProxy(rw http.ResponseWriter, req *http.Request)
}
36 changes: 27 additions & 9 deletions internal/loadbalancer/round_robin_lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ import (
)

type RoundRobinLB struct {
backends []*backend.Backend
index int
backends []*backend.Backend
index int
healthCheckInterval int
}

func NewRoundRobinLB() (*RoundRobinLB, error) {
return &RoundRobinLB{
backends: []*backend.Backend{},
index: 0,
backends: []*backend.Backend{},
index: 0,
healthCheckInterval: 2,
}, nil
}

Expand All @@ -24,13 +26,29 @@ func (lb *RoundRobinLB) AddBackend(b *backend.Backend) error {
return nil
}

func (lb *RoundRobinLB) GetBackends() []*backend.Backend {
return lb.backends
}

// ServeProxy serves the request to the next backend in the list
// keep in mind that this function and its sub functions need to be thread safe
func (lb *RoundRobinLB) ServeProxy(rw http.ResponseWriter, req *http.Request) {
if len(lb.backends) == 0 {
panic("No backends")
if b := lb.getNextBackend(); b != nil {
log.Printf("[LoadBalancer] Serving request to backend %s", lb.backends[lb.index].Addr)
b.Serve(rw, req)
return
}

log.Printf("[LoadBalancer] Serving request to backend %s", lb.backends[lb.index].Addr)
http.Error(rw, "No backends available", http.StatusServiceUnavailable)
}

func (lb *RoundRobinLB) getNextBackend() *backend.Backend {
for i := 0; i < len(lb.backends); i++ {
lb.index = (lb.index + 1) % len(lb.backends)
if lb.backends[lb.index].IsAlive() {
return lb.backends[lb.index]
}
}

lb.index = (lb.index + 1) % len(lb.backends)
lb.backends[lb.index].Serve(rw, req)
return nil
}
2 changes: 2 additions & 0 deletions scripts/lb_distribution_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
for i in {1..100}; do curl -s http://localhost | grep Hostname; done | sort | uniq -c | sort -nr
2 changes: 2 additions & 0 deletions scripts/lb_loop_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
for i in {1..1000}; do curl -s http://localhost | grep Hostname; sleep 0.5; done

0 comments on commit c3120dc

Please sign in to comment.