From 04494881baad5e90da3ccf9ca0ebdf1891675e61 Mon Sep 17 00:00:00 2001 From: supermeng Date: Fri, 9 Mar 2018 16:19:47 +0800 Subject: [PATCH] Added canaries for podgroup --- engine/canaries.go | 84 +++++++++++++++++++++++++++++++++++++ engine/canaries_test.go | 92 +++++++++++++++++++++++++++++++++++++++++ engine/engine.go | 64 ++++++++++++++++++++++------ engine/podgroup.go | 10 +++++ engine/specs.go | 3 ++ utils/util/util.go | 9 ++++ 6 files changed, 250 insertions(+), 12 deletions(-) create mode 100644 engine/canaries.go create mode 100644 engine/canaries_test.go diff --git a/engine/canaries.go b/engine/canaries.go new file mode 100644 index 0000000..ae044d2 --- /dev/null +++ b/engine/canaries.go @@ -0,0 +1,84 @@ +package engine + +import ( + "strings" + + "github.com/laincloud/deployd/storage" + "github.com/mijia/sweb/log" +) + +type IStrategy interface { + Type() string + Rules() []interface{} +} + +type DefaultStrategy struct { + DivType string `json:"Type"` + DivDatas []interface{} `json:"Rule"` +} + +func (ds *DefaultStrategy) Type() string { + return ds.DivType +} + +func (ds *DefaultStrategy) Rules() []interface{} { + return ds.DivDatas +} + +type Canary struct { + Strategies []IStrategy +} + +func (canary *Canary) Equal(nCanary *Canary) bool { + if len(canary.Strategies) != len(nCanary.Strategies) { + return false + } + for i, strategy := range canary.Strategies { + if strategy.Type() != nCanary.Strategies[i].Type() { + return false + } + } + return true +} + +type CanaryPodsWithSpec struct { + PodGroupSpec + canary *Canary +} + +func (spec *CanaryPodsWithSpec) SaveCanary(store storage.Store) error { + key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/") + if err := store.Set(key, spec.canary, true); err != nil { + log.Warnf("[Store] Failed to save pod group canary info %s, %s", key, err) + return err + } + return nil +} + +func (spec *PodGroupSpec) RemoveCanary(store storage.Store) error { + key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/") + if err := store.Remove(key); err != nil { + log.Warnf("[Store] Failed to remove pod group canary info %s, %s", key, err) + return err + } + return nil +} + +func (spec *PodGroupSpec) FetchCanary(store storage.Store) (*Canary, error) { + key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/") + var canary Canary + if err := store.Get(key, &canary); err != nil { + log.Warnf("[Store] Failed to fetch pod group canary info %s, %s", key, err) + return nil, err + } + return &canary, nil +} + +func (spec *PodGroupSpec) UpdateCanary(store storage.Store, canary *Canary) error { + key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/") + if err := store.Set(key, canary, true); err != nil { + log.Warnf("[Store] Failed to update pod group canary info %s, %s", key, err) + return err + } + return nil +} diff --git a/engine/canaries_test.go b/engine/canaries_test.go new file mode 100644 index 0000000..17e7151 --- /dev/null +++ b/engine/canaries_test.go @@ -0,0 +1,92 @@ +package engine + +import ( + "encoding/json" + "testing" + "time" +) + +func TestEngineCanaries(t *testing.T) { + etcdAddr := "http://127.0.0.1:2379" + ConfigPortsManager(etcdAddr) + c, store, err := initClusterAndStore() + if err != nil { + t.Fatalf("Cannot create the cluster and storage, %s", err) + } + + engine, err := New(c, store) + if err != nil { + t.Fatalf("Cannot create the orc engine, %s", err) + } + + namespace := "hello" + name := "hello.canary.web" + pgSpec := createPodGroupSpec(namespace, name, 1) + data := map[string]interface{}{"suffix": 1, "upstream": "beta1"} + strategies := []IStrategy{&DefaultStrategy{"uidsuffix", []interface{}{data}}} + canary := &Canary{strategies} + canarySpec := CanaryPodsWithSpec{pgSpec, canary} + if err := engine.NewCanary(canarySpec); err != nil { + t.Fatalf("Should not return error, %s", err) + } + if err := engine.NewCanary(canarySpec); err == nil { + t.Errorf("Should return exists error, but we got no problem") + } + + time.Sleep(20 * time.Second) + if pg, ok := engine.InspectPodGroup(name); !ok { + t.Errorf("We should have the pod group, but we don't get it") + } else if pg.State != RunStateSuccess { + t.Errorf("We should have the pod deployed and running") + } + + engine.RescheduleInstance(name, 3) + time.Sleep(20 * time.Second) + if pg, ok := engine.InspectPodGroup(name); !ok { + t.Errorf("We should have the pod group, but we don't get it") + } else if len(pg.Pods) != 3 { + t.Errorf("We should have 3 instance of the pods") + } + + engine.RescheduleInstance(name, 1) + time.Sleep(30 * time.Second) + if pg, ok := engine.InspectPodGroup(name); !ok { + t.Errorf("We should have the pod group, but we don't get it") + } else if len(pg.Pods) != 1 { + bytes, err := json.Marshal(pg.Pods) + pods := "" + if err == nil { + pods = string(bytes) + } + t.Errorf("We should have 1 instance of the pods : %v", pods) + } + + podSpec := createPodSpec(namespace, name) + podSpec.Containers[0].MemoryLimit = 24 * 1024 * 1024 + engine.RescheduleSpec(name, podSpec) + time.Sleep(40 * time.Second) + if pg, ok := engine.InspectPodGroup(name); !ok { + t.Errorf("We should have the pod group, but we don't get it") + } else if pg.Spec.Version != 1 { + t.Errorf("We should have version 1 of the pods") + } + + if nCanary := engine.FetchCanaryInfos(name); nCanary == nil { + t.Fatalf("Should return canary!") + } else { + if !nCanary.Equal(canary) { + t.Fatalf("Canary info should not changed") + } + } + + if err := engine.RemovePodGroup(name); err != nil { + t.Errorf("We should be able to remove the pod group, %s", err) + } else if err := engine.NewPodGroup(pgSpec); err == nil { + t.Errorf("We should not be able to deploy pod group again in short time we remove it") + } + time.Sleep(20 * time.Second) + + if nCanary := engine.FetchCanaryInfos(name); nCanary != nil { + t.Fatalf("Should not return nCanary") + } +} diff --git a/engine/engine.go b/engine/engine.go index 8ef683d..074770f 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -10,6 +10,7 @@ import ( "github.com/laincloud/deployd/cluster" "github.com/laincloud/deployd/storage" + "github.com/laincloud/deployd/utils/util" "github.com/mijia/adoc" "github.com/mijia/sweb/log" ) @@ -43,18 +44,18 @@ type EngineConfig struct { type OrcEngine struct { sync.RWMutex - config EngineConfig - cluster cluster.Cluster - store storage.Store - eagleView *RuntimeEagleView - pgCtrls map[string]*podGroupController - rmPgCtrls map[string]*podGroupController - dependsCtrls map[string]*dependsController - rmDepCtrls map[string]*dependsController - opsChan chan orcOperation + config EngineConfig + cluster cluster.Cluster + store storage.Store + eagleView *RuntimeEagleView + pgCtrls map[string]*podGroupController + rmPgCtrls map[string]*podGroupController + dependsCtrls map[string]*dependsController + rmDepCtrls map[string]*dependsController + opsChan chan orcOperation refreshAllChan chan bool - stop chan struct{} - clstrFailCnt int + stop chan struct{} + clstrFailCnt int } const ( @@ -236,10 +237,16 @@ func (engine *OrcEngine) RemovePodGroup(name string) error { return err } log.Infof("start delete %v\n", name) + // if is canary podgroup remove canary strategy + if util.PodGroupType(name) == PGCanaryType { + // remove key + pgCtrl.spec.RemoveCanary(engine.store) + } engine.opsChan <- orcOperRemove{pgCtrl} delete(engine.pgCtrls, name) engine.rmPgCtrls[name] = pgCtrl go engine.checkPodGroupRemoveResult(name, pgCtrl) + return nil } } @@ -284,7 +291,6 @@ func (engine *OrcEngine) RescheduleSpec(name string, podSpec PodSpec) error { pgCtrl.opsChan <- pgOperSaveStore{true} pgCtrl.opsChan <- pgOperOver{} } - return nil } } @@ -329,6 +335,40 @@ func (engine *OrcEngine) ChangeState(pgName, op string, instance int) error { return nil } +func (engine *OrcEngine) NewCanary(spec CanaryPodsWithSpec) error { + if err := engine.NewPodGroup(spec.PodGroupSpec); err != nil { + return err + } + // gen canary strategy + spec.SaveCanary(engine.store) + return nil +} + +func (engine *OrcEngine) FetchCanaryInfos(name string) *Canary { + engine.RLock() + defer engine.RUnlock() + if pgCtrl, ok := engine.pgCtrls[name]; !ok { + return nil + } else { + if canary, err := pgCtrl.spec.FetchCanary(engine.store); err != nil { + return nil + } else { + return canary + } + } + return nil +} + +func (engine *OrcEngine) UpdateCanaryInfos(name string, canary *Canary) error { + engine.RLock() + defer engine.RUnlock() + if pgCtrl, ok := engine.pgCtrls[name]; !ok { + return ErrPodGroupNotExists + } else { + return pgCtrl.spec.UpdateCanary(engine.store, canary) + } +} + func (engine *OrcEngine) hasEnoughResource(pgCtrl *podGroupController, podSpec PodSpec) bool { if resources, err := engine.cluster.GetResources(); err != nil { return false diff --git a/engine/podgroup.go b/engine/podgroup.go index 712d6f1..1e8ee8a 100644 --- a/engine/podgroup.go +++ b/engine/podgroup.go @@ -24,6 +24,16 @@ type PodGroupWithSpec struct { PodGroup } +// type PodGroupController interface { +// Deploy() +// Remove() +// Inspect() PodGroupWithSpec +// RescheduleSpec(podSpec PodSpec) +// RescheduleInstance(numInstances int, restartPolicy ...RestartPolicy) +// RescheduleDrift(fromNode, toNode string, instanceNo int, force bool) +// ChangeState(op string, instance int) +// } + type podGroupController struct { Publisher diff --git a/engine/specs.go b/engine/specs.go index 3a3e83c..6bacbd2 100644 --- a/engine/specs.go +++ b/engine/specs.go @@ -22,6 +22,7 @@ const ( kLainNodesKey = "nodes" kLainLastPodSpecKey = "last_spec" kLainPgOpingKey = "operating" + kLainCanaryKey = "canaries" kLainLabelPrefix = "cc.bdp.lain.deployd" kLainLogVolumePath = "/lain/logs" @@ -31,6 +32,8 @@ const ( MinPodKillTimeout = 10 MaxPodKillTimeout = 120 + + PGCanaryType = "canary" ) var ( diff --git a/utils/util/util.go b/utils/util/util.go index 2536bde..16497b4 100644 --- a/utils/util/util.go +++ b/utils/util/util.go @@ -48,6 +48,15 @@ func ParseContainerName(containerName string) (string, int, int, int, error) { return g.Group(1), version, instance, driftCount, nil } +func PodGroupType(procName string) string { + p := regex.MustCompile(`.*\.(.*)\..*`) + g := p.Match(procName) + if g == nil { + return "" + } + return g.Group(1) +} + func IpConflictErrorMatch(err string) string { p := regex.MustCompile(`IP assignment error, data: {IP:([0-9.]+) HandleID:(.*)}: Address already assigned in block`) g := p.Match(err)