Skip to content

Commit

Permalink
feat(opencurve#27): supports for deployment waiting
Browse files Browse the repository at this point in the history
Signed-off-by: Anur Ijuokarukas <[email protected]>
  • Loading branch information
anurnomeru committed Apr 19, 2023
1 parent ebde98e commit fcb5240
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 35 deletions.
8 changes: 6 additions & 2 deletions pkg/chunkserver/chunkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/coreos/pkg/capnslog"
"github.com/pkg/errors"
v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/types"

curvev1 "github.com/opencurve/curve-operator/api/v1"
Expand Down Expand Up @@ -106,14 +107,17 @@ func (c *Cluster) Start(nodeNameIP map[string]string) error {
logger.Info("create physical pool successed")

// 3. startChunkServers start all chunkservers for each device of every node
err = c.startChunkServers()
var chunkServers []*v1.Deployment
chunkServers, err = c.startChunkServers()
if err != nil {
return errors.Wrap(err, "failed to start chunkserver")
}

// 4. wait all chunkservers online before create logical pool
logger.Info("starting all chunkserver")
time.Sleep(30 * time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
k8sutil.WaitForDeploymentsToStart(ctx, c.context.Clientset, 3*time.Second, chunkServers...)

// 5. create logical pool
_, err = c.runCreatePoolJob(nodeNameIP, "logical_pool")
Expand Down
75 changes: 42 additions & 33 deletions pkg/chunkserver/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (

"github.com/pkg/errors"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand All @@ -15,20 +16,21 @@ import (
)

// startChunkServers start all chunkservers for each device of every node
func (c *Cluster) startChunkServers() error {
func (c *Cluster) startChunkServers() ([]*v1.Deployment, error) {
results := make([]*v1.Deployment, 0)
if len(job2DeviceInfos) == 0 {
logger.Errorf("no job to format device and provision chunk file")
return nil
return results, nil
}

if len(chunkserverConfigs) == 0 {
logger.Errorf("no device need to start chunkserver")
return nil
return results, nil
}

if len(job2DeviceInfos) != len(chunkserverConfigs) {
logger.Errorf("no device need to start chunkserver")
return errors.New("failed to start chunkserver because of job numbers is not equal with chunkserver config")
return results, errors.New("failed to start chunkserver because of job numbers is not equal with chunkserver config")
}

_ = c.createStartCSConfigMap()
Expand All @@ -41,18 +43,19 @@ func (c *Cluster) startChunkServers() error {

err := c.createConfigMap(csConfig)
if err != nil {
return errors.Wrapf(err, "failed to create chunkserver configmap for %v", config.ChunkserverConfigMapName)
return results, errors.Wrapf(err, "failed to create chunkserver configmap for %v",
config.ChunkserverConfigMapName)
}

d, err := c.makeDeployment(&csConfig)
if err != nil {
return errors.Wrap(err, "failed to create chunkserver Deployment")
return results, errors.Wrap(err, "failed to create chunkserver Deployment")
}

newDeployment, err := c.context.Clientset.AppsV1().Deployments(c.namespacedName.Namespace).Create(d)
if err != nil {
if !kerrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "failed to create chunkserver deployment %s", csConfig.ResourceName)
return results, errors.Wrapf(err, "failed to create chunkserver deployment %s", csConfig.ResourceName)
}
logger.Infof("deployment for chunkserver %s already exists. updating if needed", csConfig.ResourceName)

Expand All @@ -63,18 +66,18 @@ func (c *Cluster) startChunkServers() error {
} else {
logger.Infof("Deployment %s has been created , waiting for startup", newDeployment.GetName())
// TODO:wait for the new deployment
// deploymentsToWaitFor = append(deploymentsToWaitFor, newDeployment)
results = append(results, newDeployment)
}
// update condition type and phase etc.
}

return nil
return results, nil
}

// createCSClientConfigMap create cs_client configmap
func (c *Cluster) createCSClientConfigMap() error {
// 1. get mds-conf-template from cluster
csClientCMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.CsClientConfigMapTemp, metav1.GetOptions{})
csClientCMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.CsClientConfigMapTemp,
metav1.GetOptions{})
if err != nil {
logger.Errorf("failed to get configmap %s from cluster", config.CsClientConfigMapTemp)
if kerrors.IsNotFound(err) {
Expand All @@ -88,14 +91,15 @@ func (c *Cluster) createCSClientConfigMap() error {
// 3. replace ${} to specific parameters
replacedCsClientData, err := config.ReplaceConfigVars(csClientCMData, &chunkserverConfigs[0])
if err != nil {
return errors.Wrap(err, "failed to Replace cs_client config template to generate a new cs_client configmap to start server.")
return errors.Wrap(err,
"failed to Replace cs_client config template to generate a new cs_client configmap to start server.")
}

csClientConfigMap := map[string]string{
config.CSClientConfigMapDataKey: replacedCsClientData,
}

cm := &v1.ConfigMap{
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: config.CSClientConfigMapName,
Namespace: c.namespacedName.Namespace,
Expand All @@ -105,7 +109,8 @@ func (c *Cluster) createCSClientConfigMap() error {

err = c.ownerInfo.SetControllerReference(cm)
if err != nil {
return errors.Wrapf(err, "failed to set owner reference to cs_client.conf configmap %q", config.CSClientConfigMapName)
return errors.Wrapf(err, "failed to set owner reference to cs_client.conf configmap %q",
config.CSClientConfigMapName)
}

// Create cs_client configmap in cluster
Expand All @@ -119,7 +124,8 @@ func (c *Cluster) createCSClientConfigMap() error {

// CreateS3ConfigMap creates s3 configmap
func (c *Cluster) CreateS3ConfigMap() error {
s3CMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.S3ConfigMapTemp, metav1.GetOptions{})
s3CMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.S3ConfigMapTemp,
metav1.GetOptions{})
if err != nil {
logger.Errorf("failed to get configmap %s from cluster", config.S3ConfigMapTemp)
if kerrors.IsNotFound(err) {
Expand All @@ -146,7 +152,7 @@ func (c *Cluster) CreateS3ConfigMap() error {
config.S3ConfigMapDataKey: configMapData,
}

cm := &v1.ConfigMap{
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: config.S3ConfigMapName,
Namespace: c.namespacedName.Namespace,
Expand Down Expand Up @@ -175,7 +181,7 @@ func (c *Cluster) createStartCSConfigMap() error {
startChunkserverScriptFileDataKey: script.START,
}

cm := &v1.ConfigMap{
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: startChunkserverConfigMapName,
Namespace: c.namespacedName.Namespace,
Expand All @@ -199,7 +205,8 @@ func (c *Cluster) createStartCSConfigMap() error {
// createConfigMap create chunkserver configmap for chunkserver server
func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
// 1. get mds-conf-template from cluster
chunkserverCMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.ChunkServerConfigMapTemp, metav1.GetOptions{})
chunkserverCMTemplate, err := c.context.Clientset.CoreV1().ConfigMaps(c.namespacedName.Namespace).Get(config.ChunkServerConfigMapTemp,
metav1.GetOptions{})
if err != nil {
logger.Errorf("failed to get configmap %s from cluster", config.ChunkServerConfigMapTemp)
if kerrors.IsNotFound(err) {
Expand All @@ -217,7 +224,8 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
// 3. replace ${} to specific parameters
replacedChunkServerData, err := config.ReplaceConfigVars(chunkserverData, &csConfig)
if err != nil {
return errors.Wrap(err, "failed to Replace chunkserver config template to generate a new chunkserver configmap to start server.")
return errors.Wrap(err,
"failed to Replace chunkserver config template to generate a new chunkserver configmap to start server.")
}

// for debug
Expand All @@ -227,7 +235,7 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {
config.ChunkserverConfigMapDataKey: replacedChunkServerData,
}

cm := &v1.ConfigMap{
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: csConfig.CurrentConfigMapName,
Namespace: c.namespacedName.Namespace,
Expand All @@ -237,7 +245,8 @@ func (c *Cluster) createConfigMap(csConfig chunkserverConfig) error {

err = c.ownerInfo.SetControllerReference(cm)
if err != nil {
return errors.Wrapf(err, "failed to set owner reference to chunkserverconfig configmap %q", config.ChunkserverConfigMapName)
return errors.Wrapf(err, "failed to set owner reference to chunkserverconfig configmap %q",
config.ChunkserverConfigMapName)
}

// Create chunkserver config in cluster
Expand All @@ -254,19 +263,19 @@ func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*apps.Deployment,
vols, _ := c.createTopoAndToolVolumeAndMount()
volumes = append(volumes, vols...)

podSpec := v1.PodTemplateSpec{
podSpec := corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: csConfig.ResourceName,
Labels: c.getChunkServerPodLabels(csConfig),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
c.makeCSDaemonContainer(csConfig),
},
NodeName: csConfig.NodeName,
RestartPolicy: v1.RestartPolicyAlways,
RestartPolicy: corev1.RestartPolicyAlways,
HostNetwork: true,
DNSPolicy: v1.DNSClusterFirstWithHostNet,
DNSPolicy: corev1.DNSClusterFirstWithHostNet,
Volumes: volumes,
},
}
Expand Down Expand Up @@ -301,7 +310,7 @@ func (c *Cluster) makeDeployment(csConfig *chunkserverConfig) (*apps.Deployment,
}

// makeCSDaemonContainer create chunkserver container
func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Container {
func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) corev1.Container {

privileged := true
runAsUser := int64(0)
Expand All @@ -321,7 +330,7 @@ func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Containe
argsChunkserverPort := strconv.Itoa(csConfig.Port)
argsConfigFileMountPath := path.Join(config.ChunkserverConfigMapMountPathDir, config.ChunkserverConfigMapDataKey)

container := v1.Container{
container := corev1.Container{
Name: "chunkserver",
Command: []string{
"/bin/bash",
Expand All @@ -339,16 +348,16 @@ func (c *Cluster) makeCSDaemonContainer(csConfig *chunkserverConfig) v1.Containe
Image: c.spec.CurveVersion.Image,
ImagePullPolicy: c.spec.CurveVersion.ImagePullPolicy,
VolumeMounts: volMounts,
Ports: []v1.ContainerPort{
Ports: []corev1.ContainerPort{
{
Name: "listen-port",
ContainerPort: int32(csConfig.Port),
HostPort: int32(csConfig.Port),
Protocol: v1.ProtocolTCP,
Protocol: corev1.ProtocolTCP,
},
},
Env: []v1.EnvVar{{Name: "TZ", Value: "Asia/Hangzhou"}},
SecurityContext: &v1.SecurityContext{
Env: []corev1.EnvVar{{Name: "TZ", Value: "Asia/Hangzhou"}},
SecurityContext: &corev1.SecurityContext{
Privileged: &privileged,
RunAsUser: &runAsUser,
RunAsNonRoot: &runAsNonRoot,
Expand Down
94 changes: 94 additions & 0 deletions pkg/k8sutil/deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package k8sutil

import (
"context"
"time"

v1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

// WaitForDeploymentsToStart waits for the deployments to start, and returns a channel to indicate whether
// all deployments are started or not
//
// tickDuration is the interval to check the deployment status
// objectMeta is the metadata of the deployment
//
// we use the hub chan to collect the result of each deployment, and when all deployments are started,
// we return true, otherwise, we return false, this design let WaitForDeploymentToStart and
// WaitForDeploymentsToStart can be used in the same way
func WaitForDeploymentsToStart(ctx context.Context, clientSet kubernetes.Interface, tickDuration time.Duration,
objectMetas ...*v1.Deployment) chan bool {
length := len(objectMetas)
hub := make(chan bool, length)
defer close(hub)
for i := range objectMetas {
objectMata := objectMetas[i]
go func() {
if succeed := <-WaitForDeploymentToStart(ctx, clientSet, tickDuration, objectMata); !succeed {
hub <- false
return
}
}()
}

chn := make(chan bool)
go func() {
defer close(chn)
for i := 0; i < length; i++ {
if succeed := <-hub; !succeed {
chn <- false
return
}
}
chn <- true
return
}()
return chn
}

// WaitForDeploymentToStart waits for the deployment to start, and returns a channel to indicate whether
// the deployment is started or not
//
// tickDuration is the interval to check the deployment status
// objectMeta is the metadata of the deployment
func WaitForDeploymentToStart(ctx context.Context, clientSet kubernetes.Interface, tickDuration time.Duration,
objectMeta *v1.Deployment) chan bool {
ticker := time.NewTicker(tickDuration)
defer ticker.Stop()

chn := make(chan bool)
go func() {
defer close(chn)
for {
select {
case <-ticker.C:
deployment, err := clientSet.AppsV1().Deployments(objectMeta.GetNamespace()).Get(objectMeta.GetName(),
metav1.GetOptions{})
logger.Infof("waiting for deployment %s starting", deployment.Name)
if err != nil {

// TODO: return the failed reason is required??
logger.Errorf("failed to get deployment %s in cluster", objectMeta.GetName())
chn <- false
return
}
if deployment.Status.ObservedGeneration != deployment.Status.ObservedGeneration &&
deployment.Status.UpdatedReplicas > 0 &&
deployment.Status.ReadyReplicas > 0 {
logger.Infof("deployment %s has been started", deployment.Name)
chn <- true
return
}

// TODO: should log the unready reason, e.g. conditions, etc. to help debugging??
case <-ctx.Done():
chn <- false
logger.Infof("stop waiting for deployment %s to start due to context is done", objectMeta.GetName())
return
}
}
}()
return chn
}

0 comments on commit fcb5240

Please sign in to comment.