Skip to content

Commit

Permalink
Add initial support for S3 backed PVCs
Browse files Browse the repository at this point in the history
  • Loading branch information
lbeckman314 committed Nov 22, 2024
1 parent 1777bf5 commit 5b38ccb
Show file tree
Hide file tree
Showing 10 changed files with 153 additions and 37 deletions.
108 changes: 102 additions & 6 deletions compute/kubernetes/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
batchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/ptr"

"github.com/ohsu-comp-bio/funnel/config"
"github.com/ohsu-comp-bio/funnel/events"
Expand Down Expand Up @@ -172,39 +173,129 @@ func (b *Backend) createJob(task *tes.Task) (*v1.Job, error) {
}

func (b *Backend) createPVC(ctx context.Context, taskID string, resources *tes.Resources) error {
clientset, err := kubernetes.NewForConfig(b.config) // You'll need to store the config during NewBackend
clientset, err := kubernetes.NewForConfig(b.config)
if err != nil {
return fmt.Errorf("getting kubernetes client: %v", err)
}

storageSize := resource.NewQuantity(1024*1024*1024, resource.BinarySI) // 1Gi default
// Define storage size (ignored by S3 CSI driver but required by the API)
storageSize := resource.NewQuantity(1024*1024*1024, resource.BinarySI) // Default 1Gi
if resources != nil && resources.DiskGb > 0 {
storageSize = resource.NewQuantity(int64(resources.DiskGb*1024*1024*1024), resource.BinarySI)
}

pvName := fmt.Sprintf("funnel-pv-%s", taskID)
pvcName := fmt.Sprintf("funnel-pvc-%s", taskID)
bucketName := "funnel-testing"

// Step 1: Create the PersistentVolume
pv := &corev1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: pvName,
Labels: map[string]string{
"app": "funnel",
"taskId": taskID,
},
},
Spec: corev1.PersistentVolumeSpec{
Capacity: corev1.ResourceList{
corev1.ResourceStorage: *storageSize,
},
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteMany, // S3 CSI supports RWX
},
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRetain,
StorageClassName: "",
ClaimRef: &corev1.ObjectReference{
Namespace: b.namespace,
Name: pvcName,
},
MountOptions: []string{
"allow-delete",
"region=us-west-2",
},
PersistentVolumeSource: corev1.PersistentVolumeSource{
CSI: &corev1.CSIPersistentVolumeSource{
Driver: "s3.csi.aws.com",
VolumeHandle: fmt.Sprintf("s3-csi-%s", taskID),
VolumeAttributes: map[string]string{
"bucketName": bucketName,
},
},
},
},
}

if _, err := clientset.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("creating PersistentVolume: %v", err)
}

// Step 2: Create the PersistentVolumeClaim
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("funnel-pvc-%s", taskID),
Name: pvcName,
Labels: map[string]string{
"app": "funnel",
"taskId": taskID,
},
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{
corev1.ReadWriteOnce,
corev1.ReadWriteMany,
},
Resources: corev1.VolumeResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: *storageSize,
},
},
StorageClassName: func() *string { s := ""; return &s }(),
VolumeName: pvName,
},
}

_, err = clientset.CoreV1().PersistentVolumeClaims(b.namespace).Create(ctx, pvc, metav1.CreateOptions{})
if _, err := clientset.CoreV1().PersistentVolumeClaims(b.namespace).Create(ctx, pvc, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("creating PersistentVolumeClaim: %v", err)
}

return nil
}

