diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e3a6f8bf..75806c259 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,15 @@ - Trident now allows ONTAP backends even if it cannot read the aggregate media type, or if the media type is unknown. However, such backends will be ignored for storage classes that require a specific media type. +- Trident launcher supports creating the ConfigMap in a non-default namespace. **Enhancements:** -- Added release notes (CHANGELOG.md) +- The improved Trident launcher has a better support for failure recovery, error +reporting, user arguments, and unit testing. - Enabled SVM-scoped users for ONTAP backends. - Switched to using vserver-show-aggr-get-iter API for ONTAP 9.0 and later to get aggregate media type. +- Added support for E-Series. +- Upgraded the etcd version to v3.1.3. +- Added release notes (CHANGELOG.md). diff --git a/Dockerfile b/Dockerfile index 08bc0c9c4..be9d5e564 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ FROM debian:jessie MAINTAINER Ardalan Kangarlou -LABEL version="1.0" \ +LABEL version="17.04.0" \ description="Kubernetes storage orchestrator" RUN apt-get update && apt-get install -y \ diff --git a/Makefile b/Makefile index 92f6910fe..dd624baaf 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ BIN ?= trident_orchestrator DIST_REGISTRY=netapp -TRIDENT_DIST_VERSION ?= 1.1 +TRIDENT_DIST_VERSION ?= 17.04.0 TRIDENT_VERSION ?= local TRIDENT_IMAGE ?= trident @@ -130,7 +130,7 @@ test_core: @docker rm etcd-test > /dev/null test_other: - @go test -cover $(shell go list ./... | grep -v /vendor/ | grep -v core | grep -v persistent_store) + @go test -cover -v $(shell go list ./... | grep -v /vendor/ | grep -v core | grep -v persistent_store) vet: @go vet $(shell go list ./... | grep -v /vendor/) @@ -157,14 +157,14 @@ launcher_build: check_registry launcher_retag docker push ${LAUNCHER_TAG} -docker rmi ${LAUNCHER_TAG_OLD} -docker_launcher_build: +docker_launcher_build: check_registry launcher_retag @chmod 777 ./launcher/docker-build @${GO} ${BUILD} -o ${TRIDENT_VOLUME_PATH}/launcher/docker-build/launcher ./launcher docker build -t ${LAUNCHER_TAG} ./launcher/docker-build/ docker push ${LAUNCHER_TAG} -docker rmi ${LAUNCHER_TAG_OLD} -launcher_start: check_registry +launcher_start: prep_pod_template ifndef LAUNCHER_BACKEND $(error Must define LAUNCHER_BACKEND to start the launcher.) endif @@ -207,10 +207,10 @@ endif -kubectl delete --ignore-not-found=true pod trident-launcher @mkdir -p ${LAUNCHER_CONFIG_DIR} @cp ${LAUNCHER_BACKEND} ${LAUNCHER_CONFIG_DIR}/backend.json - @sed "s|__TRIDENT_IMAGE__|netapp/trident:1.0|g" kubernetes-yaml/trident-deployment.yaml.templ > ${LAUNCHER_CONFIG_DIR}/trident-deployment.yaml + @sed "s|__TRIDENT_IMAGE__|netapp/trident:latest|g" kubernetes-yaml/trident-deployment.yaml.templ > ${LAUNCHER_CONFIG_DIR}/trident-deployment.yaml @echo "Usable Trident pod definition available at ./launcher/kubernetes-yaml/trident-deployment.yaml" @kubectl create configmap trident-launcher-config --from-file=${LAUNCHER_CONFIG_DIR} - @sed "s|__LAUNCHER_TAG__|netapp/trident-launcher:1.0|g" ./launcher/kubernetes-yaml/launcher-pod.yaml.templ > ${LAUNCHER_POD_FILE} + @sed "s|__LAUNCHER_TAG__|netapp/trident-launcher:latest|g" ./launcher/kubernetes-yaml/launcher-pod.yaml.templ > ${LAUNCHER_POD_FILE} @kubectl create -f ${LAUNCHER_POD_FILE} @echo "Trident Launcher started; pod definition in ${LAUNCHER_POD_FILE}" diff --git a/README.md b/README.md index 7c0e05c51..11ace9de5 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ exposing users to complexities of various backends. * [Backends](#backends) * [ONTAP Configurations](#ontap-configurations) * [SolidFire Configurations](#solidfire-configurations) - * [E-Series Configurations](#e-series configurations) + * [E-Series Configurations](#e-series-configurations) * [Volume Configurations](#volume-configurations) * [Storage Class Configurations](#storage-class-configurations) * [Storage Attributes](#storage-attributes) @@ -87,7 +87,8 @@ are not available, see the subsequent sections. If you plan on using E-Series, create a Host Group named `trident` and create a Host in that Host Group that contains the IQN of each node in the cluster. -5. Download and untar the [Trident installer bundle](https://github.com/NetApp/trident/releases/download/v1.0/trident-installer-1.0.tar.gz). +5. Download and untar the [Trident installer bundle](https://github.com/NetApp/trident/releases). + Then, change into the `trident-installer` directory resulted from untar. 6. Configure a storage backend from which Trident will provision its volumes. This will also be used in step 8 to provision the PVC on which Trident will @@ -199,7 +200,10 @@ them via Kubernetes are available in the [Storage Classes](#storage-classes) section. For more details on how Trident chooses storage pools from a storage class to provision its volumes, see [Provisioning Workflow](#provisioning-workflow). -The following tutorial presents an in-depth overview of Trident and demonstrates some advanced use cases: +The following tutorial presents an in-depth overview of Trident and demonstrates +some advanced use cases (the tutorial is based on v1.0, so please view +[CHANGELOG](https://github.com/NetApp/trident/blob/master/CHANGELOG.md) for +the changes since the first release): [![Trident: External Provisioner for NetApp Storage](https://img.youtube.com/vi/NDcnyGe2GFo/0.jpg)](https://www.youtube.com/watch?v=NDcnyGe2GFo) diff --git a/config/config.go b/config/config.go index 74ad7d450..d0e699e02 100644 --- a/config/config.go +++ b/config/config.go @@ -14,7 +14,8 @@ type VolumeType string const ( /* Misc. orchestrator constants */ OrchestratorName = "trident" - OrchestratorVersion = "1.1" + OrchestratorVersion = "17.04.0" + OrchestratorAPIVersion = "1" PersistentStoreTimeout = 60 * time.Second MaxBootstrapAttempts = 10 @@ -55,11 +56,11 @@ var ( } /* API Server and persistent store variables */ OrchestratorMajorVersion = getMajorVersion(OrchestratorVersion) - VersionURL = "/" + OrchestratorName + "/v" + OrchestratorMajorVersion + "/version" - BackendURL = "/" + OrchestratorName + "/v" + OrchestratorMajorVersion + "/backend" - VolumeURL = "/" + OrchestratorName + "/v" + OrchestratorMajorVersion + "/volume" - TransactionURL = "/" + OrchestratorName + "/v" + OrchestratorMajorVersion + "/txn" - StorageClassURL = "/" + OrchestratorName + "/v" + OrchestratorMajorVersion + "/storageclass" + VersionURL = "/" + OrchestratorName + "/v" + OrchestratorAPIVersion + "/version" + BackendURL = "/" + OrchestratorName + "/v" + OrchestratorAPIVersion + "/backend" + VolumeURL = "/" + OrchestratorName + "/v" + OrchestratorAPIVersion + "/volume" + TransactionURL = "/" + OrchestratorName + "/v" + OrchestratorAPIVersion + "/txn" + StorageClassURL = "/" + OrchestratorName + "/v" + OrchestratorAPIVersion + "/storageclass" ) func IsValidProtocol(p Protocol) bool { diff --git a/core/orchestrator_core.go b/core/orchestrator_core.go index 159dad33b..64895f8e7 100644 --- a/core/orchestrator_core.go +++ b/core/orchestrator_core.go @@ -54,7 +54,6 @@ func (o *tridentOrchestrator) Bootstrap() error { "store! Bootstrap might have failed, persistent store might "+ "be down, or persistent store may not have any backend, "+ "volume, or storage class state: %s", err.Error()) - log.Warnf(errMsg) return fmt.Errorf(errMsg) } o.bootstrapped = true @@ -207,10 +206,10 @@ func (o *tridentOrchestrator) bootstrap() error { func (o *tridentOrchestrator) rollBackTransaction(v *persistent_store.VolumeTransaction) error { log.WithFields(log.Fields{ - "volume": v.Config.Name, - "size": v.Config.Size, - "storage_class": v.Config.StorageClass, - "op": v.Op, + "volume": v.Config.Name, + "size": v.Config.Size, + "storageClass": v.Config.StorageClass, + "op": v.Op, }).Info("Processed volume transaction log.") switch v.Op { case persistent_store.AddVolume: @@ -602,6 +601,7 @@ func (o *tridentOrchestrator) AddVolume(volumeConfig *storage.VolumeConfig) ( log.WithFields(log.Fields{ "volume": volumeConfig.Name, }).Debugf("Looking through %d backends", len(pools)) + errorMessages := make([]string, 0) for _, num := range rand.Perm(len(pools)) { backend = pools[num].Backend if vol, err = backend.AddVolume( @@ -624,14 +624,24 @@ func (o *tridentOrchestrator) AddVolume(volumeConfig *storage.VolumeConfig) ( "volume": volumeConfig.Name, "error": err, }).Warn("Failed to create the volume on this backend!") + errorMessages = append(errorMessages, + fmt.Sprintf("[Failed to create volume %s "+ + "on storage pool %s from backend %s: %s]", + volumeConfig.Name, pools[num].Name, backend.Name, + err.Error())) } } externalVol = nil - err = fmt.Errorf("No suitable %s backend with \"%s\" "+ - "storage class and %s of free space was found! Find available backends"+ - " under %s.", volumeConfig.Protocol, - volumeConfig.StorageClass, volumeConfig.Size, config.BackendURL) + if len(errorMessages) == 0 { + err = fmt.Errorf("No suitable %s backend with \"%s\" "+ + "storage class and %s of free space was found! Find available backends"+ + " under %s.", volumeConfig.Protocol, + volumeConfig.StorageClass, volumeConfig.Size, config.BackendURL) + } else { + err = fmt.Errorf("Encountered error(s) in creating the volume: %s", + strings.Join(errorMessages, ", ")) + } return nil, err } diff --git a/frontend/kubernetes/constants.go b/frontend/kubernetes/config.go similarity index 83% rename from frontend/kubernetes/constants.go rename to frontend/kubernetes/config.go index e70ed58ed..d389d252f 100644 --- a/frontend/kubernetes/constants.go +++ b/frontend/kubernetes/config.go @@ -10,6 +10,7 @@ import ( const ( KubernetesSyncPeriod = 60 * time.Second + // Kubernetes-defined annotations // (Based on kubernetes/pkg/controller/volume/persistentvolume/controller.go) AnnClass = "volume.beta.kubernetes.io/storage-class" @@ -27,4 +28,12 @@ const ( AnnVendor = AnnPrefix + "/vendor" AnnBackendID = AnnPrefix + "/backendID" AnnExportPolicy = AnnPrefix + "/exportPolicy" + + // Minimum and maximum supported Kubernetes versions + KubernetesVersionMin = "1.4" + KubernetesVersionMax = "1.6" + + //Minimum and maximum supported OpenShift versions + OpenShiftVersionMin = "3.4" + OpenShiftVersionMax = "3.6" ) diff --git a/frontend/kubernetes/plugin.go b/frontend/kubernetes/plugin.go index 7f588d7d5..5ebb06657 100644 --- a/frontend/kubernetes/plugin.go +++ b/frontend/kubernetes/plugin.go @@ -7,6 +7,7 @@ import ( "strings" log "github.com/Sirupsen/logrus" + version "github.com/hashicorp/go-version" dvp "github.com/netapp/netappdvp/storage_drivers" "k8s.io/client-go/kubernetes" core_v1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -16,32 +17,56 @@ import ( k8s_storage "k8s.io/client-go/pkg/apis/storage/v1beta1" "k8s.io/client-go/pkg/conversion" "k8s.io/client-go/pkg/runtime" + k8s_version "k8s.io/client-go/pkg/version" "k8s.io/client-go/pkg/watch" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" + "github.com/netapp/trident/config" "github.com/netapp/trident/core" "github.com/netapp/trident/storage" "github.com/netapp/trident/storage_attribute" "github.com/netapp/trident/storage_class" ) +func VersionSupported(versionInfo *k8s_version.Info) (bool, error) { + v, err := version.NewVersion(versionInfo.Major + "." + versionInfo.Minor) + if err != nil { + return false, err + } + kubeConstraints, err := version.NewConstraint(fmt.Sprintf(">= %s, <= %s", + KubernetesVersionMin, KubernetesVersionMax)) + if err != nil { + return false, err + } + openshiftConstraints, err := version.NewConstraint( + fmt.Sprintf(">= %s, <= %s", OpenShiftVersionMin, OpenShiftVersionMax)) + if err != nil { + return false, err + } + if kubeConstraints.Check(v) || openshiftConstraints.Check(v) { + return true, nil + } + return false, nil +} + type KubernetesPlugin struct { - orchestrator core.Orchestrator - kubeClient kubernetes.Interface - eventRecorder record.EventRecorder - pendingClaimMatchMap map[string]*v1.PersistentVolume - claimController *cache.Controller - claimControllerStopChan chan struct{} - claimSource cache.ListerWatcher - volumeController *cache.Controller - volumeControllerStopChan chan struct{} - volumeSource cache.ListerWatcher - classController *cache.Controller - classControllerStopChan chan struct{} - classSource cache.ListerWatcher + orchestrator core.Orchestrator + kubeClient kubernetes.Interface + eventRecorder record.EventRecorder + pendingClaimMatchMap map[string]*v1.PersistentVolume + claimController *cache.Controller + claimControllerStopChan chan struct{} + claimSource cache.ListerWatcher + volumeController *cache.Controller + volumeControllerStopChan chan struct{} + volumeSource cache.ListerWatcher + classController *cache.Controller + classControllerStopChan chan struct{} + classSource cache.ListerWatcher + containerOrchestratorVersion *k8s_version.Info } func NewPlugin( @@ -68,7 +93,30 @@ func newForConfig( if err != nil { return nil, err } - return newKubernetesPlugin(kubeClient, o), nil + versionInfo, err := kubeClient.Discovery().ServerVersion() + if err != nil { + return nil, + fmt.Errorf("Kubernetes frontend couldn't retrieve API server's "+ + "version: %v", err) + } + if supported, err := VersionSupported(versionInfo); err != nil { + return nil, fmt.Errorf("Kubernetes frontend encountered error "+ + "in checking the version of container orchestrator: %s", err) + } else if !supported { + log.Warnf("%s v%s may not support "+ + "container orchestrator version %s.%s! "+ + "Supported versions for Kubernetes are %s-%s "+ + "and for OpenShift are %s-%s.", + config.OrchestratorName, config.OrchestratorVersion, + versionInfo.Major, versionInfo.Minor, + KubernetesVersionMin, KubernetesVersionMax, + OpenShiftVersionMin, OpenShiftVersionMax) + } + log.WithFields(log.Fields{ + "version": versionInfo.Major + "." + versionInfo.Minor, + }).Info("Kubernetes frontend determined the container orchestrator ", + "version.") + return newKubernetesPlugin(kubeClient, o, versionInfo), nil } func getUniqueClaimName(claim *v1.PersistentVolumeClaim) string { @@ -84,14 +132,16 @@ func getUniqueClaimName(claim *v1.PersistentVolumeClaim) string { func newKubernetesPlugin( kubeClient kubernetes.Interface, orchestrator core.Orchestrator, + containerOrchestratorVersion *k8s_version.Info, ) *KubernetesPlugin { ret := &KubernetesPlugin{ - orchestrator: orchestrator, - kubeClient: kubeClient, - claimControllerStopChan: make(chan struct{}), - volumeControllerStopChan: make(chan struct{}), - classControllerStopChan: make(chan struct{}), - pendingClaimMatchMap: make(map[string]*v1.PersistentVolume), + orchestrator: orchestrator, + kubeClient: kubeClient, + claimControllerStopChan: make(chan struct{}), + volumeControllerStopChan: make(chan struct{}), + classControllerStopChan: make(chan struct{}), + pendingClaimMatchMap: make(map[string]*v1.PersistentVolume), + containerOrchestratorVersion: containerOrchestratorVersion, } broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink( diff --git a/frontend/rest/client.go b/frontend/rest/client.go new file mode 100644 index 000000000..7dc02181b --- /dev/null +++ b/frontend/rest/client.go @@ -0,0 +1,239 @@ +// Copyright 2016 NetApp, Inc. All Rights Reserved. + +package rest + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "time" + + "github.com/netapp/trident/config" + "github.com/netapp/trident/storage" + "github.com/netapp/trident/storage_class" +) + +const ( + contentType = "application/json" +) + +type RESTInterface interface { + Get(endpoint string) (*http.Response, error) + Post(endpoint string, body io.Reader) (*http.Response, error) + Delete(endpoint string) (*http.Response, error) +} + +type Interface interface { + RESTInterface + Configure(ip string, port, timeout int) Interface + GetBackend(backendID string) (*GetBackendResponse, error) + PostBackend(backendFile string) (*AddBackendResponse, error) + ListBackends() (*ListBackendsResponse, error) + AddStorageClass(storageClassConfig *storage_class.Config) (*AddStorageClassResponse, error) + GetVolume(volName string) (*GetVolumeResponse, error) + AddVolume(volConfig *storage.VolumeConfig) (*AddVolumeResponse, error) + DeleteVolume(volName string) (*DeleteResponse, error) +} + +type TridentClient struct { + ip string + port int + client *http.Client +} + +func NewTridentClient(ip string, port, timeout int) *TridentClient { + return &TridentClient{ + ip: ip, + port: port, + client: &http.Client{ + Timeout: time.Duration(timeout) * time.Second, + }, + } +} + +func (client *TridentClient) Configure(ip string, port, timeout int) Interface { + client.ip = ip + client.port = port + client.client = &http.Client{ + Timeout: time.Duration(timeout) * time.Second, + } + return client +} + +func (client *TridentClient) Get(endpoint string) (*http.Response, error) { + return client.client.Get(fmt.Sprintf("http://%s:%d/trident/v%s/%s", + client.ip, client.port, config.OrchestratorAPIVersion, endpoint)) +} + +func (client *TridentClient) Post(endpoint string, body io.Reader) (*http.Response, error) { + return client.client.Post(fmt.Sprintf("http://%s:%d/trident/v%s/%s", + client.ip, client.port, config.OrchestratorAPIVersion, endpoint), + contentType, body) +} + +func (client *TridentClient) Delete(endpoint string) (*http.Response, error) { + req, err := http.NewRequest(http.MethodDelete, + fmt.Sprintf("http://%s:%d/trident/v%s/%s", + client.ip, client.port, config.OrchestratorAPIVersion, endpoint), + nil) + if err != nil { + return &http.Response{}, err + } + return client.client.Do(req) +} + +func (client *TridentClient) GetBackend(backendID string) (*GetBackendResponse, error) { + var ( + resp *http.Response + err error + bytes []byte + getBackendResponse GetBackendResponse + ) + if resp, err = client.Get("backend/" + backendID); err != nil { + return nil, err + } + defer resp.Body.Close() + if bytes, err = ioutil.ReadAll(resp.Body); err != nil { + return nil, err + } + if err = json.Unmarshal(bytes, &getBackendResponse); err != nil { + return nil, err + } + return &getBackendResponse, nil +} + +func (client *TridentClient) PostBackend(backendFile string) (*AddBackendResponse, error) { + var ( + resp *http.Response + err error + jsonBytes []byte + addBackendResponse AddBackendResponse + ) + body, err := ioutil.ReadFile(backendFile) + if err != nil { + return nil, err + } + if resp, err = client.Post("backend", bytes.NewBuffer(body)); err != nil { + return nil, err + } + defer resp.Body.Close() + if jsonBytes, err = ioutil.ReadAll(resp.Body); err != nil { + return nil, err + } + //TODO: Fix the unmarshaling problem with StorageBackendExternal.Storage.Attributes + if err = json.Unmarshal(jsonBytes, &addBackendResponse); err != nil { + return nil, err + } + return &addBackendResponse, nil +} + +func (client *TridentClient) ListBackends() (*ListBackendsResponse, error) { + var ( + resp *http.Response + err error + bytes []byte + listBackendsResponse ListBackendsResponse + ) + if resp, err = client.Get("backend"); err != nil { + return nil, err + } + defer resp.Body.Close() + if bytes, err = ioutil.ReadAll(resp.Body); err != nil { + return nil, err + } + if err = json.Unmarshal(bytes, &listBackendsResponse); err != nil { + return nil, err + } + return &listBackendsResponse, nil +} + +func (client *TridentClient) AddStorageClass(storageClassConfig *storage_class.Config) (*AddStorageClassResponse, error) { + var ( + resp *http.Response + err error + jsonBytes []byte + addStorageClassResponse AddStorageClassResponse + ) + jsonBytes, err = json.Marshal(storageClassConfig) + if err != nil { + return nil, err + } + if resp, err = client.Post("storageclass", bytes.NewBuffer(jsonBytes)); err != nil { + return nil, err + } + defer resp.Body.Close() + if jsonBytes, err = ioutil.ReadAll(resp.Body); err != nil { + return nil, err + } + if err = json.Unmarshal(jsonBytes, &addStorageClassResponse); err != nil { + return nil, err + } + return &addStorageClassResponse, nil +} + +func (client *TridentClient) GetVolume(volName string) (*GetVolumeResponse, error) { + var ( + resp *http.Response + err error + bytes []byte + getVolResponse GetVolumeResponse + ) + if resp, err = client.Get("volume/" + volName); err != nil { + return nil, err + } + defer resp.Body.Close() + if bytes, err = ioutil.ReadAll(resp.Body); err != nil { + return nil, err + } + if err = json.Unmarshal(bytes, &getVolResponse); err != nil { + return nil, err + } + return &getVolResponse, nil +} + +func (client *TridentClient) AddVolume(volConfig *storage.VolumeConfig) (*AddVolumeResponse, error) { + var ( + resp *http.Response + err error + jsonBytes []byte + addVolResponse AddVolumeResponse + ) + jsonBytes, err = json.Marshal(volConfig) + if err != nil { + return nil, err + } + if resp, err = client.Post("volume", bytes.NewBuffer(jsonBytes)); err != nil { + return nil, err + } + defer resp.Body.Close() + if jsonBytes, err = ioutil.ReadAll(resp.Body); err != nil { + return nil, err + } + if err = json.Unmarshal(jsonBytes, &addVolResponse); err != nil { + return nil, err + } + return &addVolResponse, nil +} + +func (client *TridentClient) DeleteVolume(volName string) (*DeleteResponse, error) { + var ( + resp *http.Response + err error + jsonBytes []byte + delResponse DeleteResponse + ) + if resp, err = client.Delete("volume/" + volName); err != nil { + return nil, err + } + defer resp.Body.Close() + if jsonBytes, err = ioutil.ReadAll(resp.Body); err != nil { + return nil, err + } + if err = json.Unmarshal(jsonBytes, &delResponse); err != nil { + return nil, err + } + return &delResponse, nil +} diff --git a/frontend/rest/client_fake.go b/frontend/rest/client_fake.go new file mode 100644 index 000000000..cf43087a8 --- /dev/null +++ b/frontend/rest/client_fake.go @@ -0,0 +1,130 @@ +// Copyright 2016 NetApp, Inc. All Rights Reserved. + +package rest + +import ( + "bytes" + "fmt" + "io" + "net/http" + + "github.com/netapp/trident/storage" + "github.com/netapp/trident/storage_class" +) + +type FakeTridentClient struct { + volumes map[string]storage.VolumeExternal + failMatrix map[string]bool +} + +func NewFakeTridentClient(failMatrix map[string]bool) *FakeTridentClient { + return &FakeTridentClient{ + volumes: make(map[string]storage.VolumeExternal, 0), + failMatrix: failMatrix, + } +} + +func (client *FakeTridentClient) Configure(ip string, port, timeout int) Interface { + return client +} + +func (client *FakeTridentClient) Get(endpoint string) (*http.Response, error) { + if fail, ok := client.failMatrix["Get"]; fail && ok { + return nil, fmt.Errorf("Get failed") + } + return nil, nil +} + +func (client *FakeTridentClient) Post(endpoint string, body io.Reader) (*http.Response, error) { + if fail, ok := client.failMatrix["Post"]; fail && ok { + return nil, fmt.Errorf("Post failed") + } + return nil, nil +} + +func (client *FakeTridentClient) Delete(endpoint string) (*http.Response, error) { + if fail, ok := client.failMatrix["Delete"]; fail && ok { + return nil, fmt.Errorf("Delete failed") + } + return nil, nil +} + +func (client *FakeTridentClient) GetBackend(backendID string) (*GetBackendResponse, error) { + return nil, nil +} + +func (client *FakeTridentClient) PostBackend(backendFile string) (*AddBackendResponse, error) { + return nil, nil +} + +func (client *FakeTridentClient) ListBackends() (*ListBackendsResponse, error) { + return nil, nil +} + +func (client *FakeTridentClient) AddStorageClass(storageClassConfig *storage_class.Config) (*AddStorageClassResponse, error) { + return nil, nil +} + +func (client *FakeTridentClient) GetVolume(volName string) (*GetVolumeResponse, error) { + var ( + err error + ok = false + vol storage.VolumeExternal + getVolumeResponse GetVolumeResponse + ) + if _, err = client.Get("volume/" + volName); err != nil { + return nil, err + } + if vol, ok = client.volumes[volName]; !ok { + getVolumeResponse.Volume = nil + getVolumeResponse.Error = "Volume wasn't found" + return &getVolumeResponse, nil + } + getVolumeResponse.Volume = &vol + if fail, ok := client.failMatrix["GetVolume"]; fail && ok { + getVolumeResponse.Error = "GetVolume failed" + return nil, fmt.Errorf("GetVolume failed.") + } + return &getVolumeResponse, nil +} + +func (client *FakeTridentClient) AddVolume(volConfig *storage.VolumeConfig) (*AddVolumeResponse, error) { + var err error = nil + if _, err = client.Post("", bytes.NewBuffer(make([]byte, 0))); err != nil { + return nil, err + } + client.volumes[volConfig.Name] = storage.VolumeExternal{ + Config: volConfig, + Backend: "ontapnas_1.1.1.1", + Pool: "aggr1", + } + addVolumeResponse := &AddVolumeResponse{ + BackendID: "ontapnas_1.1.1.1", + } + if fail, ok := client.failMatrix["AddVolume"]; fail && ok { + addVolumeResponse.Error = "AddVolume failed" + return nil, fmt.Errorf("AddVolume failed.") + } + return addVolumeResponse, nil +} + +func (client *FakeTridentClient) DeleteVolume(volName string) (*DeleteResponse, error) { + var ( + err error + ok = false + deleteResponse DeleteResponse + ) + if _, err = client.Delete("volume/" + volName); err != nil { + return nil, err + } + if _, ok = client.volumes[volName]; !ok { + deleteResponse.Error = "Volume wasn't found" + return &deleteResponse, nil + } + delete(client.volumes, volName) + if fail, ok := client.failMatrix["DeleteVolume"]; fail && ok { + deleteResponse.Error = "DeleteVolume failed" + return nil, fmt.Errorf("DeleteVolume failed.") + } + return &deleteResponse, nil +} diff --git a/glide.lock b/glide.lock index 01c5d3c5a..6e0ec64ba 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: dd67b782a7ae132fc44420278dcf76b16e659ad88f5afa57edebec9acde4b4c6 -updated: 2017-04-04T11:31:02.172933601-04:00 +hash: 290daa05926848f034bcd490f7285ab55dd18ef43eb0df790fe88b228f059194 +updated: 2017-04-06T19:48:30.863279998-04:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 @@ -39,8 +39,6 @@ imports: subpackages: - digest - reference -- name: github.com/dustin/go-humanize - version: 259d2a102b871d17f30e3cd9881a642961a1e486 - name: github.com/emicklei/go-restful version: 89ef8af493ab468a45a42bb0d89a06fccdd2fb22 subpackages: @@ -77,6 +75,8 @@ imports: version: 215affda49addc4c8ef7e2534915df2c8c35c6cd - name: github.com/gorilla/mux version: 8096f47503459bcc74d1f4c487b7e6e42e5746b5 +- name: github.com/hashicorp/go-version + version: 03c5bf6be031b6dd45afec16b1cf94fc8938bc77 - name: github.com/howeyc/gopass version: 3ca23474a7c7203e0a0a070fd33508f6efdb9b3d - name: github.com/imdario/mergo @@ -89,10 +89,8 @@ imports: - buffer - jlexer - jwriter -- name: github.com/mattn/go-runewidth - version: 14207d285c6c197daabb5c9793d63e7af9ab2d50 - name: github.com/netapp/netappdvp - version: cf4fbb74aba04229bdd0d3db19757a4d6e4afce9 + version: fd2e77efb33a1059a11882a1b70ccbba0d133a52 subpackages: - apis/eseries - apis/ontap @@ -100,8 +98,6 @@ imports: - azgo - storage_drivers - utils -- name: github.com/olekukonko/tablewriter - version: febf2d34b54a69ce7530036c7503b1c9fbfdf0bb - name: github.com/pborman/uuid version: ca53cad383cad2479bbba7f7a1a05797ec1386e4 - name: github.com/PuerkitoBio/purell @@ -109,7 +105,7 @@ imports: - name: github.com/PuerkitoBio/urlesc version: 5bd2802263f21d8788851d5305584c82a5c75d7e - name: github.com/Sirupsen/logrus - version: 51fe59aca108dc5680109e7b2051cbdcfa5a253c + version: 10f801ebc38b33738c9d17d50860f484a0988ff5 - name: github.com/spf13/pflag version: 5ccb023bc27df288a957c5e994cd44fd19619465 - name: github.com/tylerb/graceful @@ -118,8 +114,6 @@ imports: version: f1f1a805ed361a0e078bb537e4ea78cd37dcf065 subpackages: - codec -- name: github.com/urfave/cli - version: 8ba6f23b6e36d03666a14bd9421f5e3efcb59aca - name: golang.org/x/crypto version: 1f22c0103821b9390939b6776727195525381532 subpackages: diff --git a/glide.yaml b/glide.yaml index 5a5dfcf2e..deafa63cf 100644 --- a/glide.yaml +++ b/glide.yaml @@ -10,7 +10,7 @@ import: - package: github.com/PuerkitoBio/urlesc version: 5bd2802263f21d8788851d5305584c82a5c75d7e - package: github.com/Sirupsen/logrus - version: 51fe59aca108dc5680109e7b2051cbdcfa5a253c + version: 10f801ebc38b33738c9d17d50860f484a0988ff5 - package: github.com/alecthomas/units version: 2efee857e7cfd4f3d0138cc3cbb1b4966962b93a - package: github.com/blang/semver @@ -93,7 +93,7 @@ import: - jlexer - jwriter - package: github.com/netapp/netappdvp - version: cf4fbb74aba04229bdd0d3db19757a4d6e4afce9 + version: fd2e77efb33a1059a11882a1b70ccbba0d133a52 subpackages: - apis/eseries - apis/ontap @@ -296,3 +296,4 @@ import: - tools/metrics - tools/record - transport +- package: github.com/hashicorp/go-version diff --git a/k8s_client/k8s_client.go b/k8s_client/k8s_client.go new file mode 100644 index 000000000..126746898 --- /dev/null +++ b/k8s_client/k8s_client.go @@ -0,0 +1,336 @@ +// Copyright 2016 NetApp, Inc. All Rights Reserved. + +package k8s_client + +import ( + "fmt" + "strings" + "time" + + log "github.com/Sirupsen/logrus" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/pkg/api/errors" + "k8s.io/client-go/pkg/api/unversioned" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/apis/extensions/v1beta1" + "k8s.io/client-go/pkg/version" + "k8s.io/client-go/pkg/watch" + "k8s.io/client-go/rest" +) + +type Interface interface { + Version() *version.Info + GetDeployment(deploymentName string) (*v1beta1.Deployment, error) + CheckDeploymentExists(deploymentName string) (bool, error) + CreateDeployment(deployment *v1beta1.Deployment) (*v1beta1.Deployment, error) + GetPod(podName string) (*v1.Pod, error) + GetPodByLabels(listOptions *v1.ListOptions) (*v1.Pod, error) + GetPodPhase(podName string) (v1.PodPhase, error) + CheckPodExists(pod string) (bool, error) + CreatePod(pod *v1.Pod) (*v1.Pod, error) + DeletePod(podName string, options *v1.DeleteOptions) error + WatchPod(listOptions *v1.ListOptions) (watch.Interface, error) + ListPod(listOptions *v1.ListOptions) (*v1.PodList, error) + GetRunningPod(pod *v1.Pod, timeout *int64, labels map[string]string) (*v1.Pod, error) + GetPVC(pvcName string) (*v1.PersistentVolumeClaim, error) + GetPVCPhase(pvcName string) (v1.PersistentVolumeClaimPhase, error) + CheckPVCExists(pvc string) (bool, error) + CreatePVC(pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) + DeletePVC(pvcName string, options *v1.DeleteOptions) error + WatchPVC(listOptions *v1.ListOptions) (watch.Interface, error) + GetBoundPVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, timeout *int64, labels map[string]string) (*v1.PersistentVolumeClaim, error) + CreatePV(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) + DeletePV(pvName string, options *v1.DeleteOptions) error +} + +type KubeClient struct { + clientset *kubernetes.Clientset + namespace string + versionInfo *version.Info +} + +func NewKubeClient(config *rest.Config, namespace string) (*KubeClient, error) { + var versionInfo *version.Info + if namespace == "" { + return nil, fmt.Errorf("An empty namespace is not acceptable!") + } + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, err + } + versionInfo, err = clientset.Discovery().ServerVersion() + if err != nil { + return nil, fmt.Errorf("Couldn't retrieve API server's version: %v", err) + } + kubeClient := &KubeClient{ + clientset: clientset, + namespace: namespace, + versionInfo: versionInfo, + } + return kubeClient, nil +} + +func (k *KubeClient) Version() *version.Info { + return k.versionInfo +} + +func (k *KubeClient) GetDeployment(deploymentName string) (*v1beta1.Deployment, error) { + return k.clientset.ExtensionsV1beta1().Deployments(k.namespace).Get( + deploymentName) +} + +func (k *KubeClient) CheckDeploymentExists(deploymentName string) (bool, error) { + if _, err := k.GetDeployment(deploymentName); err != nil { + if statusErr, ok := err.(*errors.StatusError); ok && statusErr.Status().Reason == unversioned.StatusReasonNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +func (k *KubeClient) CreateDeployment(deployment *v1beta1.Deployment) (*v1beta1.Deployment, error) { + return k.clientset.ExtensionsV1beta1().Deployments(k.namespace).Create( + deployment) +} + +func (k *KubeClient) GetPod(podName string) (*v1.Pod, error) { + return k.clientset.Core().Pods(k.namespace).Get(podName) +} + +func (k *KubeClient) GetPodByLabels(listOptions *v1.ListOptions) (*v1.Pod, error) { + var ( + watchedPod *v1.Pod + timeout int64 + ) + startTime := time.Now() + timeout = *listOptions.TimeoutSeconds + listOptions.TimeoutSeconds = nil //no timeout + pods, err := k.ListPod(listOptions) + if len(pods.Items) == 1 { + return &pods.Items[0], nil + } else if len(pods.Items) > 1 { + return nil, fmt.Errorf("Multiple pods have the label %s: %v", + listOptions.LabelSelector, pods.Items) + } + listOptions.TimeoutSeconds = &timeout + log.Debugf("KubeClient took %v to retrieve pods: %v.", + time.Since(startTime), pods.Items) + podWatch, err := k.WatchPod(listOptions) + if err != nil { + return nil, err + } + defer podWatch.Stop() + for event := range podWatch.ResultChan() { + switch event.Type { + case watch.Error: + //TODO: Validate error handling. + return nil, fmt.Errorf("Received error when watching pod: %s", + event.Object.GetObjectKind().GroupVersionKind().String()) + case watch.Deleted: + return nil, fmt.Errorf("Pod got deleted before becoming available!") + case watch.Added, watch.Modified: + watchedPod = event.Object.(*v1.Pod) + default: + return nil, fmt.Errorf( + "Got unknown event type %s while watching pod!", + event.Type) + } + } + log.Debugf("KubeClient took %v to retrieve pod %v.", + time.Since(startTime), watchedPod.Name) + return watchedPod, nil +} + +func (k *KubeClient) GetPodPhase(podName string) (v1.PodPhase, error) { + pod, err := k.GetPod(podName) + if err != nil { + var phase v1.PodPhase = "" + return phase, err + } + return pod.Status.Phase, nil +} + +func (k *KubeClient) CheckPodExists(pod string) (bool, error) { + if _, err := k.GetPod(pod); err != nil { + if statusErr, ok := err.(*errors.StatusError); ok && statusErr.Status().Reason == unversioned.StatusReasonNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +func (k *KubeClient) CreatePod(pod *v1.Pod) (*v1.Pod, error) { + return k.clientset.Core().Pods(k.namespace).Create(pod) +} + +func (k *KubeClient) DeletePod(podName string, options *v1.DeleteOptions) error { + return k.clientset.Core().Pods(k.namespace).Delete(podName, options) +} + +func (k *KubeClient) WatchPod(listOptions *v1.ListOptions) (watch.Interface, error) { + return k.clientset.Core().Pods(k.namespace).Watch(*listOptions) +} + +func (k *KubeClient) ListPod(listOptions *v1.ListOptions) (*v1.PodList, error) { + return k.clientset.Core().Pods(k.namespace).List(*listOptions) +} + +func (k *KubeClient) GetRunningPod(pod *v1.Pod, timeout *int64, labels map[string]string) (*v1.Pod, error) { + var watchedPod *v1.Pod + podWatch, err := k.WatchPod(CreateListOptions( + timeout, labels, pod.ResourceVersion)) + if err != nil { + return pod, err + } + defer podWatch.Stop() + for event := range podWatch.ResultChan() { + switch event.Type { + case watch.Error: + //TODO: Validate error handling. + return pod, fmt.Errorf("Received error when watching pod %s: %s", + pod.Name, + event.Object.GetObjectKind().GroupVersionKind().String()) + case watch.Deleted, watch.Added, watch.Modified: + watchedPod = event.Object.(*v1.Pod) + default: + return pod, fmt.Errorf( + "Got unknown event type %s while watching pod %s!", + event.Type, pod.Name) + } + if watchedPod.Name != pod.Name { + continue + } + if event.Type == watch.Deleted { + return pod, fmt.Errorf("Pod %s got deleted before becoming available!", + pod.Name) + } + switch watchedPod.Status.Phase { + case v1.PodPending: + continue + case v1.PodSucceeded, v1.PodFailed: + return pod, fmt.Errorf("Pod %s exited early with status %s!", + pod.Name, pod.Status.Phase) + case v1.PodRunning: + return watchedPod, nil + case v1.PodUnknown: + return pod, fmt.Errorf("Couldn't obtain Pod %s's state!", pod.Name) + default: + return pod, fmt.Errorf("Pod %s has unknown status (%s)!", pod.Name) + } + } + return pod, fmt.Errorf("Pod %s wasn't running within %d seconds!", + pod.Name, *timeout) +} + +func (k *KubeClient) GetPVC(pvcName string) (*v1.PersistentVolumeClaim, error) { + return k.clientset.Core().PersistentVolumeClaims(k.namespace).Get( + pvcName) +} + +func (k *KubeClient) GetPVCPhase(pvcName string) (v1.PersistentVolumeClaimPhase, + error) { + pvc, err := k.GetPVC(pvcName) + if err != nil { + var phase v1.PersistentVolumeClaimPhase = "" + return phase, err + } + return pvc.Status.Phase, nil +} + +func (k *KubeClient) CheckPVCExists(pvc string) (bool, error) { + if _, err := k.GetPVC(pvc); err != nil { + if statusErr, ok := err.(*errors.StatusError); ok && statusErr.Status().Reason == unversioned.StatusReasonNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +func (k *KubeClient) CreatePVC(pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) { + return k.clientset.Core().PersistentVolumeClaims(k.namespace).Create(pvc) +} + +func (k *KubeClient) DeletePVC(pvcName string, options *v1.DeleteOptions) error { + return k.clientset.Core().PersistentVolumeClaims(k.namespace).Delete(pvcName, options) +} + +func (k *KubeClient) WatchPVC(listOptions *v1.ListOptions) (watch.Interface, error) { + return k.clientset.Core().PersistentVolumeClaims(k.namespace).Watch(*listOptions) +} + +func (k *KubeClient) GetBoundPVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, timeout *int64, labels map[string]string) (*v1.PersistentVolumeClaim, error) { + var watchedPVC *v1.PersistentVolumeClaim + pvcWatch, err := k.WatchPVC(CreateListOptions( + timeout, labels, pvc.ResourceVersion)) + if err != nil { + return pvc, err + } + defer pvcWatch.Stop() + for event := range pvcWatch.ResultChan() { + switch event.Type { + case watch.Deleted: + return pvc, fmt.Errorf("PVC deleted before becoming bound.") + case watch.Error: + //TODO: Validate error handling. + return pvc, fmt.Errorf("Received error when watching PVC %s: %s", + pvc.Name, + event.Object.GetObjectKind().GroupVersionKind().String()) + case watch.Added, watch.Modified: + watchedPVC = event.Object.(*v1.PersistentVolumeClaim) + default: + return pvc, + fmt.Errorf("Got unknown event type %s while watching PVC %s!", + event.Type, pvc.Name) + } + if watchedPVC.Name != pvc.Name { + continue + } + switch watchedPVC.Status.Phase { + case v1.ClaimPending: + continue + case v1.ClaimLost: + return pvc, fmt.Errorf("PVC is in the lost phase!") + case v1.ClaimBound: + if watchedPVC.Spec.VolumeName == pv.Name { + return watchedPVC, nil + } + } + } + return pvc, fmt.Errorf("PVC %s wasn't bound to PV %s within %d seconds!", + pvc.Name, pv.Name, *timeout) +} + +func (k *KubeClient) CreatePV(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) { + return k.clientset.Core().PersistentVolumes().Create(pv) +} + +func (k *KubeClient) DeletePV(pvName string, options *v1.DeleteOptions) error { + return k.clientset.Core().PersistentVolumes().Delete(pvName, options) +} + +func CreateLabelSelectorString(labels map[string]string) string { + ret := "" + for k, v := range labels { + ret += k + ret += "=" + ret += v + ret += "," + } + return strings.TrimSuffix(ret, ",") +} + +func CreateListOptions(timeout *int64, labels map[string]string, resourceVersion string) *v1.ListOptions { + listOptions := &v1.ListOptions{ + TimeoutSeconds: timeout, + } + if len(labels) > 0 { + listOptions.LabelSelector = CreateLabelSelectorString(labels) + } + if resourceVersion != "" { + listOptions.ResourceVersion = resourceVersion + } + return listOptions +} diff --git a/k8s_client/k8s_client_fake.go b/k8s_client/k8s_client_fake.go new file mode 100644 index 000000000..a237b0252 --- /dev/null +++ b/k8s_client/k8s_client_fake.go @@ -0,0 +1,179 @@ +// Copyright 2016 NetApp, Inc. All Rights Reserved. + +package k8s_client + +import ( + "fmt" + "sort" + + "k8s.io/client-go/pkg/api/errors" + "k8s.io/client-go/pkg/api/unversioned" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/apis/extensions/v1beta1" + "k8s.io/client-go/pkg/version" + "k8s.io/client-go/pkg/watch" +) + +type FakeKubeClient struct { + Deployments map[string]*v1beta1.Deployment + PVCs map[string]*v1.PersistentVolumeClaim + failMatrix map[string]bool +} + +func NewFakeKubeClient(failMatrix map[string]bool) *FakeKubeClient { + return &FakeKubeClient{ + Deployments: make(map[string]*v1beta1.Deployment, 0), + PVCs: make(map[string]*v1.PersistentVolumeClaim, 0), + failMatrix: failMatrix, + } +} + +type FakeKubeClientState struct { + Deployments []string + PVCs []string +} + +func (k *FakeKubeClient) SnapshotState() *FakeKubeClientState { + state := &FakeKubeClientState{ + Deployments: make([]string, 0), + PVCs: make([]string, 0), + } + for key, _ := range k.Deployments { + state.Deployments = append(state.Deployments, key) + } + sort.Strings(state.Deployments) + for key, _ := range k.PVCs { + state.PVCs = append(state.PVCs, key) + } + sort.Strings(state.PVCs) + return state +} + +func (k *FakeKubeClient) Version() *version.Info { + return nil +} + +func (k *FakeKubeClient) GetDeployment(deploymentName string) (*v1beta1.Deployment, error) { + if fail, ok := k.failMatrix["GetDeployment"]; fail && ok { + return nil, fmt.Errorf("GetDeployment failed") + } + if deployment, ok := k.Deployments[deploymentName]; ok { + return deployment, nil + } + err := &errors.StatusError{} + err.ErrStatus.Reason = unversioned.StatusReasonNotFound + return nil, err +} + +func (k *FakeKubeClient) CheckDeploymentExists(deploymentName string) (bool, error) { + if _, err := k.GetDeployment(deploymentName); err != nil { + if statusErr, ok := err.(*errors.StatusError); ok && statusErr.Status().Reason == unversioned.StatusReasonNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +func (k *FakeKubeClient) CreateDeployment(deployment *v1beta1.Deployment) (*v1beta1.Deployment, error) { + if fail, ok := k.failMatrix["CreateDeployment"]; fail && ok { + return deployment, fmt.Errorf("CreateDeployment failed") + } + k.Deployments[deployment.Name] = deployment + return deployment, nil +} + +func (k *FakeKubeClient) GetPod(podName string) (*v1.Pod, error) { + return nil, nil +} + +func (k *FakeKubeClient) GetPodByLabels(listOptions *v1.ListOptions) (*v1.Pod, error) { + return nil, nil +} + +func (k *FakeKubeClient) GetPodPhase(podName string) (v1.PodPhase, error) { + return v1.PodRunning, nil +} + +func (k *FakeKubeClient) CheckPodExists(pod string) (bool, error) { + return true, nil +} + +func (k *FakeKubeClient) CreatePod(pod *v1.Pod) (*v1.Pod, error) { + return nil, nil +} + +func (k *FakeKubeClient) DeletePod(podName string, options *v1.DeleteOptions) error { + return nil +} + +func (k *FakeKubeClient) WatchPod(listOptions *v1.ListOptions) (watch.Interface, error) { + return watch.NewEmptyWatch(), nil +} + +func (k *FakeKubeClient) ListPod(listOptions *v1.ListOptions) (*v1.PodList, error) { + return nil, nil +} + +func (k *FakeKubeClient) GetRunningPod(pod *v1.Pod, timeout *int64, labels map[string]string) (*v1.Pod, error) { + return nil, nil +} + +func (k *FakeKubeClient) GetPVC(pvcName string) (*v1.PersistentVolumeClaim, error) { + if fail, ok := k.failMatrix["GetPVC"]; fail && ok { + return nil, fmt.Errorf("GetPVC failed") + } + if pvc, ok := k.PVCs[pvcName]; ok { + return pvc, nil + } + err := &errors.StatusError{} + err.ErrStatus.Reason = unversioned.StatusReasonNotFound + return nil, err +} + +func (k *FakeKubeClient) GetPVCPhase(pvcName string) (v1.PersistentVolumeClaimPhase, error) { + pvc, err := k.GetPVC(pvcName) + if err != nil { + var phase v1.PersistentVolumeClaimPhase = "" + return phase, err + } + return pvc.Status.Phase, nil +} + +func (k *FakeKubeClient) CheckPVCExists(pvc string) (bool, error) { + if _, err := k.GetPVC(pvc); err != nil { + if statusErr, ok := err.(*errors.StatusError); ok && statusErr.Status().Reason == unversioned.StatusReasonNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +func (k *FakeKubeClient) CreatePVC(pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolumeClaim, error) { + if fail, ok := k.failMatrix["CreatePVC"]; fail && ok { + return pvc, fmt.Errorf("CreatePVC failed") + } + k.PVCs[pvc.Name] = pvc + return pvc, nil +} + +func (k *FakeKubeClient) DeletePVC(pvcName string, options *v1.DeleteOptions) error { + return nil +} + +func (k *FakeKubeClient) WatchPVC(listOptions *v1.ListOptions) (watch.Interface, error) { + return watch.NewEmptyWatch(), nil +} + +func (k *FakeKubeClient) GetBoundPVC(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, timeout *int64, labels map[string]string) (*v1.PersistentVolumeClaim, error) { + return nil, nil +} + +func (k *FakeKubeClient) CreatePV(pv *v1.PersistentVolume) (*v1.PersistentVolume, error) { + return nil, nil +} + +func (k *FakeKubeClient) DeletePV(pvName string, options *v1.DeleteOptions) error { + return nil +} diff --git a/kubernetes-yaml/trident-deployment.yaml.templ b/kubernetes-yaml/trident-deployment.yaml.templ index e27a1ebc2..7f94295a4 100644 --- a/kubernetes-yaml/trident-deployment.yaml.templ +++ b/kubernetes-yaml/trident-deployment.yaml.templ @@ -31,27 +31,25 @@ spec: - containerPort: 8000 - name: etcd image: quay.io/coreos/etcd:v3.1.3 - ports: - - containerPort: 8001 command: - /usr/local/bin/etcd args: - -name - etcd1 - -advertise-client-urls - - http://0.0.0.0:8001 + - http://127.0.0.1:8001 - -listen-client-urls - - http://0.0.0.0:8001 + - http://127.0.0.1:8001 - -initial-cluster - - default=http://0.0.0.0:8002 + - default=http://127.0.0.1:8002 - -initial-advertise-peer-urls - - http://0.0.0.0:8002 + - http://127.0.0.1:8002 - -listen-peer-urls - - http://0.0.0.0:8002 + - http://127.0.0.1:8002 - -data-dir - /var/etcd/data - -initial-cluster - - etcd1=http://0.0.0.0:8002 + - etcd1=http://127.0.0.1:8002 volumeMounts: - name: etcd-vol mountPath: /var/etcd/data diff --git a/kubernetes-yaml/trident-namespace.yaml b/kubernetes-yaml/trident-namespace.yaml new file mode 100644 index 000000000..4bc2cbdb7 --- /dev/null +++ b/kubernetes-yaml/trident-namespace.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: trident diff --git a/launcher/docker-build/Dockerfile b/launcher/docker-build/Dockerfile index 43ac13b5f..2929aa8ac 100644 --- a/launcher/docker-build/Dockerfile +++ b/launcher/docker-build/Dockerfile @@ -2,7 +2,7 @@ FROM debian:jessie MAINTAINER Chris Dragga -LABEL version="1.0" \ +LABEL version="17.04.0" \ description="Tool to bootstrap a PVC for Trident's etcd deployment." COPY launcher /usr/local/bin/launcher diff --git a/launcher/kube_client.go b/launcher/kube_client.go deleted file mode 100644 index 23dcf2f16..000000000 --- a/launcher/kube_client.go +++ /dev/null @@ -1,401 +0,0 @@ -// Copyright 2016 NetApp, Inc. All Rights Reserved. - -package main - -import ( - "bytes" - "fmt" - "io/ioutil" - "net/http" - "strings" - - k8sfrontend "github.com/netapp/trident/frontend/kubernetes" - "github.com/netapp/trident/storage" - - log "github.com/Sirupsen/logrus" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/pkg/api/errors" - "k8s.io/client-go/pkg/api/resource" - "k8s.io/client-go/pkg/api/unversioned" - "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/pkg/apis/extensions/v1beta1" - "k8s.io/client-go/pkg/types" - "k8s.io/client-go/pkg/util/yaml" - "k8s.io/client-go/pkg/watch" - "k8s.io/client-go/rest" -) - -const ( - pvcName = "trident" - pvName = "trident" - inMemoryTridentName = "trident-ephemeral" - tridentContainerName = "trident-main" - tridentLabelValue = "trident.netapp.io" - - defaultPort = 8000 - - timeout = 60 -) - -var ( - volBytes int64 = int64(volGB) * 1073741824 -) - -type KubeClient struct { - clientset *kubernetes.Clientset - namespace string - tridentName string - tridentImage string - lastRV string - tridentDeployment *v1beta1.Deployment -} - -func NewKubeClient(config *rest.Config, podDefinitionPath string) ( - *KubeClient, error) { - var tridentDeployment v1beta1.Deployment - - yamlBytes, err := ioutil.ReadFile(podDefinitionPath) - if err != nil { - return nil, err - } - err = yaml.NewYAMLOrJSONDecoder(bytes.NewBuffer(yamlBytes), 512).Decode( - &tridentDeployment) - if err != nil { - return nil, err - } - - tridentImage := "" - for _, container := range tridentDeployment.Spec.Template.Spec.Containers { - if container.Name == tridentContainerName { - tridentImage = container.Image - } - } - if tridentImage == "" { - return nil, fmt.Errorf("Trident container not found in pod definition.") - } - - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - return &KubeClient{clientset: clientset, - namespace: tridentDeployment.Namespace, - tridentName: tridentDeployment.Name, - tridentDeployment: &tridentDeployment, - tridentImage: tridentImage, - }, nil -} - -func (k *KubeClient) CheckTridentRunning() (bool, error) { - _, err := k.clientset.ExtensionsV1beta1().Deployments(k.namespace).Get( - k.tridentDeployment.Name) - if err != nil { - if statusErr, ok := err.(*errors.StatusError); ok && statusErr.ErrStatus.Code != http.StatusNotFound { - return false, err - } - return false, nil - } - return true, nil -} - -func (k *KubeClient) PVCExists() (bool, error) { - _, err := k.clientset.Core().PersistentVolumeClaims(k.namespace).Get( - pvcName) - if err != nil { - if statusErr, ok := err.(*errors.StatusError); ok && statusErr.ErrStatus.Code != http.StatusNotFound { - return false, err - } - return false, nil - } - return true, nil -} - -func (k *KubeClient) getRunningTrident( - namePrefix string, -) (createdPod *v1.Pod, err error) { - var timeoutSeconds int64 = timeout - podWatch, err := k.clientset.Core().Pods(k.namespace).Watch( - v1.ListOptions{ - // TODO: I'm not sure TimeoutSeconds actually does anything - TimeoutSeconds: &timeoutSeconds, - LabelSelector: fmt.Sprintf("app=%s", tridentLabelValue), - }, - ) - if err != nil { - return nil, err - } - defer podWatch.Stop() - // TODO: It may be necessary to introduce a timeout here at some point. - for event := range podWatch.ResultChan() { - switch event.Type { - case watch.Error: - // TODO: Figure out the error handling here. - return nil, fmt.Errorf("Received error when running watch.") - case watch.Deleted, watch.Added, watch.Modified: - createdPod = event.Object.(*v1.Pod) - default: - return nil, fmt.Errorf( - "Got unknown event type while watching: %s", event.Type) - } - if !strings.HasPrefix(createdPod.Name, namePrefix) { - continue - } - if event.Type == watch.Deleted { - return nil, fmt.Errorf("Trident pod deleted before becoming " + - "available.") - } - - k.lastRV = createdPod.ResourceVersion - switch createdPod.Status.Phase { - case v1.PodPending: - continue - case v1.PodSucceeded, v1.PodFailed: - return nil, fmt.Errorf("Pod exited early with status %s", - createdPod.Status.Phase) - case v1.PodRunning: - return createdPod, nil - case v1.PodUnknown: - return nil, fmt.Errorf("Pod status is unknown.") - default: - return nil, fmt.Errorf("Received unknown pod status: %s", - createdPod.Status.Phase) - } - } - return nil, fmt.Errorf("Unable to start Trident within %d seconds.", - timeout) -} - -func (k *KubeClient) waitInMemoryTridentDeletion(name string) error { - var deletedPod *v1.Pod - var timeoutSeconds int64 = timeout - - podWatch, err := k.clientset.Core().Pods(k.namespace).Watch( - v1.ListOptions{ - // TODO: I'm not sure TimeoutSeconds actually does anything - TimeoutSeconds: &timeoutSeconds, - LabelSelector: fmt.Sprintf("app=%s", tridentLabelValue), - ResourceVersion: k.lastRV, - }, - ) - if err != nil { - return err - } - defer podWatch.Stop() - for event := range podWatch.ResultChan() { - switch event.Type { - case watch.Error: - // TODO: Figure out the error handling here. - return fmt.Errorf("Received error when running watch.") - case watch.Added, watch.Modified: - continue - case watch.Deleted: - deletedPod = event.Object.(*v1.Pod) - default: - return fmt.Errorf("Received unknown status for watch: %s", - event.Type) - } - if deletedPod.Name != name { - continue - } - // The Trident pod was deleted successfully. - return nil - } - return fmt.Errorf("Watch terminated early.") -} - -// StartInMemoryTrident starts a Trident pod using an in-memory store and -// waits until it comes online. Returns the pod's IP address once running -// or an error. -func (k *KubeClient) StartInMemoryTrident() (string, error) { - // Don't bother checking if Trident exists for now. In error cases, we - // can have the user manually delete. - podToCreate := &v1.Pod{ - TypeMeta: unversioned.TypeMeta{ - Kind: "Pod", - APIVersion: "v1", - }, - ObjectMeta: v1.ObjectMeta{ - Name: inMemoryTridentName, - Namespace: k.namespace, - Labels: map[string]string{"app": tridentLabelValue}, - }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ - v1.Container{ - Name: tridentContainerName, - Image: k.tridentImage, - Command: []string{"/usr/local/bin/trident_orchestrator"}, - Args: []string{ - "-port", fmt.Sprintf("%d", defaultPort), - "-no_persistence", - }, - Ports: []v1.ContainerPort{ - v1.ContainerPort{ContainerPort: defaultPort}, - }, - }, - }, - }, - } - if *debug { - podToCreate.Spec.Containers[0].Args = append(podToCreate.Spec.Containers[0].Args, "-debug") - } - log.Debug("Creating Trident.") - _, err := k.clientset.Core().Pods(k.namespace).Create(podToCreate) - if err != nil { - return "", err - } - createdPod, err := k.getRunningTrident(inMemoryTridentName) - if err != nil { - return "", err - } - if createdPod.Status.PodIP == "" { - return "", fmt.Errorf("Pod IP empty.") - } - return createdPod.Status.PodIP, nil -} - -func (k *KubeClient) StartFullTrident() error { - _, err := k.clientset.ExtensionsV1beta1().Deployments(k.namespace).Create(k.tridentDeployment) - if err != nil { - return err - } - _, err = k.getRunningTrident(k.tridentName) - return err -} - -func (k *KubeClient) waitForPVCBind() (bool, error) { - var createdPVC *v1.PersistentVolumeClaim - - pvcWatch, err := k.clientset.Core().PersistentVolumeClaims( - k.namespace).Watch(v1.ListOptions{ - LabelSelector: fmt.Sprintf("app=%s", tridentLabelValue), - }, - ) - if err != nil { - return false, err - } - defer pvcWatch.Stop() - for event := range pvcWatch.ResultChan() { - switch event.Type { - case watch.Deleted: - return false, fmt.Errorf("Trident PVC deleted before becoming " + - "binding.") - case watch.Error: - return false, fmt.Errorf("Received error when running watch.") - case watch.Added, watch.Modified: - createdPVC = event.Object.(*v1.PersistentVolumeClaim) - default: - return false, fmt.Errorf("Got unknown event type while watching: "+ - "%s", event.Type) - } - if createdPVC.Name != pvcName { - continue - } - switch createdPVC.Status.Phase { - case v1.ClaimPending: - continue - case v1.ClaimLost: - return false, fmt.Errorf("Volume assigned to claim lost") - } - // We know if the PVC bound to the correct volume if the volume name - // matches - return createdPVC.Spec.VolumeName == pvName, nil - } - return false, fmt.Errorf("Watch stopped prematurely. Volume state " + - "unknown.") -} - -func (k *KubeClient) createPVC() (types.UID, error) { - pvcToCreate := &v1.PersistentVolumeClaim{ - TypeMeta: unversioned.TypeMeta{ - Kind: "PersistentVolumeClaim", - APIVersion: "v1", - }, - ObjectMeta: v1.ObjectMeta{ - Name: pvcName, - Namespace: k.namespace, - Labels: map[string]string{"app": tridentLabelValue}, - }, - Spec: v1.PersistentVolumeClaimSpec{ - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, - Resources: v1.ResourceRequirements{ - Requests: v1.ResourceList{ - v1.ResourceStorage: *resource.NewQuantity(volBytes, - resource.BinarySI), - }, - }, - Selector: &unversioned.LabelSelector{ - MatchLabels: map[string]string{"app": tridentLabelValue}, - }, - }, - } - pvc, err := k.clientset.Core().PersistentVolumeClaims(k.namespace).Create( - pvcToCreate) - if err != nil { - return "", err - } - return pvc.UID, nil -} - -func (k *KubeClient) createPV( - volConfig *storage.VolumeConfig, pvcUID types.UID, -) error { - pvToCreate := &v1.PersistentVolume{ - TypeMeta: unversioned.TypeMeta{ - Kind: "PersistentVolume", - APIVersion: "v1", - }, - ObjectMeta: v1.ObjectMeta{ - Name: pvName, - Labels: map[string]string{"app": tridentLabelValue}, - }, - Spec: v1.PersistentVolumeSpec{ - Capacity: v1.ResourceList{ - v1.ResourceStorage: *resource.NewQuantity(volBytes, - resource.BinarySI), - }, - AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, - ClaimRef: &v1.ObjectReference{ - Namespace: k.namespace, - Name: pvcName, - UID: pvcUID, - }, - PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimRetain, - }, - } - switch { - case volConfig.AccessInfo.NfsAccessInfo.NfsServerIP != "": - pvToCreate.Spec.NFS = k8sfrontend.CreateNFSVolumeSource(volConfig) - case volConfig.AccessInfo.IscsiAccessInfo.IscsiTargetPortal != "": - pvToCreate.Spec.ISCSI = k8sfrontend.CreateISCSIVolumeSource(volConfig) - default: - return fmt.Errorf("Cannot create PV. Unrecognized volume type.") - } - _, err := k.clientset.Core().PersistentVolumes().Create(pvToCreate) - if err != nil { - return err - } - return err -} - -func (k *KubeClient) CreateKubeVolume(volConfig *storage.VolumeConfig) ( - bool, error) { - pvcUID, err := k.createPVC() - if err != nil { - return false, fmt.Errorf("Unable to create PVC: %v", err) - } - err = k.createPV(volConfig, pvcUID) - if err != nil { - return false, fmt.Errorf("Unable to create PV: %v", err) - } - return k.waitForPVCBind() -} - -func (k *KubeClient) RemoveInMemoryTrident() error { - err := k.clientset.Core().Pods(k.namespace).Delete(inMemoryTridentName, - &v1.DeleteOptions{}) - if err != nil { - return err - } - return k.waitInMemoryTridentDeletion(inMemoryTridentName) -} diff --git a/launcher/kubernetes-yaml/launcher-pod.yaml.templ b/launcher/kubernetes-yaml/launcher-pod.yaml.templ index f59db5de3..956207e04 100644 --- a/launcher/kubernetes-yaml/launcher-pod.yaml.templ +++ b/launcher/kubernetes-yaml/launcher-pod.yaml.templ @@ -10,9 +10,18 @@ spec: image: __LAUNCHER_TAG__ command: - /usr/local/bin/launcher - #args: + args: + - "-volume_size" + - "2" + #- "-volume_name" + #- "trident" + #- "-pvc_name + #- "trident" + #- "-pv_name" + #- "trident" #- "-apiserver" #- "__KUBERNETES_SERVER__:__KUBERNETES_PORT__" + #- "-debug" volumeMounts: - name: config-volume mountPath: /etc/config diff --git a/launcher/launcher.go b/launcher/launcher.go index 0f09b11cf..94a6978ee 100644 --- a/launcher/launcher.go +++ b/launcher/launcher.go @@ -8,92 +8,183 @@ import ( "flag" "fmt" "io/ioutil" - "net" "net/http" "strings" "time" log "github.com/Sirupsen/logrus" + "github.com/netapp/trident/k8s_client" + "k8s.io/client-go/pkg/api/resource" + "k8s.io/client-go/pkg/api/unversioned" + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/apis/extensions/v1beta1" + "k8s.io/client-go/pkg/util/yaml" + "k8s.io/client-go/pkg/version" k8srest "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "github.com/netapp/trident/frontend/rest" + k8sfrontend "github.com/netapp/trident/frontend/kubernetes" + tridentrest "github.com/netapp/trident/frontend/rest" "github.com/netapp/trident/storage" - sc "github.com/netapp/trident/storage_class" + "github.com/netapp/trident/storage_class" +) + +const ( + tridentContainerName = "trident-main" + tridentEphemeralPodName = "trident-ephemeral" + tridentDefaultPort = 8000 + tridentStorageClassName = "trident-basic" + //based on the number of seconds Trident waits on etcd to bootstrap + bootstrappingTimeout int64 = 11 ) var ( - apiServerIP = flag.String("apiserver", "", "Kubernetes API server IP "+ - "address.") + apiServerIP = flag.String("apiserver", "", "Kubernetes API server IP address.") backendFile = flag.String("backend", "/etc/config/backend.json", - "Name of the configuration file for the backend that will host the "+ - "etcd volume.") + "Configuration file for the backend that will host the etcd volume.") deploymentFile = flag.String("deployment_file", - "/etc/config/trident-deployment.yaml", - "Deployment definition file for Trident") - debug = flag.Bool("debug", false, "Enable debug output.") + "/etc/config/trident-deployment.yaml", "Deployment definition file for Trident") + debug = flag.Bool("debug", false, "Enable debug output.") + k8sTimeout = flag.Int64("k8s_timeout", 60, "The number of seconds to wait before timing out on Kubernetes operations.") + tridentVolumeSize = flag.Int("volume_size", 1, "The size of the volume used by etcd in GB.") + tridentVolumeName = flag.String("volume_name", "trident", "The name of the volume used by etcd.") + tridentPVCName = flag.String("pvc_name", "trident", "The name of the PVC used by Trident.") + tridentPVName = flag.String("pv_name", "trident", "The name of the PV used by Trident.") + tridentTimeout = flag.Int("trident_timeout", 10, "The number of seconds to wait before timing out on a Trident connection.") + tridentImage = "" + tridentNamespace = "" + tridentLabels map[string]string + tridentEphemeralLabels = map[string]string{"app": "trident-launcher.netapp.io"} ) -const ( - scName = "trident-basic" - volName = "trident" - - volGB int = 1 - maxTries = 10 - defaultTridentPort = 8000 -) +// createTridentDeploymentFromFile creates a deployment object from a file. +func createTridentDeploymentFromFile(deploymentFile string) (*v1beta1.Deployment, error) { + var deployment v1beta1.Deployment -func WaitOnline(ip string) error { - for tries := 0; tries < maxTries; tries++ { - log.Debug("Checking that ephemeral Trident is online") - _, err := http.Get(fmt.Sprintf("http://%s:%d/trident/v1/backend", - ip, defaultTridentPort)) - if err == nil { - return nil - } - if _, ok := err.(net.Error); !ok { - return err - } - // Assume that net errors are likely to be transient (e.g., connection - // refused). Even if they aren't, retrying here won't hurt. - log.Debug("Connection to ephemeral Trident got network error; " + - "retrying.") - time.Sleep(time.Second) + yamlBytes, err := ioutil.ReadFile(deploymentFile) + if err != nil { + return nil, err + } + err = yaml.NewYAMLOrJSONDecoder(bytes.NewBuffer(yamlBytes), 512).Decode( + &deployment) + if err != nil { + return nil, err } - return fmt.Errorf("Unable to connect to ephemeral Trident after %d "+ - "seconds.", maxTries) + return &deployment, nil } -func PostBackend(ip string) (backendName string, err error) { - var backendResponse rest.AddBackendResponse - - jsonBytes, err := ioutil.ReadFile(*backendFile) +// createTridentEphemeralPod creates the ephemeral Trident pod. +func createTridentEphemeralPod(kubeClient k8s_client.Interface) (*v1.Pod, error) { + // Check if the pod already exists + exists, err := kubeClient.CheckPodExists(tridentEphemeralPodName) if err != nil { - return "", err + return nil, + fmt.Errorf("Launcher couldn't detect the presence of %s pod: %s", + tridentEphemeralPodName, err) } - resp, err := http.Post(fmt.Sprintf("http://%s:%d/trident/v1/backend", ip, - defaultTridentPort), "application/json", bytes.NewBuffer(jsonBytes)) - if err != nil { - return "", err + if exists { + return nil, fmt.Errorf("Please run 'kubectl delete pod %s' and try again!", + tridentEphemeralPodName) } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return "", err + + // Create the pod + pod := &v1.Pod{ + TypeMeta: unversioned.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: tridentEphemeralPodName, + Namespace: tridentNamespace, + Labels: tridentEphemeralLabels, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Name: tridentContainerName, + Image: tridentImage, + Command: []string{"/usr/local/bin/trident_orchestrator"}, + Args: []string{ + "-port", fmt.Sprintf("%d", tridentDefaultPort), + "-no_persistence", + }, + Ports: []v1.ContainerPort{ + v1.ContainerPort{ContainerPort: tridentDefaultPort}, + }, + }, + }, + }, + } + if *debug { + pod.Spec.Containers[0].Args = append(pod.Spec.Containers[0].Args, + "-debug") } + return kubeClient.CreatePod(pod) +} - err = json.Unmarshal(body, &backendResponse) - if err != nil { - return "", err +// stopTridentEphemeralPod stops the ephemeral Trident pod. +func stopTridentEphemeralPod(kubeClient k8s_client.Interface) error { + var gracePeriod int64 = 0 + options := &v1.DeleteOptions{ + GracePeriodSeconds: &gracePeriod, } - if backendResponse.Error != "" { - return "", fmt.Errorf("%s", backendResponse.Error) + return kubeClient.DeletePod(tridentEphemeralPodName, options) +} + +// checkTridentAPI checks the responsiveness of a Trident pod. +func checkTridentAPI(tridentClient tridentrest.Interface, podName string) error { + var err error + startTime := time.Now() + for t := 0; time.Since(startTime) < time.Duration(bootstrappingTimeout)*time.Second; t++ { + if err != nil { + log.Debugf("Launcher validating pod %s has bootstrapped (trial #%d: %v).", + podName, t, err) + } + _, err = tridentClient.ListBackends() + if err == nil { + log.Debugf("Launcher validated pod %s has bootstrapped after %v.", + podName, time.Since(startTime)) + return nil + } + time.Sleep(time.Second) } - return backendResponse.BackendID, nil + return fmt.Errorf("Pod %s isn't running after %v (%s).", + podName, time.Since(startTime), err) +} + +// addBackend adds a backend to a Trident pod +func addBackend(tridentClient tridentrest.Interface, + fileName string) (string, error) { + addBackendResponse, err := tridentClient.PostBackend(fileName) + if err != nil { + return "", + fmt.Errorf("Launcher failed in communication with Pod %s: %s", + tridentEphemeralPodName, err) + } else if addBackendResponse.Error != "" { + return "", fmt.Errorf("Pod %s failed in adding a backend: %s", + tridentEphemeralPodName, addBackendResponse.Error) + } + log.Infof("Launcher successfully added backend %s to pod %s.", + addBackendResponse.BackendID, tridentEphemeralPodName) + return addBackendResponse.BackendID, nil } -func GetBackendStoragePools(ip, backendName string) ([]string, error) { - // Unmarshaling the entire backend is difficult, so just get what we need. +// getStoragePools retrieves the storage pools from a given backend added to a Trident pod. +func getStoragePools(tridentClient tridentrest.Interface, + backendID string) ([]string, error) { + /* //TODO: Fix the unmarshaling problem with StorageBackendExternal.Storage.Attributes + getBackendResponse, err := tridentClient.GetBackend(addBackendResponse.BackendID) + if err != nil { + return nil, fmt.Errorf("Launcher failed in communication with Pod %s: %s", + tridentEphemeralPodName, err) + } else if addBackendResponse.Error != "" { + return nil, fmt.Errorf("Pod %s failed in getting a backend: %s", + tridentEphemeralPodName, getBackendResponse.Error) + } + log.Infof("Launcher retrieved storage pools for backend %s: %v", + addBackendResponse.BackendID, getBackendResponse.Backend.Storage) + */ + // Workaround for the unmarshalling problem type partialBackend struct { Backend struct { Name string `json:"name"` @@ -101,240 +192,692 @@ func GetBackendStoragePools(ip, backendName string) ([]string, error) { } `json:"backend"` Error string `json:"error"` } - var backendResponse partialBackend - storagePoolNames := make([]string, 0) + var ( + resp *http.Response + backendResponse partialBackend + err error + bytes []byte + storagePools []string = make([]string, 0) + ) - resp, err := http.Get( - fmt.Sprintf("http://%s:%d/trident/v1/backend/%s", ip, defaultTridentPort, - backendName)) - if err != nil { + if resp, err = tridentClient.Get("backend/" + backendID); err != nil { return nil, err } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { + if bytes, err = ioutil.ReadAll(resp.Body); err != nil { return nil, err } - - err = json.Unmarshal(body, &backendResponse) - if err != nil { + if err = json.Unmarshal(bytes, &backendResponse); err != nil { return nil, err } if backendResponse.Error != "" { return nil, fmt.Errorf("%s", backendResponse.Error) } - for name, _ := range backendResponse.Backend.Storage { - storagePoolNames = append(storagePoolNames, name) + for pool, _ := range backendResponse.Backend.Storage { + storagePools = append(storagePools, pool) } - return storagePoolNames, nil + return storagePools, nil } -// Do this against the Trident API, rather than Kubernetes, so that we don't -// have an extra storage class lying around Kubernetes -func PostStorageClass(ip, backendName string, storagePoolList []string) error { - var scResponse rest.AddStorageClassResponse - - scConfig := sc.Config{ - Version: "v1", - Name: scName, - BackendStoragePools: map[string][]string{backendName: storagePoolList}, - } - jsonBytes, err := json.Marshal(&scConfig) +// addStorageClass adds a storage class to a Trident pod. +func addStorageClass(tridentClient tridentrest.Interface, + storageClassConfig *storage_class.Config) error { + addStorageClassResponse, + err := tridentClient.AddStorageClass(storageClassConfig) if err != nil { return err + } else if addStorageClassResponse.Error != "" { + return fmt.Errorf("Pod %s failed in adding storage class %s: %s", + tridentEphemeralPodName, storageClassConfig.Name, + addStorageClassResponse.Error) } - resp, err := http.Post(fmt.Sprintf("http://%s:%d/trident/v1/storageclass", - ip, defaultTridentPort), "application/json", bytes.NewBuffer(jsonBytes)) + log.Infof("Launcher successfully added storage class %s to pod %s.", + addStorageClassResponse.StorageClassID, tridentEphemeralPodName) + return nil +} + +// checkVolumeExists checks whether a Trident pod has created a volume with +// the given name. +func checkVolumeExists( + tridentClient tridentrest.Interface, + volName string) (bool, error) { + getVolResponse, err := tridentClient.GetVolume(volName) if err != nil { - return err + return false, err + } else if strings.Contains(getVolResponse.Error, "not found") { + return false, nil + } else if getVolResponse.Error == "" { + if getVolResponse.Volume.Config.Name == volName { + return true, nil + } } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + return false, fmt.Errorf(getVolResponse.Error) +} + +// getVolume retrieves a volume from a Trident pod. +func getVolume(tridentClient tridentrest.Interface, + volName string) (*storage.VolumeConfig, error) { + getVolumeResponse, err := tridentClient.GetVolume(volName) if err != nil { - return err + return nil, fmt.Errorf("Launcher failed in getting volume %s: %s", + volName, err) + } else if getVolumeResponse.Error != "" { + return nil, fmt.Errorf("Pod %s failed in getting volume %s: %s", + tridentEphemeralPodName, volName, getVolumeResponse.Error) } + return getVolumeResponse.Volume.Config, nil +} - err = json.Unmarshal(body, &scResponse) +// addVolume adds a volume to a Trident pod. +func addVolume(tridentClient tridentrest.Interface, volConfig *storage.VolumeConfig) error { + addVolumeResponse, err := tridentClient.AddVolume(volConfig) if err != nil { - return err - } - if scResponse.Error != "" { - return fmt.Errorf("%s", scResponse.Error) + return fmt.Errorf("%s", err) + } else if addVolumeResponse.Error != "" { + return fmt.Errorf("%s", addVolumeResponse.Error) } return nil } -func GetVolume(ip, name string) (*storage.VolumeConfig, error) { - var volResponse rest.GetVolumeResponse - - log.Debug("Retrieving created volume.") - resp, err := http.Get( - fmt.Sprintf("http://%s:%d/trident/v1/volume/%s", ip, - defaultTridentPort, name)) +// deleteVolume deletes a volume from a Trident pod. +func deleteVolume(tridentClient tridentrest.Interface, volName string) error { + deleteResponse, err := tridentClient.DeleteVolume(volName) if err != nil { - return nil, err + return fmt.Errorf("%s", err) + } else if deleteResponse.Error != "" { + return fmt.Errorf("%s", deleteResponse.Error) } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + return nil +} + +// provisionVolume undertakes all the steps required to provision a volume on a +// new Trident pod. +func provisionVolume(tridentClient tridentrest.Interface) error { + // Add a backend + backendID, err := addBackend(tridentClient, *backendFile) if err != nil { - log.Debug("IO error.") - return nil, err + return err } - err = json.Unmarshal(body, &volResponse) + // Get backend storage pool to define a storage class + pools, err := getStoragePools(tridentClient, backendID) if err != nil { - log.Debug("Failed to unmarshal. JSON: ", string(body)) - return nil, err + return err } - if volResponse.Error != "" { - log.Debug("Backend error.") - return nil, fmt.Errorf("%s", volResponse.Error) + + // Create the storage class + storageClassConfig := &storage_class.Config{ + Version: "v1", + Name: tridentStorageClassName, + BackendStoragePools: map[string][]string{backendID: pools}, } - return volResponse.Volume.Config, err -} + // Add the storage class + if err := addStorageClass(tridentClient, storageClassConfig); err != nil { + return err + } -func PostVolume(ip string) (*storage.VolumeConfig, error) { - var volResponse rest.AddVolumeResponse + // Create the volume volConfig := &storage.VolumeConfig{ - Name: volName, - Size: fmt.Sprintf("%dGB", volGB), - StorageClass: scName, - } - jsonBytes, err := json.Marshal(&volConfig) - if err != nil { - return nil, err + Name: *tridentVolumeName, + Size: fmt.Sprintf("%dGB", *tridentVolumeSize), + StorageClass: tridentStorageClassName, } + return addVolume(tridentClient, volConfig) +} - resp, err := http.Post(fmt.Sprintf("http://%s:%d/trident/v1/volume", - ip, defaultTridentPort), "application/json", - bytes.NewBuffer(jsonBytes)) - if err != nil { - return nil, err - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } +// createPVC creates a PVC in the Kubernetes cluster. +func createPVC(kubeClient k8s_client.Interface, pvcName string) (*v1.PersistentVolumeClaim, error) { + pvc := &v1.PersistentVolumeClaim{ + TypeMeta: unversioned.TypeMeta{ + Kind: "PersistentVolumeClaim", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: pvcName, + Namespace: tridentNamespace, + Labels: tridentLabels, + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: *resource.NewQuantity( + int64(*tridentVolumeSize)*1073741824, + resource.BinarySI), + }, + }, + Selector: &unversioned.LabelSelector{ + MatchLabels: tridentLabels, + }, + }, + } + return kubeClient.CreatePVC(pvc) +} - err = json.Unmarshal(body, &volResponse) - if err != nil { - return nil, err - } - if volResponse.Error != "" { - return nil, fmt.Errorf("%s", volResponse.Error) +// createPV creates a PV in the Kubernetes cluster. +func createPV(kubeClient k8s_client.Interface, pvName string, + volConfig *storage.VolumeConfig, + pvc *v1.PersistentVolumeClaim) (*v1.PersistentVolume, error) { + pv := &v1.PersistentVolume{ + TypeMeta: unversioned.TypeMeta{ + Kind: "PersistentVolume", + APIVersion: "v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: pvName, + Labels: tridentLabels, + }, + Spec: v1.PersistentVolumeSpec{ + Capacity: v1.ResourceList{ + v1.ResourceStorage: *resource.NewQuantity( + int64(*tridentVolumeSize)*1073741824, + resource.BinarySI), + }, + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce}, + ClaimRef: &v1.ObjectReference{ + Namespace: pvc.Namespace, + Name: pvc.Name, + UID: pvc.UID, + }, + PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimRetain, + }, + } + switch { + case volConfig.AccessInfo.NfsAccessInfo.NfsServerIP != "": + pv.Spec.NFS = k8sfrontend.CreateNFSVolumeSource(volConfig) + case volConfig.AccessInfo.IscsiAccessInfo.IscsiTargetPortal != "": + pv.Spec.ISCSI = k8sfrontend.CreateISCSIVolumeSource(volConfig) + default: + return nil, fmt.Errorf("Unrecognized volume type") + } + return kubeClient.CreatePV(pv) +} + +type Launcher struct { + kubeClient k8s_client.Interface + tridentClient tridentrest.Interface + tridentEphemeralClient tridentrest.Interface + tridentDeployment *v1beta1.Deployment +} + +// NewLauncher creates a new launcher object. +func NewLauncher(kubeClient k8s_client.Interface, tridentClient tridentrest.Interface, + tridentEphemeralClient tridentrest.Interface, + tridentDeployment *v1beta1.Deployment) *Launcher { + return &Launcher{ + kubeClient: kubeClient, + tridentClient: tridentClient, + tridentEphemeralClient: tridentEphemeralClient, + tridentDeployment: tridentDeployment, } - return GetVolume(ip, volName) } -func ProvisionVolume(kubeClient *KubeClient) { - tridentIP, err := kubeClient.StartInMemoryTrident() - if err != nil { - log.Fatalf("Unable to start Trident: %v\nCleanup of the Trident pod "+ - "may be necessary.", err) +// ValidateVersion checks whether the container orchestrator version is +// supported or not. +func (launcher *Launcher) ValidateVersion(versionInfo *version.Info) (bool, error) { + return k8sfrontend.VersionSupported(versionInfo) +} + +// Run runs the launcher. +func (launcher *Launcher) Run() (errors []error) { + var ( + deleteTridentEphemeral = false + deploymentExists = false + deploymentCreated = false + err error = nil + launcherErr error = nil + pv *v1.PersistentVolume = nil + pvExists = false + pvcExists = false + pvcCreated = false + pvc *v1.PersistentVolumeClaim = nil + pvCreated = false + tridentEphemeralCreated = false + tridentEphemeralPod *v1.Pod + tridentPod *v1.Pod + volConfig *storage.VolumeConfig + volumeCreated = false + ) + + errors = make([]error, 0) + + defer func() { + // Cleanup after success (err == nil) + if launcherErr == nil && tridentEphemeralCreated { + // Delete pod trident-ephemeral + if errCleanup := stopTridentEphemeralPod(launcher.kubeClient); errCleanup != nil { + log.WithFields(log.Fields{ + "error": errCleanup, + "pod": tridentEphemeralPodName, + }).Error("Launcher failed to delete the pod, so it needs " + + "to be manually deleted!") + errors = append(errors, + fmt.Errorf("Launcher failed to delete pod %s: %s. "+ + "Manual deletion is required!", + tridentEphemeralPodName, errCleanup)) + } else { + log.WithFields(log.Fields{ + "pod": tridentEphemeralPodName, + }).Info("Launcher successfully deleted the pod during cleanup.") + } + } else if launcherErr != nil { + log.Error(launcherErr) + errors = append(errors, launcherErr) + + // Cleanup after failure (err != nil) + var gracePeriod int64 = 0 + options := &v1.DeleteOptions{ + GracePeriodSeconds: &gracePeriod, + } + log.WithFields(log.Fields{ + "error": err, + "pvcCreated": pvcCreated, + "pvCreated": pvCreated, + "volumeCreated": volumeCreated, + "deploymentCreated": deploymentCreated, + "tridentEphemeralCreated": tridentEphemeralCreated, + }).Debug("Launcher is starting the cleanup after failure.") + if pvcCreated && !deploymentCreated { + // Delete the PVC + if errCleanup := launcher.kubeClient.DeletePVC(*tridentPVCName, options); errCleanup != nil { + log.WithFields(log.Fields{ + "error": errCleanup, + "pvc": *tridentPVCName, + }).Error("Launcher failed to delete the PVC during cleanup. " + + "Manual deletion is required!") + errors = append(errors, + fmt.Errorf("Launcher failed to delete PVC %s during cleanup: %s. "+ + "Manual deletion is required!", *tridentPVCName, errCleanup)) + } else { + log.WithFields(log.Fields{ + "pvc": *tridentPVCName, + }).Info("Launcher successfully deleted the PVC during cleanup.") + } + } + if pvCreated && !deploymentCreated { + // Delete the PV + if errCleanup := launcher.kubeClient.DeletePV(*tridentPVName, options); errCleanup != nil { + log.WithFields(log.Fields{ + "error": errCleanup, + "pv": *tridentPVName, + }).Error("Launcher failed to delete the PV during cleanup! " + + "Manual deletion is required!") + errors = append(errors, + fmt.Errorf("Launcher failed to delete PV %s during cleanup: %s. "+ + "Manual deletion is required!", *tridentPVName, errCleanup)) + } else { + log.WithFields(log.Fields{ + "pv": *tridentPVName, + }).Info("Launcher successfully deleted the PV during cleanup.") + } + } + if volumeCreated && !deploymentCreated { + // Delete the volume + if errCleanup := deleteVolume(launcher.tridentEphemeralClient, *tridentVolumeName); errCleanup != nil { + log.WithFields(log.Fields{ + "error": errCleanup, + "volume": *tridentVolumeName, + }).Error("Launcher failed to delete the volume during cleanup. "+ + "Manual deletion is required! "+ + "Run 'kubectl logs %s' for more information.", + tridentEphemeralPodName) + errors = append(errors, + fmt.Errorf("Launcher failed to delete volume %s during cleanup: %s. "+ + "Manual deletion is required! "+ + "Run 'kubectl logs %s' for more information.", + *tridentVolumeName, errCleanup)) + deleteTridentEphemeral = false + } else { + log.WithFields(log.Fields{ + "volume": *tridentVolumeName, + }).Info("Launcher successfully deleted the volume during cleanup.") + } + } + if tridentEphemeralCreated && (deploymentCreated || deleteTridentEphemeral) { + //TODO: Capture the logs for pod trident-ephemeral + /* kubectl logs trident-ephemeral --v=8 + * https://kubernetes.io/docs/api-reference/v1/operations/ + * https://kubernetes.io/docs/admin/authorization/ + */ + // Delete pod trident-ephemeral + if errCleanup := stopTridentEphemeralPod(launcher.kubeClient); errCleanup != nil { + log.WithFields(log.Fields{ + "error": errCleanup, + "pod": tridentEphemeralPodName, + }).Error("Launcher failed to delete the pod. " + + "Manual deletion is required!") + errors = append(errors, + fmt.Errorf("Launcher failed to delete pod %s: %s. "+ + "Manual deletion is required!", + tridentEphemeralPodName, errCleanup)) + } else { + log.Infof("Launcher successfully deleted pod %s during cleanup.", + tridentEphemeralPodName) + } + } + } + }() + + // Check for an existing Trident deployment + if deploymentExists, err = launcher.kubeClient.CheckDeploymentExists(launcher.tridentDeployment.Name); deploymentExists { + launcherErr = fmt.Errorf("Launcher detected a preexisting deployment "+ + "called %s, so it will quit!", launcher.tridentDeployment.Name) + return + } else if err != nil { + launcherErr = fmt.Errorf("Launcher couldn't establish the presence "+ + "of deployment %s: %s. Please check your service account setup.", + launcher.tridentDeployment.Name, err) + return } - log.Debugf("Started Trident at %s", tridentIP) - err = WaitOnline(tridentIP) - if err != nil { - log.Fatalf("%v", err) + + // Check for an existing PVC for Trident + if pvcExists, err = launcher.kubeClient.CheckPVCExists(*tridentPVCName); pvcExists { + var phase v1.PersistentVolumeClaimPhase + log.WithFields(log.Fields{ + "pvc": *tridentPVCName, + }).Info("Launcher detected a preexisting PVC. It assumes " + + "this PVC was created for the Trident deployment.") + phase, err = launcher.kubeClient.GetPVCPhase(*tridentPVCName) + if err != nil { + launcherErr = fmt.Errorf( + "Launcher couldn't detect the phase for PVC %s: %s", + *tridentPVCName, err) + return + } + switch phase { + case v1.ClaimPending: + log.WithFields(log.Fields{ + "pvc": *tridentPVCName, + }).Info("Launcher detected that the PVC is still pending; " + + "proceeding with the creation of the corresponding PV.") + case v1.ClaimBound: + pvExists = true + log.WithFields(log.Fields{ + "pvc": *tridentPVCName, + }).Info("Launcher detected that the PVC is bound; proceeding " + + "with the creation of the Trident deployment.") + case v1.ClaimLost: + launcherErr = fmt.Errorf("Please delete the preexisting PVC %s "+ + "and try again.", *tridentPVCName) + return + } + } else if err != nil { + launcherErr = fmt.Errorf("Launcher couldn't establish the presence "+ + "of PVC %s: %s", *tridentPVCName, err) + return } - backendName, err := PostBackend(tridentIP) - if err != nil { - log.Fatalf("Unable to post backend: %v\nCleanup of the Trident pod is"+ - " necessary", err) + + if !pvExists { + // Start ephemeral Trident + if tridentEphemeralPod, err = createTridentEphemeralPod(launcher.kubeClient); err != nil { + launcherErr = fmt.Errorf("Launcher failed to launch pod %s: %s", + tridentEphemeralPodName, err) + return + } + log.WithFields(log.Fields{ + "pod": tridentEphemeralPodName, + }).Info("Launcher created the pod.") + tridentEphemeralCreated = true + + // Wait for the pod to run + if tridentEphemeralPod, err = launcher.kubeClient.GetRunningPod( + tridentEphemeralPod, k8sTimeout, + tridentEphemeralLabels); err != nil { + launcherErr = err + return + } + + // Create Trident client + if tridentEphemeralPod.Status.PodIP == "" { + launcherErr = fmt.Errorf("Pod %s doesn't have an IP address!", + tridentEphemeralPod.Name) + return + } + log.WithFields(log.Fields{ + "pod": tridentEphemeralPod.Name, + "ipAddress": tridentEphemeralPod.Status.PodIP, + }).Infof("Launcher detected the IP address for the pod.") + launcher.tridentEphemeralClient.Configure(tridentEphemeralPod.Status.PodIP, + tridentDefaultPort, *tridentTimeout) + + // Check the pod is functional + if err = checkTridentAPI(launcher.tridentEphemeralClient, tridentEphemeralPod.Name); err != nil { + launcherErr = fmt.Errorf( + "Launcher failed to bring up a functional pod: %s "+ + "Try 'kubectl logs %s' to diagnose the problem.", + err, tridentEphemeralPod.Name) + return + } + + // Provision the volume + if err = provisionVolume(launcher.tridentEphemeralClient); err != nil { + launcherErr = fmt.Errorf("Launcher failed in adding volume %s: %s", + *tridentVolumeName, err.Error()) + return + } + volumeCreated = true + log.WithFields(log.Fields{ + "volume": *tridentVolumeName, + "volumeSize": fmt.Sprintf("%d%s", *tridentVolumeSize, "GB"), + }).Info("Launcher successfully created the volume.") + deleteTridentEphemeral = true } - log.Debugf("Added backend %s", backendName) - storagePoolNames, err := GetBackendStoragePools(tridentIP, backendName) - if err != nil { - log.Fatalf("Unable to retrieve storagePool names: %v\nCleanup of "+ - "the Trident pod is necessary.", err) + + if !pvcExists { + // Create the PVC + if pvc, err = createPVC(launcher.kubeClient, *tridentPVCName); err != nil { + launcherErr = fmt.Errorf("Launcher failed in creating PVC %s: %s", + *tridentPVCName, err) + return + } + pvcCreated = true + log.WithFields(log.Fields{ + "pvc": *tridentPVCName, + }).Info("Launcher successfully created the PVC.") + } else { + // Retrieve the preexisting PVC + if pvc, err = launcher.kubeClient.GetPVC(*tridentPVCName); err != nil { + launcherErr = fmt.Errorf("Launcher failed in getting PVC %s: %s", + *tridentPVCName, err) + return + } } - log.Debug("StoragePool names: ", strings.Join(storagePoolNames, ", ")) - err = PostStorageClass(tridentIP, backendName, storagePoolNames) - if err != nil { - log.Fatalf("Unable to post storage class: %v\nCleanup of the Trident "+ - "pod is necessary", err) + + // At this point, we should have created the volume and the PVC for the + // stateful Trident pod. + if !pvExists { + // Create the pre-bound PV + // Get the volume information + if volConfig, err = getVolume(launcher.tridentEphemeralClient, *tridentVolumeName); err != nil { + launcherErr = err + return + } + if pv, err = createPV(launcher.kubeClient, *tridentPVName, volConfig, pvc); err != nil { + launcherErr = fmt.Errorf("Launcher failed in creating PV %s: %s", + *tridentPVName, err) + return + } + pvCreated = true + log.WithFields(log.Fields{ + "pv": pv.Name, + "volume": volConfig.InternalName, + }).Info("Launcher successfully created the PV for the volume.") + + // Wait for the Trident PVC and PV to bind + if pvc, err = launcher.kubeClient.GetBoundPVC(pvc, + pv, k8sTimeout, tridentLabels); err != nil { + launcherErr = err + return + } } - log.Debug("Added storage class.") - volConfig, err := PostVolume(tridentIP) - if err != nil { - log.Fatalf("Unable to post volume: %v\nCleanup of the Trident pod "+ - "is necessary", err) + + // Start the stateful Trident + if launcher.tridentDeployment, err = launcher.kubeClient.CreateDeployment(launcher.tridentDeployment); err != nil { + launcherErr = fmt.Errorf("Launcher failed in creating deployment %s: %s", + launcher.tridentDeployment.Name, err) + return } - log.Debug("Provisioned volume with configuration: ", volConfig) - bound, err := kubeClient.CreateKubeVolume(volConfig) - if err != nil { - log.Fatalf("Unable to create PVC and PV: %v\nCleanup of Trident pod "+ - "and/or PVC may be necessary", err) + deploymentCreated = true + + // Get the stateful Trident pod + if tridentPod, err = launcher.kubeClient.GetPodByLabels( + k8s_client.CreateListOptions(k8sTimeout, + tridentLabels, launcher.tridentDeployment.ResourceVersion)); err != nil { + launcherErr = err + return } - if bound { - log.Infof("Provisioned volume %s for Trident", volConfig.Name) - } else { - log.Warnf("PVC bound to pre-existing volume; clean up volume %s.", - volConfig.Name) + log.WithFields(log.Fields{ + "pod": tridentPod.Name, + "podPhase": tridentPod.Status.Phase, + "deployment": launcher.tridentDeployment.Name, + }).Info("Launcher successfully retrieved information about the deployment.") + + // Wait for a running Pod + if tridentPod.Status.Phase != v1.PodRunning { + if tridentPod, err = launcher.kubeClient.GetRunningPod( + tridentPod, k8sTimeout, tridentLabels); err != nil { + launcherErr = fmt.Errorf("%s "+ + "Try 'kubectl describe pod %s' for more information.", + err, tridentPod.Name) + return + } else { + log.WithFields(log.Fields{ + "pod": tridentPod.Name, + "podPhase": tridentPod.Status.Phase, + "deployment": launcher.tridentDeployment.Name, + }).Info("Launcher successfully retrieved information about the deployment.") + } } - log.Debug("Provisioned volume.") - err = kubeClient.RemoveInMemoryTrident() - if err != nil { - log.Fatalf("Unable to delete ephememral Trident: %v\nRemove it "+ - "manually before starting the pod.", err) + + // Create the client for the stateful pod + if tridentPod.Status.PodIP == "" { + launcherErr = fmt.Errorf("Pod %s doesn't have an IP address!", + tridentPod.Name) + return } + log.WithFields(log.Fields{ + "pod": tridentPod.Name, + "ipAddress": tridentPod.Status.PodIP, + }).Infof("Launcher detected the IP address for the pod.") + launcher.tridentClient = launcher.tridentClient.Configure(tridentPod.Status.PodIP, + tridentDefaultPort, *tridentTimeout) + + // Check the pod is functional + if err = checkTridentAPI(launcher.tridentClient, tridentPod.Name); err != nil { + log.Warnf("%s Perhaps Trident is bootstrapping a lot of state. "+ + "Try 'kubectl logs %s -c %s' to see if Trident has bootstrapped successfully.", + err, tridentPod.Name, tridentContainerName) + } + return } func main() { var ( - config *k8srest.Config - err error + config *k8srest.Config + err error = nil + kubeClient k8s_client.Interface + tridentDeployment *v1beta1.Deployment ) + // Process command line arguments flag.Parse() if *debug { log.SetLevel(log.DebugLevel) } - if *apiServerIP != "" { config, err = clientcmd.BuildConfigFromFlags(*apiServerIP, "") if err != nil { - log.Fatal("Unable to get client config: ", err) + log.WithFields(log.Fields{ + "error": err, + }).Fatal("Launcher is unable to get the config for the API server client!") } } else { config, err = k8srest.InClusterConfig() if err != nil { - log.Fatal("Unable to get client config through service account: ", - err) + log.WithFields(log.Fields{ + "error": err, + }).Fatal("Launcher is unable to get the config for the API " + + "server client through service account!") } } - kubeClient, err := NewKubeClient(config, *deploymentFile) - if err != nil { - log.Fatal("Unable to construct client: ", err) + + // Read Trident deployment definition + if tridentDeployment, err = createTridentDeploymentFromFile(*deploymentFile); err != nil { + log.WithFields(log.Fields{ + "error": err, + "deploymentFile": *deploymentFile, + }).Fatal("Launcher failed in reading the deployment file!") } - running, err := kubeClient.CheckTridentRunning() - if err != nil { - log.Fatal("Unable to query whether Trident is running: ", err) + // Retrieve Trident pod label, image, and namespace + tridentLabels = tridentDeployment.Spec.Template.Labels + if len(tridentLabels) == 0 { + log.WithFields(log.Fields{ + "deploymentFile": *deploymentFile, + }).Fatal("Launcher requires the deployment definition to have labels for the Trident pod!") + } + for _, container := range tridentDeployment.Spec.Template.Spec.Containers { + if container.Name == tridentContainerName { + tridentImage = container.Image + tridentNamespace = tridentDeployment.Namespace + break + } } - if running { - log.Info("Trident already running.") - return + if tridentImage == "" { + log.WithFields(log.Fields{ + "container": tridentContainerName, + "deploymentFile": *deploymentFile, + }).Fatal("Launcher couldn't find the Trident container in the " + + "deployment definition!") } - - exists, err := kubeClient.PVCExists() - if err != nil { - log.Fatalf("Unable to query %s PVC existence: %v", pvcName, err) + if tridentNamespace == "" { + log.WithFields(log.Fields{ + "deploymentFile": *deploymentFile, + }).Fatal("Launcher requires a namespace in the deployment definition!") } - if exists { - // At some point, we may want to ensure that the PVC is bound and not - // errored. For now, though, assume that the user will delete the PVC - // if it is errored. - log.Info("Trident PVC already exists; starting the pod.") - } else { - ProvisionVolume(kubeClient) + + // Set up the Kubernetes API server client + if kubeClient, err = k8s_client.NewKubeClient(config, tridentNamespace); err != nil { + log.WithFields(log.Fields{ + "error": err, + }).Fatal("Launcher was unable to create an API client!") + } + k8sVersion := kubeClient.Version() + log.WithFields(log.Fields{ + "major": k8sVersion.Major, + "minor": k8sVersion.Minor, + "gitCommit": k8sVersion.GitCommit, + }).Infof("Launcher successfully retrieved the version of Kubernetes.") + + // Set up clients for the ephemeral and stateful Trident pods + tridentEphemeralClient := &tridentrest.TridentClient{} + tridentClient := &tridentrest.TridentClient{} + + // Create the launcher object + launcher := NewLauncher(kubeClient, tridentClient, tridentEphemeralClient, + tridentDeployment) + + // Check whether this version of Kubernetes is supported by Trident + if supported, err := launcher.ValidateVersion(k8sVersion); err != nil { + log.WithFields(log.Fields{ + "error": err, + }).Fatal("Launcher encountered an error in checking the version of " + + "the container orchestrator!") + } else if !supported { + log.WithFields(log.Fields{ + "yourVersion": fmt.Sprintf("%s.%s", + k8sVersion.Major, k8sVersion.Minor), + "minKubernetesVersion": k8sfrontend.KubernetesVersionMin, + "maxKubernetesVersion": k8sfrontend.KubernetesVersionMax, + "minOpenShiftVersion": k8sfrontend.OpenShiftVersionMin, + "maxOpenShiftVersion": k8sfrontend.OpenShiftVersionMax, + }).Warn("Launcher may not support this version of the container " + + "orchestrator!") } - log.Infof("Provisioned PVC %s; starting Trident.", pvcName) - err = kubeClient.StartFullTrident() - if err != nil { - log.Fatal("Unable to start full Trident deployment: ", err) + + // Run the launcher + if errors := launcher.Run(); len(errors) == 0 { + log.Info("Trident deployment was successfully launched.") } - log.Info("Trident is now running.") } diff --git a/launcher/launcher_test.go b/launcher/launcher_test.go new file mode 100644 index 000000000..93ce6bdb6 --- /dev/null +++ b/launcher/launcher_test.go @@ -0,0 +1,288 @@ +// Copyright 2016 NetApp, Inc. All Rights Reserved. + +package main + +import ( + "reflect" + "strings" + "testing" + + "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/pkg/apis/extensions/v1beta1" + "k8s.io/client-go/pkg/version" + + tridentrest "github.com/netapp/trident/frontend/rest" + "github.com/netapp/trident/k8s_client" + "github.com/netapp/trident/storage" +) + +func TestValidKubeVersion(t *testing.T) { + k8sVersion := &version.Info{ + Major: "1", + Minor: "5", + } + launcher := &Launcher{} + supported, err := launcher.ValidateVersion(k8sVersion) + if !supported || err != nil { + t.Fatalf("The test for a valid Kubernetes version failed: %s", err) + } +} + +func TestInvalidKubeVersion(t *testing.T) { + k8sVersion := &version.Info{ + Major: "1", + Minor: "3", + } + launcher := &Launcher{} + supported, err := launcher.ValidateVersion(k8sVersion) + if supported || err != nil { + t.Fatalf("The test for an invalid Kubernetes version failed: %s", err) + } +} + +func TestValidOpenShiftVersion(t *testing.T) { + osVersion := &version.Info{ + Major: "3", + Minor: "5", + } + launcher := &Launcher{} + supported, err := launcher.ValidateVersion(osVersion) + if !supported || err != nil { + t.Fatalf("The test for a valid OpenShift version failed: %s", err) + } +} + +func TestInvalidOpenShiftVersion(t *testing.T) { + osVersion := &version.Info{ + Major: "3", + Minor: "3", + } + launcher := &Launcher{} + supported, err := launcher.ValidateVersion(osVersion) + if supported || err != nil { + t.Fatalf("The test for an invalid OpenShift version failed: %s", err) + } +} + +func TestTridentClientVolume(t *testing.T) { + tridentClientFailMatrix := map[string]bool{} + tridentClient := tridentrest.NewFakeTridentClient(tridentClientFailMatrix) + getVolResponse, err := tridentClient.GetVolume("trident") + if err != nil || getVolResponse.Error != "Volume wasn't found" { + t.Fatal("Error in retrieving a non-existent volume!") + } + _, err = + tridentClient.AddVolume(&storage.VolumeConfig{Name: "trident"}) + if err != nil { + t.Fatal("Error in creating a volume!") + } + getVolResponse, err = tridentClient.GetVolume("trident") + if err != nil || getVolResponse.Error != "" || + getVolResponse.Volume.Config.Name != "trident" { + t.Fatal("Error in retrieving a nonexistent volume!") + } + deleteResponse, err := tridentClient.DeleteVolume("trident") + if err != nil || deleteResponse.Error != "" { + t.Fatal("Error in deleting a volume!") + } + deleteResponse, err = tridentClient.DeleteVolume("trident") + if err != nil || deleteResponse.Error == "" { + t.Fatal("Deleting a volume should succeed only once!") + } +} + +func TestKubeSnapshotStateValid(t *testing.T) { + kubeClientFailMatrix := make(map[string]bool, 0) + kubeClient := k8s_client.NewFakeKubeClient(kubeClientFailMatrix) + snapshotBefore := kubeClient.SnapshotState() + snapshotAfter := kubeClient.SnapshotState() + if !reflect.DeepEqual(snapshotBefore, snapshotAfter) { + t.Fatal("Kubernetes state shouldn't have changed!") + } +} + +func TestKubeSnapshotStateInvalid(t *testing.T) { + var err error + kubeClientFailMatrix := make(map[string]bool, 0) + kubeClient := k8s_client.NewFakeKubeClient(kubeClientFailMatrix) + snapshotBefore := kubeClient.SnapshotState() + + tridentDeployment := &v1beta1.Deployment{} + tridentDeployment.Name = "trident" + tridentDeployment, err = kubeClient.CreateDeployment(tridentDeployment) + if err != nil { + t.Fatal(err) + } + + snapshotAfter := kubeClient.SnapshotState() + if reflect.DeepEqual(snapshotBefore, snapshotAfter) { + t.Fatal("Kubernetes state should have changed!") + } +} + +func TestExistingDeployment(t *testing.T) { + var err error + // Creating the parameters for launcher + tridentClientFailMatrix := map[string]bool{} + kubeClientFailMatrix := make(map[string]bool, 0) + kubeClient := k8s_client.NewFakeKubeClient(kubeClientFailMatrix) + tridentClient := tridentrest.NewFakeTridentClient(tridentClientFailMatrix) + tridentEphemeralClient := tridentrest.NewFakeTridentClient(tridentClientFailMatrix) + tridentDeployment := &v1beta1.Deployment{} + tridentDeployment.Name = "trident" + + // Configuring the environment + tridentDeployment, err = kubeClient.CreateDeployment(tridentDeployment) + if err != nil { + t.Fatal(err) + } + + // Get the state of the Kubernetes cluster before running launcher + snapshotBefore := kubeClient.SnapshotState() + + // Running launcher + launcher := NewLauncher(kubeClient, tridentClient, tridentEphemeralClient, + tridentDeployment) + errors := launcher.Run() + if len(errors) != 1 { + t.Fatal("Launcher should have failed with the preexisting deployment!") + } + if !strings.Contains(errors[0].Error(), + "Launcher detected a preexisting deployment") { + t.Fatal("Launcher returned an incorrect error!") + } + + // Make sure launcher didn't change the state of the Kubernetes cluster + snapshotAfter := kubeClient.SnapshotState() + if !reflect.DeepEqual(snapshotBefore, snapshotAfter) { + t.Fatal("Launcher didn't clean up state properly!") + } +} + +func TestExistingDeploymentFailure(t *testing.T) { + var err error + // Creating the parameters for launcher + tridentClientFailMatrix := map[string]bool{} + kubeClientFailMatrix := map[string]bool{ + "GetDeployment": true, + } + kubeClient := k8s_client.NewFakeKubeClient(kubeClientFailMatrix) + tridentClient := tridentrest.NewFakeTridentClient(tridentClientFailMatrix) + tridentEphemeralClient := tridentrest.NewFakeTridentClient(tridentClientFailMatrix) + tridentDeployment := &v1beta1.Deployment{} + tridentDeployment.Name = "trident" + + // Configuring the environment + tridentDeployment, err = kubeClient.CreateDeployment(tridentDeployment) + if err != nil { + t.Fatal(err) + } + + // Get the state of the Kubernetes cluster before running launcher + snapshotBefore := kubeClient.SnapshotState() + + // Running launcher + launcher := NewLauncher(kubeClient, tridentClient, tridentEphemeralClient, + tridentDeployment) + errors := launcher.Run() + if len(errors) != 1 { + t.Fatal("Launcher should have failed!") + } + if !strings.Contains(errors[0].Error(), + "Launcher couldn't establish the presence of deployment") { + t.Fatal("Launcher returned an incorrect error!") + } + + // Make sure launcher didn't change the state of the Kubernetes cluster + snapshotAfter := kubeClient.SnapshotState() + if !reflect.DeepEqual(snapshotBefore, snapshotAfter) { + t.Fatal("Launcher didn't clean up state properly!") + } +} + +func TestExistingPVCFailure(t *testing.T) { + var err error + // Creating the parameters for launcher + tridentClientFailMatrix := map[string]bool{} + kubeClientFailMatrix := map[string]bool{ + "GetPVC": true, + } + kubeClient := k8s_client.NewFakeKubeClient(kubeClientFailMatrix) + tridentClient := tridentrest.NewFakeTridentClient(tridentClientFailMatrix) + tridentEphemeralClient := tridentrest.NewFakeTridentClient(tridentClientFailMatrix) + tridentDeployment := &v1beta1.Deployment{} + tridentDeployment.Name = "trident" + + // Configuring the environment + tridentPVC := &v1.PersistentVolumeClaim{} + tridentPVC.Name = "trident" + tridentPVC, err = kubeClient.CreatePVC(tridentPVC) + if err != nil { + t.Fatal(err) + } + + // Get the state of the Kubernetes cluster before running launcher + snapshotBefore := kubeClient.SnapshotState() + + // Running launcher + launcher := NewLauncher(kubeClient, tridentClient, tridentEphemeralClient, + tridentDeployment) + errors := launcher.Run() + if len(errors) != 1 { + t.Fatal("Launcher should have failed!") + } + if !strings.Contains(errors[0].Error(), + "Launcher couldn't establish the presence of PVC") { + t.Fatal("Launcher returned an incorrect error!") + } + + // Make sure launcher didn't change the state of the Kubernetes cluster + snapshotAfter := kubeClient.SnapshotState() + if !reflect.DeepEqual(snapshotBefore, snapshotAfter) { + t.Fatal("Launcher didn't clean up state properly!") + } +} + +func TestPrexistingBoundPVCFailedDeployment(t *testing.T) { + var err error + // Creating the parameters for launcher + tridentClientFailMatrix := map[string]bool{} + kubeClientFailMatrix := map[string]bool{ + "CreateDeployment": true, + } + kubeClient := k8s_client.NewFakeKubeClient(kubeClientFailMatrix) + tridentClient := tridentrest.NewFakeTridentClient(tridentClientFailMatrix) + tridentEphemeralClient := tridentrest.NewFakeTridentClient(tridentClientFailMatrix) + tridentDeployment := &v1beta1.Deployment{} + tridentDeployment.Name = "trident" + + // Configuring the environment + tridentPVC := &v1.PersistentVolumeClaim{} + tridentPVC.Name = "trident" + tridentPVC.Status.Phase = v1.ClaimBound + tridentPVC, err = kubeClient.CreatePVC(tridentPVC) + if err != nil { + t.Fatal(err) + } + + // Get the state of the Kubernetes cluster before running launcher + snapshotBefore := kubeClient.SnapshotState() + + // Running launcher + launcher := NewLauncher(kubeClient, tridentClient, tridentEphemeralClient, + tridentDeployment) + errors := launcher.Run() + if len(errors) != 1 { + t.Fatal("Launcher should have failed!") + } + if !strings.Contains(errors[0].Error(), + "CreateDeployment failed") { + t.Fatal("Launcher returned an incorrect error!") + } + + // Make sure launcher didn't change the state of the Kubernetes cluster + snapshotAfter := kubeClient.SnapshotState() + if !reflect.DeepEqual(snapshotBefore, snapshotAfter) { + t.Fatal("Launcher didn't clean up state properly!") + } +} diff --git a/storage/backend.go b/storage/backend.go index e7a8f61ce..c83326b1b 100644 --- a/storage/backend.go +++ b/storage/backend.go @@ -123,30 +123,35 @@ func (b *StorageBackend) AddVolume( } if err := b.Driver.Create(volConfig.InternalName, volSize, args); err != nil { - log.WithFields(log.Fields{ - "backend": b.Name, - "storage_pool": storagePool.Name, - "volume": volConfig.Name, - "error": err, - }).Warn("Failed to create the volume on this backend.") - } else { - if err = b.Driver.CreateFollowup(volConfig); err != nil { - errDestroy := b.Driver.Destroy(volConfig.InternalName) - if errDestroy != nil { - log.WithFields(log.Fields{ - "backend": b.Name, - "volume": volConfig.InternalName, - }).Warnf("Mapping the created volume failed "+ - "and %s wasn't able to delete it afterwards: %s. "+ - "Volume needs to be manually deleted.", - config.OrchestratorName, errDestroy.Error()) - } + // Implement idempotency at the Trident layer + // Ignore the error if the volume exists already + if b.Driver.Get(volConfig.InternalName) != nil { + log.WithFields(log.Fields{ + "backend": b.Name, + "storagePool": storagePool.Name, + "volume": volConfig.Name, + "error": err, + }).Warn("Failed to create the volume on this backend.") return nil, err } - vol := NewVolume(volConfig, b, storagePool) - storagePool.AddVolume(vol, false) - return vol, err } + + if err = b.Driver.CreateFollowup(volConfig); err != nil { + errDestroy := b.Driver.Destroy(volConfig.InternalName) + if errDestroy != nil { + log.WithFields(log.Fields{ + "backend": b.Name, + "volume": volConfig.InternalName, + }).Warnf("Mapping the created volume failed "+ + "and %s wasn't able to delete it afterwards: %s. "+ + "Volume needs to be manually deleted.", + config.OrchestratorName, errDestroy) + } + return nil, err + } + vol := NewVolume(volConfig, b, storagePool) + storagePool.AddVolume(vol, false) + return vol, err } else { log.WithFields(log.Fields{ "storagePoolName": storagePool.Name, diff --git a/trident-installer/install_trident.sh b/trident-installer/install_trident.sh index 0da8ab0b0..17f9b17cd 100755 --- a/trident-installer/install_trident.sh +++ b/trident-installer/install_trident.sh @@ -43,8 +43,13 @@ command -v curl > /dev/null 2>&1 || \ command -v kubectl > /dev/null || \ { echo >&2 "$0 requires kubectl present in \$PATH."; exit 1; } -kubectl delete configmap --ignore-not-found=true trident-launcher-config -kubectl delete pod --ignore-not-found=true trident-launcher +if [ -z $NAMESPACE ] +then + NAMESPACE="default" +fi + +kubectl --namespace=$NAMESPACE delete configmap --ignore-not-found=true trident-launcher-config +kubectl --namespace=$NAMESPACE delete pod --ignore-not-found=true trident-launcher if [ ! -z $NAMESPACE ] then @@ -58,11 +63,11 @@ then sed -i "s/serviceAccount: [A-Za-z-]\+/serviceAccount: $SERVICE_ACCOUNT/g" launcher-pod.yaml fi -kubectl create configmap trident-launcher-config --from-file=./setup +kubectl --namespace=$NAMESPACE create configmap trident-launcher-config --from-file=./setup if [ $? -ne 0 ]; then exit 1; fi -kubectl create -f ./launcher-pod.yaml +kubectl --namespace=$NAMESPACE create -f ./launcher-pod.yaml if [ $? -ne 0 ]; then exit 1; fi