Skip to content

Commit

Permalink
Loadbalancer cleanup before integration with new controlplane (cluste…
Browse files Browse the repository at this point in the history
…rlink-net#114)

* Remove http receivers (old controlplane)
* SetPolicy and DeletePolicy take policy+return err
* Change map name from Policy to Scheme
* Do not work with pointers to slices
* Safer handling of ServiceStateMap
* Ensure there is always a default policy
* Testing load-balancer
* Test PolicyHandler interface to the load balancer

Signed-off-by: Ziv Nevo <[email protected]>
  • Loading branch information
zivnevo authored Oct 26, 2023
1 parent 01f89cc commit 1fa8b6e
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 103 deletions.
6 changes: 2 additions & 4 deletions pkg/policyengine/PolicyDispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,11 @@ func connPolicyFromBlob(blob io.Reader) (*policytypes.ConnectivityPolicy, error)
}

func (pH *PolicyHandler) AddLBPolicy(lbPolicy *LBPolicy) error {
pH.loadBalancer.SetPolicy(lbPolicy.ServiceSrc, lbPolicy.ServiceDst, lbPolicy.Scheme, lbPolicy.DefaultPeer)
return nil
return pH.loadBalancer.SetPolicy(lbPolicy)
}

func (pH *PolicyHandler) DeleteLBPolicy(lbPolicy *LBPolicy) error {
pH.loadBalancer.deletePolicy(lbPolicy.ServiceSrc, lbPolicy.ServiceDst, lbPolicy.Scheme, lbPolicy.DefaultPeer)
return nil
return pH.loadBalancer.DeletePolicy(lbPolicy)
}

