Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed 4747: automatic deletion of outdated snapshots #901

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions cmd/external-repos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"github.com/content-services/content-sources-backend/pkg/dao"
"github.com/content-services/content-sources-backend/pkg/db"
"github.com/content-services/content-sources-backend/pkg/external_repos"
"github.com/content-services/content-sources-backend/pkg/models"
"github.com/content-services/content-sources-backend/pkg/pulp_client"
"github.com/content-services/content-sources-backend/pkg/tasks"
"github.com/content-services/content-sources-backend/pkg/tasks/client"
Expand Down Expand Up @@ -111,6 +112,11 @@
if err != nil {
log.Error().Err(err).Msg("error queueing snapshot tasks")
}
snapshotRetainDaysLimit := config.Get().Options.SnapshotRetainDaysLimit
err = enqueueSnapshotsCleanup(ctx, snapshotRetainDaysLimit)
if err != nil {
log.Error().Err(err).Msg("error queueing delete snapshot tasks for snapshot cleanup")
}
}
} else if args[1] == "pulp-orphan-cleanup" {
batchSize := 5
Expand Down Expand Up @@ -293,6 +299,78 @@
return nil
}

func enqueueSnapshotsCleanup(ctx context.Context, olderThanDays int) error {
q, err := queue.NewPgQueue(ctx, db.GetUrl())
if err != nil {
return fmt.Errorf("error getting new task queue: %w", err)
}
c := client.NewTaskClient(&q)
repoConfigDao := dao.GetRepositoryConfigDao(db.DB, pulp_client.GetPulpClientWithDomain(""))
snapshotDao := dao.GetSnapshotDao(db.DB)
taskInfoDao := dao.GetTaskInfoDao(db.DB)

repoConfigs, err := repoConfigDao.ListReposWithOutdatedSnapshots(ctx, olderThanDays)
if err != nil {
return fmt.Errorf("error getting repository configurations: %v", err)
}

for _, repo := range repoConfigs {
// Fetch snapshots for repo and find those which are to be deleted
snaps, err := snapshotDao.FetchForRepoConfigUUID(ctx, repo.UUID)
if err != nil {
return fmt.Errorf("error fetching snapshots for repository %v", repo.Name)
}
if len(snaps) < 2 {
return nil
}

Check failure on line 326 in cmd/external-repos/main.go

View workflow job for this annotation

GitHub Actions / Lint

undefined: slices (typecheck)
slices.SortFunc(snaps, func(s1, s2 models.Snapshot) int {
return s1.CreatedAt.Compare(s2.CreatedAt)
})
toBeDeletedSnapUUIDs := make([]string, 0, len(snaps))
for i, snap := range snaps {
if i == len(snaps)-1 && len(toBeDeletedSnapUUIDs) == len(snaps)-1 {
break
}
if snap.CreatedAt.Before(time.Now().Add(-time.Duration(olderThanDays) * 24 * time.Hour)) {
toBeDeletedSnapUUIDs = append(toBeDeletedSnapUUIDs, snap.UUID)
}
}
if len(toBeDeletedSnapUUIDs) == 0 {
return fmt.Errorf("no outdated snapshot found for repository %v", repo.Name)
}

// Check for a running delete task
inProgressTasks, err := taskInfoDao.FetchActiveTasks(ctx, repo.OrgID, repo.UUID, config.DeleteRepositorySnapshotsTask, config.DeleteSnapshotsTask)
if err != nil {
return fmt.Errorf("error fetching delete repository snapshots task for repository %v", repo.Name)
}
if len(inProgressTasks) >= 1 {
return fmt.Errorf("error, delete is already in progress for repoository %v", repo.Name)
}

// Enqueue new delete task
t := queue.Task{
Typename: config.DeleteSnapshotsTask,
Payload: payloads.DeleteSnapshotsPayload{
RepoUUID: repo.UUID,
SnapshotsUUIDs: toBeDeletedSnapUUIDs,
},
OrgId: repo.OrgID,
AccountId: repo.AccountID,
ObjectUUID: &repo.RepositoryUUID,
ObjectType: utils.Ptr(config.ObjectTypeRepository),
}

_, err = c.Enqueue(t)
if err != nil {
return fmt.Errorf("error enqueueing delete snapshots task for repository %v", repo.Name)
}
}

return nil
}

