diff --git a/Makefile b/Makefile index eb8fbc6..f938dcf 100644 --- a/Makefile +++ b/Makefile @@ -1,2 +1,11 @@ build: CGO_ENABLED=0 go build -o ./bin/plumber . + +docker-build: + docker build --push -t krapi0314/plumber . + +docker-compose-up: + docker-compose -f ./docker/docker-compose.yml up --build -d + +docker-compose-down: + docker-compose -f ./docker/docker-compose.yml down diff --git a/README.md b/README.md index e694bfa..604120a 100644 --- a/README.md +++ b/README.md @@ -25,18 +25,18 @@ make build Plumber aims to support [Yorkie](https://github.com/yorkie-team/yorkie) as a backend for the load balancer. The following features are planned to be implemented first: -### v0.0.1 +### v0.1.0 -- [ ] Support static load balancing with round-robin algorithm -- [ ] Support backends health check +- [x] Support static load balancing with round-robin algorithm +- [x] Support backends health check -### v0.1.0 +### v0.2.0 - [ ] Support consistent hashing algorithm - [ ] Support dynamic backend configuration (with K8s API) - [ ] Support mechanism to resolve split-brain of long-lived connection -### v0.2.0 +### v0.3.0 - [ ] Support interceptor to modify request/response - [ ] TBD diff --git a/internal/agent.go b/internal/agent.go index 4136835..7f38dcb 100644 --- a/internal/agent.go +++ b/internal/agent.go @@ -5,43 +5,59 @@ import ( "net/http" "github.com/krapie/plumber/internal/backend" + "github.com/krapie/plumber/internal/health" "github.com/krapie/plumber/internal/loadbalancer" ) type Agent struct { - lb loadbalancer.LoadBalancer + loadBalancer loadbalancer.LoadBalancer + healthChecker *health.Checker } func NewAgent() (*Agent, error) { - lb, err := loadbalancer.NewRoundRobinLB() + loadBalancer, err := loadbalancer.NewRoundRobinLB() if err != nil { return nil, err } return &Agent{ - lb: lb, + loadBalancer: loadBalancer, + healthChecker: health.NewHealthChecker(2), }, nil } func (s *Agent) Run(backendAddresses []string) error { + err := s.addBackends(backendAddresses) + if err != nil { + return err + } + + s.healthChecker.AddBackends(s.loadBalancer.GetBackends()) + s.healthChecker.Run() + log.Printf("[Agent] Running health check") + + http.HandleFunc("/", s.loadBalancer.ServeProxy) + log.Printf("[Agent] Starting server on :80") + err = http.ListenAndServe(":80", nil) + if err != nil { + return err + } + + return nil +} + +func (s *Agent) addBackends(backendAddresses []string) error { for _, addr := range backendAddresses { b, err := backend.NewDefaultBackend(addr) if err != nil { return err } - err = s.lb.AddBackend(b) + err = s.loadBalancer.AddBackend(b) if err != nil { return err } } - http.HandleFunc("/", s.lb.ServeProxy) - log.Printf("[Plumber] Starting server on :80") - err := http.ListenAndServe(":80", nil) - if err != nil { - return err - } - return nil } diff --git a/internal/backend/backend.go b/internal/backend/backend.go index df72993..91f72d6 100644 --- a/internal/backend/backend.go +++ b/internal/backend/backend.go @@ -4,11 +4,19 @@ import ( "net/http" "net/http/httputil" "net/url" + "sync" +) + +const ( + ALIVE_UP = true + ALIVE_DOWN = false ) type Backend struct { - Addr string + Addr *url.URL + Alive bool + mutex sync.RWMutex proxy *httputil.ReverseProxy } @@ -18,12 +26,34 @@ func NewDefaultBackend(addr string) (*Backend, error) { return nil, err } + proxy := httputil.NewSingleHostReverseProxy(parsedAddr) + proxy.ErrorHandler = func(rw http.ResponseWriter, req *http.Request, err error) { + http.Error(rw, "Error occurred while processing request", http.StatusBadGateway) + } + return &Backend{ - Addr: addr, - proxy: httputil.NewSingleHostReverseProxy(parsedAddr), + Addr: parsedAddr, + Alive: true, + + mutex: sync.RWMutex{}, + proxy: proxy, }, nil } func (b *Backend) Serve(rw http.ResponseWriter, req *http.Request) { b.proxy.ServeHTTP(rw, req) } + +func (b *Backend) SetAlive(alive bool) { + b.mutex.Lock() + b.Alive = alive + b.mutex.Unlock() +} + +func (b *Backend) IsAlive() bool { + b.mutex.RLock() + alive := b.Alive + b.mutex.RUnlock() + + return alive +} diff --git a/internal/health/health_check.go b/internal/health/health_check.go new file mode 100644 index 0000000..803c0b2 --- /dev/null +++ b/internal/health/health_check.go @@ -0,0 +1,68 @@ +package health + +import ( + "net" + "net/url" + "time" + + "github.com/krapie/plumber/internal/backend" +) + +const TCP = "tcp" + +type Checker struct { + backends []*backend.Backend + interval int +} + +func NewHealthChecker(interval int) *Checker { + return &Checker{ + backends: nil, + interval: interval, + } +} + +func (c *Checker) Run() { + go c.healthCheck() +} + +func (c *Checker) AddBackends(backends []*backend.Backend) { + c.backends = backends +} + +func (c *Checker) healthCheck() { + t := time.NewTicker(time.Duration(c.interval) * time.Second) + for { + select { + case <-t.C: + // log.Printf("[Health] Running health check") + c.checkBackendLiveness() + } + } +} + +func (c *Checker) checkBackendLiveness() { + for _, b := range c.backends { + isAlive := c.checkTCPConnection(b.Addr, c.interval) + if isAlive { + b.SetAlive(backend.ALIVE_UP) + } else { + b.SetAlive(backend.ALIVE_DOWN) + } + // log.Printf("[Health] Backend %s is %v", b.Addr, b.IsAlive()) + } +} + +func (c *Checker) checkTCPConnection(addr *url.URL, interval int) bool { + conn, err := net.DialTimeout(TCP, addr.Host, time.Duration(interval)*time.Second) + if err != nil { + return false + } + + err = conn.Close() + if err != nil { + return false + } + + return true +} diff --git a/internal/loadbalancer/load_balancer.go b/internal/loadbalancer/load_balancer.go index e4d240e..a922cc5 100644 --- a/internal/loadbalancer/load_balancer.go +++ b/internal/loadbalancer/load_balancer.go @@ -9,5 +9,6 @@ import ( // LoadBalancer is an interface for a load balancer. type LoadBalancer interface { AddBackend(b *backend.Backend) error + GetBackends() []*backend.Backend ServeProxy(rw http.ResponseWriter, req *http.Request) } diff --git a/internal/loadbalancer/round_robin_lb.go b/internal/loadbalancer/round_robin_lb.go index 51c2532..8a255a7 100644 --- a/internal/loadbalancer/round_robin_lb.go +++ b/internal/loadbalancer/round_robin_lb.go @@ -8,14 +8,16 @@ import ( ) type RoundRobinLB struct { - backends []*backend.Backend - index int + backends []*backend.Backend + index int + healthCheckInterval int } func NewRoundRobinLB() (*RoundRobinLB, error) { return &RoundRobinLB{ - backends: []*backend.Backend{}, - index: 0, + backends: []*backend.Backend{}, + index: 0, + healthCheckInterval: 2, }, nil } @@ -24,13 +26,29 @@ func (lb *RoundRobinLB) AddBackend(b *backend.Backend) error { return nil } +func (lb *RoundRobinLB) GetBackends() []*backend.Backend { + return lb.backends +} + +// ServeProxy serves the request to the next backend in the list +// keep in mind that this function and its sub functions need to be thread safe func (lb *RoundRobinLB) ServeProxy(rw http.ResponseWriter, req *http.Request) { - if len(lb.backends) == 0 { - panic("No backends") + if b := lb.getNextBackend(); b != nil { + log.Printf("[LoadBalancer] Serving request to backend %s", lb.backends[lb.index].Addr) + b.Serve(rw, req) + return } - log.Printf("[LoadBalancer] Serving request to backend %s", lb.backends[lb.index].Addr) + http.Error(rw, "No backends available", http.StatusServiceUnavailable) +} + +func (lb *RoundRobinLB) getNextBackend() *backend.Backend { + for i := 0; i < len(lb.backends); i++ { + lb.index = (lb.index + 1) % len(lb.backends) + if lb.backends[lb.index].IsAlive() { + return lb.backends[lb.index] + } + } - lb.index = (lb.index + 1) % len(lb.backends) - lb.backends[lb.index].Serve(rw, req) + return nil } diff --git a/scripts/lb_distribution_test.sh b/scripts/lb_distribution_test.sh new file mode 100755 index 0000000..3fe548b --- /dev/null +++ b/scripts/lb_distribution_test.sh @@ -0,0 +1,2 @@ +#!/bin/bash +for i in {1..100}; do curl -s http://localhost | grep Hostname; done | sort | uniq -c | sort -nr diff --git a/scripts/lb_loop_test.sh b/scripts/lb_loop_test.sh new file mode 100755 index 0000000..1f089a5 --- /dev/null +++ b/scripts/lb_loop_test.sh @@ -0,0 +1,2 @@ +#!/bin/bash +for i in {1..1000}; do curl -s http://localhost | grep Hostname; sleep 0.5; done