diff --git a/apiserver/podgroup.go b/apiserver/podgroup.go index a6c7db8..d347c15 100644 --- a/apiserver/podgroup.go +++ b/apiserver/podgroup.go @@ -5,6 +5,7 @@ import ( "net/http" "github.com/laincloud/deployd/engine" + "github.com/laincloud/deployd/utils/util" "github.com/mijia/sweb/form" "github.com/mijia/sweb/log" "github.com/mijia/sweb/server" @@ -26,12 +27,25 @@ func (rpg RestfulPodGroups) Post(ctx context.Context, r *http.Request) (int, int } orcEngine := getEngine(ctx) - if err := orcEngine.NewPodGroup(pgSpec); err != nil { - switch err { + var deployErr error + if util.PodGroupType(pgSpec.Name) == engine.PGCanaryType { + var canary engine.Canary + if err := form.ParamBodyJson(r, &canary); err != nil { + log.Warnf("Failed to decode canary, %s", err) + return http.StatusBadRequest, fmt.Sprintf("Invalid Canary params format: %s", err) + } + canaryPgSpec := engine.CanaryPodsWithSpec{pgSpec, &canary} + deployErr = orcEngine.NewCanary(canaryPgSpec) + + } else { + deployErr = orcEngine.NewPodGroup(pgSpec) + } + if deployErr != nil { + switch deployErr { case engine.ErrNotEnoughResources, engine.ErrPodGroupExists, engine.ErrDependencyPodNotExists: - return http.StatusMethodNotAllowed, err.Error() + return http.StatusMethodNotAllowed, deployErr.Error() default: - return http.StatusInternalServerError, err.Error() + return http.StatusInternalServerError, deployErr.Error() } } diff --git a/engine/canaries.go b/engine/canaries.go new file mode 100644 index 0000000..83b21ec --- /dev/null +++ b/engine/canaries.go @@ -0,0 +1,71 @@ +package engine + +import ( + "strings" + + "github.com/laincloud/deployd/storage" + "github.com/mijia/sweb/log" +) + +type Strategy struct { + DivType string `json:"Type"` + DivDatas []interface{} `json:"Rule"` +} + +type Canary struct { + Strategies []*Strategy +} + +func (canary *Canary) Equal(nCanary *Canary) bool { + if len(canary.Strategies) != len(nCanary.Strategies) { + return false + } + for i, strategy := range canary.Strategies { + if strategy.DivType != nCanary.Strategies[i].DivType { + return false + } + } + return true +} + +type CanaryPodsWithSpec struct { + PodGroupSpec + Canary *Canary +} + +func (spec *CanaryPodsWithSpec) SaveCanary(store storage.Store) error { + key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/") + if err := store.Set(key, spec.Canary, true); err != nil { + log.Warnf("[Store] Failed to save pod group canary info %s, %s", key, err) + return err + } + return nil +} + +func (spec *PodGroupSpec) RemoveCanary(store storage.Store) error { + key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/") + if err := store.Remove(key); err != nil { + log.Warnf("[Store] Failed to remove pod group canary info %s, %s", key, err) + return err + } + return nil +} + +func (spec *PodGroupSpec) FetchCanary(store storage.Store) (*Canary, error) { + key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/") + var canary Canary + if err := store.Get(key, &canary); err != nil { + log.Warnf("[Store] Failed to fetch pod group canary info %s, %s", key, err) + return nil, err + } + return &canary, nil +} + +func (spec *PodGroupSpec) UpdateCanary(store storage.Store, canary *Canary) error { + key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/") + if err := store.Set(key, canary, true); err != nil { + log.Warnf("[Store] Failed to update pod group canary info %s, %s", key, err) + return err + } + return nil +} diff --git a/engine/canaries_test.go b/engine/canaries_test.go new file mode 100644 index 0000000..99cbbe8 --- /dev/null +++ b/engine/canaries_test.go @@ -0,0 +1,92 @@ +package engine + +import ( + "encoding/json" + "testing" + "time" +) + +func TestEngineCanaries(t *testing.T) { + etcdAddr := "http://127.0.0.1:2379" + ConfigPortsManager(etcdAddr) + c, store, err := initClusterAndStore() + if err != nil { + t.Fatalf("Cannot create the cluster and storage, %s", err) + } + + engine, err := New(c, store) + if err != nil { + t.Fatalf("Cannot create the orc engine, %s", err) + } + + namespace := "hello" + name := "hello.canary.web" + pgSpec := createPodGroupSpec(namespace, name, 1) + data := map[string]interface{}{"suffix": 1, "upstream": "beta1"} + strategies := []*Strategy{&Strategy{"uidsuffix", []interface{}{data}}} + canary := &Canary{strategies} + canarySpec := CanaryPodsWithSpec{pgSpec, canary} + if err := engine.NewCanary(canarySpec); err != nil { + t.Fatalf("Should not return error, %s", err) + } + if err := engine.NewCanary(canarySpec); err == nil { + t.Errorf("Should return exists error, but we got no problem") + } + + time.Sleep(20 * time.Second) + if pg, ok := engine.InspectPodGroup(name); !ok { + t.Errorf("We should have the pod group, but we don't get it") + } else if pg.State != RunStateSuccess { + t.Errorf("We should have the pod deployed and running") + } + + engine.RescheduleInstance(name, 3) + time.Sleep(20 * time.Second) + if pg, ok := engine.InspectPodGroup(name); !ok { + t.Errorf("We should have the pod group, but we don't get it") + } else if len(pg.Pods) != 3 { + t.Errorf("We should have 3 instance of the pods") + } + + engine.RescheduleInstance(name, 1) + time.Sleep(30 * time.Second) + if pg, ok := engine.InspectPodGroup(name); !ok { + t.Errorf("We should have the pod group, but we don't get it") + } else if len(pg.Pods) != 1 { + bytes, err := json.Marshal(pg.Pods) + pods := "" + if err == nil { + pods = string(bytes) + } + t.Errorf("We should have 1 instance of the pods : %v", pods) + } + + podSpec := createPodSpec(namespace, name) + podSpec.Containers[0].MemoryLimit = 24 * 1024 * 1024 + engine.RescheduleSpec(name, podSpec) + time.Sleep(40 * time.Second) + if pg, ok := engine.InspectPodGroup(name); !ok { + t.Errorf("We should have the pod group, but we don't get it") + } else if pg.Spec.Version != 1 { + t.Errorf("We should have version 1 of the pods") + } + + if nCanary := engine.FetchCanaryInfos(name); nCanary == nil { + t.Fatalf("Should return canary!") + } else { + if !nCanary.Equal(canary) { + t.Fatalf("Canary info should not changed") + } + } + + if err := engine.RemovePodGroup(name); err != nil { + t.Errorf("We should be able to remove the pod group, %s", err) + } else if err := engine.NewPodGroup(pgSpec); err == nil { + t.Errorf("We should not be able to deploy pod group again in short time we remove it") + } + time.Sleep(20 * time.Second) + + if nCanary := engine.FetchCanaryInfos(name); nCanary != nil { + t.Fatalf("Should not return nCanary") + } +} diff --git a/engine/engine.go b/engine/engine.go index 8ef683d..074770f 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -10,6 +10,7 @@ import ( "github.com/laincloud/deployd/cluster" "github.com/laincloud/deployd/storage" + "github.com/laincloud/deployd/utils/util" "github.com/mijia/adoc" "github.com/mijia/sweb/log" ) @@ -43,18 +44,18 @@ type EngineConfig struct { type OrcEngine struct { sync.RWMutex - config EngineConfig - cluster cluster.Cluster - store storage.Store - eagleView *RuntimeEagleView - pgCtrls map[string]*podGroupController - rmPgCtrls map[string]*podGroupController - dependsCtrls map[string]*dependsController - rmDepCtrls map[string]*dependsController - opsChan chan orcOperation + config EngineConfig + cluster cluster.Cluster + store storage.Store + eagleView *RuntimeEagleView + pgCtrls map[string]*podGroupController + rmPgCtrls map[string]*podGroupController + dependsCtrls map[string]*dependsController + rmDepCtrls map[string]*dependsController + opsChan chan orcOperation refreshAllChan chan bool - stop chan struct{} - clstrFailCnt int + stop chan struct{} + clstrFailCnt int } const ( @@ -236,10 +237,16 @@ func (engine *OrcEngine) RemovePodGroup(name string) error { return err } log.Infof("start delete %v\n", name) + // if is canary podgroup remove canary strategy + if util.PodGroupType(name) == PGCanaryType { + // remove key + pgCtrl.spec.RemoveCanary(engine.store) + } engine.opsChan <- orcOperRemove{pgCtrl} delete(engine.pgCtrls, name) engine.rmPgCtrls[name] = pgCtrl go engine.checkPodGroupRemoveResult(name, pgCtrl) + return nil } } @@ -284,7 +291,6 @@ func (engine *OrcEngine) RescheduleSpec(name string, podSpec PodSpec) error { pgCtrl.opsChan <- pgOperSaveStore{true} pgCtrl.opsChan <- pgOperOver{} } - return nil } } @@ -329,6 +335,40 @@ func (engine *OrcEngine) ChangeState(pgName, op string, instance int) error { return nil } +func (engine *OrcEngine) NewCanary(spec CanaryPodsWithSpec) error { + if err := engine.NewPodGroup(spec.PodGroupSpec); err != nil { + return err + } + // gen canary strategy + spec.SaveCanary(engine.store) + return nil +} + +func (engine *OrcEngine) FetchCanaryInfos(name string) *Canary { + engine.RLock() + defer engine.RUnlock() + if pgCtrl, ok := engine.pgCtrls[name]; !ok { + return nil + } else { + if canary, err := pgCtrl.spec.FetchCanary(engine.store); err != nil { + return nil + } else { + return canary + } + } + return nil +} + +func (engine *OrcEngine) UpdateCanaryInfos(name string, canary *Canary) error { + engine.RLock() + defer engine.RUnlock() + if pgCtrl, ok := engine.pgCtrls[name]; !ok { + return ErrPodGroupNotExists + } else { + return pgCtrl.spec.UpdateCanary(engine.store, canary) + } +} + func (engine *OrcEngine) hasEnoughResource(pgCtrl *podGroupController, podSpec PodSpec) bool { if resources, err := engine.cluster.GetResources(); err != nil { return false diff --git a/engine/podgroup.go b/engine/podgroup.go index 712d6f1..1e8ee8a 100644 --- a/engine/podgroup.go +++ b/engine/podgroup.go @@ -24,6 +24,16 @@ type PodGroupWithSpec struct { PodGroup } +// type PodGroupController interface { +// Deploy() +// Remove() +// Inspect() PodGroupWithSpec +// RescheduleSpec(podSpec PodSpec) +// RescheduleInstance(numInstances int, restartPolicy ...RestartPolicy) +// RescheduleDrift(fromNode, toNode string, instanceNo int, force bool) +// ChangeState(op string, instance int) +// } + type podGroupController struct { Publisher diff --git a/engine/specs.go b/engine/specs.go index 3a3e83c..6bacbd2 100644 --- a/engine/specs.go +++ b/engine/specs.go @@ -22,6 +22,7 @@ const ( kLainNodesKey = "nodes" kLainLastPodSpecKey = "last_spec" kLainPgOpingKey = "operating" + kLainCanaryKey = "canaries" kLainLabelPrefix = "cc.bdp.lain.deployd" kLainLogVolumePath = "/lain/logs" @@ -31,6 +32,8 @@ const ( MinPodKillTimeout = 10 MaxPodKillTimeout = 120 + + PGCanaryType = "canary" ) var ( diff --git a/utils/util/util.go b/utils/util/util.go index 2536bde..16497b4 100644 --- a/utils/util/util.go +++ b/utils/util/util.go @@ -48,6 +48,15 @@ func ParseContainerName(containerName string) (string, int, int, int, error) { return g.Group(1), version, instance, driftCount, nil } +func PodGroupType(procName string) string { + p := regex.MustCompile(`.*\.(.*)\..*`) + g := p.Match(procName) + if g == nil { + return "" + } + return g.Group(1) +} + func IpConflictErrorMatch(err string) string { p := regex.MustCompile(`IP assignment error, data: {IP:([0-9.]+) HandleID:(.*)}: Address already assigned in block`) g := p.Match(err) diff --git a/utils/util/util_test.go b/utils/util/util_test.go index 54e39f1..d3bb4fa 100644 --- a/utils/util/util_test.go +++ b/utils/util/util_test.go @@ -54,3 +54,10 @@ func Test_timeMarshal(t *testing.T) { data, _ := json.Marshal(tt) fmt.Println("t:", string(data)) } + +func Test_PGType(t *testing.T) { + pgName := "hello.canary.web" + if pgType := PodGroupType(pgName); pgType != "canary" { + t.Fatalf("pgtype %s should be canary", pgType) + } +}