Skip to content

Commit

Permalink
Fixes 4926: job to create latest dist for existing repos
Browse files Browse the repository at this point in the history
  • Loading branch information
rverdile committed Nov 22, 2024
1 parent d8a6297 commit ef67702
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 31 deletions.
75 changes: 75 additions & 0 deletions cmd/create_latest_distributions/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"context"
"sync"

"github.com/content-services/content-sources-backend/pkg/api"
"github.com/content-services/content-sources-backend/pkg/config"
"github.com/content-services/content-sources-backend/pkg/dao"
"github.com/content-services/content-sources-backend/pkg/db"
"github.com/content-services/content-sources-backend/pkg/pulp_client"
"github.com/content-services/content-sources-backend/pkg/tasks/helpers"
"github.com/rs/zerolog/log"
)

/*
This job creates the latest snapshot distribution for existing repos.
It calls pulp in batches of 5 so pulp does not get overloaded.
*/

func main() {
err := db.Connect()
if err != nil {
log.Fatal().Err(err).Msg("failed to connect to database")
}

daoReg := dao.GetDaoRegistry(db.DB)
ctx := context.Background()

domains, err := daoReg.Domain.List(ctx)
if err != nil {
log.Fatal().Err(err).Msg("failed to list domains")
}
for _, domain := range domains {
pulpClient := pulp_client.GetPulpClientWithDomain(domain.DomainName)
distHelper := helpers.NewPulpDistributionHelper(ctx, pulpClient)

pageData := api.PaginationData{Limit: -1}
filterData := api.FilterData{Origin: config.OriginExternal + "," + config.OriginUpload}
repos, _, err := daoReg.RepositoryConfig.List(ctx, domain.OrgId, pageData, filterData)
if err != nil {
log.Fatal().Err(err).Msg("failed to list repos")
}

batchSize := 5
for i := 0; i < batchSize; i += batchSize {
end := i + batchSize
if end > len(repos.Data) {
end = len(repos.Data)
}
batch := repos.Data[i:end]
wg := sync.WaitGroup{}
for _, repo := range batch {
lastSnapshot := repo.LastSnapshot
if lastSnapshot == nil {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
_, err = distHelper.FindOrCreateDistribution(
domain.OrgId,
repo.UUID,
helpers.GetLatestRepoDistPath(repo.UUID),
lastSnapshot.PublicationHref)
if err != nil {
log.Error().Str("repo_uuid", repo.UUID).Str("org_id", domain.OrgId).Err(err).Msg("failed to create distribution")
}
}()
}
wg.Wait()
}
}
}
57 changes: 57 additions & 0 deletions deployments/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,63 @@ objects:
- mountPath: /tmp
name: tmpdir
jobs:
- name: create-latest-distributions
podSpec:
securityContext:
runAsNonRoot: true
runAsUser: 1001
image: ${IMAGE}:${IMAGE_TAG}
inheritEnv: true
command:
- /create_latest_distributions
env:
- name: CLOWDER_ENABLED
value: ${CLOWDER_ENABLED}
- name: RH_CDN_CERT_PAIR
valueFrom:
secretKeyRef:
name: content-sources-certs
key: cdn.redhat.com
- name: SENTRY_DSN
valueFrom:
secretKeyRef:
name: content-sources-sentry
key: dsn
optional: true
- name: CLIENTS_PULP_SERVER
value: ${{CLIENTS_PULP_SERVER}}
- name: CLIENTS_PULP_DOWNLOAD_POLICY
value: ${{CLIENTS_PULP_DOWNLOAD_POLICY}}
- name: CLIENTS_PULP_USERNAME
value: ${{CLIENTS_PULP_USERNAME}}
- name: CLIENTS_PULP_PASSWORD
valueFrom:
secretKeyRef:
name: pulp-content-sources-password
key: password
optional: true
- name: LOGGING_LEVEL
value: ${{LOGGING_LEVEL}}
- name: OPTIONS_EXTERNAL_URL
value: ${OPTIONS_EXTERNAL_URL}
- name: FEATURES_SNAPSHOTS_ENABLED
value: ${FEATURES_SNAPSHOTS_ENABLED}
- name: FEATURES_SNAPSHOTS_ACCOUNTS
value: ${FEATURES_SNAPSHOTS_ACCOUNTS}
- name: FEATURES_SNAPSHOTS_ORGANIZATIONS
value: ${FEATURES_SNAPSHOTS_ORGANIZATIONS}
- name: FEATURES_ADMIN_TASKS_ENABLED
value: ${FEATURES_ADMIN_TASKS_ENABLED}
- name: FEATURES_ADMIN_TASKS_ACCOUNTS
value: ${FEATURES_ADMIN_TASKS_ACCOUNTS}
- name: FEATURES_ADMIN_TASKS_ORGANIZATIONS
value: ${FEATURES_ADMIN_TASKS_ORGANIZATIONS}
- name: CLIENTS_RBAC_BASE_URL
value: ${{CLIENTS_RBAC_BASE_URL}}
- name: OPTIONS_ALWAYS_RUN_CRON_TASKS
value: ${OPTIONS_ALWAYS_RUN_CRON_TASKS}
- name: OPTIONS_ENABLE_NOTIFICATIONS
value: ${OPTIONS_ENABLE_NOTIFICATIONS}
- name: retry-failed-tasks
podSpec:
securityContext:
Expand Down
10 changes: 10 additions & 0 deletions deployments/jobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,13 @@ objects:
appName: content-sources-backend
jobs:
- retry-failed-tasks
- apiVersion: cloud.redhat.com/v1alpha1
kind: ClowdJobInvocation
metadata:
labels:
app: content-sources-backend
name: create-latest-distributions-2024-11-20
spec:
appName: content-sources-backend
jobs:
- create-latest-distributions
19 changes: 10 additions & 9 deletions pkg/api/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (
)