func (pH *PolicyHandler) AddAccessPolicy(policy *api.Policy) error {
Expand Down
35 changes: 30 additions & 5 deletions pkg/policyengine/PolicyDispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,6 @@ func TestIncomingConnectionRequests(t *testing.T) {
}

func TestOutgoingConnectionRequests(t *testing.T) {
const (
peer1 = "peer1"
peer2 = "peer2"
)

ph := policyengine.NewPolicyHandler()
simpleSelector2 := metav1.LabelSelector{MatchLabels: policytypes.WorkloadAttrs{
policyengine.ServiceNameLabel: svcName,
Expand Down Expand Up @@ -148,6 +143,36 @@ func TestOutgoingConnectionRequests(t *testing.T) {
require.Nil(t, err)
}

func TestLoadBalancer(t *testing.T) {
ph := policyengine.NewPolicyHandler()
addRemoteSvc(t, svcName, peer1, ph)
addRemoteSvc(t, svcName, peer2, ph)
addPolicy(t, &policy, ph)

lbPolicy := policyengine.LBPolicy{ServiceSrc: svcName, ServiceDst: svcName, Scheme: policyengine.Static, DefaultPeer: peer1}
err := ph.AddLBPolicy(&lbPolicy)
require.Nil(t, err)

requestAttr := event.ConnectionRequestAttr{SrcService: svcName, DstService: svcName, Direction: event.Outgoing}
connReqResp, err := ph.AuthorizeAndRouteConnection(&requestAttr)
require.Nil(t, err)
require.Equal(t, event.Allow, connReqResp.Action)
require.Equal(t, peer1, connReqResp.TargetMbg) // LB policy requires this request to be served by peer1

err = ph.DeleteLBPolicy(&lbPolicy) // LB policy is deleted - the random default policy now takes effect
require.Nil(t, err)
connReqResp, err = ph.AuthorizeAndRouteConnection(&requestAttr)
require.Nil(t, err)
require.Equal(t, event.Allow, connReqResp.Action)
require.Contains(t, []string{peer1, peer2}, connReqResp.TargetMbg)

ph.DeletePeer(peer1) // peer1 is deleted, so all requests should go to peer2
connReqResp, err = ph.AuthorizeAndRouteConnection(&requestAttr)
require.Nil(t, err)
require.Equal(t, event.Allow, connReqResp.Action)
require.Equal(t, peer2, connReqResp.TargetMbg)
}

func addRemoteSvc(t *testing.T, svc, peer string, ph policyengine.PolicyDecider) {
ph.AddPeer(&api.Peer{Name: peer}) // just in case it was not already added
action, err := ph.AddBinding(&api.Binding{Spec: api.BindingSpec{Import: svc, Peer: peer}})
Expand Down
160 changes: 66 additions & 94 deletions pkg/policyengine/loadBalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
package policyengine

import (
"encoding/json"
"fmt"
"math/rand"
"net/http"

"github.com/sirupsen/logrus"

Expand Down Expand Up @@ -47,147 +45,120 @@ type ServiceState struct {
}

type LoadBalancer struct {
ServiceMap map[string]*[]string // Service to Peers
Policy map[string](map[string]LBScheme) // PolicyMap [serviceDst][serviceSrc]Policy
ServiceMap map[string][]string // Service to Peers
Scheme map[string](map[string]LBScheme) // PolicyMap [serviceDst][serviceSrc]Policy
ServiceStateMap map[string]map[string]*ServiceState // State of policy Per destination and source
}

func NewLoadBalancer() *LoadBalancer {
lb := &LoadBalancer{
ServiceMap: make(map[string]*[]string),
Policy: make(map[string](map[string]LBScheme)),
ServiceMap: make(map[string][]string),
Scheme: make(map[string](map[string]LBScheme)),
ServiceStateMap: make(map[string](map[string]*ServiceState)),
}

lb.ServiceStateMap[event.Wildcard] = make(map[string]*ServiceState)
lb.Policy[event.Wildcard] = make(map[string]LBScheme)
lb.Policy[event.Wildcard][event.Wildcard] = Random // default policy
lb.Scheme[event.Wildcard] = map[string]LBScheme{event.Wildcard: Random} // default policy
lb.ServiceStateMap[event.Wildcard] = map[string]*ServiceState{event.Wildcard: {}}
return lb
}

/********************* HTTP functions ***************************************************/
func (lB *LoadBalancer) SetPolicyReq(w http.ResponseWriter, r *http.Request) {
var requestAttr LBPolicy
err := json.NewDecoder(r.Body).Decode(&requestAttr)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
plog.Infof("Set LB Policy request : %+v", requestAttr)

lB.SetPolicy(requestAttr.ServiceSrc, requestAttr.ServiceDst, requestAttr.Scheme, requestAttr.DefaultPeer)

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}

func (lB *LoadBalancer) DeletePolicyReq(w http.ResponseWriter, r *http.Request) {
var requestAttr LBPolicy
err := json.NewDecoder(r.Body).Decode(&requestAttr)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
plog.Infof("Delete LB Policy request : %+v", requestAttr)

lB.deletePolicy(requestAttr.ServiceSrc, requestAttr.ServiceDst, requestAttr.Scheme, requestAttr.DefaultPeer)

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
}

func (lB *LoadBalancer) GetPolicyReq(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(lB.Policy); err != nil {
plog.Errorf("Error happened in JSON encode. Err: %s", err)
return
}
}

/********************* LodBalancer functions ***************************************************/

func (lB *LoadBalancer) AddToServiceMap(serviceDst string, peer string) {
if peers, ok := lB.ServiceMap[serviceDst]; ok {
_, exist := exists(*peers, peer)
_, exist := exists(peers, peer)
if !exist {
*peers = append(*peers, peer)
lB.ServiceMap[serviceDst] = peers
lB.ServiceMap[serviceDst] = append(peers, peer)
}
} else {
lB.ServiceMap[serviceDst] = &([]string{peer})
lB.ServiceMap[serviceDst] = []string{peer}
lB.ServiceStateMap[serviceDst] = make(map[string]*ServiceState)
lB.ServiceStateMap[serviceDst][event.Wildcard] = &ServiceState{totalConnections: 0, defaultPeer: peer}
}
llog.Infof("Remote serviceDst added %v->[%+v]", serviceDst, *(lB.ServiceMap[serviceDst]))
llog.Infof("Remote serviceDst added %v->[%+v]", serviceDst, lB.ServiceMap[serviceDst])
}

func (lB *LoadBalancer) RemovePeerFromServiceMap(peer string) {
for svc := range lB.ServiceMap {
lB.RemovePeerFromService(svc, peer)
lB.removePeerFromService(svc, peer)
}
}
func (lB *LoadBalancer) RemovePeerFromService(svc, peer string) {

func (lB *LoadBalancer) removePeerFromService(svc, peer string) {
if peers, ok := lB.ServiceMap[svc]; ok {
index, exist := exists(*peers, peer)
index, exist := exists(peers, peer)
if !exist {
return
}
*peers = append((*peers)[:index], (*peers)[index+1:]...)
llog.Infof("Peer removed from service %v->[%+v]", svc, *(lB.ServiceMap[svc]))
lB.ServiceMap[svc] = append(peers[:index], peers[index+1:]...)
llog.Infof("Peer removed from service %v->[%+v]", svc, lB.ServiceMap[svc])
}
}
func (lB *LoadBalancer) SetPolicy(serviceSrc, serviceDst string, policy LBScheme, defaultPeer string) {
plog.Infof("Set LB policy %v for serviceSrc %+v serviceDst %+v defaultPeer %+v", policy, serviceSrc, serviceDst, defaultPeer)

if policy == Static && !lB.checkPeerExist(serviceDst, defaultPeer) {
llog.Errorf("Remote service %v is not exist in [%+v]", serviceDst, defaultPeer)
defaultPeer = ""
func (lB *LoadBalancer) SetPolicy(lbPolicy *LBPolicy) error {
plog.Infof("Set LB policy %+v", lbPolicy)

defaultPeer := lbPolicy.DefaultPeer
serviceSrc := lbPolicy.ServiceSrc
serviceDst := lbPolicy.ServiceDst
scheme := lbPolicy.Scheme
if scheme == Static && !lB.checkPeerExist(serviceDst, defaultPeer) {
err := fmt.Errorf("remote service %v does not exist in [%+v]", serviceDst, defaultPeer)
llog.Errorf(err.Error())
return err
}

if _, ok := lB.Policy[serviceDst]; !ok { // Create default service if destination service is not exist
lB.Policy[serviceDst] = make(map[string]LBScheme)
if _, ok := lB.Scheme[serviceDst]; !ok { // Create default service if destination service is not exist
lB.Scheme[serviceDst] = make(map[string]LBScheme)
}
// start to update policy
lB.Policy[serviceDst][serviceSrc] = policy
lB.Scheme[serviceDst][serviceSrc] = scheme

if serviceDst != event.Wildcard { // ServiceStateMap[dst][*] is created only when the remote service is exposed
lB.ServiceStateMap[serviceDst][serviceSrc] = &ServiceState{totalConnections: 0, defaultPeer: defaultPeer}
}

if serviceDst != event.Wildcard && serviceSrc == event.Wildcard { // for [dst][*] update only defaultPeer
lB.ServiceStateMap[serviceDst][serviceSrc].defaultPeer = defaultPeer
}
return nil
}

func (lB *LoadBalancer) deletePolicy(serviceSrc, serviceDst string, policy LBScheme, defaultPeer string) {
plog.Infof("Delete LB policy %v for serviceSrc %+v serviceDst %+v defaultPeer %+v", policy, serviceSrc, serviceDst, defaultPeer)
if _, ok := lB.Policy[serviceDst][serviceSrc]; ok {
delete(lB.Policy[serviceDst], serviceSrc)
if len(lB.Policy[serviceDst]) == 0 {
delete(lB.Policy, serviceDst)
func (lB *LoadBalancer) DeletePolicy(lbPolicy *LBPolicy) error {
plog.Infof("Delete LB policy %+v", lbPolicy)

serviceSrc := lbPolicy.ServiceSrc
serviceDst := lbPolicy.ServiceDst

if serviceSrc == event.Wildcard && serviceDst == event.Wildcard {
return fmt.Errorf("default policy cannot be deleted")
}

if _, ok := lB.Scheme[serviceDst][serviceSrc]; ok {
delete(lB.Scheme[serviceDst], serviceSrc)
if len(lB.Scheme[serviceDst]) == 0 {
delete(lB.Scheme, serviceDst)
}
} else {
return fmt.Errorf("failed to delete a non-existing load-balancing policy")
}

if serviceDst != event.Wildcard && serviceSrc != event.Wildcard { // ServiceStateMap apply only we set policy for specific serviceSrc and serviceDst
delete(lB.ServiceStateMap[serviceDst], serviceSrc)
}
return nil
}

func (lB *LoadBalancer) RemoveDestService(serviceDst, peer string) {
if peer != "" {
lB.RemovePeerFromService(serviceDst, peer)
lB.removePeerFromService(serviceDst, peer)
} else {
delete(lB.ServiceMap, serviceDst)
}
}

func (lB *LoadBalancer) updateState(serviceSrc, serviceDst string) {
if _, ok := lB.Policy[serviceDst][serviceSrc]; ok {
if _, ok := lB.ServiceStateMap[serviceDst][serviceSrc]; ok {
lB.ServiceStateMap[serviceDst][serviceSrc].totalConnections++
}
if _, ok := lB.Policy[event.Wildcard][serviceSrc]; ok && serviceDst == event.Wildcard {
lB.ServiceStateMap[event.Wildcard][serviceSrc].totalConnections++
if _, ok := lB.ServiceStateMap[serviceDst][event.Wildcard]; ok {
lB.ServiceStateMap[serviceDst][event.Wildcard].totalConnections++ // may not exist if dst is not imported yet
}
lB.ServiceStateMap[serviceDst][event.Wildcard].totalConnections++ // always exist
}

/********************* Policy functions ***************************************************/
Expand Down Expand Up @@ -218,7 +189,7 @@ func (lB *LoadBalancer) LookupStatic(serviceSrc, serviceDst string, peers []stri
}

func (lB *LoadBalancer) LookupWith(serviceSrc, serviceDst string, peers []string) (string, error) {
policy := lB.getPolicy(serviceSrc, serviceDst)
policy := lB.getScheme(serviceSrc, serviceDst)

lB.updateState(serviceSrc, serviceDst)
plog.Infof("LoadBalancer lookup for serviceSrc %s serviceDst %s with policy %s with %+v", serviceSrc, serviceDst, policy, peers)
Expand All @@ -238,21 +209,22 @@ func (lB *LoadBalancer) LookupWith(serviceSrc, serviceDst string, peers []string
return lB.LookupRandom(serviceDst, peers)
}
}
func (lB *LoadBalancer) getPolicy(serviceSrc, serviceDst string) LBScheme {
if p, ok := lB.Policy[serviceDst][serviceSrc]; ok {

func (lB *LoadBalancer) getScheme(serviceSrc, serviceDst string) LBScheme {
if p, ok := lB.Scheme[serviceDst][serviceSrc]; ok {
return p
} else if p, ok := lB.Policy[event.Wildcard][serviceSrc]; ok {
} else if p, ok := lB.Scheme[event.Wildcard][serviceSrc]; ok {
return p
} else if p, ok := lB.Policy[serviceDst][event.Wildcard]; ok {
} else if p, ok := lB.Scheme[serviceDst][event.Wildcard]; ok {
return p
} else {
return lB.Policy[event.Wildcard][event.Wildcard]
return lB.Scheme[event.Wildcard][event.Wildcard]
}
}

func (lB *LoadBalancer) getDefaultPeer(serviceSrc, serviceDst string) string {
if _, ok := lB.Policy[serviceDst]; ok {
if _, ok := lB.Policy[serviceDst][serviceSrc]; ok {
if _, ok := lB.ServiceStateMap[serviceDst]; ok {
if _, ok := lB.ServiceStateMap[serviceDst][serviceSrc]; ok {
return lB.ServiceStateMap[serviceDst][serviceSrc].defaultPeer
}
return lB.ServiceStateMap[serviceDst][event.Wildcard].defaultPeer
Expand All @@ -263,17 +235,17 @@ func (lB *LoadBalancer) getDefaultPeer(serviceSrc, serviceDst string) string {

func (lB *LoadBalancer) GetTargetPeers(service string) ([]string, error) {
peerList := lB.ServiceMap[service]
if peerList == nil {
if len(peerList) == 0 {
plog.Errorf("Unable to find peer for %s", service)
return []string{}, fmt.Errorf("no available target peer")
}
return *peerList, nil
return peerList, nil
}

func (lB *LoadBalancer) checkPeerExist(service, peer string) bool {
peerList := lB.ServiceMap[service]
if peerList != nil {
for _, val := range *peerList {
for _, val := range peerList {
if val == peer {
return true
}
Expand Down
Loading

0 comments on commit 1fa8b6e

Please sign in to comment.