Skip to content

Commit

Permalink
Added canaries for podgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
supermeng committed Mar 9, 2018
1 parent 0dc379f commit 0449488
Show file tree
Hide file tree
Showing 6 changed files with 250 additions and 12 deletions.
84 changes: 84 additions & 0 deletions engine/canaries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package engine

import (
"strings"

"github.com/laincloud/deployd/storage"
"github.com/mijia/sweb/log"
)

type IStrategy interface {
Type() string
Rules() []interface{}
}

type DefaultStrategy struct {
DivType string `json:"Type"`
DivDatas []interface{} `json:"Rule"`
}

func (ds *DefaultStrategy) Type() string {
return ds.DivType
}

func (ds *DefaultStrategy) Rules() []interface{} {
return ds.DivDatas
}

type Canary struct {
Strategies []IStrategy
}

func (canary *Canary) Equal(nCanary *Canary) bool {
if len(canary.Strategies) != len(nCanary.Strategies) {
return false
}
for i, strategy := range canary.Strategies {
if strategy.Type() != nCanary.Strategies[i].Type() {
return false
}
}
return true
}

type CanaryPodsWithSpec struct {
PodGroupSpec
canary *Canary
}

func (spec *CanaryPodsWithSpec) SaveCanary(store storage.Store) error {
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
if err := store.Set(key, spec.canary, true); err != nil {
log.Warnf("[Store] Failed to save pod group canary info %s, %s", key, err)
return err
}
return nil
}

func (spec *PodGroupSpec) RemoveCanary(store storage.Store) error {
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
if err := store.Remove(key); err != nil {
log.Warnf("[Store] Failed to remove pod group canary info %s, %s", key, err)
return err
}
return nil
}

func (spec *PodGroupSpec) FetchCanary(store storage.Store) (*Canary, error) {
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
var canary Canary
if err := store.Get(key, &canary); err != nil {
log.Warnf("[Store] Failed to fetch pod group canary info %s, %s", key, err)
return nil, err
}
return &canary, nil
}

func (spec *PodGroupSpec) UpdateCanary(store storage.Store, canary *Canary) error {
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
if err := store.Set(key, canary, true); err != nil {
log.Warnf("[Store] Failed to update pod group canary info %s, %s", key, err)
return err
}
return nil
}
92 changes: 92 additions & 0 deletions engine/canaries_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package engine

import (
"encoding/json"
"testing"
"time"
)

func TestEngineCanaries(t *testing.T) {
etcdAddr := "http://127.0.0.1:2379"
ConfigPortsManager(etcdAddr)
c, store, err := initClusterAndStore()
if err != nil {
t.Fatalf("Cannot create the cluster and storage, %s", err)
}

engine, err := New(c, store)
if err != nil {
t.Fatalf("Cannot create the orc engine, %s", err)
}

namespace := "hello"
name := "hello.canary.web"
pgSpec := createPodGroupSpec(namespace, name, 1)
data := map[string]interface{}{"suffix": 1, "upstream": "beta1"}
strategies := []IStrategy{&DefaultStrategy{"uidsuffix", []interface{}{data}}}
canary := &Canary{strategies}
canarySpec := CanaryPodsWithSpec{pgSpec, canary}
if err := engine.NewCanary(canarySpec); err != nil {
t.Fatalf("Should not return error, %s", err)
}
if err := engine.NewCanary(canarySpec); err == nil {
t.Errorf("Should return exists error, but we got no problem")
}

time.Sleep(20 * time.Second)
if pg, ok := engine.InspectPodGroup(name); !ok {
t.Errorf("We should have the pod group, but we don't get it")
} else if pg.State != RunStateSuccess {
t.Errorf("We should have the pod deployed and running")
}

engine.RescheduleInstance(name, 3)
time.Sleep(20 * time.Second)
if pg, ok := engine.InspectPodGroup(name); !ok {
t.Errorf("We should have the pod group, but we don't get it")
} else if len(pg.Pods) != 3 {
t.Errorf("We should have 3 instance of the pods")
}

engine.RescheduleInstance(name, 1)
time.Sleep(30 * time.Second)
if pg, ok := engine.InspectPodGroup(name); !ok {
t.Errorf("We should have the pod group, but we don't get it")
} else if len(pg.Pods) != 1 {
bytes, err := json.Marshal(pg.Pods)
pods := ""
if err == nil {
pods = string(bytes)
}
t.Errorf("We should have 1 instance of the pods : %v", pods)
}

podSpec := createPodSpec(namespace, name)
podSpec.Containers[0].MemoryLimit = 24 * 1024 * 1024
engine.RescheduleSpec(name, podSpec)
time.Sleep(40 * time.Second)
if pg, ok := engine.InspectPodGroup(name); !ok {
t.Errorf("We should have the pod group, but we don't get it")
} else if pg.Spec.Version != 1 {
t.Errorf("We should have version 1 of the pods")
}

if nCanary := engine.FetchCanaryInfos(name); nCanary == nil {
t.Fatalf("Should return canary!")
} else {
if !nCanary.Equal(canary) {
t.Fatalf("Canary info should not changed")
}
}

if err := engine.RemovePodGroup(name); err != nil {
t.Errorf("We should be able to remove the pod group, %s", err)
} else if err := engine.NewPodGroup(pgSpec); err == nil {
t.Errorf("We should not be able to deploy pod group again in short time we remove it")
}
time.Sleep(20 * time.Second)

if nCanary := engine.FetchCanaryInfos(name); nCanary != nil {
t.Fatalf("Should not return nCanary")
}
}
64 changes: 52 additions & 12 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/laincloud/deployd/cluster"
"github.com/laincloud/deployd/storage"
"github.com/laincloud/deployd/utils/util"
"github.com/mijia/adoc"
"github.com/mijia/sweb/log"
)
Expand Down Expand Up @@ -43,18 +44,18 @@ type EngineConfig struct {
type OrcEngine struct {
sync.RWMutex

config EngineConfig
cluster cluster.Cluster
store storage.Store
eagleView *RuntimeEagleView
pgCtrls map[string]*podGroupController
rmPgCtrls map[string]*podGroupController
dependsCtrls map[string]*dependsController
rmDepCtrls map[string]*dependsController
opsChan chan orcOperation
config EngineConfig
cluster cluster.Cluster
store storage.Store
eagleView *RuntimeEagleView
pgCtrls map[string]*podGroupController
rmPgCtrls map[string]*podGroupController
dependsCtrls map[string]*dependsController
rmDepCtrls map[string]*dependsController
opsChan chan orcOperation
refreshAllChan chan bool
stop chan struct{}
clstrFailCnt int
stop chan struct{}
clstrFailCnt int
}

const (
Expand Down Expand Up @@ -236,10 +237,16 @@ func (engine *OrcEngine) RemovePodGroup(name string) error {
return err
}
log.Infof("start delete %v\n", name)
// if is canary podgroup remove canary strategy
if util.PodGroupType(name) == PGCanaryType {
// remove key
pgCtrl.spec.RemoveCanary(engine.store)
}
engine.opsChan <- orcOperRemove{pgCtrl}
delete(engine.pgCtrls, name)
engine.rmPgCtrls[name] = pgCtrl
go engine.checkPodGroupRemoveResult(name, pgCtrl)

return nil
}
}
Expand Down Expand Up @@ -284,7 +291,6 @@ func (engine *OrcEngine) RescheduleSpec(name string, podSpec PodSpec) error {
pgCtrl.opsChan <- pgOperSaveStore{true}
pgCtrl.opsChan <- pgOperOver{}
}