type SnapshotResponse struct {
UUID string `json:"uuid"`
CreatedAt time.Time `json:"created_at"` // Datetime the snapshot was created
RepositoryPath string `json:"repository_path"` // Path to repository snapshot contents
ContentCounts map[string]int64 `json:"content_counts"` // Count of each content type
AddedCounts map[string]int64 `json:"added_counts"` // Count of each content type
RemovedCounts map[string]int64 `json:"removed_counts"` // Count of each content type
URL string `json:"url"` // URL to the snapshot's content
RepositoryName string `json:"repository_name"` // Name of repository the snapshot belongs to
RepositoryUUID string `json:"repository_uuid"` // UUID of the repository the snapshot belongs to
UUID string `json:"uuid"`
CreatedAt time.Time `json:"created_at"` // Datetime the snapshot was created
RepositoryPath string `json:"repository_path"` // Path to repository snapshot contents
ContentCounts map[string]int64 `json:"content_counts"` // Count of each content type
AddedCounts map[string]int64 `json:"added_counts"` // Count of each content type
RemovedCounts map[string]int64 `json:"removed_counts"` // Count of each content type
URL string `json:"url"` // URL to the snapshot's content
RepositoryName string `json:"repository_name"` // Name of repository the snapshot belongs to
RepositoryUUID string `json:"repository_uuid"` // UUID of the repository the snapshot belongs to
PublicationHref string `json:"publication_href" swaggerignore:"true"` // Publication href of the snapshot in pulp
}

