From 7f6ff10b906fad6b10556be785bf511d0f3437d5 Mon Sep 17 00:00:00 2001 From: Peter Sabaini Date: Fri, 22 Sep 2023 18:00:48 +0200 Subject: [PATCH] WIP: Fix disk add race Get rid of the racy OSD id calculation, make the database the single source of truth for OSD ids. Signed-off-by: Peter Sabaini --- microceph/ceph/osd.go | 70 +--- microceph/database/disk.mapper.go | 388 -------------------- microceph/database/disk_extras.go | 48 ++- microceph/database/{disk.go => diskpath.go} | 48 ++- microceph/database/diskpath.mapper.go | 346 +++++++++++++++++ microceph/database/schema.go | 20 + 6 files changed, 416 insertions(+), 504 deletions(-) delete mode 100644 microceph/database/disk.mapper.go rename microceph/database/{disk.go => diskpath.go} (53%) create mode 100644 microceph/database/diskpath.mapper.go diff --git a/microceph/ceph/osd.go b/microceph/ceph/osd.go index f557d715..0483413d 100644 --- a/microceph/ceph/osd.go +++ b/microceph/ceph/osd.go @@ -12,8 +12,6 @@ import ( "os" "os/exec" "path/filepath" - "strconv" - "strings" "syscall" "time" @@ -31,56 +29,6 @@ import ( "github.com/canonical/microceph/microceph/database" ) -func nextOSD(s *state.State) (int64, error) { - // Get the used OSD ids from Ceph. - osds, err := cephRun("osd", "ls") - if err != nil { - return -1, err - } - - cephIds := []int64{} - for _, line := range strings.Split(osds, "\n") { - if line == "" { - continue - } - - id, err := strconv.ParseInt(line, 10, 64) - if err != nil { - continue - } - - cephIds = append(cephIds, id) - } - - // Get the used OSD ids from the database. - dbIds := []int64{} - err = s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { - disks, err := database.GetDisks(ctx, tx) - if err != nil { - return fmt.Errorf("Failed to fetch disks: %w", err) - } - - for _, disk := range disks { - dbIds = append(dbIds, int64(disk.OSD)) - } - - return nil - }) - if err != nil { - return -1, err - } - - // Find next available. - nextID := int64(0) - for { - if !shared.Int64InSlice(nextID, cephIds) && !shared.Int64InSlice(nextID, dbIds) { - return nextID, nil - } - - nextID++ - } -} - func prepareDisk(disk *types.DiskParameter, suffix string, osdPath string, osdID int64) error { if disk.Wipe { err := timeoutWipe(disk.Path) @@ -362,20 +310,13 @@ func AddOSD(s *state.State, data types.DiskParameter, wal *types.DiskParameter, return fmt.Errorf("Failed to set stable disk path: %w", err) } - // Get a OSD number. - nr, err := nextOSD(s) - if err != nil { - return fmt.Errorf("Failed to find next OSD number: %w", err) - } - logger.Debugf("nextOSD number is %d for disk %s", nr, data.Path) - // Record the disk. + var nr int64 err = s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { - _, err := database.CreateDisk(ctx, tx, database.Disk{Member: s.Name(), Path: data.Path, OSD: int(nr)}) + nr, err = database.CreateDiskPath(ctx, tx, database.DiskPath{Member: s.Name(), Path: data.Path}) if err != nil { return fmt.Errorf("Failed to record disk: %w", err) } - return nil }) if err != nil { @@ -384,9 +325,6 @@ func AddOSD(s *state.State, data types.DiskParameter, wal *types.DiskParameter, logger.Debugf("Created disk record for osd.%d", nr) - // Keep the old path in case it changes after encrypting. - oldPath := data.Path - dataPath := filepath.Join(os.Getenv("SNAP_COMMON"), "data") osdDataPath := filepath.Join(dataPath, "osd", fmt.Sprintf("ceph-%d", nr)) @@ -394,7 +332,7 @@ func AddOSD(s *state.State, data types.DiskParameter, wal *types.DiskParameter, revert.Add(func() { os.RemoveAll(osdDataPath) s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { - database.DeleteDisk(ctx, tx, s.Name(), oldPath) + database.DeleteDiskPath(ctx, tx, s.Name(), data.Path) return nil }) }) @@ -514,7 +452,7 @@ func sanityCheck(s common.StateInterface, osd int64) error { return err } if !exists { - return fmt.Errorf("ods.%d not found", osd) + return fmt.Errorf("osd.%d not found", osd) } return nil } diff --git a/microceph/database/disk.mapper.go b/microceph/database/disk.mapper.go deleted file mode 100644 index 1e78e4b7..00000000 --- a/microceph/database/disk.mapper.go +++ /dev/null @@ -1,388 +0,0 @@ -package database - -// The code below was generated by lxd-generate - DO NOT EDIT! - -import ( - "context" - "database/sql" - "errors" - "fmt" - "net/http" - "strings" - - "github.com/canonical/lxd/lxd/db/query" - "github.com/canonical/lxd/shared/api" - "github.com/canonical/microcluster/cluster" -) - -var _ = api.ServerEnvironment{} - -var diskObjects = cluster.RegisterStmt(` -SELECT disks.id, internal_cluster_members.name AS member, disks.osd, disks.path - FROM disks - JOIN internal_cluster_members ON disks.member_id = internal_cluster_members.id - ORDER BY internal_cluster_members.id, disks.osd -`) - -var diskObjectsByMember = cluster.RegisterStmt(` -SELECT disks.id, internal_cluster_members.name AS member, disks.osd, disks.path - FROM disks - JOIN internal_cluster_members ON disks.member_id = internal_cluster_members.id - WHERE ( member = ? ) - ORDER BY internal_cluster_members.id, disks.osd -`) - -var diskObjectsByMemberAndPath = cluster.RegisterStmt(` -SELECT disks.id, internal_cluster_members.name AS member, disks.osd, disks.path - FROM disks - JOIN internal_cluster_members ON disks.member_id = internal_cluster_members.id - WHERE ( member = ? AND disks.path = ? ) - ORDER BY internal_cluster_members.id, disks.osd -`) - -var diskID = cluster.RegisterStmt(` -SELECT disks.id FROM disks - JOIN internal_cluster_members ON disks.member_id = internal_cluster_members.id - WHERE internal_cluster_members.name = ? AND disks.osd = ? -`) - -var diskCreate = cluster.RegisterStmt(` -INSERT INTO disks (member_id, osd, path) - VALUES ((SELECT internal_cluster_members.id FROM internal_cluster_members WHERE internal_cluster_members.name = ?), ?, ?) -`) - -var diskDeleteByMember = cluster.RegisterStmt(` -DELETE FROM disks WHERE member_id = (SELECT internal_cluster_members.id FROM internal_cluster_members WHERE internal_cluster_members.name = ?) -`) - -var diskDeleteByMemberAndPath = cluster.RegisterStmt(` -DELETE FROM disks WHERE member_id = (SELECT internal_cluster_members.id FROM internal_cluster_members WHERE internal_cluster_members.name = ?) AND path = ? -`) - -var diskUpdate = cluster.RegisterStmt(` -UPDATE disks - SET member_id = (SELECT internal_cluster_members.id FROM internal_cluster_members WHERE internal_cluster_members.name = ?), osd = ?, path = ? - WHERE id = ? -`) - -// diskColumns returns a string of column names to be used with a SELECT statement for the entity. -// Use this function when building statements to retrieve database entries matching the Disk entity. -func diskColumns() string { - return "disks.id, internal_cluster_members.name AS member, disks.osd, disks.path" -} - -// getDisks can be used to run handwritten sql.Stmts to return a slice of objects. -func getDisks(ctx context.Context, stmt *sql.Stmt, args ...any) ([]Disk, error) { - objects := make([]Disk, 0) - - dest := func(scan func(dest ...any) error) error { - d := Disk{} - err := scan(&d.ID, &d.Member, &d.OSD, &d.Path) - if err != nil { - return err - } - - objects = append(objects, d) - - return nil - } - - err := query.SelectObjects(ctx, stmt, dest, args...) - if err != nil { - return nil, fmt.Errorf("Failed to fetch from \"disks\" table: %w", err) - } - - return objects, nil -} - -// getDisksRaw can be used to run handwritten query strings to return a slice of objects. -func getDisksRaw(ctx context.Context, tx *sql.Tx, sql string, args ...any) ([]Disk, error) { - objects := make([]Disk, 0) - - dest := func(scan func(dest ...any) error) error { - d := Disk{} - err := scan(&d.ID, &d.Member, &d.OSD, &d.Path) - if err != nil { - return err - } - - objects = append(objects, d) - - return nil - } - - err := query.Scan(ctx, tx, sql, dest, args...) - if err != nil { - return nil, fmt.Errorf("Failed to fetch from \"disks\" table: %w", err) - } - - return objects, nil -} - -// GetDisks returns all available disks. -// generator: disk GetMany -func GetDisks(ctx context.Context, tx *sql.Tx, filters ...DiskFilter) ([]Disk, error) { - var err error - - // Result slice. - objects := make([]Disk, 0) - - // Pick the prepared statement and arguments to use based on active criteria. - var sqlStmt *sql.Stmt - args := []any{} - queryParts := [2]string{} - - if len(filters) == 0 { - sqlStmt, err = cluster.Stmt(tx, diskObjects) - if err != nil { - return nil, fmt.Errorf("Failed to get \"diskObjects\" prepared statement: %w", err) - } - } - - for i, filter := range filters { - if filter.Member != nil && filter.Path != nil && filter.OSD == nil { - args = append(args, []any{filter.Member, filter.Path}...) - if len(filters) == 1 { - sqlStmt, err = cluster.Stmt(tx, diskObjectsByMemberAndPath) - if err != nil { - return nil, fmt.Errorf("Failed to get \"diskObjectsByMemberAndPath\" prepared statement: %w", err) - } - - break - } - - query, err := cluster.StmtString(diskObjectsByMemberAndPath) - if err != nil { - return nil, fmt.Errorf("Failed to get \"diskObjects\" prepared statement: %w", err) - } - - parts := strings.SplitN(query, "ORDER BY", 2) - if i == 0 { - copy(queryParts[:], parts) - continue - } - - _, where, _ := strings.Cut(parts[0], "WHERE") - queryParts[0] += "OR" + where - } else if filter.Member != nil && filter.Path == nil && filter.OSD == nil { - args = append(args, []any{filter.Member}...) - if len(filters) == 1 { - sqlStmt, err = cluster.Stmt(tx, diskObjectsByMember) - if err != nil { - return nil, fmt.Errorf("Failed to get \"diskObjectsByMember\" prepared statement: %w", err) - } - - break - } - - query, err := cluster.StmtString(diskObjectsByMember) - if err != nil { - return nil, fmt.Errorf("Failed to get \"diskObjects\" prepared statement: %w", err) - } - - parts := strings.SplitN(query, "ORDER BY", 2) - if i == 0 { - copy(queryParts[:], parts) - continue - } - - _, where, _ := strings.Cut(parts[0], "WHERE") - queryParts[0] += "OR" + where - } else if filter.Member == nil && filter.Path == nil && filter.OSD == nil { - return nil, fmt.Errorf("Cannot filter on empty DiskFilter") - } else { - return nil, fmt.Errorf("No statement exists for the given Filter") - } - } - - // Select. - if sqlStmt != nil { - objects, err = getDisks(ctx, sqlStmt, args...) - } else { - queryStr := strings.Join(queryParts[:], "ORDER BY") - objects, err = getDisksRaw(ctx, tx, queryStr, args...) - } - - if err != nil { - return nil, fmt.Errorf("Failed to fetch from \"disks\" table: %w", err) - } - - return objects, nil -} - -// GetDisk returns the disk with the given key. -// generator: disk GetOne -func GetDisk(ctx context.Context, tx *sql.Tx, member string, osd int) (*Disk, error) { - filter := DiskFilter{} - filter.Member = &member - filter.OSD = &osd - - objects, err := GetDisks(ctx, tx, filter) - if err != nil { - return nil, fmt.Errorf("Failed to fetch from \"disks\" table: %w", err) - } - - switch len(objects) { - case 0: - return nil, api.StatusErrorf(http.StatusNotFound, "Disk not found") - case 1: - return &objects[0], nil - default: - return nil, fmt.Errorf("More than one \"disks\" entry matches") - } -} - -// GetDiskID return the ID of the disk with the given key. -// generator: disk ID -func GetDiskID(ctx context.Context, tx *sql.Tx, member string, osd int) (int64, error) { - stmt, err := cluster.Stmt(tx, diskID) - if err != nil { - return -1, fmt.Errorf("Failed to get \"diskID\" prepared statement: %w", err) - } - - row := stmt.QueryRowContext(ctx, member, osd) - var id int64 - err = row.Scan(&id) - if errors.Is(err, sql.ErrNoRows) { - return -1, api.StatusErrorf(http.StatusNotFound, "Disk not found") - } - - if err != nil { - return -1, fmt.Errorf("Failed to get \"disks\" ID: %w", err) - } - - return id, nil -} - -// DiskExists checks if a disk with the given key exists. -// generator: disk Exists -func DiskExists(ctx context.Context, tx *sql.Tx, member string, osd int) (bool, error) { - _, err := GetDiskID(ctx, tx, member, osd) - if err != nil { - if api.StatusErrorCheck(err, http.StatusNotFound) { - return false, nil - } - - return false, err - } - - return true, nil -} - -// CreateDisk adds a new disk to the database. -// generator: disk Create -func CreateDisk(ctx context.Context, tx *sql.Tx, object Disk) (int64, error) { - // Check if a disk with the same key exists. - exists, err := DiskExists(ctx, tx, object.Member, object.OSD) - if err != nil { - return -1, fmt.Errorf("Failed to check for duplicates: %w", err) - } - - if exists { - return -1, api.StatusErrorf(http.StatusConflict, "This \"disks\" entry already exists") - } - - args := make([]any, 3) - - // Populate the statement arguments. - args[0] = object.Member - args[1] = object.OSD - args[2] = object.Path - - // Prepared statement to use. - stmt, err := cluster.Stmt(tx, diskCreate) - if err != nil { - return -1, fmt.Errorf("Failed to get \"diskCreate\" prepared statement: %w", err) - } - - // Execute the statement. - result, err := stmt.Exec(args...) - if err != nil { - return -1, fmt.Errorf("Failed to create \"disks\" entry: %w", err) - } - - id, err := result.LastInsertId() - if err != nil { - return -1, fmt.Errorf("Failed to fetch \"disks\" entry ID: %w", err) - } - - return id, nil -} - -// DeleteDisk deletes the disk matching the given key parameters. -// generator: disk DeleteOne-by-Member-and-Path -func DeleteDisk(ctx context.Context, tx *sql.Tx, member string, path string) error { - stmt, err := cluster.Stmt(tx, diskDeleteByMemberAndPath) - if err != nil { - return fmt.Errorf("Failed to get \"diskDeleteByMemberAndPath\" prepared statement: %w", err) - } - - result, err := stmt.Exec(member, path) - if err != nil { - return fmt.Errorf("Delete \"disks\": %w", err) - } - - n, err := result.RowsAffected() - if err != nil { - return fmt.Errorf("Fetch affected rows: %w", err) - } - - if n == 0 { - return api.StatusErrorf(http.StatusNotFound, "Disk not found") - } else if n > 1 { - return fmt.Errorf("Query deleted %d Disk rows instead of 1", n) - } - - return nil -} - -// DeleteDisks deletes the disk matching the given key parameters. -// generator: disk DeleteMany-by-Member -func DeleteDisks(ctx context.Context, tx *sql.Tx, member string) error { - stmt, err := cluster.Stmt(tx, diskDeleteByMember) - if err != nil { - return fmt.Errorf("Failed to get \"diskDeleteByMember\" prepared statement: %w", err) - } - - result, err := stmt.Exec(member) - if err != nil { - return fmt.Errorf("Delete \"disks\": %w", err) - } - - _, err = result.RowsAffected() - if err != nil { - return fmt.Errorf("Fetch affected rows: %w", err) - } - - return nil -} - -// UpdateDisk updates the disk matching the given key parameters. -// generator: disk Update -func UpdateDisk(ctx context.Context, tx *sql.Tx, member string, osd int, object Disk) error { - id, err := GetDiskID(ctx, tx, member, osd) - if err != nil { - return err - } - - stmt, err := cluster.Stmt(tx, diskUpdate) - if err != nil { - return fmt.Errorf("Failed to get \"diskUpdate\" prepared statement: %w", err) - } - - result, err := stmt.Exec(object.Member, object.OSD, object.Path, id) - if err != nil { - return fmt.Errorf("Update \"disks\" entry failed: %w", err) - } - - n, err := result.RowsAffected() - if err != nil { - return fmt.Errorf("Fetch affected rows: %w", err) - } - - if n != 1 { - return fmt.Errorf("Query updated %d rows instead of 1", n) - } - - return nil -} diff --git a/microceph/database/disk_extras.go b/microceph/database/disk_extras.go index dca9251e..af5d1c29 100644 --- a/microceph/database/disk_extras.go +++ b/microceph/database/disk_extras.go @@ -31,17 +31,17 @@ type MemberDisk struct { var _ = api.ServerEnvironment{} var membersDiskCnt = cluster.RegisterStmt(` -SELECT internal_cluster_members.name AS member, count(disks.id) AS num_disks - FROM disks - JOIN internal_cluster_members ON disks.member_id = internal_cluster_members.id +SELECT internal_cluster_members.name AS member, count(diskpaths.id) AS num_disks + FROM diskpaths + JOIN internal_cluster_members ON diskpaths.member_id = internal_cluster_members.id GROUP BY internal_cluster_members.id `) var membersDiskCntExclude = cluster.RegisterStmt(` -SELECT internal_cluster_members.name AS member, count(disks.id) AS num_disks -FROM disks -JOIN internal_cluster_members ON disks.member_id = internal_cluster_members.id -WHERE disks.OSD != ? +SELECT internal_cluster_members.name AS member, count(diskpaths.id) AS num_disks +FROM diskpaths +JOIN internal_cluster_members ON diskpaths.member_id = internal_cluster_members.id +WHERE diskpaths.id != ? GROUP BY internal_cluster_members.id `) @@ -135,38 +135,38 @@ type OSDQueryInterface interface { type OSDQueryImpl struct{} -var osdCount = cluster.RegisterStmt(` -SELECT count(disks.id) AS num_disks -FROM disks -WHERE disks.OSD = ? +var haveOsd = cluster.RegisterStmt(` +SELECT count(*) +FROM diskpaths +WHERE diskpaths.id = ? `) var osdPath = cluster.RegisterStmt(` -SELECT disks.path -FROM disks -WHERE disks.OSD = ? +SELECT diskpaths.path +FROM diskpaths +WHERE diskpaths.id = ? `) // HaveOSD returns either false or true depending on whether the given OSD is present in the cluster func (o OSDQueryImpl) HaveOSD(s *state.State, osd int64) (bool, error) { - var numDisks int + var present int err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { - sqlStmt, err := cluster.Stmt(tx, osdCount) + sqlStmt, err := cluster.Stmt(tx, haveOsd) if err != nil { - return fmt.Errorf("Failed to get \"osdCount\" prepared statement: %w", err) + return fmt.Errorf("Failed to get \"haveOsd\" prepared statement: %w", err) } - err = sqlStmt.QueryRow(osd).Scan(&numDisks) + err = sqlStmt.QueryRow(osd).Scan(&present) if err != nil { - return fmt.Errorf("Failed to get \"osdCount\" objects: %w", err) + return fmt.Errorf("Failed to get \"haveOsd\" objects: %w", err) } return nil }) if err != nil { return false, err } - return numDisks > 0, nil + return present > 0, nil } // Path returns the path of the given OSD @@ -198,7 +198,7 @@ func (o OSDQueryImpl) Delete(s *state.State, osd int64) error { return err } err = s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { - return DeleteDisk(ctx, tx, s.Name(), path) + return DeleteDiskPath(ctx, tx, s.Name(), path) }) return err } @@ -206,18 +206,17 @@ func (o OSDQueryImpl) Delete(s *state.State, osd int64) error { // List OSD records func (o OSDQueryImpl) List(s *state.State) (types.Disks, error) { disks := types.Disks{} - // Get the OSDs from the database. err := s.Database.Transaction(s.Context, func(ctx context.Context, tx *sql.Tx) error { - records, err := GetDisks(ctx, tx) + records, err := GetDiskPaths(ctx, tx) if err != nil { return fmt.Errorf("Failed to fetch disks: %w", err) } for _, disk := range records { disks = append(disks, types.Disk{ + OSD: int64(disk.ID), Location: disk.Member, - OSD: int64(disk.OSD), Path: disk.Path, }) } @@ -227,7 +226,6 @@ func (o OSDQueryImpl) List(s *state.State) (types.Disks, error) { if err != nil { return nil, err } - return disks, nil } diff --git a/microceph/database/disk.go b/microceph/database/diskpath.go similarity index 53% rename from microceph/database/disk.go rename to microceph/database/diskpath.go index fd9c83ce..b3ad2488 100644 --- a/microceph/database/disk.go +++ b/microceph/database/diskpath.go @@ -1,37 +1,35 @@ package database -//go:generate -command mapper lxd-generate db mapper -t disk.mapper.go +//go:generate -command mapper lxd-generate db mapper -t diskpath.mapper.go //go:generate mapper reset // -//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e disk objects table=disks -//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e disk objects-by-Member table=disks -//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e disk objects-by-Member-and-Path table=disks -//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e disk id table=disks -//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e disk create table=disks -//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e disk delete-by-Member table=disks -//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e disk delete-by-Member-and-Path table=disks -//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e disk update table=disks +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e DiskPath objects table=diskpaths +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e DiskPath objects-by-Member table=diskpaths +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e DiskPath objects-by-Member-and-Path table=diskpaths +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e DiskPath id table=diskpaths +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e DiskPath create table=diskpaths +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e DiskPath delete-by-Member table=diskpaths +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e DiskPath delete-by-Member-and-Path table=diskpaths +//go:generate mapper stmt -d github.com/canonical/microcluster/cluster -e DiskPath update table=diskpaths // -//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e disk GetMany -//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e disk GetOne -//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e disk ID -//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e disk Exists -//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e disk Create -//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e disk DeleteOne-by-Member-and-Path -//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e disk DeleteMany-by-Member -//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e disk Update +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e DiskPath GetMany +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e DiskPath GetOne +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e DiskPath ID +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e DiskPath Exists +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e DiskPath Create +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e DiskPath DeleteOne-by-Member-and-Path +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e DiskPath DeleteMany-by-Member +//go:generate mapper method -i -d github.com/canonical/microcluster/cluster -e DiskPath Update -// Disk is used to track the Ceph disks on a particular server. -type Disk struct { +// DiskPath is used to track the Ceph disks on a particular server. +type DiskPath struct { ID int - Member string `db:"primary=yes&join=internal_cluster_members.name&joinon=disks.member_id"` - OSD int `db:"primary=yes"` - Path string + Member string `db:"primary=yes&join=internal_cluster_members.name&joinon=diskpaths.member_id"` + Path string `db:"primary=yes"` } -// DiskFilter is a required struct for use with lxd-generate. It is used for filtering fields on database fetches. -type DiskFilter struct { +// DiskPathFilter is a required struct for use with lxd-generate. It is used for filtering fields on database fetches. +type DiskPathFilter struct { Member *string Path *string - OSD *int } diff --git a/microceph/database/diskpath.mapper.go b/microceph/database/diskpath.mapper.go new file mode 100644 index 00000000..295154ac --- /dev/null +++ b/microceph/database/diskpath.mapper.go @@ -0,0 +1,346 @@ +package database + +// The code below was generated by lxd-generate - DO NOT EDIT! + +import ( + "context" + "database/sql" + "errors" + "fmt" + "net/http" + "strings" + + "github.com/canonical/lxd/lxd/db/query" + "github.com/canonical/lxd/shared/api" + "github.com/canonical/microcluster/cluster" +) + +var _ = api.ServerEnvironment{} + +var diskPathObjects = cluster.RegisterStmt(` +SELECT diskpaths.id, internal_cluster_members.name AS member, diskpaths.path + FROM diskpaths + JOIN internal_cluster_members ON diskpaths.member_id = internal_cluster_members.id + ORDER BY internal_cluster_members.id, diskpaths.path +`) + +var diskPathObjectsByMember = cluster.RegisterStmt(` +SELECT diskpaths.id, internal_cluster_members.name AS member, diskpaths.path + FROM diskpaths + JOIN internal_cluster_members ON diskpaths.member_id = internal_cluster_members.id + WHERE ( member = ? ) + ORDER BY internal_cluster_members.id, diskpaths.path +`) + +var diskPathObjectsByMemberAndPath = cluster.RegisterStmt(` +SELECT diskpaths.id, internal_cluster_members.name AS member, diskpaths.path + FROM diskpaths + JOIN internal_cluster_members ON diskpaths.member_id = internal_cluster_members.id + WHERE ( member = ? AND diskpaths.path = ? ) + ORDER BY internal_cluster_members.id, diskpaths.path +`) + +var diskPathID = cluster.RegisterStmt(` +SELECT diskpaths.id FROM diskpaths + JOIN internal_cluster_members ON diskpaths.member_id = internal_cluster_members.id + WHERE internal_cluster_members.name = ? AND diskpaths.path = ? +`) + +var diskPathCreate = cluster.RegisterStmt(` +INSERT INTO diskpaths (member_id, path) + VALUES ((SELECT internal_cluster_members.id FROM internal_cluster_members WHERE internal_cluster_members.name = ?), ?) +`) + +var diskPathDeleteByMember = cluster.RegisterStmt(` +DELETE FROM diskpaths WHERE member_id = (SELECT internal_cluster_members.id FROM internal_cluster_members WHERE internal_cluster_members.name = ?) +`) + +var diskPathDeleteByMemberAndPath = cluster.RegisterStmt(` +DELETE FROM diskpaths WHERE member_id = (SELECT internal_cluster_members.id FROM internal_cluster_members WHERE internal_cluster_members.name = ?) AND path = ? +`) + +var diskPathUpdate = cluster.RegisterStmt(` +UPDATE diskpaths + SET member_id = (SELECT internal_cluster_members.id FROM internal_cluster_members WHERE internal_cluster_members.name = ?), path = ? + WHERE id = ? +`) + +// GetDiskPaths returns all available DiskPaths. +// generator: DiskPath GetMany +func GetDiskPaths(ctx context.Context, tx *sql.Tx, filters ...DiskPathFilter) ([]DiskPath, error) { + var err error + + // Result slice. + objects := make([]DiskPath, 0) + + // Pick the prepared statement and arguments to use based on active criteria. + var sqlStmt *sql.Stmt + args := []any{} + queryParts := [2]string{} + + if len(filters) == 0 { + sqlStmt, err = cluster.Stmt(tx, diskPathObjects) + if err != nil { + return nil, fmt.Errorf("Failed to get \"diskPathObjects\" prepared statement: %w", err) + } + } + + for i, filter := range filters { + if filter.Member != nil && filter.Path != nil { + args = append(args, []any{filter.Member, filter.Path}...) + if len(filters) == 1 { + sqlStmt, err = cluster.Stmt(tx, diskPathObjectsByMemberAndPath) + if err != nil { + return nil, fmt.Errorf("Failed to get \"diskPathObjectsByMemberAndPath\" prepared statement: %w", err) + } + + break + } + + query, err := cluster.StmtString(diskPathObjectsByMemberAndPath) + if err != nil { + return nil, fmt.Errorf("Failed to get \"diskPathObjects\" prepared statement: %w", err) + } + + parts := strings.SplitN(query, "ORDER BY", 2) + if i == 0 { + copy(queryParts[:], parts) + continue + } + + _, where, _ := strings.Cut(parts[0], "WHERE") + queryParts[0] += "OR" + where + } else if filter.Member != nil && filter.Path == nil { + args = append(args, []any{filter.Member}...) + if len(filters) == 1 { + sqlStmt, err = cluster.Stmt(tx, diskPathObjectsByMember) + if err != nil { + return nil, fmt.Errorf("Failed to get \"diskPathObjectsByMember\" prepared statement: %w", err) + } + + break + } + + query, err := cluster.StmtString(diskPathObjectsByMember) + if err != nil { + return nil, fmt.Errorf("Failed to get \"diskPathObjects\" prepared statement: %w", err) + } + + parts := strings.SplitN(query, "ORDER BY", 2) + if i == 0 { + copy(queryParts[:], parts) + continue + } + + _, where, _ := strings.Cut(parts[0], "WHERE") + queryParts[0] += "OR" + where + } else if filter.Member == nil && filter.Path == nil { + return nil, fmt.Errorf("Cannot filter on empty DiskPathFilter") + } else { + return nil, fmt.Errorf("No statement exists for the given Filter") + } + } + + // Dest function for scanning a row. + dest := func(scan func(dest ...any) error) error { + d := DiskPath{} + err := scan(&d.ID, &d.Member, &d.Path) + if err != nil { + return err + } + + objects = append(objects, d) + + return nil + } + + // Select. + if sqlStmt != nil { + err = query.SelectObjects(ctx, sqlStmt, dest, args...) + } else { + queryStr := strings.Join(queryParts[:], "ORDER BY") + err = query.Scan(ctx, tx, queryStr, dest, args...) + } + + if err != nil { + return nil, fmt.Errorf("Failed to fetch from \"disks_paths\" table: %w", err) + } + + return objects, nil +} + +// GetDiskPath returns the DiskPath with the given key. +// generator: DiskPath GetOne +func GetDiskPath(ctx context.Context, tx *sql.Tx, member string, path string) (*DiskPath, error) { + filter := DiskPathFilter{} + filter.Member = &member + filter.Path = &path + + objects, err := GetDiskPaths(ctx, tx, filter) + if err != nil { + return nil, fmt.Errorf("Failed to fetch from \"disks_paths\" table: %w", err) + } + + switch len(objects) { + case 0: + return nil, api.StatusErrorf(http.StatusNotFound, "DiskPath not found") + case 1: + return &objects[0], nil + default: + return nil, fmt.Errorf("More than one \"disks_paths\" entry matches") + } +} + +// GetDiskPathID return the ID of the DiskPath with the given key. +// generator: DiskPath ID +func GetDiskPathID(ctx context.Context, tx *sql.Tx, member string, path string) (int64, error) { + stmt, err := cluster.Stmt(tx, diskPathID) + if err != nil { + return -1, fmt.Errorf("Failed to get \"diskPathID\" prepared statement: %w", err) + } + + row := stmt.QueryRowContext(ctx, member, path) + var id int64 + err = row.Scan(&id) + if errors.Is(err, sql.ErrNoRows) { + return -1, api.StatusErrorf(http.StatusNotFound, "DiskPath not found") + } + + if err != nil { + return -1, fmt.Errorf("Failed to get \"disks_paths\" ID: %w", err) + } + + return id, nil +} + +// DiskPathExists checks if a DiskPath with the given key exists. +// generator: DiskPath Exists +func DiskPathExists(ctx context.Context, tx *sql.Tx, member string, path string) (bool, error) { + _, err := GetDiskPathID(ctx, tx, member, path) + if err != nil { + if api.StatusErrorCheck(err, http.StatusNotFound) { + return false, nil + } + + return false, err + } + + return true, nil +} + +// CreateDiskPath adds a new DiskPath to the database. +// generator: DiskPath Create +func CreateDiskPath(ctx context.Context, tx *sql.Tx, object DiskPath) (int64, error) { + // Check if a DiskPath with the same key exists. + exists, err := DiskPathExists(ctx, tx, object.Member, object.Path) + if err != nil { + return -1, fmt.Errorf("Failed to check for duplicates: %w", err) + } + + if exists { + return -1, api.StatusErrorf(http.StatusConflict, "This \"disks_paths\" entry already exists") + } + + args := make([]any, 2) + + // Populate the statement arguments. + args[0] = object.Member + args[1] = object.Path + + // Prepared statement to use. + stmt, err := cluster.Stmt(tx, diskPathCreate) + if err != nil { + return -1, fmt.Errorf("Failed to get \"diskPathCreate\" prepared statement: %w", err) + } + + // Execute the statement. + result, err := stmt.Exec(args...) + if err != nil { + return -1, fmt.Errorf("Failed to create \"disks_paths\" entry: %w", err) + } + + id, err := result.LastInsertId() + if err != nil { + return -1, fmt.Errorf("Failed to fetch \"disks_paths\" entry ID: %w", err) + } + + return id, nil +} + +// DeleteDiskPath deletes the DiskPath matching the given key parameters. +// generator: DiskPath DeleteOne-by-Member-and-Path +func DeleteDiskPath(ctx context.Context, tx *sql.Tx, member string, path string) error { + stmt, err := cluster.Stmt(tx, diskPathDeleteByMemberAndPath) + if err != nil { + return fmt.Errorf("Failed to get \"diskPathDeleteByMemberAndPath\" prepared statement: %w", err) + } + + result, err := stmt.Exec(member, path) + if err != nil { + return fmt.Errorf("Delete \"disks_paths\": %w", err) + } + + n, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("Fetch affected rows: %w", err) + } + + if n == 0 { + return api.StatusErrorf(http.StatusNotFound, "DiskPath not found") + } else if n > 1 { + return fmt.Errorf("Query deleted %d DiskPath rows instead of 1", n) + } + + return nil +} + +// DeleteDiskPaths deletes the DiskPath matching the given key parameters. +// generator: DiskPath DeleteMany-by-Member +func DeleteDiskPaths(ctx context.Context, tx *sql.Tx, member string) error { + stmt, err := cluster.Stmt(tx, diskPathDeleteByMember) + if err != nil { + return fmt.Errorf("Failed to get \"diskPathDeleteByMember\" prepared statement: %w", err) + } + + result, err := stmt.Exec(member) + if err != nil { + return fmt.Errorf("Delete \"disks_paths\": %w", err) + } + + _, err = result.RowsAffected() + if err != nil { + return fmt.Errorf("Fetch affected rows: %w", err) + } + + return nil +} + +// UpdateDiskPath updates the DiskPath matching the given key parameters. +// generator: DiskPath Update +func UpdateDiskPath(ctx context.Context, tx *sql.Tx, member string, path string, object DiskPath) error { + id, err := GetDiskPathID(ctx, tx, member, path) + if err != nil { + return err + } + + stmt, err := cluster.Stmt(tx, diskPathUpdate) + if err != nil { + return fmt.Errorf("Failed to get \"diskPathUpdate\" prepared statement: %w", err) + } + + result, err := stmt.Exec(object.Member, object.Path, id) + if err != nil { + return fmt.Errorf("Update \"disks_paths\" entry failed: %w", err) + } + + n, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("Fetch affected rows: %w", err) + } + + if n != 1 { + return fmt.Errorf("Query updated %d rows instead of 1", n) + } + + return nil +} diff --git a/microceph/database/schema.go b/microceph/database/schema.go index 9e90468b..7be114ac 100644 --- a/microceph/database/schema.go +++ b/microceph/database/schema.go @@ -13,6 +13,7 @@ import ( var SchemaExtensions = map[int]schema.Update{ 1: schemaUpdate1, 2: schemaUpdate2, + 3: schemaUpdate3, } func schemaUpdate1(ctx context.Context, tx *sql.Tx) error { @@ -66,3 +67,22 @@ CREATE UNIQUE INDEX cc_index ON client_config(coalesce(member_id, 0), key); return err } + +// schemaUpdate3 generates the diskpaths table, copying the data from the disks table. +func schemaUpdate3(ctx context.Context, tx *sql.Tx) error { + stmt := ` +CREATE TABLE diskpaths ( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + member_id INTEGER NOT NULL, + path TEXT NOT NULL, + FOREIGN KEY (member_id) REFERENCES "internal_cluster_members" (id) ON DELETE CASCADE, + UNIQUE(member_id, path) +); +INSERT INTO diskpaths (id, member_id, path) +SELECT osd, member_id, path FROM disks; +DROP TABLE disks; + ` + _, err := tx.Exec(stmt) + + return err +}