diff --git a/compute/kubernetes/backend.go b/compute/kubernetes/backend.go index bc14b7a3..334f2abb 100644 --- a/compute/kubernetes/backend.go +++ b/compute/kubernetes/backend.go @@ -14,14 +14,12 @@ import ( v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" k8errors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" batchv1 "k8s.io/client-go/kubernetes/typed/batch/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "k8s.io/utils/ptr" "github.com/ohsu-comp-bio/funnel/config" "github.com/ohsu-comp-bio/funnel/events" @@ -69,13 +67,17 @@ func NewBackend(ctx context.Context, conf config.Kubernetes, reader tes.ReadOnly } b := &Backend{ - client: clientset.BatchV1().Jobs(conf.Namespace), - namespace: conf.Namespace, - template: conf.Template, - event: writer, - database: reader, - log: log, - config: kubeconfig, + bucket: conf.Bucket, + region: conf.Region, + client: clientset.BatchV1().Jobs(conf.Namespace), + namespace: conf.Namespace, + template: conf.Template, + pvTemplate: conf.PVTemplate, + pvcTemplate: conf.PVCTemplate, + event: writer, + database: reader, + log: log, + config: kubeconfig, } if !conf.DisableReconciler { @@ -88,9 +90,13 @@ func NewBackend(ctx context.Context, conf config.Kubernetes, reader tes.ReadOnly // Backend represents the local backend. type Backend struct { + bucket string + region string client batchv1.JobInterface namespace string template string + pvTemplate string + pvcTemplate string event events.Writer database tes.ReadOnlyServer log *logger.Logger @@ -134,7 +140,7 @@ func (b *Backend) Close() { //TODO: close database? } -// Create the Funnel Worker job +// Create the Funnel Worker job from kubernetes-template.yaml // Executor job is created in worker/kubernetes.go#Run func (b *Backend) createJob(task *tes.Task) (*v1.Job, error) { submitTpl, err := template.New(task.Id).Parse(b.template) @@ -156,7 +162,7 @@ func (b *Backend) createJob(task *tes.Task) (*v1.Job, error) { "DiskGb": res.GetDiskGb(), }) if err != nil { - return nil, fmt.Errorf("executing template: %v", err) + return nil, fmt.Errorf("executing Worker template: %v", err) } decode := scheme.Codecs.UniversalDeserializer().Decode @@ -172,133 +178,74 @@ func (b *Backend) createJob(task *tes.Task) (*v1.Job, error) { return job, nil } -func (b *Backend) createPVC(ctx context.Context, taskID string, resources *tes.Resources) error { - clientset, err := kubernetes.NewForConfig(b.config) +// Create the Worker/Executor PVC from config/kubernetes-pvc.yaml +// TODO: Move this config file to Helm Charts so users can see/customize it +func (b *Backend) createPVC(task *tes.Task) (*corev1.PersistentVolumeClaim, error) { + // Load templates + pvcTpl, err := template.New(task.Id).Parse(b.pvcTemplate) if err != nil { - return fmt.Errorf("getting kubernetes client: %v", err) + return nil, fmt.Errorf("parsing template: %v", err) } - // Define storage size (ignored by S3 CSI driver but required by the API) - storageSize := resource.NewQuantity(1024*1024*1024, resource.BinarySI) // Default 1Gi - if resources != nil && resources.DiskGb > 0 { - storageSize = resource.NewQuantity(int64(resources.DiskGb*1024*1024*1024), resource.BinarySI) + // Template parameters + var buf bytes.Buffer + err = pvcTpl.Execute(&buf, map[string]interface{}{ + "TaskId": task.Id, + "Namespace": b.namespace, + "Bucket": b.bucket, + "Region": b.region, + }) + if err != nil { + return nil, fmt.Errorf("executing PVC template: %v", err) } - pvName := fmt.Sprintf("funnel-pv-%s", taskID) - pvcName := fmt.Sprintf("funnel-pvc-%s", taskID) - bucketName := "funnel-testing" - - // Step 1: Create the PersistentVolume - pv := &corev1.PersistentVolume{ - ObjectMeta: metav1.ObjectMeta{ - Name: pvName, - Labels: map[string]string{ - "app": "funnel", - "taskId": taskID, - }, - }, - Spec: corev1.PersistentVolumeSpec{ - Capacity: corev1.ResourceList{ - corev1.ResourceStorage: *storageSize, - }, - AccessModes: []corev1.PersistentVolumeAccessMode{ - corev1.ReadWriteMany, // S3 CSI supports RWX - }, - PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRetain, - StorageClassName: "", - ClaimRef: &corev1.ObjectReference{ - Namespace: b.namespace, - Name: pvcName, - }, - MountOptions: []string{ - "allow-delete", - "region=us-west-2", - }, - PersistentVolumeSource: corev1.PersistentVolumeSource{ - CSI: &corev1.CSIPersistentVolumeSource{ - Driver: "s3.csi.aws.com", - VolumeHandle: fmt.Sprintf("s3-csi-%s", taskID), - VolumeAttributes: map[string]string{ - "bucketName": bucketName, - }, - }, - }, - }, - } - - if _, err := clientset.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("creating PersistentVolume: %v", err) - } - - // Step 2: Create the PersistentVolumeClaim - pvc := &corev1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Name: pvcName, - Labels: map[string]string{ - "app": "funnel", - "taskId": taskID, - }, - }, - Spec: corev1.PersistentVolumeClaimSpec{ - AccessModes: []corev1.PersistentVolumeAccessMode{ - corev1.ReadWriteMany, - }, - Resources: corev1.VolumeResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceStorage: *storageSize, - }, - }, - StorageClassName: func() *string { s := ""; return &s }(), - VolumeName: pvName, - }, - } - - if _, err := clientset.CoreV1().PersistentVolumeClaims(b.namespace).Create(ctx, pvc, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("creating PersistentVolumeClaim: %v", err) + decode := scheme.Codecs.UniversalDeserializer().Decode + obj, _, err := decode(buf.Bytes(), nil, nil) + if err != nil { + return nil, fmt.Errorf("decoding PVC spec: %v", err) } - return nil + fmt.Println("PVC spec: ", string(buf.Bytes())) + pvc, ok := obj.(*corev1.PersistentVolumeClaim) + if !ok { + return nil, fmt.Errorf("failed to decode PVC spec") + } + return pvc, nil } -func (b *Backend) addOwnerReference(ctx context.Context, taskID string, job *v1.Job) error { - // Fetch the Job that will own the PVC and PV - jobName := fmt.Sprintf("funnel-job-%s", taskID) - _, err := b.client.Get(ctx, jobName, metav1.GetOptions{}) +// Create the Worker/Executor PV from config/kubernetes-pv.yaml +// TODO: Move this config file to Helm Charts so users can see/customize it +func (b *Backend) createPV(task *tes.Task) (*corev1.PersistentVolume, error) { + // Load templates + pvTpl, err := template.New(task.Id).Parse(b.pvTemplate) if err != nil { - return fmt.Errorf("fetching Job for owner reference: %v", err) + return nil, fmt.Errorf("parsing template: %v", err) } - // Fetch the PVC - pvcName := fmt.Sprintf("funnel-pvc-%s", taskID) - clientset, err := kubernetes.NewForConfig(b.config) - if err != nil { - return fmt.Errorf("getting kubernetes client: %v", err) - } - pvc, err := clientset.CoreV1().PersistentVolumeClaims(b.namespace).Get(ctx, pvcName, metav1.GetOptions{}) + // Template parameters + var buf bytes.Buffer + err = pvTpl.Execute(&buf, map[string]interface{}{ + "TaskId": task.Id, + "Namespace": b.namespace, + "Bucket": b.bucket, + "Region": b.region, + }) if err != nil { - return fmt.Errorf("fetching PVC: %v", err) + return nil, fmt.Errorf("executing PV template: %v", err) } - // Add OwnerReference for the Job - ownerRef := metav1.OwnerReference{ - APIVersion: "batch/v1", - Kind: "Job", - Name: job.Name, - UID: job.UID, - BlockOwnerDeletion: ptr.To(true), - Controller: ptr.To(true), - } - - // Append the OwnerReference to the PVC - pvc.OwnerReferences = append(pvc.OwnerReferences, ownerRef) - - // Update the PVC - _, err = clientset.CoreV1().PersistentVolumeClaims(b.namespace).Update(ctx, pvc, metav1.UpdateOptions{}) + decode := scheme.Codecs.UniversalDeserializer().Decode + obj, _, err := decode(buf.Bytes(), nil, nil) if err != nil { - return fmt.Errorf("updating PVC: %v", err) + return nil, fmt.Errorf("decoding PV spec: %v", err) } - return nil + fmt.Println("PV spec: ", string(buf.Bytes())) + pv, ok := obj.(*corev1.PersistentVolume) + if !ok { + return nil, fmt.Errorf("failed to decode PV spec") + } + return pv, nil } // Add this helper function for PVC cleanup @@ -326,13 +273,35 @@ func (b *Backend) Submit(ctx context.Context, task *tes.Task) error { // Create a new background context instead of inheriting from the potentially canceled one submitCtx := context.Background() - // TODO: Update this so that a PVC is only created if the task has inputs or outputs + // TODO: Update this so that a PVC/PV is only created if the task has inputs or outputs // If the task has either inputs or outputs, then create a PVC // shared between the Funnel Worker and the Executor // e.g. `if len(task.Inputs) > 0 || len(task.Outputs) > 0 {}` - err := b.createPVC(submitCtx, task.Id, task.GetResources()) + pvc, err := b.createPVC(task) + if err != nil { + return fmt.Errorf("creating shared storage PVC: %v", err) + } + + pv, err := b.createPV(task) if err != nil { - return fmt.Errorf("creating shared storage: %v", err) + return fmt.Errorf("creating shared storage PV: %v", err) + } + + clientset, err := kubernetes.NewForConfig(b.config) + if err != nil { + return fmt.Errorf("getting kubernetes client: %v", err) + } + + // Create PVC + pvc, err = clientset.CoreV1().PersistentVolumeClaims(b.namespace).Create(context.Background(), pvc, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("creating PVC: %v", err) + } + + // Create PV + pv, err = clientset.CoreV1().PersistentVolumes().Create(context.Background(), pv, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("creating PV: %v", err) } // Create the worker job @@ -348,11 +317,6 @@ func (b *Backend) Submit(ctx context.Context, task *tes.Task) error { return fmt.Errorf("creating job in backend: %v", err) } - err = b.addOwnerReference(submitCtx, task.Id, job) - if err != nil { - return fmt.Errorf("updating PVC with OwnerReference: %v", err) - } - return nil } diff --git a/config/config.go b/config/config.go index 978aba8d..32ed8370 100644 --- a/config/config.go +++ b/config/config.go @@ -409,6 +409,10 @@ func (h FTPStorage) Valid() bool { // Kubernetes describes the configuration for the Kubernetes compute backend. type Kubernetes struct { + // The bucket to use for the task's Working Directory + Bucket string + // The region to use for the task's Bucket + Region string // The executor used to execute tasks. Available executors: docker, kubernetes Executor string // Turn off task state reconciler. When enabled, Funnel communicates with Kuberenetes @@ -428,6 +432,10 @@ type Kubernetes struct { ExecutorTemplate string // ExecutorTemplateFile is the path to the executor template. ExecutorTemplateFile string + // Worker/Executor PV job template. + PVTemplate string + // Worker/Executor PVC job template. + PVCTemplate string // Path to the Kubernetes configuration file, otherwise assumes the Funnel server is running in a pod and // attempts to use https://godoc.org/k8s.io/client-go/rest#InClusterConfig to infer configuration. ConfigFile string diff --git a/config/default.go b/config/default.go index 2dfde8f1..13fbb47e 100644 --- a/config/default.go +++ b/config/default.go @@ -164,11 +164,17 @@ func DefaultConfig() Config { kubernetesTemplate := intern.MustAsset("config/kubernetes-template.yaml") executorTemplate := intern.MustAsset("config/kubernetes-executor-template.yaml") - c.Kubernetes.Executor = "docker" + pvTemplate := intern.MustAsset("config/kubernetes-pv.yaml") + pvcTemplate := intern.MustAsset("config/kubernetes-pvc.yaml") + c.Kubernetes.Executor = "kubernetes" c.Kubernetes.Namespace = "default" c.Kubernetes.ServiceAccount = "funnel-sa" c.Kubernetes.Template = string(kubernetesTemplate) c.Kubernetes.ExecutorTemplate = string(executorTemplate) + c.Kubernetes.Bucket = "" + c.Kubernetes.Region = "" + c.Kubernetes.PVTemplate = string(pvTemplate) + c.Kubernetes.PVCTemplate = string(pvcTemplate) c.Kubernetes.ReconcileRate = reconcile return c diff --git a/config/internal/bundle.go b/config/internal/bundle.go index 59e082e6..0710d7ba 100644 --- a/config/internal/bundle.go +++ b/config/internal/bundle.go @@ -1,8 +1,10 @@ // Code generated by go-bindata. DO NOT EDIT. // sources: +// config/kubernetes-pvc.yaml (309B) // config/gridengine-template.txt (346B) // config/pbs-template.txt (361B) // config/slurm-template.txt (415B) +// config/kubernetes-pv.yaml (517B) // config/kubernetes-executor-template.yaml (1.232kB) // config/default-config.yaml (11.655kB) // config/htcondor-template.txt (505B) @@ -74,6 +76,26 @@ func (fi bindataFileInfo) Sys() interface{} { return nil } +var _configKubernetesPvcYaml = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\x8f\xc1\x6a\xf3\x30\x10\x84\xef\x7a\x8a\x81\xff\xec\xfc\xe4\xaa\x6b\x28\xa5\x87\x84\x50\x8a\x73\xde\xca\xd3\x22\x6c\x4b\xaa\x56\x32\x2d\x26\xef\x5e\x6c\x6c\x4a\x8e\xb3\x3b\xf3\xc1\xf7\x0f\xb7\x98\x7b\xe6\xff\x4f\xdf\x74\xb5\xc4\x8c\x6b\x7b\x32\x92\x7c\xcb\xac\x3e\x06\x8b\xe9\x68\x7a\x1f\x3a\x8b\xeb\x72\xd1\xc2\x50\xda\x38\xd4\x91\xa7\x41\xfc\x68\x46\x16\xe9\xa4\x88\x35\x40\x90\x91\x16\x1f\x35\x04\x0e\x4d\x9a\x5c\x33\xcf\x87\x37\xd1\xfe\xa5\xbb\xdf\xb7\xb7\x26\x71\xb4\x98\x67\x1c\x2e\x7b\xc4\xfa\x1d\xe4\x9d\x83\x2e\x18\x40\x52\xda\x39\x6b\x2e\x2b\x64\x99\xfd\xf1\x34\xd1\x2d\x6d\x71\x8e\xaa\xe7\xd8\x71\x1b\x37\x78\xa5\x74\xb7\xec\x0b\xcf\x12\x7e\x0c\x90\xa9\xb1\x66\xb7\x17\x32\xbf\x2a\xb5\x6c\x09\xd0\x12\xb3\x7c\xd2\xe2\xf8\xec\x0d\x30\xad\x76\x97\x47\x97\x07\x95\xdf\x00\x00\x00\xff\xff\x01\x27\x69\xed\x35\x01\x00\x00") + +func configKubernetesPvcYamlBytes() ([]byte, error) { + return bindataRead( + _configKubernetesPvcYaml, + "config/kubernetes-pvc.yaml", + ) +} + +func configKubernetesPvcYaml() (*asset, error) { + bytes, err := configKubernetesPvcYamlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "config/kubernetes-pvc.yaml", size: 309, mode: os.FileMode(0644), modTime: time.Unix(1732326881, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe9, 0xc, 0x1f, 0x92, 0xa1, 0xb3, 0x7, 0x8, 0xb0, 0x29, 0x2f, 0x8e, 0x77, 0x9d, 0x10, 0x18, 0xc6, 0xcd, 0x90, 0xbf, 0xdf, 0x92, 0xf6, 0xf5, 0x81, 0x70, 0x47, 0xb6, 0x35, 0x85, 0x1a, 0x9d}} + return a, nil +} + var _configGridengineTemplateTxt = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x7c\x90\xcd\x4a\xc4\x30\x14\x85\xf7\x79\x8a\x6b\xc7\x59\x26\xed\x0b\xb8\xb2\x30\xb8\x71\x21\x82\x4b\x69\xc9\x0d\x13\x32\xf9\xe1\x26\x51\x30\xe4\xdd\xa5\x69\x11\x0a\x75\x76\x97\xc3\x77\x3e\xb8\xe7\xf4\xd0\xcf\xda\xf5\xf3\x14\xaf\xec\xf4\x08\xfc\x15\x4a\x11\xef\x53\x34\x2f\xb2\xd6\x96\xf8\x25\xf9\xf0\x64\x46\x4d\xb5\xf6\x2a\x3b\x87\x37\x1e\x93\xf4\x39\x35\x00\xff\x03\x90\x88\x95\xa2\x15\x38\x04\xf1\x1c\x72\x84\x01\x78\xad\xac\x94\x40\xda\x25\x05\xdd\x52\x0f\x08\x36\x68\x38\xcb\x6e\x85\x1a\xc0\x01\x9d\x6c\xd7\x56\x7f\x9b\xec\x65\x86\x41\x1c\x19\x6e\x70\xfd\xfc\xb2\x68\x9f\xce\x62\x50\x97\x6e\x83\x8f\x3d\xa3\x8e\xe6\xae\x48\x45\xfd\x83\x7f\xa6\x15\xdf\xa9\xd8\xfa\x20\x7c\x7b\x32\x48\x40\xd9\x01\xe7\x69\x59\x6c\xdc\x6d\xf7\x1b\x00\x00\xff\xff\xcf\x92\x30\x7f\x5a\x01\x00\x00") func configGridengineTemplateTxtBytes() ([]byte, error) { @@ -134,6 +156,26 @@ func configSlurmTemplateTxt() (*asset, error) { return a, nil } +var _configKubernetesPvYaml = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x5c\x91\x41\x6b\xfb\x30\x0c\xc5\xef\xfe\x14\x82\xff\x39\xf9\x53\x76\x33\xec\xb0\xc1\xd8\x76\x68\x57\xc2\x68\xcf\xaa\xad\x16\x11\xc7\x36\xb6\x92\xae\x84\x7e\xf7\x11\xa7\x19\x5b\x6f\x91\xf2\xd3\x7b\x7e\xd2\x3f\xd8\x87\xd4\x52\xfa\xff\xf2\x45\xa6\x97\x90\x60\xbb\x53\x18\x79\x47\x29\x73\xf0\x1a\x86\x95\x6a\xd9\x5b\x0d\xdb\xa9\x93\x85\xbc\xec\x82\xeb\x3b\x52\x1d\x09\x5a\x14\xd4\x0a\xc0\x63\x47\x1a\x8e\xbd\xf7\xe4\xaa\x38\x54\xe3\x58\x7f\x62\x6e\xdf\xed\xf5\xaa\x00\x1c\x1e\xc8\xe5\x89\x03\xc0\x18\x17\xb0\xd4\x52\x30\x0d\xbf\x27\x72\x24\x33\xd1\x06\x23\x1a\x96\xcb\x3c\x99\x25\x24\x3c\x91\x86\xd5\x2b\x2b\x00\x34\x86\x72\x5e\x07\x4b\x37\xe5\x0a\x1a\x42\xbb\x4f\x2c\xb4\x46\x7f\x51\x00\xf1\xee\xc9\x0d\x19\x87\xdc\x6d\x83\x63\x73\xd1\xd0\x90\x20\x7b\x05\xd0\x85\xde\xcb\x47\x14\x0e\xfe\x47\x0b\x9d\x0b\xe7\xca\x92\x23\xa1\x5b\x2b\xd1\x89\x83\x7f\x1c\xc7\xba\x29\x5f\x25\x9b\xc9\x3c\x8f\xd8\xc4\x03\x25\x0d\xf9\xa1\x36\x99\x6b\x3c\xe7\xda\x84\xae\xfc\x1a\x8a\xfb\x1b\x7a\xeb\x68\x02\x2a\x93\xf9\x6e\x45\x0b\xf4\x24\x92\xf8\xd0\xcb\x12\x0a\xe0\xd0\x9b\x96\x64\x53\x16\x3c\x8e\xf5\x73\x29\x67\xeb\x29\x4c\x43\xc7\x99\x9c\x4e\x90\x23\x9a\x19\xdb\x2c\xd5\x4d\xfd\xee\x40\xe6\x8f\xfd\x77\x00\x00\x00\xff\xff\xbf\x6a\x00\x6b\x05\x02\x00\x00") + +func configKubernetesPvYamlBytes() ([]byte, error) { + return bindataRead( + _configKubernetesPvYaml, + "config/kubernetes-pv.yaml", + ) +} + +func configKubernetesPvYaml() (*asset, error) { + bytes, err := configKubernetesPvYamlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "config/kubernetes-pv.yaml", size: 517, mode: os.FileMode(0644), modTime: time.Unix(1732327506, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x62, 0xa5, 0x30, 0x54, 0x11, 0xf3, 0x98, 0xe3, 0xc3, 0x75, 0xf8, 0x72, 0x29, 0x3a, 0x5c, 0x73, 0x31, 0x5b, 0x86, 0xc4, 0x99, 0x1d, 0x1b, 0xe0, 0xb4, 0xde, 0xf1, 0x3a, 0x28, 0xfe, 0xba, 0xea}} + return a, nil +} + var _configKubernetesExecutorTemplateYaml = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x84\x54\xcd\x4f\xdb\x4e\x10\xbd\xfb\xaf\x18\x39\xfc\x6e\xd8\x98\xcb\xef\x60\xa9\x07\x14\x10\x1f\x2a\x01\xb5\x88\x1e\xaa\x1e\xc6\x9b\x49\xb2\xcd\x7e\x75\x67\x1d\x40\x56\xfe\xf7\x6a\xfd\x55\x07\x28\xf5\x69\xb2\xef\xf9\xbd\x37\x93\x59\xcf\xe0\x01\x79\x0b\x17\xcf\x24\xea\x60\x7d\x82\x4e\x3e\x92\x67\x69\x4d\x09\x15\x06\xb1\x39\xd9\x9d\x26\x5b\x69\x96\x25\xdc\xd8\x2a\xd1\x14\x70\x89\x01\xcb\x04\xc0\xa0\xa6\x12\x9a\x26\x8f\x0a\xd7\xcb\xfd\x3e\x6b\x9a\xfc\xc6\x56\xb1\xec\x61\x76\x28\x3a\xce\x62\xf8\xd5\x62\x0a\x2b\x52\x1c\x45\x00\xd0\xb9\x12\x56\xb5\x31\xa4\x32\x1a\x62\x44\xe0\xa7\xad\xb2\x0f\x3d\xd8\x91\x88\x1a\x15\x8a\xad\x5d\xad\x3e\x4b\x2d\x43\x09\x45\x02\x20\xac\x76\x8a\x82\xb4\x86\x4b\x38\x4d\x00\x02\x69\xa7\x30\x50\x67\x39\xbc\x18\x1f\x4f\x1c\xd0\x87\x7b\xab\xa4\x78\x29\x61\x41\x3b\xf2\x3d\xc4\xe4\x77\x52\xd0\x99\x10\xb6\x36\x61\xd1\x46\xe9\x83\x32\xf6\x1c\x61\x4d\x40\x69\xc8\xf3\x20\x98\xf5\x83\xe9\x99\x4f\xd6\x6f\xc9\x67\x93\x16\x7a\x1e\x80\xd4\xb8\xee\xba\xbb\x8e\xd5\x6b\xe4\xbe\x56\x6a\x88\x75\xa6\x9e\xf0\x85\x47\x5c\x58\xad\x31\xfe\x27\xdf\xd3\x93\x4a\x9a\x13\xde\xa4\xc7\x90\x66\x22\xfd\x31\x52\xd0\xaf\xb9\xd5\x9e\x77\xdc\x89\x7a\x8c\x24\xcd\xfa\x5c\xfa\x96\xf0\xcd\xfa\xed\x52\xfa\x09\xc1\x13\xdb\xda\x0b\x1a\x7b\xea\x0e\x7f\xd5\xc4\xe1\xe0\x0c\x40\xb8\x3a\x8a\xc8\x15\x18\x82\x7c\xee\x6a\x86\x02\xb2\xfd\x3e\x1a\xbb\x9a\x63\x01\xa4\x98\x20\x56\xe9\x69\x51\xe8\x34\x56\x74\x10\x28\x3e\x9a\xb4\xf5\x2f\x13\xad\x2f\xa8\x2f\x2b\x28\xf2\x5e\xce\x79\x69\xc2\x0a\xd2\xff\xf2\x62\x75\x99\xf6\x70\x2b\xa5\x98\x3a\xf1\xff\x6f\xff\xa2\x4d\x6e\x43\x9a\x3c\xaa\x8c\x83\xf5\xfd\xd0\x7b\x9b\x73\xc9\xdb\x8f\x7c\x3a\xfc\xd0\xa8\x28\xde\x73\x1a\x8b\x9d\x55\xb5\xa6\xdb\xb8\x34\x93\x69\xcd\x66\x33\x38\xbf\x83\xc5\xdd\x03\xcc\xaf\xce\x16\x97\x17\xf0\x70\x75\xfd\x75\x84\x9b\xc6\xa3\x59\x13\x1c\xc9\xe5\xf3\x31\x1c\xc9\x40\x1a\xca\x4f\x90\x3f\xb6\x62\x3c\xf1\x79\xb5\x60\x7d\x4b\x59\xd3\x1c\xbd\x5d\x31\x00\x1d\x63\xdc\x63\xd8\xc4\x9e\x5b\xd9\x7c\x3e\xec\x6c\x3c\x3e\x20\x73\x5d\x8d\xd4\x51\xec\x5f\xaf\x0d\x73\x48\xa6\xdd\x73\x09\xef\xdf\x87\x3f\x71\xdf\xa6\x75\xf1\xcb\xc3\x81\x4c\xe8\xba\x9e\x2b\x94\x7a\xba\x6f\x22\x1e\x1c\x5c\x44\xb7\x13\x07\x52\xbf\x03\x00\x00\xff\xff\x00\x13\xe6\xe4\xd0\x04\x00\x00") func configKubernetesExecutorTemplateYamlBytes() ([]byte, error) { @@ -169,7 +211,7 @@ func configDefaultConfigYaml() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "config/default-config.yaml", size: 11655, mode: os.FileMode(0644), modTime: time.Unix(1732064227, 0)} + info := bindataFileInfo{name: "config/default-config.yaml", size: 11655, mode: os.FileMode(0644), modTime: time.Unix(1732326201, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x61, 0x5a, 0x7d, 0xb0, 0x9b, 0x9d, 0x13, 0x7, 0x49, 0xd0, 0x4c, 0xab, 0xc2, 0xe5, 0x37, 0xfa, 0x4a, 0x5a, 0x9a, 0x68, 0x89, 0x2b, 0x4b, 0x3e, 0xf3, 0x73, 0x35, 0x82, 0xe8, 0xf1, 0xbf, 0x3c}} return a, nil } @@ -305,9 +347,11 @@ func AssetNames() []string { // _bindata is a table, holding each asset generator, mapped to its name. var _bindata = map[string]func() (*asset, error){ + "config/kubernetes-pvc.yaml": configKubernetesPvcYaml, "config/gridengine-template.txt": configGridengineTemplateTxt, "config/pbs-template.txt": configPbsTemplateTxt, "config/slurm-template.txt": configSlurmTemplateTxt, + "config/kubernetes-pv.yaml": configKubernetesPvYaml, "config/kubernetes-executor-template.yaml": configKubernetesExecutorTemplateYaml, "config/default-config.yaml": configDefaultConfigYaml, "config/htcondor-template.txt": configHtcondorTemplateTxt, @@ -365,6 +409,8 @@ var _bintree = &bintree{nil, map[string]*bintree{ "gridengine-template.txt": {configGridengineTemplateTxt, map[string]*bintree{}}, "htcondor-template.txt": {configHtcondorTemplateTxt, map[string]*bintree{}}, "kubernetes-executor-template.yaml": {configKubernetesExecutorTemplateYaml, map[string]*bintree{}}, + "kubernetes-pv.yaml": {configKubernetesPvYaml, map[string]*bintree{}}, + "kubernetes-pvc.yaml": {configKubernetesPvcYaml, map[string]*bintree{}}, "kubernetes-template.yaml": {configKubernetesTemplateYaml, map[string]*bintree{}}, "pbs-template.txt": {configPbsTemplateTxt, map[string]*bintree{}}, "slurm-template.txt": {configSlurmTemplateTxt, map[string]*bintree{}}, diff --git a/deployments/kubernetes/helm/Chart.yaml b/deployments/kubernetes/helm/Chart.yaml index c93d104c..87938faf 100644 --- a/deployments/kubernetes/helm/Chart.yaml +++ b/deployments/kubernetes/helm/Chart.yaml @@ -17,7 +17,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.12 +version: 0.1.13 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to diff --git a/deployments/kubernetes/helm/config/funnel-server.yaml b/deployments/kubernetes/helm/config/funnel-server.yaml index 29812b53..5deeb38f 100644 --- a/deployments/kubernetes/helm/config/funnel-server.yaml +++ b/deployments/kubernetes/helm/config/funnel-server.yaml @@ -1,8 +1,15 @@ Compute: {{ .Values.Compute }} Kubernetes: - Executor: "kubernetes" + Executor: {{ .Values.Kubernetes.Executor }} Namespace: {{ .Release.Namespace }} + DisableReconciler: {{ .Values.Kubernetes.DisableReconciler }} + ReconcileRate: {{ .Values.Kubernetes.ReconcileRate }} + ServiceAccount: {{ .Values.Kubernetes.ServiceAccount }} + Template: {{ .Values.Kubernetes.Template }} + TemplateFile: {{ .Values.Kubernetes.TemplateFile }} + Bucket: {{ .Values.Kubernetes.Bucket }} + Region: {{ .Values.Kubernetes.Region }} Database: {{ .Values.Database }} diff --git a/deployments/kubernetes/helm/config/funnel-worker.yaml b/deployments/kubernetes/helm/config/funnel-worker.yaml index 5f2a3cb6..d8abf37d 100644 --- a/deployments/kubernetes/helm/config/funnel-worker.yaml +++ b/deployments/kubernetes/helm/config/funnel-worker.yaml @@ -3,8 +3,15 @@ Database: {{ .Values.Database }} Compute: {{ .Values.Compute }} Kubernetes: - Executor: "kubernetes" + Executor: {{ .Values.Kubernetes.Executor }} Namespace: {{ .Release.Namespace }} + DisableReconciler: {{ .Values.Kubernetes.DisableReconciler }} + ReconcileRate: {{ .Values.Kubernetes.ReconcileRate }} + ServiceAccount: {{ .Values.Kubernetes.ServiceAccount }} + Template: {{ .Values.Kubernetes.Template }} + TemplateFile: {{ .Values.Kubernetes.TemplateFile }} + Bucket: {{ .Values.Kubernetes.Bucket }} + Region: {{ .Values.Kubernetes.Region }} Logger: Level: {{ .Values.Logger.level }} diff --git a/deployments/kubernetes/helm/values.yaml b/deployments/kubernetes/helm/values.yaml index 55ae99bf..1860b2c6 100644 --- a/deployments/kubernetes/helm/values.yaml +++ b/deployments/kubernetes/helm/values.yaml @@ -349,7 +349,7 @@ AWSBatch: # Kubernetes describes the configuration for the Kubernetes compute backend. Kubernetes: # The executor used to execute tasks. Available executors: docker, kubernetes - Executor: "kubernetes" + Executor: "kubernetes" # Turn off task state reconciler. When enabled, Funnel communicates with Kubernetes # to find tasks that are stuck in a queued state or errored and # updates the task state accordingly. @@ -365,6 +365,10 @@ Kubernetes: Template: "" # TemplateFile is the path to the master job template. TemplateFile: "" + # The bucket to use for the task's Working Directory + Bucket: "" + # The region to use for the task's Bucket + Region: "" # Configuration of the Kubernetes executor. diff --git a/worker/kubernetes.go b/worker/kubernetes.go index 2ad7ca6b..c68a7d1e 100644 --- a/worker/kubernetes.go +++ b/worker/kubernetes.go @@ -29,7 +29,7 @@ type KubernetesCommand struct { Command } -// Create the Executor K8s job +// Create the Executor K8s job from kubernetes-executor-template.yaml // Funnel Worker job is created in compute/kubernetes/backend.go#createJob func (kcmd KubernetesCommand) Run(ctx context.Context) error { var taskId = kcmd.TaskId diff --git a/worker/worker.go b/worker/worker.go index ab421020..93683ddf 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -34,6 +34,10 @@ type Executor struct { Backend string // Kubernetes executor template Template string + // Kubernetes persistent volume template + PVTemplate string + // Kubernetes persistent volume claim template + PVCTemplate string // Kubernetes namespace Namespace string // Kubernetes service account name