Skip to content

Commit

Permalink
Add support for S3 Mountpoint S3 Driver
Browse files Browse the repository at this point in the history
- Thanks for this @jawadqur and @paulineribeyre!
  • Loading branch information
lbeckman314 committed Nov 23, 2024
1 parent 5b38ccb commit 6fe944e
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 135 deletions.
220 changes: 92 additions & 128 deletions compute/kubernetes/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@ import (
v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8errors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
batchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/ptr"

"github.com/ohsu-comp-bio/funnel/config"
"github.com/ohsu-comp-bio/funnel/events"
Expand Down Expand Up @@ -69,13 +67,17 @@ func NewBackend(ctx context.Context, conf config.Kubernetes, reader tes.ReadOnly
}

b := &Backend{
client: clientset.BatchV1().Jobs(conf.Namespace),
namespace: conf.Namespace,
template: conf.Template,
event: writer,
database: reader,
log: log,
config: kubeconfig,
bucket: conf.Bucket,
region: conf.Region,
client: clientset.BatchV1().Jobs(conf.Namespace),
namespace: conf.Namespace,
template: conf.Template,
pvTemplate: conf.PVTemplate,
pvcTemplate: conf.PVCTemplate,
event: writer,
database: reader,
log: log,
config: kubeconfig,
}

if !conf.DisableReconciler {
Expand All @@ -88,9 +90,13 @@ func NewBackend(ctx context.Context, conf config.Kubernetes, reader tes.ReadOnly

// Backend represents the local backend.
type Backend struct {
bucket string
region string
client batchv1.JobInterface
namespace string
template string
pvTemplate string
pvcTemplate string
event events.Writer
database tes.ReadOnlyServer
log *logger.Logger
Expand Down Expand Up @@ -134,7 +140,7 @@ func (b *Backend) Close() {
//TODO: close database?
}

// Create the Funnel Worker job
// Create the Funnel Worker job from kubernetes-template.yaml
// Executor job is created in worker/kubernetes.go#Run
func (b *Backend) createJob(task *tes.Task) (*v1.Job, error) {
submitTpl, err := template.New(task.Id).Parse(b.template)
Expand All @@ -156,7 +162,7 @@ func (b *Backend) createJob(task *tes.Task) (*v1.Job, error) {
"DiskGb": res.GetDiskGb(),
})
if err != nil {
return nil, fmt.Errorf("executing template: %v", err)
return nil, fmt.Errorf("executing Worker template: %v", err)
}

decode := scheme.Codecs.UniversalDeserializer().Decode
Expand All @@ -172,133 +178,74 @@ func (b *Backend) createJob(task *tes.Task) (*v1.Job, error) {
return job, nil
}

func (b *Backend) createPVC(ctx context.Context, taskID string, resources *tes.Resources) error {
clientset, err := kubernetes.NewForConfig(b.config)
// Create the Worker/Executor PVC from config/kubernetes-pvc.yaml
// TODO: Move this config file to Helm Charts so users can see/customize it
func (b *Backend) createPVC(task *tes.Task) (*corev1.PersistentVolumeClaim, error) {
// Load templates
pvcTpl, err := template.New(task.Id).Parse(b.pvcTemplate)
if err != nil {
return fmt.Errorf("getting kubernetes client: %v", err)
return nil, fmt.Errorf("parsing template: %v", err)
}

// Define storage size (ignored by S3 CSI driver but required by the API)
storageSize := resource.NewQuantity(1024*1024*1024, resource.BinarySI) // Default 1Gi
if resources != nil && resources.DiskGb > 0 {
storageSize = resource.NewQuantity(int64(resources.DiskGb*1024*1024*1024), resource.BinarySI)
// Template parameters
var buf bytes.Buffer
err = pvcTpl.Execute(&buf, map[string]interface{}{
"TaskId": task.Id,
"Namespace": b.namespace,
"Bucket": b.bucket,
"Region": b.region,
})
if err != nil {
return nil, fmt.Errorf("executing PVC template: %v", err)
}

pvName := fmt.Sprintf("funnel-pv-%s", taskID)
pvcName := fmt.Sprintf("funnel-pvc-%s", taskID)
bucketName := "funnel-testing"

// Step 1: Create the PersistentVolume
pv := &corev1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: pvName,
Labels: map[string]string{
"app": "funnel",
"taskId": taskID,
},
},
Spec: corev1.PersistentVolumeSpec{
Capacity: corev1.ResourceList{
corev1.ResourceStorage: *storageSize,
},
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteMany, // S3 CSI supports RWX
},
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRetain,
StorageClassName: "",
ClaimRef: &corev1.ObjectReference{
Namespace: b.namespace,
Name: pvcName,
},
MountOptions: []string{
"allow-delete",
"region=us-west-2",
},
PersistentVolumeSource: corev1.PersistentVolumeSource{
CSI: &corev1.CSIPersistentVolumeSource{
Driver: "s3.csi.aws.com",
VolumeHandle: fmt.Sprintf("s3-csi-%s", taskID),
VolumeAttributes: map[string]string{
"bucketName": bucketName,
},
},
},
},
}

if _, err := clientset.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("creating PersistentVolume: %v", err)
}

// Step 2: Create the PersistentVolumeClaim
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: pvcName,
Labels: map[string]string{
"app": "funnel",
"taskId": taskID,
},
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteMany,
},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: *storageSize,
},
},
StorageClassName: func() *string { s := ""; return &s }(),
VolumeName: pvName,
},
}

