From 69920979f7181d13bf1e61a588cea6ff1e6270b2 Mon Sep 17 00:00:00 2001 From: Chin Huang Date: Wed, 1 Mar 2023 01:47:50 -0800 Subject: [PATCH] feat: Add pullman PVC storage provider (#36) * Add pullman PVC storage provider Add pullman PVC storage provider to support models in PVC Signed-off-by: Chin Huang * use symlink, restructure, cleanup based on review, temporary use join instead of SecureJoin due to use of symlinks Signed-off-by: Chin Huang * add more error handling Signed-off-by: Chin Huang --------- Signed-off-by: Chin Huang --- model-serving-puller/puller/puller.go | 12 +- pullman/storageproviders/pvc/provider.go | 128 ++++++++++++++++++ pullman/storageproviders/pvc/provider_test.go | 109 +++++++++++++++ 3 files changed, 245 insertions(+), 4 deletions(-) create mode 100644 pullman/storageproviders/pvc/provider.go create mode 100644 pullman/storageproviders/pvc/provider_test.go diff --git a/model-serving-puller/puller/puller.go b/model-serving-puller/puller/puller.go index e32097f8..785ef1dc 100644 --- a/model-serving-puller/puller/puller.go +++ b/model-serving-puller/puller/puller.go @@ -30,6 +30,7 @@ import ( _ "github.com/kserve/modelmesh-runtime-adapter/pullman/storageproviders/azure" _ "github.com/kserve/modelmesh-runtime-adapter/pullman/storageproviders/gcs" _ "github.com/kserve/modelmesh-runtime-adapter/pullman/storageproviders/http" + _ "github.com/kserve/modelmesh-runtime-adapter/pullman/storageproviders/pvc" _ "github.com/kserve/modelmesh-runtime-adapter/pullman/storageproviders/s3" ) @@ -183,10 +184,13 @@ func (s *Puller) ProcessLoadModelRequest(ctx context.Context, req *mmesh.LoadMod } // update model path to an absolute path in the local filesystem - modelFullPath, joinErr := util.SecureJoin(modelDir, modelPathFilename) - if joinErr != nil { - return nil, fmt.Errorf("Error joining paths '%s' and '%s': %w", modelDir, modelPathFilename, joinErr) - } + // commment out SecureJoin since it doesn't handle symlinks well + // modelFullPath, joinErr := util.SecureJoin(modelDir, modelPathFilename) + // if joinErr != nil { + // return nil, fmt.Errorf("Error joining paths '%s' and '%s': %w", modelDir, modelPathFilename, joinErr) + // } + modelFullPath := modelDir + string(filepath.Separator) + modelPathFilename + req.ModelPath = modelFullPath // if it was included, update schema path to an absolute path in the local filesystem diff --git a/pullman/storageproviders/pvc/provider.go b/pullman/storageproviders/pvc/provider.go new file mode 100644 index 00000000..68139b50 --- /dev/null +++ b/pullman/storageproviders/pvc/provider.go @@ -0,0 +1,128 @@ +// Copyright 2022 IBM Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package pvcprovider + +import ( + "context" + "fmt" + "os" + "path/filepath" + + "github.com/go-logr/logr" + + "github.com/kserve/modelmesh-runtime-adapter/internal/util" + "github.com/kserve/modelmesh-runtime-adapter/pullman" +) + +const ( + // config fields + configPVCName = "name" + + // defaults + defaultPVCMountBase = "/pvc_mounts" +) + +type pvcProvider struct { + pvcMountBase string +} + +// pvcProvider implements StorageProvider +var _ pullman.StorageProvider = (*pvcProvider)(nil) + +func (p pvcProvider) GetKey(config pullman.Config) string { + // Since the same instance of the repository can be used for all PVCs, there is no need to distinguish them here. + return "" +} + +func (p pvcProvider) NewRepository(config pullman.Config, log logr.Logger) (pullman.RepositoryClient, error) { + if p.pvcMountBase == "" { + p.pvcMountBase = defaultPVCMountBase + } + + fileInfo, err := os.Stat(p.pvcMountBase) + if err != nil { + if os.IsNotExist(err) { + return nil, fmt.Errorf("the PVC mount base '%s' doesn't exist: %w", p.pvcMountBase, err) + } + return nil, fmt.Errorf("failed to access the PVC mount base '%s': %w", p.pvcMountBase, err) + } + + if !fileInfo.IsDir() { + return nil, fmt.Errorf("the PVC mount base '%s' is not a directory", p.pvcMountBase) + } + + return &pvcRepositoryClient{ + pvcProvider: p, + log: log, + }, nil +} + +type pvcRepositoryClient struct { + pvcProvider pvcProvider + log logr.Logger +} + +// pvcRepositoryClient implements RepositoryClient +var _ pullman.RepositoryClient = (*pvcRepositoryClient)(nil) + +func (r *pvcRepositoryClient) Pull(ctx context.Context, pc pullman.PullCommand) error { + targets := pc.Targets + destDir := pc.Directory + + // Process per-command configuration + pvcName, ok := pullman.GetString(pc.RepositoryConfig, configPVCName) + if !ok { + return fmt.Errorf("required configuration '%s' missing from command", configPVCName) + } + + // create destination directories + if mkdirErr := os.MkdirAll(destDir, 0755); mkdirErr != nil { + return fmt.Errorf("unable to create directories '%s': %w", destDir, mkdirErr) + } + + pvcDir, joinErr := util.SecureJoin(r.pvcProvider.pvcMountBase, pvcName) + if joinErr != nil { + return fmt.Errorf("unable to join paths '%s' and '%s': %v", r.pvcProvider.pvcMountBase, pvcName, joinErr) + } + r.log.V(1).Info("The PVC directory is set", "pvcDir", pvcDir) + + for _, pt := range targets { + fullModelPath, joinErr := util.SecureJoin(pvcDir, pt.RemotePath) + if joinErr != nil { + return fmt.Errorf("unable to join paths '%s' and '%s': %v", pvcDir, pt.RemotePath, joinErr) + } + r.log.V(1).Info("The model path is set", "fullModelPath", fullModelPath) + + // check the local model path /pvcMountBase/pvcName/modelPath exists + if _, err := os.Stat(fullModelPath); err != nil { + return fmt.Errorf("unable to access model local path '%s': %w", fullModelPath, err) + } + + // create symlink + linkPath, err := util.SecureJoin(destDir, filepath.Base(fullModelPath)) + if err != nil { + return fmt.Errorf("unable to join paths '%s' and '%s': %v", destDir, filepath.Base(fullModelPath), err) + } + if err := os.Symlink(fullModelPath, linkPath); err != nil { + return fmt.Errorf("unable to create symlink '%s': %w", linkPath, err) + } + } + + return nil +} + +func init() { + p := pvcProvider{} + pullman.RegisterProvider("pvc", p) +} diff --git a/pullman/storageproviders/pvc/provider_test.go b/pullman/storageproviders/pvc/provider_test.go new file mode 100644 index 00000000..66685257 --- /dev/null +++ b/pullman/storageproviders/pvc/provider_test.go @@ -0,0 +1,109 @@ +// Copyright 2022 IBM Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package pvcprovider + +import ( + "context" + "os" + "testing" + + "github.com/go-logr/logr" + "github.com/kserve/modelmesh-runtime-adapter/pullman" + "github.com/stretchr/testify/assert" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +func newPVCProvider(t *testing.T) (pvcProvider, logr.Logger) { + p := pvcProvider{} + log := zap.New() + return p, log +} + +func newPVCRepositoryClient(t *testing.T) *pvcRepositoryClient { + pvcMountBase, _ := os.Getwd() + p, log := newPVCProvider(t) + p.pvcMountBase = pvcMountBase + pvcrc := pvcRepositoryClient{ + pvcProvider: p, + log: log, + } + return &pvcrc +} + +func Test_NewRepository(t *testing.T) { + pvcMountBase, _ := os.Getwd() + p, log := newPVCProvider(t) + p.pvcMountBase = pvcMountBase + c := pullman.NewRepositoryConfig("pvc", nil) + c.Set("name", "pvcName") + _, err := p.NewRepository(c, log) + assert.NoError(t, err) +} + +func Test_Verify_Directory(t *testing.T) { + pvcName := "pvcName" + modelDir := "modelDir" + modelID := "testId" + pvcRc := newPVCRepositoryClient(t) + c := pullman.NewRepositoryConfig("pvc", nil) + c.Set("name", pvcName) + + pvcNameDir := pvcRc.pvcProvider.pvcMountBase + "/" + pvcName + pvcModelDir := pvcNameDir + "/" + modelDir + serveModelDir := pvcRc.pvcProvider.pvcMountBase + "/" + modelID + + inputPullCommand := pullman.PullCommand{ + RepositoryConfig: c, + Directory: serveModelDir, + Targets: []pullman.Target{ + { + RemotePath: modelDir, + }, + }, + } + + // should return error because the directory doesn't exist + err := pvcRc.Pull(context.Background(), inputPullCommand) + assert.Error(t, err) + + err = os.MkdirAll(pvcModelDir, os.ModePerm) + assert.NoError(t, err) + + // should not return error because the directory exists + err = pvcRc.Pull(context.Background(), inputPullCommand) + assert.NoError(t, err) + + err = os.RemoveAll(pvcNameDir) + assert.NoError(t, err) + + err = os.RemoveAll(serveModelDir) + assert.NoError(t, err) +} + +func Test_GetKey(t *testing.T) { + provider := pvcProvider{} + + createTestConfig := func(pvcName string) *pullman.RepositoryConfig { + config := pullman.NewRepositoryConfig("pvc", nil) + config.Set(configPVCName, pvcName) + return config + } + + // different pvc names should have the same key + t.Run("shouldChangeForTokenUri", func(t *testing.T) { + config1 := createTestConfig("pvcName1") + config2 := createTestConfig("pvcName2") + assert.Equal(t, provider.GetKey(config1), provider.GetKey(config2)) + }) +}