diff --git a/cmd/kotsadm/cli/migrate.go b/cmd/kotsadm/cli/migrate.go new file mode 100644 index 0000000000..f22feaeeb5 --- /dev/null +++ b/cmd/kotsadm/cli/migrate.go @@ -0,0 +1,106 @@ +package cli + +import ( + "os" + + "github.com/pkg/errors" + "github.com/replicatedhq/kots/pkg/filestore" + "github.com/replicatedhq/kots/pkg/persistence" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +func MigrateCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "migrate", + Short: "Trigger a migration", + Args: cobra.MinimumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return nil + }, + } + + cmd.AddCommand(MigrateS3ToRqliteCmd()) + cmd.AddCommand(MigratePVCToRqliteCmd()) + + return cmd +} + +func MigrateS3ToRqliteCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "s3-to-rqlite", + Short: "Migrate object storage from S3 to rqlite", + Long: ``, + SilenceUsage: true, + SilenceErrors: false, + PreRun: func(cmd *cobra.Command, args []string) { + viper.BindPFlags(cmd.Flags()) + }, + RunE: func(cmd *cobra.Command, args []string) error { + // Check if required env vars are set + if os.Getenv("RQLITE_URI") == "" { + return errors.New("RQLITE_URI is not set") + } + if os.Getenv("S3_ENDPOINT") == "" { + return errors.New("S3_ENDPOINT is not set") + } + if os.Getenv("S3_BUCKET_NAME") == "" { + return errors.New("S3_BUCKET_NAME is not set") + } + if os.Getenv("S3_ACCESS_KEY_ID") == "" { + return errors.New("S3_ACCESS_KEY_ID is not set") + } + if os.Getenv("S3_SECRET_ACCESS_KEY") == "" { + return errors.New("S3_SECRET_ACCESS_KEY is not set") + } + + // Initialize the rqlite DB + persistence.InitDB(os.Getenv("RQLITE_URI")) + + // Migrate from S3 to rqlite + if err := filestore.MigrateFromS3ToRqlite(cmd.Context()); err != nil { + return err + } + + return nil + }, + } + + return cmd +} + +func MigratePVCToRqliteCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "pvc-to-rqlite", + Short: "Migrate object storage from PVC to rqlite", + Long: ``, + SilenceUsage: true, + SilenceErrors: false, + PreRun: func(cmd *cobra.Command, args []string) { + viper.BindPFlags(cmd.Flags()) + }, + RunE: func(cmd *cobra.Command, args []string) error { + // Check if required env vars are set + if os.Getenv("RQLITE_URI") == "" { + return errors.New("RQLITE_URI is not set") + } + + // Check if PVC mount and the archives dir exist + if _, err := os.Stat(filestore.ArchivesDir); err != nil { + return errors.Wrap(err, "failed to stat archives dir") + } + + // Initialize the rqlite DB + persistence.InitDB(os.Getenv("RQLITE_URI")) + + // Migrate from PVC to rqlite + if err := filestore.MigrateFromPVCToRqlite(cmd.Context()); err != nil { + return err + } + + return nil + }, + } + + return cmd +} diff --git a/cmd/kotsadm/cli/root.go b/cmd/kotsadm/cli/root.go index f31f11adfc..4ec8a4066b 100644 --- a/cmd/kotsadm/cli/root.go +++ b/cmd/kotsadm/cli/root.go @@ -28,6 +28,7 @@ func RootCmd() *cobra.Command { cmd.PersistentFlags().String("log-level", "info", "set the log level") cmd.AddCommand(APICmd()) + cmd.AddCommand(MigrateCmd()) cmd.AddCommand(CompletionCmd()) viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) diff --git a/migrations/tables/object_store.yaml b/migrations/tables/object_store.yaml index 216291e97f..206cbb41f8 100644 --- a/migrations/tables/object_store.yaml +++ b/migrations/tables/object_store.yaml @@ -20,4 +20,4 @@ spec: - name: encoded_block type: text constraints: - notNull: true + notNull: true \ No newline at end of file diff --git a/pkg/filestore/blob_store.go b/pkg/filestore/blob_store.go index ce8744ea83..830573b38c 100644 --- a/pkg/filestore/blob_store.go +++ b/pkg/filestore/blob_store.go @@ -3,7 +3,6 @@ package filestore import ( "context" "io" - "io/ioutil" "os" "path/filepath" "strings" @@ -70,7 +69,7 @@ func (s *BlobStore) readFile(path string) (string, error) { } defer fileReader.Close() - tmpDir, err := ioutil.TempDir("", "kotsadm") + tmpDir, err := os.MkdirTemp("", "kotsadm") if err != nil { return "", errors.Wrap(err, "failed to create temp dir") } diff --git a/pkg/filestore/migrate.go b/pkg/filestore/migrate.go new file mode 100644 index 0000000000..80b33aabb8 --- /dev/null +++ b/pkg/filestore/migrate.go @@ -0,0 +1,201 @@ +package filestore + +import ( + "bytes" + "context" + "fmt" + "log" + "os" + "path/filepath" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + _ "github.com/mattn/go-sqlite3" + "github.com/pkg/errors" + "github.com/replicatedhq/kots/pkg/persistence" + kotss3 "github.com/replicatedhq/kots/pkg/s3" + "github.com/rqlite/gorqlite" +) + +const ( + RQLITE_S3_MIGRATION_SUCCESS_KEY = "rqlite.s3.migration.success" + RQLITE_BLOB_MIGRATION_SUCCESS_KEY = "rqlite.blob.migration.success" + RQLITE_MIGRATION_SUCCESS_VALUE = "true" +) + +func MigrateFromS3ToRqlite(ctx context.Context) error { + // Check if already migrated + rqliteDB := persistence.MustGetDBSession() + alreadyMigrated, err := isAlreadyMigrated(rqliteDB, RQLITE_S3_MIGRATION_SUCCESS_KEY) + if err != nil { + return errors.Wrap(err, "failed to check if already migrated") + } + if alreadyMigrated { + log.Println("Already migrated from S3 to rqlite. Skipping migration...") + return nil + } + + log.Println("Migrating from S3 to rqlite...") + + // Initialize rqlite store + rqliteStore := &RqliteStore{} + if err := rqliteStore.Init(); err != nil { + return errors.Wrap(err, "failed to init rqlite store") + } + if err := rqliteStore.WaitForReady(ctx); err != nil { + return errors.Wrap(err, "failed to wait for rqlite store to become ready") + } + + // Create a new S3 session + sess, err := session.NewSession(kotss3.GetConfig()) + if err != nil { + return errors.Wrap(err, "failed to create new s3 session") + } + + // Create an S3 client + s3Client := s3.New(sess) + + // List objects in the bucket + listObjectsInput := &s3.ListObjectsV2Input{ + Bucket: aws.String(os.Getenv("S3_BUCKET_NAME")), + } + listObjectsOutput, err := s3Client.ListObjectsV2(listObjectsInput) + if err != nil { + return errors.Wrap(err, "failed to list objects in bucket") + } + + // Initialize the S3 downloader + downloader := s3manager.NewDownloader(sess) + + // Process each object + for _, item := range listObjectsOutput.Contents { + if item == nil || item.Key == nil { + continue + } + key := *item.Key + log.Printf("Processing key: %s\n", key) + + // Download the object + buff := &aws.WriteAtBuffer{} + _, err := downloader.Download(buff, &s3.GetObjectInput{ + Bucket: aws.String(os.Getenv("S3_BUCKET_NAME")), + Key: aws.String(key), + }) + if err != nil { + return errors.Wrap(err, "failed to download object") + } + + // Write the object to rqlite + if err := rqliteStore.WriteArchive(key, bytes.NewReader(buff.Bytes())); err != nil { + return errors.Wrap(err, "failed to write archive to rqlite") + } + } + + // Record the migration success + query := `REPLACE INTO kotsadm_params (key, value) VALUES (?, ?)` + wr, err := rqliteDB.WriteOneParameterized(gorqlite.ParameterizedStatement{ + Query: query, + Arguments: []interface{}{RQLITE_S3_MIGRATION_SUCCESS_KEY, RQLITE_MIGRATION_SUCCESS_VALUE}, + }) + if err != nil { + return fmt.Errorf("failed to mark migration as successful: %v: %v", err, wr.Err) + } + + log.Println("Migrated from S3 to rqlite successfully!") + + return nil +} + +func MigrateFromPVCToRqlite(ctx context.Context) error { + // Check if already migrated + rqliteDB := persistence.MustGetDBSession() + alreadyMigrated, err := isAlreadyMigrated(rqliteDB, RQLITE_BLOB_MIGRATION_SUCCESS_KEY) + if err != nil { + return errors.Wrap(err, "failed to check if already migrated") + } + if alreadyMigrated { + log.Println("Already migrated from PVC to rqlite. Skipping migration...") + return nil + } + + log.Println("Migrating from PVC to rqlite...") + + // Initialize rqlite store + rqliteStore := &RqliteStore{} + if err := rqliteStore.Init(); err != nil { + return errors.Wrap(err, "failed to init rqlite store") + } + if err := rqliteStore.WaitForReady(ctx); err != nil { + return errors.Wrap(err, "failed to wait for rqlite store to become ready") + } + + // Process each object + err = filepath.Walk(ArchivesDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return errors.Wrap(err, "failed to walk path") + } + + if info.IsDir() { + return nil + } + + key, err := filepath.Rel(ArchivesDir, path) + if err != nil { + return errors.Wrap(err, "failed to get relative path") + } + log.Printf("Processing key: %s\n", key) + + // Open the file + file, err := os.Open(path) + if err != nil { + return errors.Wrap(err, "failed to open file") + } + defer file.Close() + + // Write the object to rqlite + if err := rqliteStore.WriteArchive(key, file); err != nil { + return errors.Wrap(err, "failed to write archive to rqlite") + } + + return nil + }) + if err != nil { + return errors.Wrap(err, "failed to walk PVC mount") + } + + // Record the migration success + query := `REPLACE INTO kotsadm_params (key, value) VALUES (?, ?)` + wr, err := rqliteDB.WriteOneParameterized(gorqlite.ParameterizedStatement{ + Query: query, + Arguments: []interface{}{RQLITE_BLOB_MIGRATION_SUCCESS_KEY, RQLITE_MIGRATION_SUCCESS_VALUE}, + }) + if err != nil { + return fmt.Errorf("failed to mark migration as successful: %v: %v", err, wr.Err) + } + + log.Println("Migrated from PVC to rqlite successfully!") + + return nil +} + +func isAlreadyMigrated(rqliteDB *gorqlite.Connection, migrationKey string) (bool, error) { + rows, err := rqliteDB.QueryOneParameterized(gorqlite.ParameterizedStatement{ + Query: `SELECT value FROM kotsadm_params WHERE key = ?`, + Arguments: []interface{}{migrationKey}, + }) + if err != nil { + return false, fmt.Errorf("failed to query: %v: %v", err, rows.Err) + } + if !rows.Next() { + return false, nil + } + + var value string + if err := rows.Scan(&value); err != nil { + return false, errors.Wrap(err, "failed to scan") + } + + return value == RQLITE_MIGRATION_SUCCESS_VALUE, nil +} diff --git a/pkg/filestore/rqlite_store.go b/pkg/filestore/rqlite_store.go new file mode 100644 index 0000000000..6f64054334 --- /dev/null +++ b/pkg/filestore/rqlite_store.go @@ -0,0 +1,114 @@ +package filestore + +import ( + "context" + "encoding/base64" + "fmt" + "io" + "os" + + "github.com/pkg/errors" + "github.com/replicatedhq/kots/pkg/persistence" + "github.com/rqlite/gorqlite" +) + +var ( + ErrNotFound = errors.New("not found") +) + +type RqliteStore struct { +} + +func (s *RqliteStore) Init() error { + // there's no initialization, the table is created by schemahero + return nil +} + +func (s *RqliteStore) WaitForReady(ctx context.Context) error { + // there's no waiting, the table must exist at this point + db := persistence.MustGetDBSession() + + query := `SELECT filepath FROM object_store LIMIT 1` + _, err := db.QueryOne(query) + if err != nil { + return errors.Wrap(err, "failed to query") + } + + return nil +} + +func (s *RqliteStore) WriteArchive(outputPath string, body io.ReadSeeker) error { + db := persistence.MustGetDBSession() + + bodyBytes, err := io.ReadAll(body) + if err != nil { + return errors.Wrap(err, "failed to read body") + } + + query := ` +INSERT INTO object_store (filepath, encoded_block) +VALUES (?, ?) +ON CONFLICT (filepath) DO UPDATE SET + encoded_block = excluded.encoded_block +` + wr, err := db.WriteOneParameterized(gorqlite.ParameterizedStatement{ + Query: query, + Arguments: []interface{}{outputPath, base64.StdEncoding.EncodeToString(bodyBytes)}, + }) + if err != nil { + return fmt.Errorf("failed to write: %v: %v", err, wr.Err) + } + + return nil +} + +func (s *RqliteStore) ReadArchive(path string) (string, error) { + db := persistence.MustGetDBSession() + + query := `SELECT encoded_block FROM object_store WHERE filepath = ?` + rows, err := db.QueryOneParameterized(gorqlite.ParameterizedStatement{ + Query: query, + Arguments: []interface{}{path}, + }) + if err != nil { + return "", fmt.Errorf("failed to read: %v: %v", err, rows.Err) + } + if !rows.Next() { + return "", ErrNotFound + } + + var encoded string + if err := rows.Scan(&encoded); err != nil { + return "", errors.Wrap(err, "failed to scan") + } + + decoded, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + return "", errors.Wrap(err, "failed to decode") + } + + tmpFile, err := os.CreateTemp("", "kotsadm") + if err != nil { + return "", errors.Wrap(err, "failed to create temp file") + } + if err := os.WriteFile(tmpFile.Name(), decoded, 0644); err != nil { + return "", errors.Wrap(err, "failed to write to temp file") + } + + return tmpFile.Name(), nil +} + +func (s *RqliteStore) DeleteArchive(path string) error { + db := persistence.MustGetDBSession() + + query := `DELETE FROM object_store WHERE filepath = ?` + wr, err := db.WriteOneParameterized(gorqlite.ParameterizedStatement{ + Query: query, + Arguments: []interface{}{path}, + }) + if err != nil { + return fmt.Errorf("failed to delete: %v: %v", err, wr.Err) + } + + return nil +} diff --git a/pkg/filestore/store.go b/pkg/filestore/store.go index 9c6908a9f3..bd8cb6ad79 100644 --- a/pkg/filestore/store.go +++ b/pkg/filestore/store.go @@ -19,8 +19,11 @@ func GetStore() FileStore { } func storeFromEnv() FileStore { - if os.Getenv("S3_ENDPOINT") == "" { + if os.Getenv("S3_ENDPOINT") != "" { + return &S3Store{} + } + if _, err := os.Stat("/kotsadmdata"); err == nil { return &BlobStore{} } - return &S3Store{} + return &RqliteStore{} } diff --git a/pkg/store/kotsstore/kots_store.go b/pkg/store/kotsstore/kots_store.go index 9492fc2d0f..6d4f234421 100644 --- a/pkg/store/kotsstore/kots_store.go +++ b/pkg/store/kotsstore/kots_store.go @@ -119,7 +119,7 @@ func (s *KOTSStore) IsNotFound(err error) bool { } cause := errors.Cause(err) - if cause == ErrNotFound { + if cause == ErrNotFound || cause == filestore.ErrNotFound { return true }