From a0920e160e5d9e98d2a20a6f9a69a0bcb7cd26ff Mon Sep 17 00:00:00 2001 From: Richard LT Date: Tue, 22 Jun 2021 10:59:09 +0200 Subject: [PATCH] fix(cdn): clean old worker cache items (#5856) --- engine/cdn/cdn_file.go | 50 +++++++++++++++++++++++++------------- engine/cdn/item/dao.go | 36 ++++++++++++++++++--------- engine/cdn/item_handler.go | 2 +- 3 files changed, 58 insertions(+), 30 deletions(-) diff --git a/engine/cdn/cdn_file.go b/engine/cdn/cdn_file.go index 281ccdbb19..870deb5f0e 100644 --- a/engine/cdn/cdn_file.go +++ b/engine/cdn/cdn_file.go @@ -10,6 +10,7 @@ import ( "io" "net/http" "os" + "sort" "github.com/ovh/cds/engine/cdn/item" "github.com/ovh/cds/engine/cdn/storage" @@ -126,11 +127,6 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re } defer tx.Rollback() //nolint - // Check and Clean file with same ref - if err := s.cleanPreviousFileItem(ctx, tx, sig, itemType, apiRef.ToFilename()); err != nil { - return err - } - // Insert Item if err := item.Insert(ctx, s.Mapper, tx, it); err != nil { return err @@ -186,22 +182,42 @@ func (s *Service) storeFile(ctx context.Context, sig cdn.Signature, reader io.Re } s.Units.PushInSyncQueue(ctx, it.ID, it.Created) + + // For worker cache item clean others with same ref to purge old cached data + if itemType == sdk.CDNTypeItemWorkerCache { + tx, err := s.mustDBWithCtx(ctx).Begin() + if err != nil { + return sdk.WithStack(err) + } + defer tx.Rollback() //nolint + + if err := s.cleanPreviousCachedData(ctx, tx, sig, apiRef.ToFilename()); err != nil { + return err + } + + if err := tx.Commit(); err != nil { + return sdk.WithStack(err) + } + } + return nil } -func (s *Service) cleanPreviousFileItem(ctx context.Context, tx gorpmapper.SqlExecutorWithTx, sig cdn.Signature, itemType sdk.CDNItemType, name string) error { - switch itemType { - case sdk.CDNTypeItemWorkerCache: - // Check if item already exist - existingItem, err := item.LoadFileByProjectAndCacheTag(ctx, s.Mapper, tx, itemType, sig.ProjectKey, name) - if err != nil { - if !sdk.ErrorIs(err, sdk.ErrNotFound) { - return err - } - return nil +// Mark to delete all items for given cache tag except the most recent one. +func (s *Service) cleanPreviousCachedData(ctx context.Context, tx gorpmapper.SqlExecutorWithTx, sig cdn.Signature, cacheTag string) error { + items, err := item.LoadWorkerCacheItemsByProjectAndCacheTag(ctx, s.Mapper, tx, sig.ProjectKey, cacheTag) + if err != nil { + return err + } + + sort.Slice(items, func(i, j int) bool { return items[i].Created.Before(items[j].Created) }) + + for i := 0; i < len(items)-1; i++ { + items[i].ToDelete = true + if err := item.Update(ctx, s.Mapper, tx, &items[i]); err != nil { + return err } - existingItem.ToDelete = true - return item.Update(ctx, s.Mapper, tx, existingItem) } + return nil } diff --git a/engine/cdn/item/dao.go b/engine/cdn/item/dao.go index ad50514ddc..581ff50dbd 100644 --- a/engine/cdn/item/dao.go +++ b/engine/cdn/item/dao.go @@ -144,13 +144,13 @@ func Update(ctx context.Context, m *gorpmapper.Mapper, db gorpmapper.SqlExecutor func MarkToDeleteByRunIDs(db gorpmapper.SqlExecutorWithTx, runID int64) error { query := ` - UPDATE item SET to_delete = true WHERE (api_ref->>'run_id')::int = $1 + UPDATE item SET to_delete = true WHERE (api_ref->>'run_id')::int = $1 ` _, err := db.Exec(query, runID) return sdk.WrapError(err, "unable to mark item to delete for run %d", runID) } -func LoadFileByProjectAndCacheTag(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, itemType sdk.CDNItemType, projKey string, cacheTag string) (*sdk.CDNItem, error) { +func LoadWorkerCacheItemByProjectAndCacheTag(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, projKey string, cacheTag string) (*sdk.CDNItem, error) { query := gorpmapper.NewQuery(` SELECT * FROM item @@ -158,17 +158,30 @@ func LoadFileByProjectAndCacheTag(ctx context.Context, m *gorpmapper.Mapper, db AND (api_ref->>'project_key')::text = $2 AND (api_ref->>'cache_tag')::text = $3 AND to_delete = false - - `).Args(itemType, projKey, cacheTag) + ORDER BY created DESC + LIMIT 1 + `).Args(sdk.CDNTypeItemWorkerCache, projKey, cacheTag) return getItem(ctx, m, db, query) } +func LoadWorkerCacheItemsByProjectAndCacheTag(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, projKey string, cacheTag string) ([]sdk.CDNItem, error) { + query := gorpmapper.NewQuery(` + SELECT * + FROM item + WHERE type = $1 + AND (api_ref->>'project_key')::text = $2 + AND (api_ref->>'cache_tag')::text = $3 + AND to_delete = false + `).Args(sdk.CDNTypeItemWorkerCache, projKey, cacheTag) + return getItems(ctx, m, db, query) +} + // LoadByAPIRefHashAndType load an item by his job id, step order and type func LoadByAPIRefHashAndType(ctx context.Context, m *gorpmapper.Mapper, db gorp.SqlExecutor, hash string, itemType sdk.CDNItemType, opts ...gorpmapper.GetOptionFunc) (*sdk.CDNItem, error) { query := gorpmapper.NewQuery(` SELECT * FROM item - WHERE api_ref_hash = $1 + WHERE api_ref_hash = $1 AND type = $2 AND to_delete = false `).Args(hash, itemType) @@ -179,7 +192,7 @@ func LoadByAPIRefHashAndType(ctx context.Context, m *gorpmapper.Mapper, db gorp. func ComputeSizeByIDs(db gorp.SqlExecutor, itemIDs []string) (int64, error) { query := ` SELECT COALESCE(SUM(size), 0) FROM item - WHERE id = ANY($1) + WHERE id = ANY($1) ` size, err := db.SelectInt(query, pq.StringArray(itemIDs)) if err != nil { @@ -191,9 +204,9 @@ func ComputeSizeByIDs(db gorp.SqlExecutor, itemIDs []string) (int64, error) { func ListNodeRunByProject(db gorp.SqlExecutor, projectKey string) ([]int64, error) { var IDs []int64 query := ` - SELECT + SELECT DISTINCT((api_ref->>'node_run_id')::int) - FROM item + FROM item WHERE api_ref->>'project_key' = $1 ` _, err := db.Select(&IDs, query, projectKey) @@ -206,7 +219,7 @@ func ListNodeRunByProject(db gorp.SqlExecutor, projectKey string) ([]int64, erro // ComputeSizeByProjectKey returns the size used by a project func ComputeSizeByProjectKey(db gorp.SqlExecutor, projectKey string) (int64, error) { query := ` - SELECT SUM(size) FROM item WHERE api_ref->>'project_key' = $1 + SELECT SUM(size) FROM item WHERE api_ref->>'project_key' = $1 ` size, err := db.SelectInt(query, projectKey) if err != nil { @@ -223,7 +236,7 @@ type Stat struct { func CountItems(db gorp.SqlExecutor) (res []Stat, err error) { _, err = db.Select(&res, ` - SELECT status, type, count(status) as "number" + SELECT status, type, count(status) as "number" FROM item GROUP BY status, type`) return res, sdk.WithStack(err) @@ -231,7 +244,7 @@ func CountItems(db gorp.SqlExecutor) (res []Stat, err error) { func CountItemsToDelete(db gorp.SqlExecutor) (int64, error) { query := `SELECT count(1) as "number" - FROM item + FROM item WHERE to_delete = true` nb, err := db.SelectInt(query) return nb, sdk.WithStack(err) @@ -334,5 +347,4 @@ func LoadRunResultByRunID(ctx context.Context, m *gorpmapper.Mapper, db gorp.Sql SELECT * FROM item WHERE id IN (SELECT id FROM deduplication) `).Args(runID, sdk.CDNTypeItemRunResult) return getItems(ctx, m, db, query) - } diff --git a/engine/cdn/item_handler.go b/engine/cdn/item_handler.go index c20da2d979..31f763b608 100644 --- a/engine/cdn/item_handler.go +++ b/engine/cdn/item_handler.go @@ -265,7 +265,7 @@ func (s *Service) getWorkerCache(ctx context.Context, r *http.Request, w http.Re if projectKey == "" || cachetag == "" { return sdk.WrapError(sdk.ErrWrongRequest, "invalid data to get worker cache") } - item, err := item.LoadFileByProjectAndCacheTag(ctx, s.Mapper, s.mustDBWithCtx(ctx), sdk.CDNTypeItemWorkerCache, projectKey, cachetag) + item, err := item.LoadWorkerCacheItemByProjectAndCacheTag(ctx, s.Mapper, s.mustDBWithCtx(ctx), projectKey, cachetag) if err != nil { return err }