From f8857bd83db1100978bab67f4fe85d64adecba7b Mon Sep 17 00:00:00 2001 From: Anur Ijuokarukas Date: Wed, 19 Apr 2023 15:09:26 +0800 Subject: [PATCH] feat(#27): supports for deployment waiting Signed-off-by: Anur Ijuokarukas --- pkg/chunkserver/chunkserver.go | 8 ++- pkg/chunkserver/spec.go | 63 ++++++++++++----------- pkg/k8sutil/deployment.go | 94 ++++++++++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 31 deletions(-) create mode 100644 pkg/k8sutil/deployment.go diff --git a/pkg/chunkserver/chunkserver.go b/pkg/chunkserver/chunkserver.go index 5a1986df..c9de0e31 100644 --- a/pkg/chunkserver/chunkserver.go +++ b/pkg/chunkserver/chunkserver.go @@ -6,6 +6,7 @@ import ( "github.com/coreos/pkg/capnslog" "github.com/pkg/errors" + v1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/types" curvev1 "github.com/opencurve/curve-operator/api/v1" @@ -106,14 +107,17 @@ func (c *Cluster) Start(nodeNameIP map[string]string) error { logger.Info("create physical pool successed") // 3. startChunkServers start all chunkservers for each device of every node - err = c.startChunkServers() + var chunkServers []*v1.Deployment + chunkServers, err = c.startChunkServers() if err != nil { return errors.Wrap(err, "failed to start chunkserver") } // 4. wait all chunkservers online before create logical pool logger.Info("starting all chunkserver") - time.Sleep(30 * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + k8sutil.WaitForDeploymentsToStart(ctx, c.context.Clientset, 3*time.Second, chunkServers...) // 5. create logical pool _, err = c.runCreatePoolJob(nodeNameIP, "logical_pool") diff --git a/pkg/chunkserver/spec.go b/pkg/chunkserver/spec.go index aae3dc58..c9e8ac5a 100644 --- a/pkg/chunkserver/spec.go +++ b/pkg/chunkserver/spec.go @@ -6,7 +6,8 @@ import ( "github.com/pkg/errors" apps "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" + v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -15,20 +16,21 @@ import ( ) // startChunkServers start all chunkservers for each device of every node -func (c *Cluster) startChunkServers() error { +func (c *Cluster) startChunkServers() ([]*v1.Deployment, error) { + results := make([]*v1.Deployment, 0) if len(job2DeviceInfos) == 0 { logger.Errorf("no job to format device and provision chunk file") - return nil + return results, nil } if len(chunkserverConfigs) == 0 { logger.Errorf("no device need to start chunkserver") - return nil + return results, nil } if len(job2DeviceInfos) != len(chunkserverConfigs) { logger.Errorf("no device need to start chunkserver") - return errors.New("failed to start chunkserver because of job numbers is not equal with chunkserver config") + return results, errors.New("failed to start chunkserver because of job numbers is not equal with chunkserver config") } _ = c.createStartCSConfigMap() @@ -41,18 +43,19 @@ func (c *Cluster) startChunkServers() error { err := c.createConfigMap(csConfig) if err != nil { - return errors.Wrapf(err, "failed to create chunkserver configmap for %v", config.ChunkserverConfigMapName) + return results, errors.Wrapf(err, "failed to create chunkserver configmap for %v", + config.ChunkserverConfigMapName) } d, err := c.makeDeployment(&csConfig) if err != nil { - return errors.Wrap(err, "failed to create chunkserver Deployment") + return results, errors.Wrap(err, "failed to create chunkserver Deployment") } newDeployment, err := c.context.Clientset.AppsV1().Deployments(c.namespacedName.Namespace).Create(d) if err != nil { if !kerrors.IsAlreadyExists(err) { - return errors.Wrapf(err, "failed to create chunkserver deployment %s", csConfig.ResourceName) + return results, errors.Wrapf(err, "failed to create chunkserver deployment %s", csConfig.ResourceName) } logger.Infof("deployment for chunkserver %s already exists. updating if needed", csConfig.ResourceName) @@ -63,12 +66,11 @@ func (c *Cluster) startChunkServers() error { } else { logger.Infof("Deployment %s has been created , waiting for startup", newDeployment.GetName()) // TODO:wait for the new deployment - // deploymentsToWaitFor = append(deploymentsToWaitFor, newDeployment) + results = append(results, newDeployment) } // update condition type and phase etc. } - - return nil + return results, nil } // createCSClientConfigMap create cs_client configmap @@ -88,14 +90,15 @@ func (c *Cluster) createCSClientConfigMap() error { // 3. replace ${} to specific parameters replacedCsClientData, err := config.ReplaceConfigVars(csClientCMData, &chunkserverConfigs[0]) if err != nil { - return errors.Wrap(err, "failed to Replace cs_client config template to generate a new cs_client configmap to start server.") + return errors.Wrap(err, + "failed to Replace cs_client config template to generate a new cs_client configmap to start server.") } csClientConfigMap := map[string]string{ config.CSClientConfigMapDataKey: replacedCsClientData, } - cm := &v1.ConfigMap{ + cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: config.CSClientConfigMapName, Namespace: c.namespacedName.Namespace, @@ -146,7 +149,7 @@ func (c *Cluster) CreateS3ConfigMap() error { config.S3ConfigMapDataKey: configMapData, } - cm := &v1.ConfigMap{ + cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: config.S3ConfigMapName, Namespace: c.namespacedName.Namespace, @@ -175,7 +178,7 @@ func (c *Cluster) createStartCSConfigMap() error { startChunkserverScriptFileDataKey: script.START, } - cm := &v1.ConfigMap{ + cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: startChunkserverConfigMapName, Namespace: c.namespacedName.Namespace, @@ -217,7 +220,8 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error { // 3. replace ${} to specific parameters replacedChunkServerData, err := config.ReplaceConfigVars(chunkserverData, &csConfig) if err != nil { - return errors.Wrap(err, "failed to Replace chunkserver config template to generate a new chunkserver configmap to start server.") + return errors.Wrap(err, + "failed to Replace chunkserver config template to generate a new chunkserver configmap to start server.") } // for debug @@ -227,7 +231,7 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error { config.ChunkserverConfigMapDataKey: replacedChunkServerData, } - cm := &v1.ConfigMap{ + cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: csConfig.CurrentConfigMapName, Namespace: c.namespacedName.Namespace, @@ -237,7 +241,8 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error { err = c.ownerInfo.SetControllerReference(cm) if err != nil { - return errors.Wrapf(err, "failed to set owner reference to chunkserverconfig configmap %q", config.ChunkserverConfigMapName) + return errors.Wrapf(err, "failed to set owner reference to chunkserverconfig configmap %q", + config.ChunkserverConfigMapName) } // Create chunkserver config in cluster @@ -254,19 +259,19 @@ func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*apps.Deployment, vols, _ := c.createTopoAndToolVolumeAndMount() volumes = append(volumes, vols...) - podSpec := v1.PodTemplateSpec{ + podSpec := corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Name: csConfig.ResourceName, Labels: c.getChunkServerPodLabels(csConfig), }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ c.makeCSDaemonContainer(csConfig), }, NodeName: csConfig.NodeName, - RestartPolicy: v1.RestartPolicyAlways, + RestartPolicy: corev1.RestartPolicyAlways, HostNetwork: true, - DNSPolicy: v1.DNSClusterFirstWithHostNet, + DNSPolicy: corev1.DNSClusterFirstWithHostNet, Volumes: volumes, }, } @@ -301,7 +306,7 @@ func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*apps.Deployment, } // makeCSDaemonContainer create chunkserver container -func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Container { +func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) corev1.Container { privileged := true runAsUser := int64(0) @@ -321,7 +326,7 @@ func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Containe argsChunkserverPort := strconv.Itoa(csConfig.Port) argsConfigFileMountPath := path.Join(config.ChunkserverConfigMapMountPathDir, config.ChunkserverConfigMapDataKey) - container := v1.Container{ + container := corev1.Container{ Name: "chunkserver", Command: []string{ "/bin/bash", @@ -339,16 +344,16 @@ func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Containe Image: c.spec.CurveVersion.Image, ImagePullPolicy: c.spec.CurveVersion.ImagePullPolicy, VolumeMounts: volMounts, - Ports: []v1.ContainerPort{ + Ports: []corev1.ContainerPort{ { Name: "listen-port", ContainerPort: int32(csConfig.Port), HostPort: int32(csConfig.Port), - Protocol: v1.ProtocolTCP, + Protocol: corev1.ProtocolTCP, }, }, - Env: []v1.EnvVar{{Name: "TZ", Value: "Asia/Hangzhou"}}, - SecurityContext: &v1.SecurityContext{ + Env: []corev1.EnvVar{{Name: "TZ", Value: "Asia/Hangzhou"}}, + SecurityContext: &corev1.SecurityContext{ Privileged: &privileged, RunAsUser: &runAsUser, RunAsNonRoot: &runAsNonRoot, diff --git a/pkg/k8sutil/deployment.go b/pkg/k8sutil/deployment.go new file mode 100644 index 00000000..a70935f5 --- /dev/null +++ b/pkg/k8sutil/deployment.go @@ -0,0 +1,94 @@ +package k8sutil + +import ( + "context" + "time" + + v1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +// WaitForDeploymentsToStart waits for the deployments to start, and returns a channel to indicate whether +// all deployments are started or not +// +// tickDuration is the interval to check the deployment status +// objectMeta is the metadata of the deployment +// +// we use the hub chan to collect the result of each deployment, and when all deployments are started, +// we return true, otherwise, we return false, this design let WaitForDeploymentToStart and +// WaitForDeploymentsToStart can be used in the same way +func WaitForDeploymentsToStart(ctx context.Context, clientSet kubernetes.Interface, tickDuration time.Duration, + objectMetas ...*v1.Deployment) chan bool { + length := len(objectMetas) + hub := make(chan bool, length) + defer close(hub) + for i := range objectMetas { + objectMata := objectMetas[i] + go func() { + if succeed := <-WaitForDeploymentToStart(ctx, clientSet, tickDuration, objectMata); !succeed { + hub <- false + return + } + }() + } + + chn := make(chan bool) + go func() { + defer close(chn) + for i := 0; i < length; i++ { + if succeed := <-hub; !succeed { + chn <- false + return + } + } + chn <- true + return + }() + return chn +} + +// WaitForDeploymentToStart waits for the deployment to start, and returns a channel to indicate whether +// the deployment is started or not +// +// tickDuration is the interval to check the deployment status +// objectMeta is the metadata of the deployment +func WaitForDeploymentToStart(ctx context.Context, clientSet kubernetes.Interface, tickDuration time.Duration, + objectMeta *v1.Deployment) chan bool { + ticker := time.NewTicker(tickDuration) + defer ticker.Stop() + + chn := make(chan bool) + go func() { + defer close(chn) + for { + select { + case <-ticker.C: + deployment, err := clientSet.AppsV1().Deployments(objectMeta.GetNamespace()).Get(objectMeta.GetName(), + metav1.GetOptions{}) + logger.Infof("waiting for deployment %s starting", deployment.Name) + if err != nil { + + // TODO: return the failed reason is required?? + logger.Errorf("failed to get deployment %s in cluster", objectMeta.GetName()) + chn <- false + return + } + if deployment.Status.ObservedGeneration != deployment.Status.ObservedGeneration && + deployment.Status.UpdatedReplicas > 0 && + deployment.Status.ReadyReplicas > 0 { + logger.Infof("deployment %s has been started", deployment.Name) + chn <- true + return + } + + // TODO: should log the unready reason, e.g. conditions, etc. to help debugging?? + case <-ctx.Done(): + chn <- false + logger.Infof("stop waiting for deployment %s to start due to context is done", objectMeta.GetName()) + return + } + } + }() + return chn +}