func (b *Backend) addOwnerReference(ctx context.Context, taskID string, job *v1.Job) error {
// Fetch the Job that will own the PVC and PV
jobName := fmt.Sprintf("funnel-job-%s", taskID)
_, err := b.client.Get(ctx, jobName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("creating shared PVC: %v", err)
return fmt.Errorf("fetching Job for owner reference: %v", err)
}

// Fetch the PVC
pvcName := fmt.Sprintf("funnel-pvc-%s", taskID)
clientset, err := kubernetes.NewForConfig(b.config)
if err != nil {
return fmt.Errorf("getting kubernetes client: %v", err)
}
pvc, err := clientset.CoreV1().PersistentVolumeClaims(b.namespace).Get(ctx, pvcName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("fetching PVC: %v", err)
}

// Add OwnerReference for the Job
ownerRef := metav1.OwnerReference{
APIVersion: "batch/v1",
Kind: "Job",
Name: job.Name,
UID: job.UID,
BlockOwnerDeletion: ptr.To(true),
Controller: ptr.To(true),
}

// Append the OwnerReference to the PVC
pvc.OwnerReferences = append(pvc.OwnerReferences, ownerRef)

// Update the PVC
_, err = clientset.CoreV1().PersistentVolumeClaims(b.namespace).Update(ctx, pvc, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("updating PVC: %v", err)
}

return nil
Expand Down Expand Up @@ -257,6 +348,11 @@ func (b *Backend) Submit(ctx context.Context, task *tes.Task) error {
return fmt.Errorf("creating job in backend: %v", err)
}

err = b.addOwnerReference(submitCtx, task.Id, job)
if err != nil {
return fmt.Errorf("updating PVC with OwnerReference: %v", err)
}

return nil
}

Expand Down
16 changes: 8 additions & 8 deletions config/internal/bundle.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 1 addition & 13 deletions config/kubernetes-executor-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ metadata:
name: {{.TaskId}}-{{.JobId}}
namespace: {{.Namespace}}
labels:
app: funnel-executor
job-name: {{.TaskId}}-{{.JobId}}
spec:
backoffLimit: 0
Expand All @@ -13,19 +14,6 @@ spec:
spec:
restartPolicy: Never
serviceAccountName: funnel-sa
# Setup symlinks from work dir to target paths
initContainers:
- name: setup-dirs
image: busybox
command: ["/bin/sh", "-c"]
args:
- |
# Create a directory to bind mount the worker's files
# Create parent directories for any path specified in the task
# Create the symlink from worker dir to target path
echo "initContainer: Creating directories and symlinks"
securityContext:
runAsUser: 0 # Run as root to ensure directory creation works
containers:
- name: funnel-worker-{{.TaskId}}
image: {{.Image}}
Expand Down
9 changes: 8 additions & 1 deletion config/kubernetes-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@ kind: Job
metadata:
name: {{.TaskId}}
namespace: {{.Namespace}}
labels:
app: funnel-worker
task-id: {{.TaskId}}
spec:
backoffLimit: 0
completions: 1
template:
metadata:
labels:
app: funnel-worker
task-id: {{.TaskId}}
spec:
serviceAccountName: funnel-sa
restartPolicy: Never
Expand Down Expand Up @@ -42,4 +49,4 @@ spec:

- name: funnel-storage-{{.TaskId}}
persistentVolumeClaim:
claimName: funnel-pvc-{{.TaskId}}
claimName: funnel-pvc-{{.TaskId}}
2 changes: 1 addition & 1 deletion deployments/kubernetes/helm/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.1.11
version: 0.1.12

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
Expand Down
4 changes: 1 addition & 3 deletions deployments/kubernetes/helm/templates/NOTES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
1. To access the Funnel application, use the following instructions:

To access the service locally, use:
To access the Funnel service locally, use:
kubectl --namespace {{ .Release.Namespace }} port-forward svc/{{ include "funnel.fullname" . }} 8000:8000
echo "Visit http://127.0.0.1:8000"
15 changes: 15 additions & 0 deletions deployments/kubernetes/helm/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: funnel-clusterrole
rules:
- apiGroups: [""]
resources:
- persistentvolumes
verbs:
- get
- list
- watch
- create
- update
- delete
Loading

0 comments on commit 5b38ccb

Please sign in to comment.