func pulpOrphanCleanup(ctx context.Context, db *gorm.DB, batchSize int) error {
var err error
daoReg := dao.GetDaoRegistry(db)
Expand Down
1 change: 1 addition & 0 deletions configs/config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ options:
introspect_api_time_limit_sec: 0
enable_notifications: true
template_event_topic: "platform.content-sources.template"
snapshot_retain_days_limit: 90
metrics:
path: "/metrics"
port: 9000
Expand Down
4 changes: 3 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ type Options struct {
// url (https://servername) to access the api, used to reference gpg keys
// Supports partial hostnames (i.e. http://.server.example.com).
// If this is encountered (and clowder is used), it will prepend the envName from clowder
ExternalURL string `mapstructure:"external_url"`
ExternalURL string `mapstructure:"external_url"`
SnapshotRetainDaysLimit int `mapstructure:"snapshot_retain_days_limit"`
}

type Metrics struct {
Expand Down Expand Up @@ -243,6 +244,7 @@ func setDefaults(v *viper.Viper) {
v.SetDefault("options.template_event_topic", "platform.content-sources.template")
v.SetDefault("options.repository_import_filter", "")
v.SetDefault("options.external_url", "http://pulp.content:8000")
v.SetDefault("options.snapshot_retain_days_limit", 90)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd probably set the default to a higher value (365), and then we can customize it in app-interface to do less for stage

v.SetDefault("logging.level", "info")
v.SetDefault("logging.metrics_level", "")
v.SetDefault("logging.console", true)
Expand Down
1 change: 1 addition & 0 deletions pkg/dao/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type RepositoryConfigDao interface {
Update(ctx context.Context, orgID, uuid string, repoParams api.RepositoryUpdateRequest) (bool, error)
Fetch(ctx context.Context, orgID string, uuid string) (api.RepositoryResponse, error)
InternalOnly_ListReposToSnapshot(ctx context.Context, filter *ListRepoFilter) ([]models.RepositoryConfiguration, error)
ListReposWithOutdatedSnapshots(ctx context.Context, olderThanDays int) ([]models.RepositoryConfiguration, error)
List(ctx context.Context, orgID string, paginationData api.PaginationData, filterData api.FilterData) (api.RepositoryCollectionResponse, int64, error)
Delete(ctx context.Context, orgID string, uuid string) error
SoftDelete(ctx context.Context, orgID string, uuid string) error
Expand Down
17 changes: 17 additions & 0 deletions pkg/dao/repository_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,23 @@ func (r repositoryConfigDaoImpl) InternalOnly_ListReposToSnapshot(ctx context.Co
return dbRepos, nil
}

func (r repositoryConfigDaoImpl) ListReposWithOutdatedSnapshots(ctx context.Context, olderThanDays int) ([]models.RepositoryConfiguration, error) {
var dbRepos []models.RepositoryConfiguration
pdb := r.db.WithContext(ctx)

query := pdb.
Distinct("repository_configurations.*").
Joins("INNER JOIN snapshots s ON repository_configurations.uuid = s.repository_configuration_uuid").
Joins("INNER JOIN repositories r on r.uuid = repository_configurations.repository_uuid").
Where("s.created_at <= (NOW() - CAST(? AS INTERVAL))", fmt.Sprintf("%d days", olderThanDays))
result := snapshottableRepoConfigs(query).Find(&dbRepos)
if result.Error != nil {
return dbRepos, result.Error
}

return dbRepos, nil
}

func (r repositoryConfigDaoImpl) List(
ctx context.Context,
OrgID string,
Expand Down
34 changes: 31 additions & 3 deletions pkg/dao/repository_configs_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions pkg/dao/repository_configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"slices"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -1842,6 +1843,37 @@ func (suite *RepositoryConfigSuite) TestListFilterSearch() {
assert.Equal(t, quantity, total)
}

func (suite *RepositoryConfigSuite) TestListReposWithOutdatedSnaps() {
t := suite.T()
tx := suite.tx

repos, err := seeds.SeedRepositoryConfigurations(tx, 3, seeds.SeedOptions{
OrgID: orgIDTest,
})
assert.Nil(t, err)

r1, r2, r3 := repos[0], repos[1], repos[2]
_ = suite.createSnapshotAtSpecifiedTime(r1, time.Now().Add(-2*time.Hour))
_ = suite.createSnapshotAtSpecifiedTime(r1, time.Now().Add(-1*time.Hour))

_ = suite.createSnapshotAtSpecifiedTime(r2, time.Now().Add(-100*24*time.Hour))
_ = suite.createSnapshotAtSpecifiedTime(r2, time.Now().Add(-2*time.Hour))

_ = suite.createSnapshotAtSpecifiedTime(r3, time.Now().Add(-101*24*time.Hour))
_ = suite.createSnapshotAtSpecifiedTime(r3, time.Now().Add(-100*24*time.Hour))

repoConfigDao := GetRepositoryConfigDao(suite.tx, suite.mockPulpClient)
response, err := repoConfigDao.ListReposWithOutdatedSnapshots(context.Background(), 90)
assert.Nil(t, err)
assert.Len(t, response, 2)
assert.NotEqual(t, -1, slices.IndexFunc(response, func(rc models.RepositoryConfiguration) bool {
return rc.UUID == r2.UUID
}))
assert.NotEqual(t, -1, slices.IndexFunc(response, func(rc models.RepositoryConfiguration) bool {
return rc.UUID == r3.UUID
}))
}

func (suite *RepositoryConfigSuite) TestSavePublicUrls() {
t := suite.T()
tx := suite.tx
Expand Down Expand Up @@ -2801,3 +2833,24 @@ func (suite *RepositoryConfigSuite) TestCombineStatus() {
assert.Equal(t, testCase.Expected, result, testCase.Name)
}
}

func (suite *RepositoryConfigSuite) createSnapshotAtSpecifiedTime(rConfig models.RepositoryConfiguration, CreatedAt time.Time) models.Snapshot {
t := suite.T()
tx := suite.tx

snap := models.Snapshot{
Base: models.Base{CreatedAt: CreatedAt},
VersionHref: "/pulp/version",
PublicationHref: "/pulp/publication",
DistributionPath: fmt.Sprintf("/path/to/%v", uuid.NewString()),
RepositoryConfigurationUUID: rConfig.UUID,
ContentCounts: models.ContentCountsType{"rpm.package": int64(3), "rpm.advisory": int64(1)},
AddedCounts: models.ContentCountsType{"rpm.package": int64(1), "rpm.advisory": int64(3)},
RemovedCounts: models.ContentCountsType{"rpm.package": int64(2), "rpm.advisory": int64(2)},
}

sDao := snapshotDaoImpl{db: tx}
err := sDao.Create(context.Background(), &snap)
assert.NoError(t, err)
return snap
}
Loading