diff --git a/state/action.go b/state/action.go index 8099dcae20bb..4a2cd4367dc6 100644 --- a/state/action.go +++ b/state/action.go @@ -829,41 +829,3 @@ func (st *State) matchingActionsByReceiverAndStatus(tag names.Tag, statusConditi } return actions, errors.Trace(iter.Close()) } - -// PruneOperations removes operation entries and their sub-tasks until -// only logs newer than remain and also ensures -// that the actions collection is smaller than after the deletion. -func PruneOperations(stop <-chan struct{}, st *State, maxHistoryTime time.Duration, maxHistoryMB int) error { - // There may be older actions without parent operations so try those first. - hasNoOperation := bson.D{{"$or", []bson.D{ - {{"operation", ""}}, - {{"operation", bson.D{{"$exists", false}}}}, - }}} - coll, closer := st.db().GetRawCollection(actionsC) - defer closer() - err := pruneCollection(stop, st, maxHistoryTime, maxHistoryMB, coll, "completed", hasNoOperation, GoTime) - if err != nil { - return errors.Trace(err) - } - // First calculate the average ratio of tasks to operations. Since deletion is - // done at the operation level, and any associated tasks are then deleted, but - // the actions collection is where the disk space goes, we approximate the - // number of operations to delete to achieve a given size deduction based on - // the average ratio of number of operations to tasks. - operationsColl, closer := st.db().GetRawCollection(operationsC) - defer closer() - operationsCount, err := operationsColl.Count() - if err != nil { - return errors.Annotate(err, "retrieving operations collection count") - } - actionsColl, closer := st.db().GetRawCollection(actionsC) - defer closer() - actionsCount, err := actionsColl.Count() - if err != nil { - return errors.Annotate(err, "retrieving actions collection count") - } - sizeFactor := float64(actionsCount) / float64(operationsCount) - - err = pruneCollectionAndChildren(stop, st, maxHistoryTime, maxHistoryMB, operationsColl, actionsColl, "completed", "operation", nil, sizeFactor, GoTime) - return errors.Trace(err) -} diff --git a/state/prune.go b/state/prune.go index 1b8bd7823a7d..0206a2380ddd 100644 --- a/state/prune.go +++ b/state/prune.go @@ -4,11 +4,8 @@ package state import ( - "fmt" - "math" "time" - "github.com/dustin/go-humanize" "github.com/juju/errors" "github.com/juju/mgo/v3" "github.com/juju/mgo/v3/bson" @@ -17,60 +14,6 @@ import ( "github.com/juju/juju/internal/mongo" ) -// pruneCollection removes collection entries until -// only entries newer than remain and also ensures -// that the collection is smaller than after the -// deletion. -func pruneCollection( - stop <-chan struct{}, - mb modelBackend, maxHistoryTime time.Duration, maxHistoryMB int, - coll *mgo.Collection, ageField string, filter bson.D, - timeUnit TimeUnit, -) error { - return pruneCollectionAndChildren(stop, mb, maxHistoryTime, maxHistoryMB, coll, nil, ageField, "", filter, 1, timeUnit) -} - -// pruneCollectionAndChildren removes collection entries until -// only entries newer than remain and also ensures -// that the collection (or child collection if specified) is smaller -// than after the deletion. -func pruneCollectionAndChildren(stop <-chan struct{}, mb modelBackend, maxHistoryTime time.Duration, maxHistoryMB int, - coll, childColl *mgo.Collection, ageField, parentRefField string, - filter bson.D, sizeFactor float64, timeUnit TimeUnit, -) error { - p := collectionPruner{ - st: mb, - coll: coll, - childColl: childColl, - parentRefField: parentRefField, - childCountRatio: sizeFactor, - maxAge: maxHistoryTime, - maxSize: maxHistoryMB, - ageField: ageField, - filter: filter, - timeUnit: timeUnit, - } - if err := p.validate(); err != nil { - return errors.Trace(err) - } - if err := p.pruneByAge(stop); err != nil { - return errors.Trace(err) - } - // First try pruning, excluding any items that - // have an age field that is not yet set. - // ie only prune completed items. - if err := p.pruneBySize(stop); err != nil { - return errors.Trace(err) - } - if ageField == "" { - return nil - } - // If needed, prune additional incomplete items to - // get under the size limit. - p.ageField = "" - return errors.Trace(p.pruneBySize(stop)) -} - const historyPruneBatchSize = 1000 const historyPruneProgressSeconds = 15 @@ -78,243 +21,6 @@ type doneCheck func() (bool, error) type TimeUnit string -const ( - NanoSeconds TimeUnit = "nanoseconds" - GoTime TimeUnit = "goTime" -) - -type collectionPruner struct { - st modelBackend - coll *mgo.Collection - filter bson.D - - // If specified, these fields define subordinate - // entries to delete in a related collection. - // The child records refer to the parents via - // the value of the parentRefField. - childColl *mgo.Collection - parentRefField string - childCountRatio float64 // ratio of child records to parent records. - - maxAge time.Duration - maxSize int - - ageField string - timeUnit TimeUnit -} - -func (p *collectionPruner) validate() error { - if p.maxSize < 0 { - return errors.NotValidf("non-positive max size") - } - if p.maxAge < 0 { - return errors.NotValidf("non-positive max age") - } - if p.maxSize == 0 && p.maxAge == 0 { - return errors.NewNotValid(nil, "backlog size and age constraints are both 0") - } - if p.childColl != nil && p.parentRefField == "" { - return errors.NewNotValid(nil, "missing parent reference field when a child collection is specified") - } - return nil -} - -func (p *collectionPruner) pruneByAge(stop <-chan struct{}) error { - if p.maxAge == 0 { - return nil - } - - t := p.st.clock().Now().Add(-p.maxAge) - var age interface{} - var notSet interface{} - - if p.timeUnit == NanoSeconds { - age = t.UnixNano() - notSet = 0 - } else { - age = t - notSet = time.Time{} - } - - query := bson.D{ - {"model-uuid", p.st.ModelUUID()}, - {p.ageField, bson.M{"$gt": notSet, "$lt": age}}, - } - query = append(query, p.filter...) - iter := p.coll.Find(query).Select(bson.M{"_id": 1}).Iter() - defer func() { _ = iter.Close() }() - - modelName, err := p.st.modelName() - if err != nil { - return errors.Trace(err) - } - logTemplate := fmt.Sprintf("%s age pruning (%s): %%d rows deleted", p.coll.Name, modelName) - deleted, err := deleteInBatches(stop, p.coll, p.childColl, p.parentRefField, iter, logTemplate, corelogger.INFO, noEarlyFinish) - if err != nil { - return errors.Trace(err) - } - if deleted > 0 { - logger.Debugf("%s age pruning (%s): %d rows deleted", p.coll.Name, modelName, deleted) - } - return errors.Trace(iter.Close()) -} - -func collStats(coll *mgo.Collection) (bson.M, error) { - var result bson.M - err := coll.Database.Run(bson.D{ - {"collStats", coll.Name}, - {"scale", humanize.KiByte}, - }, &result) - if err != nil { - return nil, errors.Trace(err) - } - // For mongo > 4.4, if the collection exists, - // there will be a "capped" attribute. - _, ok := result["capped"] - if !ok { - return nil, errors.NotFoundf("Collection [%s.%s]", coll.Database.Name, coll.Name) - } - return result, nil -} - -// dbCollectionSizeToInt processes the result of Database.collStats() -func dbCollectionSizeToInt(result bson.M, collectionName string) (int, error) { - size, ok := result["size"] - if !ok { - logger.Warningf("mongo collStats did not return a size field for %q", collectionName) - // this wasn't considered an error in the past, just treat it as size 0 - return 0, nil - } - if asint, ok := size.(int); ok { - if asint < 0 { - return 0, errors.Errorf("mongo collStats for %q returned a negative value: %v", collectionName, size) - } - return asint, nil - } - if asint64, ok := size.(int64); ok { - // 2billion megabytes is 2 petabytes, which is outside our range anyway. - if asint64 > math.MaxInt32 { - return math.MaxInt32, nil - } - if asint64 < 0 { - return 0, errors.Errorf("mongo collStats for %q returned a negative value: %v", collectionName, size) - } - return int(asint64), nil - } - return 0, errors.Errorf( - "mongo collStats for %q did not return an int or int64 for size, returned %T: %v", - collectionName, size, size) -} - -// getCollectionKB returns the size of a MongoDB collection (in -// kilobytes), excluding space used by indexes. -func getCollectionKB(coll *mgo.Collection) (int, error) { - stats, err := collStats(coll) - if err != nil { - return 0, errors.Trace(err) - } - return dbCollectionSizeToInt(stats, coll.Name) -} - -func (*collectionPruner) toDeleteCalculator(coll *mgo.Collection, maxSizeMB int, countRatio float64) (int, error) { - collKB, err := getCollectionKB(coll) - if err != nil { - return 0, errors.Annotate(err, "retrieving collection size") - } - maxSizeKB := maxSizeMB * humanize.KiByte - if collKB <= maxSizeKB { - return 0, nil - } - count, err := coll.Count() - if err == mgo.ErrNotFound || count <= 0 { - return 0, nil - } - if err != nil { - return 0, errors.Annotatef(err, "counting %s records", coll.Name) - } - // For large numbers of items we are making an assumption that the size of - // items can be averaged to give a reasonable number of items to drop to - // reach the goal size. - sizePerItem := float64(collKB) / float64(count) - if sizePerItem == 0 { - return 0, errors.Errorf("unexpected result calculating %s entry size", coll.Name) - } - return int(float64(collKB-maxSizeKB) / (sizePerItem * countRatio)), nil -} - -func (p *collectionPruner) pruneBySize(stop <-chan struct{}) error { - if !p.st.IsController() { - // Only prune by size in the controller. Otherwise we might - // find that multiple pruners are trying to delete the latest - // 1000 rows and end up with more deleted than we expect. - return nil - } - if p.maxSize == 0 { - return nil - } - var toDelete int - var err error - if p.childColl == nil { - // We are only operating on a single collection so calculate the number - // of items to delete based on the size of that collection. - toDelete, err = p.toDeleteCalculator(p.coll, p.maxSize, 1.0) - } else { - // We need to free up space in a child collection so calculate the number - // of parent items to delete based on the size of the child collection and - // the ratio of child items per parent item. - toDelete, err = p.toDeleteCalculator(p.childColl, p.maxSize, p.childCountRatio) - } - if err != nil { - return errors.Annotate(err, "calculating items to delete") - } - if toDelete <= 0 { - return nil - } - - // If age field is set, add a filter which - // excludes those items where the age field - // is not set, ie only prune completed items. - var filter bson.D - if p.ageField != "" { - var notSet interface{} - if p.timeUnit == NanoSeconds { - notSet = 0 - } else { - notSet = time.Time{} - } - filter = bson.D{ - {p.ageField, bson.M{"$gt": notSet}}, - } - } - filter = append(filter, p.filter...) - query := p.coll.Find(filter) - if p.ageField != "" { - query = query.Sort(p.ageField) - } - iter := query.Limit(toDelete).Select(bson.M{"_id": 1}).Iter() - defer func() { _ = iter.Close() }() - - template := fmt.Sprintf("%s size pruning: deleted %%d of %d (estimated)", p.coll.Name, toDelete) - deleted, err := deleteInBatches(stop, p.coll, p.childColl, p.parentRefField, iter, template, corelogger.INFO, func() (bool, error) { - // Check that we still need to delete more - collKB, err := getCollectionKB(p.coll) - if err != nil { - return false, errors.Annotatef(err, "retrieving %s collection size", p.coll.Name) - } - if collKB <= p.maxSize*humanize.KiByte { - return true, nil - } - return false, nil - }) - - if err != nil { - return errors.Trace(err) - } - - logger.Infof("%s size pruning finished: %d rows deleted", p.coll.Name, deleted) - return errors.Trace(iter.Close()) -} - func deleteInBatches( stop <-chan struct{}, coll *mgo.Collection, diff --git a/state/status.go b/state/status.go index 9b916a152c9f..8d58ef6676c4 100644 --- a/state/status.go +++ b/state/status.go @@ -595,12 +595,3 @@ func statusHistory(args *statusHistoryArgs) ([]status.StatusInfo, error) { results = partial return results, nil } - -// PruneStatusHistory prunes the status history collection. -func PruneStatusHistory(stop <-chan struct{}, st *State, maxHistoryTime time.Duration, maxHistoryMB int) error { - coll, closer := st.db().GetRawCollection(statusesHistoryC) - defer closer() - - err := pruneCollection(stop, st, maxHistoryTime, maxHistoryMB, coll, "updated", nil, NanoSeconds) - return errors.Trace(err) -}