Skip to content

Commit

Permalink
Merge pull request #245 from grycap/dev-slangarita
Browse files Browse the repository at this point in the history
Added support to mount a storage provider folder on OSCAR containers
  • Loading branch information
catttam authored Jun 21, 2024
2 parents 8b130e9 + 84b5f92 commit 527b70a
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 28 deletions.
12 changes: 11 additions & 1 deletion docs/fdl.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ functions:
cpu: '1.0'
image: grycap/imagemagick
script: grayify.sh
interlink_node_name: vega-new-vk
expose:
min_scale: 3
max_scale: 7
port: 5000
cpu_threshold: 70
nodePort: 30500
set_auth: true
rewrite_target: true
default_command: true
input:
- storage_provider: minio.default
path: example-workflow/med
Expand Down Expand Up @@ -104,6 +109,7 @@ storage_providers:
| `environment` </br> *[EnvVarsMap](#envvarsmap)* | The user-defined environment variables assigned to the service. Optional |
| `annotations` </br> *map[string]string* | User-defined Kubernetes [annotations](https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/) to be set in job's definition. Optional |
| `labels` </br> *map[string]string* | User-defined Kubernetes [labels](https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/) to be set in job's definition. Optional |
| `interlink_node_name` </br> *string* | Name of the virtual kubelet node (if you are using InterLink nodes) Optional

## SynchronousSettings

Expand All @@ -119,7 +125,11 @@ storage_providers:
| `min_scale` </br> *integer* | Minimum number of active replicas (pods) for the service. Optional. (default: 1) |
| `max_scale` </br> *integer* | Maximum number of active replicas (pods) for the service. Optional. (default: 10 (Unlimited)) |
| `port` </br> *integer* | Port inside the container where the API is exposed. (value: 0 , the service wont be exposed.) |
| `cpu_threshold` </br> *integer* | Percent of use of CPU before creating other pod (default: 80 max:100) |
| `cpu_threshold` </br> *integer* | Percent of use of CPU before creating other pod (default: 80 max:100). Optional. |
| `nodePort` </br> *integer* | Change the access method from the domain name to the public ip. Optional. |
| `set_auth` </br> *bool* | Create credentials for the service, composed of the service name as the user and the service token as the password. (default: false). Optional. |
| `rewrite_target` </br> *bool* | Target the URI where the traffic is redirected. (default: false). Optional. |
| `default_command` </br> *bool* | Select between executing the container's default command and executing the script inside the container. (default: false). Optional. |

## Replica

Expand Down
Binary file modified docs/images/interlink.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion examples/cowsay/cowsay.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ functions:
oscar:
- oscar-cluster:
name: cowsay
cpu: 1.0
cpu: '1.0'
memory: 1Gi
image: ghcr.io/grycap/cowsay
script: script.sh
Expand Down
22 changes: 22 additions & 0 deletions examples/expose_services/jupyter/jupyter_expose_mount.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
functions:
oscar:
- oscar-cluster:
name: jupyter
memory: 2Gi
cpu: '1.0'
image: jupyter/base-notebook
script: jupyterscript2.sh
environment:
Variables:
JUPYTER_TOKEN: "root"
JHUB_BASE_URL: "/system/services/jupyter/exposed"
JUPYTER_DIRECTORY: "/mnt"
mount:
storage_provider: minio.default
path: /prueba
expose:
min_scale: 1
max_scale: 1
api_port: 8888
cpu_threshold: 90
rewrite_target: true
3 changes: 2 additions & 1 deletion examples/expose_services/jupyter/jupyterscript2.sh
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
start-notebook.sh --NotebookApp.token=$JUPYTER_TOKEN
sleep 10
start-notebook.sh --NotebookApp.allow_root=True --Session.username=root --NotebookApp.base_url=$JHUB_BASE_URL --NotebookApp.token=$JUPYTER_TOKEN --NotebookApp.notebook_dir=$JUPYTER_DIRECTORY
91 changes: 69 additions & 22 deletions pkg/handlers/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,7 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *

// Create input buckets
for _, in := range service.Input {
// Split input provider
provSlice := strings.SplitN(strings.TrimSpace(in.Provider), types.ProviderSeparator, 2)
if len(provSlice) == 1 {
provName = strings.ToLower(provSlice[0])
// Set "default" provider ID
provID = types.DefaultProvider
} else {
provName = strings.ToLower(provSlice[0])
provID = provSlice[1]
}
provID, provName = getProviderInfo(in.Provider)

// Only allow input from MinIO and dCache
if provName != types.MinIOName && provName != types.WebDavName {
Expand Down Expand Up @@ -327,17 +318,7 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *

// Create output buckets
for _, out := range service.Output {
// Split input provider
provSlice := strings.SplitN(strings.TrimSpace(out.Provider), types.ProviderSeparator, 2)
if len(provSlice) == 1 {
provName = strings.ToLower(provSlice[0])
// Set "default" provider ID
provID = types.DefaultProvider
} else {
provName = strings.ToLower(provSlice[0])
provID = provSlice[1]
}

provID, provName = getProviderInfo(out.Provider)
// Check if the provider identifier is defined in StorageProviders
if !isStorageProviderDefined(provName, provID, service.StorageProviders) {
disableInputNotifications(service.GetMinIOWebhookARN(), service.Input, cfg.MinIOProvider)
Expand Down Expand Up @@ -401,6 +382,57 @@ func createBuckets(service *types.Service, cfg *types.Config, minIOAdminClient *
}
}

if service.Mount.Provider != "" {
provID, provName = getProviderInfo(service.Mount.Provider)
if provName == types.MinIOName {
// Check if the provider identifier is defined in StorageProviders
if !isStorageProviderDefined(provName, provID, service.StorageProviders) {
return fmt.Errorf("the StorageProvider \"%s.%s\" is not defined", provName, provID)
}

path := strings.Trim(service.Mount.Path, " /")
// Split buckets and folders from path
splitPath := strings.SplitN(path, "/", 2)

// Currently only MinIO/S3 are supported
// Use the appropriate client
if provName == types.MinIOName {
s3Client = service.StorageProviders.MinIO[provID].GetS3Client()
} else {
s3Client = service.StorageProviders.S3[provID].GetS3Client()
}
// Create bucket
_, err := s3Client.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(splitPath[0]),
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
// Check if the error is caused because the bucket already exists
if aerr.Code() == s3.ErrCodeBucketAlreadyExists || aerr.Code() == s3.ErrCodeBucketAlreadyOwnedByYou {
log.Printf("The bucket \"%s\" already exists\n", splitPath[0])
} else {
return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err)
}
} else {
return fmt.Errorf("error creating bucket %s: %v", splitPath[0], err)
}
}
// Create folder(s)
if len(splitPath) == 2 {
// Add "/" to the end of the key in order to create a folder
folderKey := fmt.Sprintf("%s/", splitPath[1])
_, err := s3Client.PutObject(&s3.PutObjectInput{
Bucket: aws.String(splitPath[0]),
Key: aws.String(folderKey),
})
if err != nil {
return fmt.Errorf("error creating folder \"%s\" in bucket \"%s\": %v", folderKey, splitPath[0], err)
}
}
}

}

return nil
}

