diff --git a/cmd/external-repos/main.go b/cmd/external-repos/main.go index 3074328d9..f2a700009 100644 --- a/cmd/external-repos/main.go +++ b/cmd/external-repos/main.go @@ -14,6 +14,7 @@ import ( "github.com/content-services/content-sources-backend/pkg/dao" "github.com/content-services/content-sources-backend/pkg/db" "github.com/content-services/content-sources-backend/pkg/external_repos" + "github.com/content-services/content-sources-backend/pkg/models" "github.com/content-services/content-sources-backend/pkg/pulp_client" "github.com/content-services/content-sources-backend/pkg/tasks" "github.com/content-services/content-sources-backend/pkg/tasks/client" @@ -111,6 +112,11 @@ func main() { if err != nil { log.Error().Err(err).Msg("error queueing snapshot tasks") } + snapshotRetainDaysLimit := config.Get().Options.SnapshotRetainDaysLimit + err = enqueueSnapshotsCleanup(ctx, snapshotRetainDaysLimit) + if err != nil { + log.Error().Err(err).Msg("error queueing delete snapshot tasks for snapshot cleanup") + } } } else if args[1] == "pulp-orphan-cleanup" { batchSize := 5 @@ -293,6 +299,78 @@ func enqueueSnapshotRepos(ctx context.Context, urls *[]string, interval *int) er return nil } +func enqueueSnapshotsCleanup(ctx context.Context, olderThanDays int) error { + q, err := queue.NewPgQueue(ctx, db.GetUrl()) + if err != nil { + return fmt.Errorf("error getting new task queue: %w", err) + } + c := client.NewTaskClient(&q) + repoConfigDao := dao.GetRepositoryConfigDao(db.DB, pulp_client.GetPulpClientWithDomain("")) + snapshotDao := dao.GetSnapshotDao(db.DB) + taskInfoDao := dao.GetTaskInfoDao(db.DB) + + repoConfigs, err := repoConfigDao.ListReposWithOutdatedSnapshots(ctx, olderThanDays) + if err != nil { + return fmt.Errorf("error getting repository configurations: %v", err) + } + + for _, repo := range repoConfigs { + // Fetch snapshots for repo and find those which are to be deleted + snaps, err := snapshotDao.FetchForRepoConfigUUID(ctx, repo.UUID) + if err != nil { + return fmt.Errorf("error fetching snapshots for repository %v", repo.Name) + } + if len(snaps) < 2 { + return nil + } + + slices.SortFunc(snaps, func(s1, s2 models.Snapshot) int { + return s1.CreatedAt.Compare(s2.CreatedAt) + }) + toBeDeletedSnapUUIDs := make([]string, 0, len(snaps)) + for i, snap := range snaps { + if i == len(snaps)-1 && len(toBeDeletedSnapUUIDs) == len(snaps)-1 { + break + } + if snap.CreatedAt.Before(time.Now().Add(-time.Duration(olderThanDays) * 24 * time.Hour)) { + toBeDeletedSnapUUIDs = append(toBeDeletedSnapUUIDs, snap.UUID) + } + } + if len(toBeDeletedSnapUUIDs) == 0 { + return fmt.Errorf("no outdated snapshot found for repository %v", repo.Name) + } + + // Check for a running delete task + inProgressTasks, err := taskInfoDao.FetchActiveTasks(ctx, repo.OrgID, repo.UUID, config.DeleteRepositorySnapshotsTask, config.DeleteSnapshotsTask) + if err != nil { + return fmt.Errorf("error fetching delete repository snapshots task for repository %v", repo.Name) + } + if len(inProgressTasks) >= 1 { + return fmt.Errorf("error, delete is already in progress for repoository %v", repo.Name) + } + + // Enqueue new delete task + t := queue.Task{ + Typename: config.DeleteSnapshotsTask, + Payload: payloads.DeleteSnapshotsPayload{ + RepoUUID: repo.UUID, + SnapshotsUUIDs: toBeDeletedSnapUUIDs, + }, + OrgId: repo.OrgID, + AccountId: repo.AccountID, + ObjectUUID: &repo.RepositoryUUID, + ObjectType: utils.Ptr(config.ObjectTypeRepository), + } + + _, err = c.Enqueue(t) + if err != nil { + return fmt.Errorf("error enqueueing delete snapshots task for repository %v", repo.Name) + } + } + + return nil +} + func pulpOrphanCleanup(ctx context.Context, db *gorm.DB, batchSize int) error { var err error daoReg := dao.GetDaoRegistry(db) diff --git a/configs/config.yaml.example b/configs/config.yaml.example index 0b541a629..e138d38a8 100644 --- a/configs/config.yaml.example +++ b/configs/config.yaml.example @@ -61,6 +61,7 @@ options: introspect_api_time_limit_sec: 0 enable_notifications: true template_event_topic: "platform.content-sources.template" + snapshot_retain_days_limit: 90 metrics: path: "/metrics" port: 9000 diff --git a/pkg/config/config.go b/pkg/config/config.go index 6e06e2f4b..fda25a481 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -175,7 +175,8 @@ type Options struct { // url (https://servername) to access the api, used to reference gpg keys // Supports partial hostnames (i.e. http://.server.example.com). // If this is encountered (and clowder is used), it will prepend the envName from clowder - ExternalURL string `mapstructure:"external_url"` + ExternalURL string `mapstructure:"external_url"` + SnapshotRetainDaysLimit int `mapstructure:"snapshot_retain_days_limit"` } type Metrics struct { @@ -243,6 +244,7 @@ func setDefaults(v *viper.Viper) { v.SetDefault("options.template_event_topic", "platform.content-sources.template") v.SetDefault("options.repository_import_filter", "") v.SetDefault("options.external_url", "http://pulp.content:8000") + v.SetDefault("options.snapshot_retain_days_limit", 90) v.SetDefault("logging.level", "info") v.SetDefault("logging.metrics_level", "") v.SetDefault("logging.console", true) diff --git a/pkg/dao/interfaces.go b/pkg/dao/interfaces.go index 30483db06..8225016ef 100644 --- a/pkg/dao/interfaces.go +++ b/pkg/dao/interfaces.go @@ -68,6 +68,7 @@ type RepositoryConfigDao interface { Update(ctx context.Context, orgID, uuid string, repoParams api.RepositoryUpdateRequest) (bool, error) Fetch(ctx context.Context, orgID string, uuid string) (api.RepositoryResponse, error) InternalOnly_ListReposToSnapshot(ctx context.Context, filter *ListRepoFilter) ([]models.RepositoryConfiguration, error) + ListReposWithOutdatedSnapshots(ctx context.Context, olderThanDays int) ([]models.RepositoryConfiguration, error) List(ctx context.Context, orgID string, paginationData api.PaginationData, filterData api.FilterData) (api.RepositoryCollectionResponse, int64, error) Delete(ctx context.Context, orgID string, uuid string) error SoftDelete(ctx context.Context, orgID string, uuid string) error diff --git a/pkg/dao/repository_configs.go b/pkg/dao/repository_configs.go index 9864d0484..3e583ba03 100644 --- a/pkg/dao/repository_configs.go +++ b/pkg/dao/repository_configs.go @@ -312,6 +312,23 @@ func (r repositoryConfigDaoImpl) InternalOnly_ListReposToSnapshot(ctx context.Co return dbRepos, nil } +func (r repositoryConfigDaoImpl) ListReposWithOutdatedSnapshots(ctx context.Context, olderThanDays int) ([]models.RepositoryConfiguration, error) { + var dbRepos []models.RepositoryConfiguration + pdb := r.db.WithContext(ctx) + + query := pdb. + Distinct("repository_configurations.*"). + Joins("INNER JOIN snapshots s ON repository_configurations.uuid = s.repository_configuration_uuid"). + Joins("INNER JOIN repositories r on r.uuid = repository_configurations.repository_uuid"). + Where("s.created_at <= (NOW() - CAST(? AS INTERVAL))", fmt.Sprintf("%d days", olderThanDays)) + result := snapshottableRepoConfigs(query).Find(&dbRepos) + if result.Error != nil { + return dbRepos, result.Error + } + + return dbRepos, nil +} + func (r repositoryConfigDaoImpl) List( ctx context.Context, OrgID string, diff --git a/pkg/dao/repository_configs_mock.go b/pkg/dao/repository_configs_mock.go index 988896c2d..858f68515 100644 --- a/pkg/dao/repository_configs_mock.go +++ b/pkg/dao/repository_configs_mock.go @@ -6,10 +6,8 @@ import ( context "context" api "github.com/content-services/content-sources-backend/pkg/api" - - mock "github.com/stretchr/testify/mock" - models "github.com/content-services/content-sources-backend/pkg/models" + mock "github.com/stretchr/testify/mock" ) // MockRepositoryConfigDao is an autogenerated mock type for the RepositoryConfigDao type @@ -311,6 +309,36 @@ func (_m *MockRepositoryConfigDao) InternalOnly_ListReposToSnapshot(ctx context. return r0, r1 } +// InternalOnly_ListReposWithOutdatedSnapshots provides a mock function with given fields: ctx, olderThanDays +func (_m *MockRepositoryConfigDao) ListReposWithOutdatedSnapshots(ctx context.Context, olderThanDays int) ([]models.RepositoryConfiguration, error) { + ret := _m.Called(ctx, olderThanDays) + + if len(ret) == 0 { + panic("no return value specified for ListReposWithOutdatedSnapshots") + } + + var r0 []models.RepositoryConfiguration + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int) ([]models.RepositoryConfiguration, error)); ok { + return rf(ctx, olderThanDays) + } + if rf, ok := ret.Get(0).(func(context.Context, int) []models.RepositoryConfiguration); ok { + r0 = rf(ctx, olderThanDays) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]models.RepositoryConfiguration) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int) error); ok { + r1 = rf(ctx, olderThanDays) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // InternalOnly_RefreshRedHatRepo provides a mock function with given fields: ctx, request, label func (_m *MockRepositoryConfigDao) InternalOnly_RefreshRedHatRepo(ctx context.Context, request api.RepositoryRequest, label string) (*api.RepositoryResponse, error) { ret := _m.Called(ctx, request, label) diff --git a/pkg/dao/repository_configs_test.go b/pkg/dao/repository_configs_test.go index ae444a8f5..4a250e45a 100644 --- a/pkg/dao/repository_configs_test.go +++ b/pkg/dao/repository_configs_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "slices" "strconv" "strings" "testing" @@ -1842,6 +1843,37 @@ func (suite *RepositoryConfigSuite) TestListFilterSearch() { assert.Equal(t, quantity, total) } +func (suite *RepositoryConfigSuite) TestListReposWithOutdatedSnaps() { + t := suite.T() + tx := suite.tx + + repos, err := seeds.SeedRepositoryConfigurations(tx, 3, seeds.SeedOptions{ + OrgID: orgIDTest, + }) + assert.Nil(t, err) + + r1, r2, r3 := repos[0], repos[1], repos[2] + _ = suite.createSnapshotAtSpecifiedTime(r1, time.Now().Add(-2*time.Hour)) + _ = suite.createSnapshotAtSpecifiedTime(r1, time.Now().Add(-1*time.Hour)) + + _ = suite.createSnapshotAtSpecifiedTime(r2, time.Now().Add(-100*24*time.Hour)) + _ = suite.createSnapshotAtSpecifiedTime(r2, time.Now().Add(-2*time.Hour)) + + _ = suite.createSnapshotAtSpecifiedTime(r3, time.Now().Add(-101*24*time.Hour)) + _ = suite.createSnapshotAtSpecifiedTime(r3, time.Now().Add(-100*24*time.Hour)) + + repoConfigDao := GetRepositoryConfigDao(suite.tx, suite.mockPulpClient) + response, err := repoConfigDao.ListReposWithOutdatedSnapshots(context.Background(), 90) + assert.Nil(t, err) + assert.Len(t, response, 2) + assert.NotEqual(t, -1, slices.IndexFunc(response, func(rc models.RepositoryConfiguration) bool { + return rc.UUID == r2.UUID + })) + assert.NotEqual(t, -1, slices.IndexFunc(response, func(rc models.RepositoryConfiguration) bool { + return rc.UUID == r3.UUID + })) +} + func (suite *RepositoryConfigSuite) TestSavePublicUrls() { t := suite.T() tx := suite.tx @@ -2801,3 +2833,24 @@ func (suite *RepositoryConfigSuite) TestCombineStatus() { assert.Equal(t, testCase.Expected, result, testCase.Name) } } + +func (suite *RepositoryConfigSuite) createSnapshotAtSpecifiedTime(rConfig models.RepositoryConfiguration, CreatedAt time.Time) models.Snapshot { + t := suite.T() + tx := suite.tx + + snap := models.Snapshot{ + Base: models.Base{CreatedAt: CreatedAt}, + VersionHref: "/pulp/version", + PublicationHref: "/pulp/publication", + DistributionPath: fmt.Sprintf("/path/to/%v", uuid.NewString()), + RepositoryConfigurationUUID: rConfig.UUID, + ContentCounts: models.ContentCountsType{"rpm.package": int64(3), "rpm.advisory": int64(1)}, + AddedCounts: models.ContentCountsType{"rpm.package": int64(1), "rpm.advisory": int64(3)}, + RemovedCounts: models.ContentCountsType{"rpm.package": int64(2), "rpm.advisory": int64(2)}, + } + + sDao := snapshotDaoImpl{db: tx} + err := sDao.Create(context.Background(), &snap) + assert.NoError(t, err) + return snap +}