return nil
}
}
Expand Down Expand Up @@ -329,6 +335,40 @@ func (engine *OrcEngine) ChangeState(pgName, op string, instance int) error {
return nil
}

func (engine *OrcEngine) NewCanary(spec CanaryPodsWithSpec) error {
if err := engine.NewPodGroup(spec.PodGroupSpec); err != nil {
return err
}
// gen canary strategy
spec.SaveCanary(engine.store)
return nil
}

func (engine *OrcEngine) FetchCanaryInfos(name string) *Canary {
engine.RLock()
defer engine.RUnlock()
if pgCtrl, ok := engine.pgCtrls[name]; !ok {
return nil
} else {
if canary, err := pgCtrl.spec.FetchCanary(engine.store); err != nil {
return nil
} else {
return canary
}
}
return nil
}

func (engine *OrcEngine) UpdateCanaryInfos(name string, canary *Canary) error {
engine.RLock()
defer engine.RUnlock()
if pgCtrl, ok := engine.pgCtrls[name]; !ok {
return ErrPodGroupNotExists
} else {
return pgCtrl.spec.UpdateCanary(engine.store, canary)
}
}

func (engine *OrcEngine) hasEnoughResource(pgCtrl *podGroupController, podSpec PodSpec) bool {
if resources, err := engine.cluster.GetResources(); err != nil {
return false
Expand Down
10 changes: 10 additions & 0 deletions engine/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ type PodGroupWithSpec struct {
PodGroup
}

// type PodGroupController interface {
// Deploy()
// Remove()
// Inspect() PodGroupWithSpec
// RescheduleSpec(podSpec PodSpec)
// RescheduleInstance(numInstances int, restartPolicy ...RestartPolicy)
// RescheduleDrift(fromNode, toNode string, instanceNo int, force bool)
// ChangeState(op string, instance int)
// }

type podGroupController struct {
Publisher

Expand Down
3 changes: 3 additions & 0 deletions engine/specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
kLainNodesKey = "nodes"
kLainLastPodSpecKey = "last_spec"
kLainPgOpingKey = "operating"
kLainCanaryKey = "canaries"

kLainLabelPrefix = "cc.bdp.lain.deployd"
kLainLogVolumePath = "/lain/logs"
Expand All @@ -31,6 +32,8 @@ const (

MinPodKillTimeout = 10
MaxPodKillTimeout = 120

PGCanaryType = "canary"
)

var (
Expand Down
9 changes: 9 additions & 0 deletions utils/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ func ParseContainerName(containerName string) (string, int, int, int, error) {
return g.Group(1), version, instance, driftCount, nil
}

func PodGroupType(procName string) string {
p := regex.MustCompile(`.*\.(.*)\..*`)
g := p.Match(procName)
if g == nil {
return ""
}
return g.Group(1)
}

func IpConflictErrorMatch(err string) string {
p := regex.MustCompile(`IP assignment error, data: {IP:([0-9.]+) HandleID:(.*)}: Address already assigned in block`)
g := p.Match(err)
Expand Down

0 comments on commit 0449488

Please sign in to comment.