Expand All @@ -419,6 +451,21 @@ func isStorageProviderDefined(storageName string, storageID string, providers *t
return ok
}

func getProviderInfo(rawInfo string) (string, string) {
var provID, provName string
// Split input provider
provSlice := strings.SplitN(strings.TrimSpace(rawInfo), types.ProviderSeparator, 2)
if len(provSlice) == 1 {
provName = strings.ToLower(provSlice[0])
// Set "default" provider ID
provID = types.DefaultProvider
} else {
provName = strings.ToLower(provSlice[0])
provID = provSlice[1]
}
return provID, provName
}

func checkIdentity(service *types.Service, cfg *types.Config, authHeader string) error {
oidcManager, _ := auth.NewOIDCManager(cfg.OIDCIssuer, cfg.OIDCSubject, cfg.OIDCGroups)
rawToken := strings.TrimPrefix(authHeader, "Bearer ")
Expand All @@ -430,7 +477,7 @@ func checkIdentity(service *types.Service, cfg *types.Config, authHeader string)
}

if !hasVO {
return fmt.Errorf("This user isn't enrrolled on the vo: %v", service.VO)
return fmt.Errorf("this user isn't enrrolled on the vo: %v", service.VO)
}

service.Labels["vo"] = service.VO
Expand Down
5 changes: 4 additions & 1 deletion pkg/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back
podSpec.Containers[i].Env = append(podSpec.Containers[i].Env, resourceIDVar)
}
}
if service.Mount.Provider != "" {
types.SetMount(podSpec, *service, cfg)
podSpec.Containers[0].Args = []string{"-c", args + ";echo \"I finish\" > /tmpfolder/finish-file;"}
}

// Delegate job if can't be scheduled and has defined replicas
if rm != nil && service.HasReplicas() {
Expand Down Expand Up @@ -207,7 +211,6 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back
c.String(http.StatusInternalServerError, err.Error())
return
}

c.Status(http.StatusCreated)
}
}
19 changes: 17 additions & 2 deletions pkg/types/expose.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,24 @@ func DeleteExpose(name string, kubeClientset kubernetes.Interface, cfg *Config)

ingressType := existsIngress(name, cfg.ServicesNamespace, kubeClientset)
if ingressType {
err = deleteIngress(name, kubeClientset, cfg)
err = deleteIngress(getIngressName(name), kubeClientset, cfg)
if err != nil {
return fmt.Errorf("error deleting ingress for exposed service '%s': %v", name, err)
}
}

termination := int64(0)
back := metav1.DeletePropagationBackground
delete := metav1.DeleteOptions{
GracePeriodSeconds: &termination,
PropagationPolicy: &back,
}
listOpts := metav1.ListOptions{
LabelSelector: "app=oscar-svc-exp-" + name,
}
err = kubeClientset.CoreV1().Pods(cfg.ServicesNamespace).DeleteCollection(context.TODO(), delete, listOpts)
if err != nil {
return fmt.Errorf("error deleting pods of exposed service '%s': %v", name, err)
}
return nil
}

Expand Down Expand Up @@ -258,6 +270,9 @@ func getPodTemplateSpec(service Service, cfg *Config) v1.PodTemplateSpec {
}
var num int32 = 0777
podSpec.Volumes[0].VolumeSource.ConfigMap.DefaultMode = &num
if service.Mount.Provider != "" {
SetMount(podSpec, service, cfg)
}
template := v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Name: service.Name,
Expand Down
Loading

0 comments on commit 527b70a

Please sign in to comment.