if _, err := clientset.CoreV1().PersistentVolumeClaims(b.namespace).Create(ctx, pvc, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("creating PersistentVolumeClaim: %v", err)
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode(buf.Bytes(), nil, nil)
if err != nil {
return nil, fmt.Errorf("decoding PVC spec: %v", err)
}

return nil
fmt.Println("PVC spec: ", string(buf.Bytes()))
pvc, ok := obj.(*corev1.PersistentVolumeClaim)
if !ok {
return nil, fmt.Errorf("failed to decode PVC spec")
}
return pvc, nil
}

func (b *Backend) addOwnerReference(ctx context.Context, taskID string, job *v1.Job) error {
// Fetch the Job that will own the PVC and PV
jobName := fmt.Sprintf("funnel-job-%s", taskID)
_, err := b.client.Get(ctx, jobName, metav1.GetOptions{})
// Create the Worker/Executor PV from config/kubernetes-pv.yaml
// TODO: Move this config file to Helm Charts so users can see/customize it
func (b *Backend) createPV(task *tes.Task) (*corev1.PersistentVolume, error) {
// Load templates
pvTpl, err := template.New(task.Id).Parse(b.pvTemplate)
if err != nil {
return fmt.Errorf("fetching Job for owner reference: %v", err)
return nil, fmt.Errorf("parsing template: %v", err)
}

// Fetch the PVC
pvcName := fmt.Sprintf("funnel-pvc-%s", taskID)
clientset, err := kubernetes.NewForConfig(b.config)
if err != nil {
return fmt.Errorf("getting kubernetes client: %v", err)
}
pvc, err := clientset.CoreV1().PersistentVolumeClaims(b.namespace).Get(ctx, pvcName, metav1.GetOptions{})
// Template parameters
var buf bytes.Buffer
err = pvTpl.Execute(&buf, map[string]interface{}{
"TaskId": task.Id,
"Namespace": b.namespace,
"Bucket": b.bucket,
"Region": b.region,
})
if err != nil {
return fmt.Errorf("fetching PVC: %v", err)
return nil, fmt.Errorf("executing PV template: %v", err)
}

// Add OwnerReference for the Job
ownerRef := metav1.OwnerReference{
APIVersion: "batch/v1",
Kind: "Job",
Name: job.Name,
UID: job.UID,
BlockOwnerDeletion: ptr.To(true),
Controller: ptr.To(true),
}

// Append the OwnerReference to the PVC
pvc.OwnerReferences = append(pvc.OwnerReferences, ownerRef)

// Update the PVC
_, err = clientset.CoreV1().PersistentVolumeClaims(b.namespace).Update(ctx, pvc, metav1.UpdateOptions{})
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode(buf.Bytes(), nil, nil)
if err != nil {
return fmt.Errorf("updating PVC: %v", err)
return nil, fmt.Errorf("decoding PV spec: %v", err)
}

return nil
fmt.Println("PV spec: ", string(buf.Bytes()))
pv, ok := obj.(*corev1.PersistentVolume)
if !ok {
return nil, fmt.Errorf("failed to decode PV spec")
}
return pv, nil
}

// Add this helper function for PVC cleanup
Expand Down Expand Up @@ -326,13 +273,35 @@ func (b *Backend) Submit(ctx context.Context, task *tes.Task) error {
// Create a new background context instead of inheriting from the potentially canceled one
submitCtx := context.Background()

// TODO: Update this so that a PVC is only created if the task has inputs or outputs
// TODO: Update this so that a PVC/PV is only created if the task has inputs or outputs
// If the task has either inputs or outputs, then create a PVC
// shared between the Funnel Worker and the Executor
// e.g. `if len(task.Inputs) > 0 || len(task.Outputs) > 0 {}`
err := b.createPVC(submitCtx, task.Id, task.GetResources())
pvc, err := b.createPVC(task)
if err != nil {
return fmt.Errorf("creating shared storage PVC: %v", err)
}

pv, err := b.createPV(task)
if err != nil {
return fmt.Errorf("creating shared storage: %v", err)
return fmt.Errorf("creating shared storage PV: %v", err)
}

clientset, err := kubernetes.NewForConfig(b.config)
if err != nil {
return fmt.Errorf("getting kubernetes client: %v", err)
}

// Create PVC
pvc, err = clientset.CoreV1().PersistentVolumeClaims(b.namespace).Create(context.Background(), pvc, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("creating PVC: %v", err)
}

// Create PV
pv, err = clientset.CoreV1().PersistentVolumes().Create(context.Background(), pv, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("creating PV: %v", err)
}

// Create the worker job
Expand All @@ -348,11 +317,6 @@ func (b *Backend) Submit(ctx context.Context, task *tes.Task) error {
return fmt.Errorf("creating job in backend: %v", err)
}

err = b.addOwnerReference(submitCtx, task.Id, job)
if err != nil {
return fmt.Errorf("updating PVC with OwnerReference: %v", err)
}

return nil
}

Expand Down
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ func (h FTPStorage) Valid() bool {

// Kubernetes describes the configuration for the Kubernetes compute backend.
type Kubernetes struct {
// The bucket to use for the task's Working Directory
Bucket string
// The region to use for the task's Bucket
Region string
// The executor used to execute tasks. Available executors: docker, kubernetes
Executor string
// Turn off task state reconciler. When enabled, Funnel communicates with Kuberenetes
Expand All @@ -428,6 +432,10 @@ type Kubernetes struct {
ExecutorTemplate string
// ExecutorTemplateFile is the path to the executor template.
ExecutorTemplateFile string
// Worker/Executor PV job template.
PVTemplate string
// Worker/Executor PVC job template.
PVCTemplate string
// Path to the Kubernetes configuration file, otherwise assumes the Funnel server is running in a pod and
// attempts to use https://godoc.org/k8s.io/client-go/rest#InClusterConfig to infer configuration.
ConfigFile string
Expand Down
8 changes: 7 additions & 1 deletion config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,17 @@ func DefaultConfig() Config {

kubernetesTemplate := intern.MustAsset("config/kubernetes-template.yaml")
executorTemplate := intern.MustAsset("config/kubernetes-executor-template.yaml")
c.Kubernetes.Executor = "docker"
pvTemplate := intern.MustAsset("config/kubernetes-pv.yaml")
pvcTemplate := intern.MustAsset("config/kubernetes-pvc.yaml")
c.Kubernetes.Executor = "kubernetes"
c.Kubernetes.Namespace = "default"
c.Kubernetes.ServiceAccount = "funnel-sa"
c.Kubernetes.Template = string(kubernetesTemplate)
c.Kubernetes.ExecutorTemplate = string(executorTemplate)
c.Kubernetes.Bucket = ""
c.Kubernetes.Region = ""
c.Kubernetes.PVTemplate = string(pvTemplate)
c.Kubernetes.PVCTemplate = string(pvcTemplate)
c.Kubernetes.ReconcileRate = reconcile

return c
Expand Down
Loading

0 comments on commit 6fe944e

Please sign in to comment.