Skip to content

Commit

Permalink
Added canaries for podgroup
Browse files Browse the repository at this point in the history
  • Loading branch information
supermeng committed Mar 9, 2018
1 parent 0dc379f commit 9c3a492
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 16 deletions.
22 changes: 18 additions & 4 deletions apiserver/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"

"github.com/laincloud/deployd/engine"
"github.com/laincloud/deployd/utils/util"
"github.com/mijia/sweb/form"
"github.com/mijia/sweb/log"
"github.com/mijia/sweb/server"
Expand All @@ -26,12 +27,25 @@ func (rpg RestfulPodGroups) Post(ctx context.Context, r *http.Request) (int, int
}

orcEngine := getEngine(ctx)
if err := orcEngine.NewPodGroup(pgSpec); err != nil {
switch err {
var deployErr error
if util.PodGroupType(pgSpec.Name) == engine.PGCanaryType {
var canary engine.Canary
if err := form.ParamBodyJson(r, &canary); err != nil {
log.Warnf("Failed to decode canary, %s", err)
return http.StatusBadRequest, fmt.Sprintf("Invalid Canary params format: %s", err)
}
canaryPgSpec := engine.CanaryPodsWithSpec{pgSpec, &canary}
deployErr = orcEngine.NewCanary(canaryPgSpec)

} else {
deployErr = orcEngine.NewPodGroup(pgSpec)
}
if deployErr != nil {
switch deployErr {
case engine.ErrNotEnoughResources, engine.ErrPodGroupExists, engine.ErrDependencyPodNotExists:
return http.StatusMethodNotAllowed, err.Error()
return http.StatusMethodNotAllowed, deployErr.Error()
default:
return http.StatusInternalServerError, err.Error()
return http.StatusInternalServerError, deployErr.Error()
}
}

Expand Down
71 changes: 71 additions & 0 deletions engine/canaries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package engine

import (
"strings"

"github.com/laincloud/deployd/storage"
"github.com/mijia/sweb/log"
)

type Strategy struct {
DivType string `json:"Type"`
DivDatas []interface{} `json:"Rule"`
}

type Canary struct {
Strategies []*Strategy
}

func (canary *Canary) Equal(nCanary *Canary) bool {
if len(canary.Strategies) != len(nCanary.Strategies) {
return false
}
for i, strategy := range canary.Strategies {
if strategy.DivType != nCanary.Strategies[i].DivType {
return false
}
}
return true
}

type CanaryPodsWithSpec struct {
PodGroupSpec
Canary *Canary
}

func (spec *CanaryPodsWithSpec) SaveCanary(store storage.Store) error {
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
if err := store.Set(key, spec.Canary, true); err != nil {
log.Warnf("[Store] Failed to save pod group canary info %s, %s", key, err)
return err
}
return nil
}

func (spec *PodGroupSpec) RemoveCanary(store storage.Store) error {
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
if err := store.Remove(key); err != nil {
log.Warnf("[Store] Failed to remove pod group canary info %s, %s", key, err)
return err
}
return nil
}

func (spec *PodGroupSpec) FetchCanary(store storage.Store) (*Canary, error) {
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
var canary Canary
if err := store.Get(key, &canary); err != nil {
log.Warnf("[Store] Failed to fetch pod group canary info %s, %s", key, err)
return nil, err
}
return &canary, nil
}

func (spec *PodGroupSpec) UpdateCanary(store storage.Store, canary *Canary) error {
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
if err := store.Set(key, canary, true); err != nil {
log.Warnf("[Store] Failed to update pod group canary info %s, %s", key, err)
return err
}
return nil
}
92 changes: 92 additions & 0 deletions engine/canaries_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package engine

import (
"encoding/json"
"testing"
"time"
)

func TestEngineCanaries(t *testing.T) {
etcdAddr := "http://127.0.0.1:2379"
ConfigPortsManager(etcdAddr)
c, store, err := initClusterAndStore()
if err != nil {
t.Fatalf("Cannot create the cluster and storage, %s", err)
}

engine, err := New(c, store)
if err != nil {
t.Fatalf("Cannot create the orc engine, %s", err)
}

namespace := "hello"
name := "hello.canary.web"
pgSpec := createPodGroupSpec(namespace, name, 1)
data := map[string]interface{}{"suffix": 1, "upstream": "beta1"}
strategies := []*Strategy{&Strategy{"uidsuffix", []interface{}{data}}}
canary := &Canary{strategies}
canarySpec := CanaryPodsWithSpec{pgSpec, canary}
if err := engine.NewCanary(canarySpec); err != nil {
t.Fatalf("Should not return error, %s", err)
}
if err := engine.NewCanary(canarySpec); err == nil {
t.Errorf("Should return exists error, but we got no problem")
}

time.Sleep(20 * time.Second)
if pg, ok := engine.InspectPodGroup(name); !ok {
t.Errorf("We should have the pod group, but we don't get it")
} else if pg.State != RunStateSuccess {
t.Errorf("We should have the pod deployed and running")
}

engine.RescheduleInstance(name, 3)
time.Sleep(20 * time.Second)
if pg, ok := engine.InspectPodGroup(name); !ok {
t.Errorf("We should have the pod group, but we don't get it")
} else if len(pg.Pods) != 3 {
t.Errorf("We should have 3 instance of the pods")
}

engine.RescheduleInstance(name, 1)
time.Sleep(30 * time.Second)
if pg, ok := engine.InspectPodGroup(name); !ok {
t.Errorf("We should have the pod group, but we don't get it")
} else if len(pg.Pods) != 1 {
bytes, err := json.Marshal(pg.Pods)
pods := ""
if err == nil {
pods = string(bytes)
}
t.Errorf("We should have 1 instance of the pods : %v", pods)
}

podSpec := createPodSpec(namespace, name)
podSpec.Containers[0].MemoryLimit = 24 * 1024 * 1024
engine.RescheduleSpec(name, podSpec)
time.Sleep(40 * time.Second)
if pg, ok := engine.InspectPodGroup(name); !ok {
t.Errorf("We should have the pod group, but we don't get it")
} else if pg.Spec.Version != 1 {
t.Errorf("We should have version 1 of the pods")
}

if nCanary := engine.FetchCanaryInfos(name); nCanary == nil {
t.Fatalf("Should return canary!")
} else {
if !nCanary.Equal(canary) {
t.Fatalf("Canary info should not changed")
}
}

if err := engine.RemovePodGroup(name); err != nil {
t.Errorf("We should be able to remove the pod group, %s", err)
} else if err := engine.NewPodGroup(pgSpec); err == nil {
t.Errorf("We should not be able to deploy pod group again in short time we remove it")
}
time.Sleep(20 * time.Second)

if nCanary := engine.FetchCanaryInfos(name); nCanary != nil {
t.Fatalf("Should not return nCanary")
}
}
64 changes: 52 additions & 12 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/laincloud/deployd/cluster"
"github.com/laincloud/deployd/storage"
"github.com/laincloud/deployd/utils/util"
"github.com/mijia/adoc"
"github.com/mijia/sweb/log"
)
Expand Down Expand Up @@ -43,18 +44,18 @@ type EngineConfig struct {
type OrcEngine struct {
sync.RWMutex

config EngineConfig
cluster cluster.Cluster
store storage.Store
eagleView *RuntimeEagleView
pgCtrls map[string]*podGroupController
rmPgCtrls map[string]*podGroupController
dependsCtrls map[string]*dependsController
rmDepCtrls map[string]*dependsController
opsChan chan orcOperation
config EngineConfig
cluster cluster.Cluster
store storage.Store
eagleView *RuntimeEagleView
pgCtrls map[string]*podGroupController
rmPgCtrls map[string]*podGroupController
dependsCtrls map[string]*dependsController
rmDepCtrls map[string]*dependsController
opsChan chan orcOperation
refreshAllChan chan bool
stop chan struct{}
clstrFailCnt int
stop chan struct{}
clstrFailCnt int
}

const (
Expand Down Expand Up @@ -236,10 +237,16 @@ func (engine *OrcEngine) RemovePodGroup(name string) error {
return err
}
log.Infof("start delete %v\n", name)
// if is canary podgroup remove canary strategy
if util.PodGroupType(name) == PGCanaryType {
// remove key
pgCtrl.spec.RemoveCanary(engine.store)
}
engine.opsChan <- orcOperRemove{pgCtrl}
delete(engine.pgCtrls, name)
engine.rmPgCtrls[name] = pgCtrl
go engine.checkPodGroupRemoveResult(name, pgCtrl)

return nil
}
}
Expand Down Expand Up @@ -284,7 +291,6 @@ func (engine *OrcEngine) RescheduleSpec(name string, podSpec PodSpec) error {
pgCtrl.opsChan <- pgOperSaveStore{true}
pgCtrl.opsChan <- pgOperOver{}
}

return nil
}
}
Expand Down Expand Up @@ -329,6 +335,40 @@ func (engine *OrcEngine) ChangeState(pgName, op string, instance int) error {
return nil
}

