Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Canaries #54

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions apiserver/podgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"

"github.com/laincloud/deployd/engine"
"github.com/laincloud/deployd/utils/util"
"github.com/mijia/sweb/form"
"github.com/mijia/sweb/log"
"github.com/mijia/sweb/server"
Expand All @@ -26,12 +27,25 @@ func (rpg RestfulPodGroups) Post(ctx context.Context, r *http.Request) (int, int
}

orcEngine := getEngine(ctx)
if err := orcEngine.NewPodGroup(pgSpec); err != nil {
switch err {
var deployErr error
if util.PodGroupType(pgSpec.Name) == engine.PGCanaryType {
var canary engine.Canary
if err := form.ParamBodyJson(r, &canary); err != nil {
log.Warnf("Failed to decode canary, %s", err)
return http.StatusBadRequest, fmt.Sprintf("Invalid Canary params format: %s", err)
}
canaryPgSpec := engine.CanaryPodsWithSpec{pgSpec, &canary}
deployErr = orcEngine.NewCanary(canaryPgSpec)

} else {
deployErr = orcEngine.NewPodGroup(pgSpec)
}
if deployErr != nil {
switch deployErr {
case engine.ErrNotEnoughResources, engine.ErrPodGroupExists, engine.ErrDependencyPodNotExists:
return http.StatusMethodNotAllowed, err.Error()
return http.StatusMethodNotAllowed, deployErr.Error()
default:
return http.StatusInternalServerError, err.Error()
return http.StatusInternalServerError, deployErr.Error()
}
}

Expand Down
1 change: 1 addition & 0 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Cluster interface {
InspectContainer(id string) (adoc.ContainerDetail, error)
RemoveContainer(id string, force bool, volumes bool) error
RenameContainer(id string, name string) error
UpdateContainer(id string, config interface{}) error

MonitorEvents(filter string, callback adoc.EventCallback) int64
StopMonitor(monitorId int64)
Expand Down
71 changes: 71 additions & 0 deletions engine/canaries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package engine

import (
"strings"

"github.com/laincloud/deployd/storage"
"github.com/mijia/sweb/log"
)

type Strategy struct {
DivType string `json:"Type"`
DivDatas []interface{} `json:"Rule"`
}

type Canary struct {
Strategies []*Strategy
}

func (canary *Canary) Equal(nCanary *Canary) bool {
if len(canary.Strategies) != len(nCanary.Strategies) {
return false
}
for i, strategy := range canary.Strategies {
if strategy.DivType != nCanary.Strategies[i].DivType {
return false
}
}
return true
}

type CanaryPodsWithSpec struct {
PodGroupSpec
Canary *Canary
}

func (spec *CanaryPodsWithSpec) SaveCanary(store storage.Store) error {
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
if err := store.Set(key, spec.Canary, true); err != nil {
log.Warnf("[Store] Failed to save pod group canary info %s, %s", key, err)
return err
}
return nil
}

func (spec *PodGroupSpec) RemoveCanary(store storage.Store) error {
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
if err := store.Remove(key); err != nil {
log.Warnf("[Store] Failed to remove pod group canary info %s, %s", key, err)
return err
}
return nil
}

func (spec *PodGroupSpec) FetchCanary(store storage.Store) (*Canary, error) {
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
var canary Canary
if err := store.Get(key, &canary); err != nil {
log.Warnf("[Store] Failed to fetch pod group canary info %s, %s", key, err)
return nil, err
}
return &canary, nil
}

func (spec *PodGroupSpec) UpdateCanary(store storage.Store, canary *Canary) error {
key := strings.Join([]string{kLainDeploydRootKey, kLainCanaryKey, spec.Namespace, spec.Name}, "/")
if err := store.Set(key, canary, true); err != nil {
log.Warnf("[Store] Failed to update pod group canary info %s, %s", key, err)
return err
}
return nil
}
92 changes: 92 additions & 0 deletions engine/canaries_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package engine

import (
"encoding/json"
"testing"
"time"
)

func TestEngineCanaries(t *testing.T) {
etcdAddr := "http://127.0.0.1:2379"
ConfigPortsManager(etcdAddr)
c, store, err := initClusterAndStore()
if err != nil {
t.Fatalf("Cannot create the cluster and storage, %s", err)
}

engine, err := New(c, store)
if err != nil {
t.Fatalf("Cannot create the orc engine, %s", err)
}

namespace := "hello"
name := "hello.canary.web"
pgSpec := createPodGroupSpec(namespace, name, 1)
data := map[string]interface{}{"suffix": 1, "upstream": "beta1"}
strategies := []*Strategy{&Strategy{"uidsuffix", []interface{}{data}}}
canary := &Canary{strategies}
canarySpec := CanaryPodsWithSpec{pgSpec, canary}
if err := engine.NewCanary(canarySpec); err != nil {
t.Fatalf("Should not return error, %s", err)
}
if err := engine.NewCanary(canarySpec); err == nil {
t.Errorf("Should return exists error, but we got no problem")
}

time.Sleep(20 * time.Second)
if pg, ok := engine.InspectPodGroup(name); !ok {
t.Errorf("We should have the pod group, but we don't get it")
} else if pg.State != RunStateSuccess {
t.Errorf("We should have the pod deployed and running")
}

engine.RescheduleInstance(name, 3)
time.Sleep(20 * time.Second)
if pg, ok := engine.InspectPodGroup(name); !ok {
t.Errorf("We should have the pod group, but we don't get it")
} else if len(pg.Pods) != 3 {
t.Errorf("We should have 3 instance of the pods")
}

engine.RescheduleInstance(name, 1)
time.Sleep(30 * time.Second)
if pg, ok := engine.InspectPodGroup(name); !ok {
t.Errorf("We should have the pod group, but we don't get it")
} else if len(pg.Pods) != 1 {
bytes, err := json.Marshal(pg.Pods)
pods := ""
if err == nil {
pods = string(bytes)
}
t.Errorf("We should have 1 instance of the pods : %v", pods)
}

podSpec := createPodSpec(namespace, name)
podSpec.Containers[0].MemoryLimit = 24 * 1024 * 1024
engine.RescheduleSpec(name, podSpec)
time.Sleep(40 * time.Second)
if pg, ok := engine.InspectPodGroup(name); !ok {
t.Errorf("We should have the pod group, but we don't get it")
} else if pg.Spec.Version != 1 {
t.Errorf("We should have version 1 of the pods")
}

if nCanary := engine.FetchCanaryInfos(name); nCanary == nil {
t.Fatalf("Should return canary!")
} else {
if !nCanary.Equal(canary) {
t.Fatalf("Canary info should not changed")
}
}

if err := engine.RemovePodGroup(name); err != nil {
t.Errorf("We should be able to remove the pod group, %s", err)
} else if err := engine.NewPodGroup(pgSpec); err == nil {
t.Errorf("We should not be able to deploy pod group again in short time we remove it")
}
time.Sleep(20 * time.Second)

if nCanary := engine.FetchCanaryInfos(name); nCanary != nil {
t.Fatalf("Should not return nCanary")
}
}
11 changes: 9 additions & 2 deletions engine/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,16 @@ type Guard struct {
Working bool `json:"Working"`
}

type CUpdateConfig struct {
CPUPeriod int64 `json:"CpuPeriod,omitempty"` // CPU CFS (Completely Fair Scheduler) period
CPUQuota int64 `json:"CpuQuota,omitempty"` // CPU CFS (Completely Fair Scheduler) quota
Memory int64 `json:"Memory,omitempty"` // Memory limit (in bytes)
MemorySwap int64 `json:"MemorySwap,omitempty"` // Total memory usage (memory + swap); set `-1` to enable unlimited swap
}

const (
EtcdResourcesKey = "/lain/config/resources"
EtcdGuardSwitchKey = "/lain/config/guardswitch"
EtcdResourcesKey = "/lain/config/resources"
EtcdGuardSwitchKey = "/lain/config/guardswitch"
EtcdCloudVolumeRootKey = "/lain/config/cloud_volumes_root"
EtcdVolumeRootKey = "/lain/config/volumes_root"

Expand Down
64 changes: 52 additions & 12 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/laincloud/deployd/cluster"
"github.com/laincloud/deployd/storage"
"github.com/laincloud/deployd/utils/util"
"github.com/mijia/adoc"
"github.com/mijia/sweb/log"
)
Expand Down Expand Up @@ -43,18 +44,18 @@ type EngineConfig struct {
type OrcEngine struct {
sync.RWMutex

config EngineConfig
cluster cluster.Cluster
store storage.Store
eagleView *RuntimeEagleView
pgCtrls map[string]*podGroupController
rmPgCtrls map[string]*podGroupController
dependsCtrls map[string]*dependsController
rmDepCtrls map[string]*dependsController
opsChan chan orcOperation
config EngineConfig
cluster cluster.Cluster
store storage.Store
eagleView *RuntimeEagleView
pgCtrls map[string]*podGroupController
rmPgCtrls map[string]*podGroupController
dependsCtrls map[string]*dependsController
rmDepCtrls map[string]*dependsController
opsChan chan orcOperation
refreshAllChan chan bool
stop chan struct{}
clstrFailCnt int
stop chan struct{}
clstrFailCnt int
}

const (
Expand Down Expand Up @@ -236,10 +237,16 @@ func (engine *OrcEngine) RemovePodGroup(name string) error {
return err
}
log.Infof("start delete %v\n", name)
// if is canary podgroup remove canary strategy
if util.PodGroupType(name) == PGCanaryType {
// remove key
pgCtrl.spec.RemoveCanary(engine.store)
}
engine.opsChan <- orcOperRemove{pgCtrl}
delete(engine.pgCtrls, name)
engine.rmPgCtrls[name] = pgCtrl
go engine.checkPodGroupRemoveResult(name, pgCtrl)

return nil
}
}
Expand Down Expand Up @@ -284,7 +291,6 @@ func (engine *OrcEngine) RescheduleSpec(name string, podSpec PodSpec) error {
pgCtrl.opsChan <- pgOperSaveStore{true}
pgCtrl.opsChan <- pgOperOver{}
}

return nil
}
}
Expand Down Expand Up @@ -329,6 +335,40 @@ func (engine *OrcEngine) ChangeState(pgName, op string, instance int) error {
return nil
}

func (engine *OrcEngine) NewCanary(spec CanaryPodsWithSpec) error {
if err := engine.NewPodGroup(spec.PodGroupSpec); err != nil {
return err
}
// gen canary strategy
spec.SaveCanary(engine.store)
return nil
}

func (engine *OrcEngine) FetchCanaryInfos(name string) *Canary {
engine.RLock()
defer engine.RUnlock()
if pgCtrl, ok := engine.pgCtrls[name]; !ok {
return nil
} else {
if canary, err := pgCtrl.spec.FetchCanary(engine.store); err != nil {
return nil
} else {
return canary
}
}
return nil
}

func (engine *OrcEngine) UpdateCanaryInfos(name string, canary *Canary) error {
engine.RLock()
defer engine.RUnlock()
if pgCtrl, ok := engine.pgCtrls[name]; !ok {
return ErrPodGroupNotExists
} else {
return pgCtrl.spec.UpdateCanary(engine.store, canary)
}
}

func (engine *OrcEngine) hasEnoughResource(pgCtrl *podGroupController, podSpec PodSpec) bool {
if resources, err := engine.cluster.GetResources(); err != nil {
return false
Expand Down
Loading