type ListSnapshotByDateRequest struct {
Expand Down
17 changes: 9 additions & 8 deletions pkg/dao/repository_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,14 +1138,15 @@ func ModelToApiFields(repoConfig models.RepositoryConfiguration, apiRepo *api.Re
apiRepo.LastSnapshotUUID = repoConfig.LastSnapshotUUID
if repoConfig.LastSnapshot != nil {
apiRepo.LastSnapshot = &api.SnapshotResponse{
UUID: repoConfig.LastSnapshot.UUID,
CreatedAt: repoConfig.LastSnapshot.CreatedAt,
ContentCounts: repoConfig.LastSnapshot.ContentCounts,
AddedCounts: repoConfig.LastSnapshot.AddedCounts,
RemovedCounts: repoConfig.LastSnapshot.RemovedCounts,
RepositoryPath: repoConfig.LastSnapshot.RepositoryPath,
RepositoryUUID: repoConfig.UUID,
RepositoryName: repoConfig.Name,
UUID: repoConfig.LastSnapshot.UUID,
CreatedAt: repoConfig.LastSnapshot.CreatedAt,
ContentCounts: repoConfig.LastSnapshot.ContentCounts,
AddedCounts: repoConfig.LastSnapshot.AddedCounts,
RemovedCounts: repoConfig.LastSnapshot.RemovedCounts,
RepositoryPath: repoConfig.LastSnapshot.RepositoryPath,
PublicationHref: repoConfig.LastSnapshot.PublicationHref,
RepositoryUUID: repoConfig.UUID,
RepositoryName: repoConfig.Name,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/dao/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ func SnapshotModelToApi(model models.Snapshot, resp *api.SnapshotResponse) {
resp.RemovedCounts = model.RemovedCounts
resp.RepositoryName = model.RepositoryConfiguration.Name
resp.RepositoryUUID = model.RepositoryConfiguration.UUID
resp.PublicationHref = model.PublicationHref
}

// pulpContentURL combines content path and repository path to get content URL
Expand Down
1 change: 0 additions & 1 deletion pkg/pulp_client/rpm_distributions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func (r *pulpDaoImpl) FindDistributionByPath(ctx context.Context, path string) (
if err != nil {
return nil, errorWithResponseBody("error listing rpm distributions", httpResp, err)
}
defer httpResp.Body.Close()
res := resp.GetResults()
if len(res) > 0 {
return &res[0], nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/tasks/delete_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (ds *DeleteSnapshots) deleteOrUpdatePulpContent(snap models.Snapshot, repo
return err
}

_, _, err = ds.pulpDistHelper.CreateOrUpdateDistribution(ds.orgID, distName, distPath, snaps.PublicationHref)
_, _, err = ds.pulpDistHelper.CreateOrUpdateDistribution(ds.orgID, snaps.PublicationHref, distName, distPath)
if err != nil {
return err
}
Expand All @@ -159,7 +159,7 @@ func (ds *DeleteSnapshots) deleteOrUpdatePulpContent(snap models.Snapshot, repo
if err != nil {
return err
}
_, _, err = ds.pulpDistHelper.CreateOrUpdateDistribution(ds.orgID, repo.UUID, latestPathIdent, latestSnap.PublicationHref)
_, _, err = ds.pulpDistHelper.CreateOrUpdateDistribution(ds.orgID, latestSnap.PublicationHref, repo.UUID, latestPathIdent)
if err != nil {
return err
}
Expand Down
34 changes: 30 additions & 4 deletions pkg/tasks/helpers/pulp_distribution_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
zest "github.com/content-services/zest/release/v2024"
)

func GetLatestRepoDistPath(repoUUID string) string {
return fmt.Sprintf("%v/%v", repoUUID, "latest")
}

func NewPulpDistributionHelper(ctx context.Context, client pulp_client.PulpClient) *PulpDistributionHelper {
return &PulpDistributionHelper{
pulpClient: client,
Expand Down Expand Up @@ -46,7 +50,7 @@ func (pdh *PulpDistributionHelper) CreateDistribution(orgID, publicationHref, di
return distResp, nil
}

func (pdh *PulpDistributionHelper) CreateOrUpdateDistribution(orgId, distName, distPath, publicationHref string) (string, bool, error) {
func (pdh *PulpDistributionHelper) CreateOrUpdateDistribution(orgID, publicationHref, distName, distPath string) (string, bool, error) {
addedContentGuard := false
distTask := &zest.TaskResponse{}
resp, err := pdh.pulpClient.FindDistributionByPath(pdh.ctx, distPath)
Expand All @@ -55,8 +59,8 @@ func (pdh *PulpDistributionHelper) CreateOrUpdateDistribution(orgId, distName, d
}

var contentGuardHref *string
if orgId != config.RedHatOrg && config.Get().Clients.Pulp.CustomRepoContentGuards {
href, err := pdh.FetchContentGuard(orgId)
if orgID != config.RedHatOrg && config.Get().Clients.Pulp.CustomRepoContentGuards {
href, err := pdh.FetchContentGuard(orgID)
if err != nil {
return "", addedContentGuard, err
}
Expand All @@ -65,7 +69,7 @@ func (pdh *PulpDistributionHelper) CreateOrUpdateDistribution(orgId, distName, d
}

if resp == nil {
distTask, err = pdh.CreateDistribution(orgId, publicationHref, distName, distPath)
distTask, err = pdh.CreateDistribution(orgID, publicationHref, distName, distPath)
if err != nil {
return "", addedContentGuard, err
}
Expand All @@ -84,6 +88,28 @@ func (pdh *PulpDistributionHelper) CreateOrUpdateDistribution(orgId, distName, d
return *resp.PulpHref, addedContentGuard, err
}

func (pdh *PulpDistributionHelper) FindOrCreateDistribution(orgID, distName, distPath, publicationHref string) (string, error) {
resp, err := pdh.pulpClient.FindDistributionByPath(pdh.ctx, distPath)
if err != nil {
return "", err
}

if resp != nil && resp.PulpHref != nil {
return *resp.PulpHref, err
}

distTask, err := pdh.CreateDistribution(orgID, publicationHref, distName, distPath)
if err != nil {
return "", err
}
distHrefPtr := pulp_client.SelectRpmDistributionHref(distTask)
if distHrefPtr == nil {
return "", fmt.Errorf("could not find a distribution href in task: %v", distTask.PulpHref)
}

return *distTask.PulpHref, err
}

func (pdh *PulpDistributionHelper) FetchContentGuard(orgId string) (*string, error) {
if orgId != config.RedHatOrg && config.Get().Clients.Pulp.CustomRepoContentGuards {
href, err := pdh.pulpClient.CreateOrUpdateGuardsForOrg(pdh.ctx, orgId)
Expand Down
2 changes: 1 addition & 1 deletion pkg/tasks/helpers/pulp_distribution_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,6 @@ func (s *PulpDistributionHelperTest) TestRedHatDistributionUpdate() {
PulpHref: &distHref,
}, nil)

_, _, err := helper.CreateOrUpdateDistribution(orgId, distName, distPath, pubHref)
_, _, err := helper.CreateOrUpdateDistribution(orgId, pubHref, distName, distPath)
assert.NoError(s.T(), err)
}
6 changes: 3 additions & 3 deletions pkg/tasks/snapshot_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ func (sh *SnapshotHelper) Run(versionHref string) error {
distPath := fmt.Sprintf("%v/%v", sh.repo.UUID, *sh.payload.GetSnapshotIdent())
helper := helpers.NewPulpDistributionHelper(sh.ctx, sh.pulpClient)

distHref, addedContentGuard, err := helper.CreateOrUpdateDistribution(sh.orgId, *sh.payload.GetSnapshotIdent(), distPath, publicationHref)
distHref, addedContentGuard, err := helper.CreateOrUpdateDistribution(sh.orgId, publicationHref, *sh.payload.GetSnapshotIdent(), distPath)
if err != nil {
return err
}

latestPathIdent := fmt.Sprintf("%v/%v", sh.repo.UUID, "latest")
latestPathIdent := helpers.GetLatestRepoDistPath(sh.repo.UUID)

_, _, err = helper.CreateOrUpdateDistribution(sh.orgId, sh.repo.UUID, latestPathIdent, publicationHref)
_, _, err = helper.CreateOrUpdateDistribution(sh.orgId, publicationHref, sh.repo.UUID, latestPathIdent)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tasks/update_latest_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (t *UpdateLatestSnapshot) updateLatestSnapshot(repo api.RepositoryResponse,
return err
}

_, _, err = helpers.NewPulpDistributionHelper(t.ctx, t.pulpClient).CreateOrUpdateDistribution(t.orgID, distName, distPath, snap.PublicationHref)
_, _, err = helpers.NewPulpDistributionHelper(t.ctx, t.pulpClient).CreateOrUpdateDistribution(t.orgID, snap.PublicationHref, distName, distPath)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/tasks/update_template_content.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (t *UpdateTemplateContent) handleReposAdded(reposAdded []string, snapshots
return err
}

distResp, err := helpers.NewPulpDistributionHelper(t.ctx, t.pulpClient).CreateDistribution(repo.OrgID, snapshots[snapIndex].PublicationHref, distName, distPath)
distResp, err := helpers.NewPulpDistributionHelper(t.ctx, t.pulpClient).CreateDistribution(repo.OrgID, distName, distPath, snapshots[snapIndex].PublicationHref)
if err != nil {
return err
}
Expand Down Expand Up @@ -246,7 +246,7 @@ func (t *UpdateTemplateContent) handleReposUnchanged(reposUnchanged []string, sn
return err
}

_, _, err = helpers.NewPulpDistributionHelper(t.ctx, t.pulpClient).CreateOrUpdateDistribution(repo.OrgID, distName, distPath, snapshots[snapIndex].PublicationHref)
_, _, err = helpers.NewPulpDistributionHelper(t.ctx, t.pulpClient).CreateOrUpdateDistribution(repo.OrgID, snapshots[snapIndex].PublicationHref, distName, distPath)
if err != nil {
return err
}
Expand Down

0 comments on commit ef67702

Please sign in to comment.