func (engine *OrcEngine) NewCanary(spec CanaryPodsWithSpec) error {
if err := engine.NewPodGroup(spec.PodGroupSpec); err != nil {
return err
}
// gen canary strategy
spec.SaveCanary(engine.store)
return nil
}

func (engine *OrcEngine) FetchCanaryInfos(name string) *Canary {
engine.RLock()
defer engine.RUnlock()
if pgCtrl, ok := engine.pgCtrls[name]; !ok {
return nil
} else {
if canary, err := pgCtrl.spec.FetchCanary(engine.store); err != nil {
return nil
} else {
return canary
}
}
return nil
}

func (engine *OrcEngine) UpdateCanaryInfos(name string, canary *Canary) error {
engine.RLock()
defer engine.RUnlock()
if pgCtrl, ok := engine.pgCtrls[name]; !ok {
return ErrPodGroupNotExists
} else {
return pgCtrl.spec.UpdateCanary(engine.store, canary)
}
}

func (engine *OrcEngine) hasEnoughResource(pgCtrl *podGroupController, podSpec PodSpec) bool {
if resources, err := engine.cluster.GetResources(); err != nil {
return false
Expand Down
10 changes: 10 additions & 0 deletions engine/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ type PodGroupWithSpec struct {
PodGroup
}

// type PodGroupController interface {
// Deploy()
// Remove()
// Inspect() PodGroupWithSpec
// RescheduleSpec(podSpec PodSpec)
// RescheduleInstance(numInstances int, restartPolicy ...RestartPolicy)
// RescheduleDrift(fromNode, toNode string, instanceNo int, force bool)
// ChangeState(op string, instance int)
// }

type podGroupController struct {
Publisher

Expand Down
3 changes: 3 additions & 0 deletions engine/specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
kLainNodesKey = "nodes"
kLainLastPodSpecKey = "last_spec"
kLainPgOpingKey = "operating"
kLainCanaryKey = "canaries"

kLainLabelPrefix = "cc.bdp.lain.deployd"
kLainLogVolumePath = "/lain/logs"
Expand All @@ -31,6 +32,8 @@ const (

MinPodKillTimeout = 10
MaxPodKillTimeout = 120

PGCanaryType = "canary"
)

var (
Expand Down
9 changes: 9 additions & 0 deletions utils/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ func ParseContainerName(containerName string) (string, int, int, int, error) {
return g.Group(1), version, instance, driftCount, nil
}

func PodGroupType(procName string) string {
p := regex.MustCompile(`.*\.(.*)\..*`)
g := p.Match(procName)
if g == nil {
return ""
}
return g.Group(1)
}

func IpConflictErrorMatch(err string) string {
p := regex.MustCompile(`IP assignment error, data: {IP:([0-9.]+) HandleID:(.*)}: Address already assigned in block`)
g := p.Match(err)
Expand Down
7 changes: 7 additions & 0 deletions utils/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,10 @@ func Test_timeMarshal(t *testing.T) {
data, _ := json.Marshal(tt)
fmt.Println("t:", string(data))
}

func Test_PGType(t *testing.T) {
pgName := "hello.canary.web"
if pgType := PodGroupType(pgName); pgType != "canary" {
t.Fatalf("pgtype %s should be canary", pgType)
}
}

0 comments on commit 9c3a492

Please sign in to comment.