Skip to content

Commit

Permalink
Merge pull request #2909 from cloudfoundry/prune-cooldowns
Browse files Browse the repository at this point in the history
fix(operator): Clean up scaling cooldowns
  • Loading branch information
silvestre authored May 15, 2024
2 parents 58893a4 + e35ec79 commit 70f2736
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 14 deletions.
2 changes: 2 additions & 0 deletions .envrc
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ if has bbl && [[ -d "$bbl_state_dir" ]]
then
eval "$(bbl print-env --state-dir "$bbl_state_dir")"
fi

export DBURL="postgres://postgres:postgres@localhost/autoscaler"
1 change: 1 addition & 0 deletions src/autoscaler/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type ScalingEngineDB interface {
CountScalingHistories(ctx context.Context, appId string, start int64, end int64, includeAll bool) (int, error)
RetrieveScalingHistories(ctx context.Context, appId string, start int64, end int64, orderType OrderType, includeAll bool, page int, resultsPerPAge int) ([]*models.AppScalingHistory, error)
PruneScalingHistories(ctx context.Context, before int64) error
PruneCooldowns(ctx context.Context, before int64) error
UpdateScalingCooldownExpireTime(appId string, expireAt int64) error
CanScaleApp(appId string) (bool, int64, error)
GetActiveSchedule(appId string) (*models.ActiveSchedule, error)
Expand Down
9 changes: 9 additions & 0 deletions src/autoscaler/db/sqldb/scalingengine_sqldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ func (sdb *ScalingEngineSQLDB) PruneScalingHistories(ctx context.Context, before
return err
}

func (sdb *ScalingEngineSQLDB) PruneCooldowns(ctx context.Context, before int64) error {
query := sdb.sqldb.Rebind("DELETE FROM scalingcooldown WHERE expireat < ?")
_, err := sdb.sqldb.ExecContext(ctx, query, before)
if err != nil {
sdb.logger.Error("failed-prune-scaling-cooldowns-from-scalingcooldown-table", err, lager.Data{"query": query, "before": before})
}
return err
}

func (sdb *ScalingEngineSQLDB) CanScaleApp(appId string) (bool, int64, error) {
query := sdb.sqldb.Rebind("SELECT expireat FROM scalingcooldown WHERE appid = ?")
rows, err := sdb.sqldb.Query(query, appId)
Expand Down
64 changes: 64 additions & 0 deletions src/autoscaler/db/sqldb/scalingengine_sqldb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sqldb_test
import (
"context"
"os"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -72,10 +73,12 @@ var _ = Describe("ScalingEngineSqldb", func() {
cleanupForApp(appId)
cleanupForApp(appId2)
cleanupForApp(appId3)
cleanUpCooldownTable()
DeferCleanup(func() {
cleanupForApp(appId)
cleanupForApp(appId2)
cleanupForApp(appId3)
cleanUpCooldownTable()
})
})

Expand Down Expand Up @@ -545,6 +548,67 @@ var _ = Describe("ScalingEngineSqldb", func() {
})
})

Describe("PruneCooldowns", Serial, func() {
var appIds []string

BeforeEach(func() {

appIds = make([]string, 10)
for i := 0; i < 10; i++ {
appIds[i] = addProcessIdTo("an-app-id-" + strconv.Itoa(i))
err := sdb.UpdateScalingCooldownExpireTime(appIds[i], 111111*int64(i+1))
Expect(err).NotTo(HaveOccurred())
}

})

JustBeforeEach(func() {
err = sdb.PruneCooldowns(context.TODO(), before)
})

Context("when pruning cooldowns before all the timestamps", func() {
BeforeEach(func() {
before = 111111
})

It("does not remove any cooldowns", func() {
Expect(err).NotTo(HaveOccurred())
Expect(getNumberOfCooldownEntries()).To(Equal(10))
})
})

Context("when pruning all the cooldowns", func() {
BeforeEach(func() {
before = time.Now().UnixNano()
})

It("empties the scalingcooldowns table", func() {
Expect(err).NotTo(HaveOccurred())
Expect(getNumberOfCooldownEntries()).To(Equal(0))
})
})

Context("when pruning part of the cooldowns", func() {
BeforeEach(func() {
before = 333333
})

It("removes cooldowns before the time specified", func() {
Expect(err).NotTo(HaveOccurred())
Expect(getNumberOfCooldownEntries()).To(Equal(8))
})
})

Context("when db fails", func() {
BeforeEach(func() {
_ = sdb.Close()
})
It("should error", func() {
Expect(err).To(MatchError(MatchRegexp("sql: .*")))
})
})
})

Describe("UpdateScalingCooldownExpireTime", func() {

JustBeforeEach(func() {
Expand Down
16 changes: 15 additions & 1 deletion src/autoscaler/db/sqldb/sqldb_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,29 @@ func removeScalingHistoryForApp(appId string) {
FailOnError("can not clean table scalinghistory", err)
}

func getNumberOfCooldownEntries() int {
var num int
query := dbHelper.Rebind("SELECT COUNT(*) FROM scalingcooldown")
err := dbHelper.QueryRow(query).Scan(&num)
FailOnError("can not count the number of records in table scalingcooldown", err)
return num
}

func removeCooldownForApp(appId string) {
query := dbHelper.Rebind("DELETE from scalingcooldown where appId = ?")
_, err := dbHelper.Exec(query, appId)
FailOnError("can not remove scalingcooldown for app", err)
}

func cleanUpCooldownTable() {
_, err := dbHelper.Exec("DELETE from scalingcooldown")
FailOnError("can not clean table scalingcooldown", err)
}

func removeActiveScheduleForApp(appId string) {
query := dbHelper.Rebind("DELETE from activeschedule where appId = ?")
_, err := dbHelper.Exec(query, appId)
FailOnError("can not clean table scalingcooldown", err)
FailOnError("can not remove actives schedules for app", err)
}

func hasScalingHistory(appId string, timestamp int64) bool {
Expand Down
30 changes: 17 additions & 13 deletions src/autoscaler/operator/scalingenginedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,35 @@ import (
)

type ScalingEngineDbPruner struct {
scalingEngineDb db.ScalingEngineDB
cutoffDuration time.Duration
clock clock.Clock
logger lager.Logger
scalingEngineDb db.ScalingEngineDB
scalingHistoriesCutoffDuration time.Duration
clock clock.Clock
logger lager.Logger
}

func NewScalingEngineDbPruner(scalingEngineDb db.ScalingEngineDB, cutoffDuration time.Duration, clock clock.Clock, logger lager.Logger) *ScalingEngineDbPruner {
func NewScalingEngineDbPruner(scalingEngineDb db.ScalingEngineDB, scalingHistoriesCutoffDuration time.Duration, clock clock.Clock, logger lager.Logger) *ScalingEngineDbPruner {
return &ScalingEngineDbPruner{
scalingEngineDb: scalingEngineDb,
cutoffDuration: cutoffDuration,
clock: clock,
logger: logger.Session("scaling_engine_db_pruner"),
scalingEngineDb: scalingEngineDb,
scalingHistoriesCutoffDuration: scalingHistoriesCutoffDuration,
clock: clock,
logger: logger.Session("scaling_engine_db_pruner"),
}
}

func (sdp ScalingEngineDbPruner) Operate(ctx context.Context) {
timestamp := sdp.clock.Now().Add(-sdp.cutoffDuration).UnixNano()
historyCutoffTimestamp := sdp.clock.Now().Add(-sdp.scalingHistoriesCutoffDuration).UnixNano()

logger := sdp.logger.Session("pruning-scaling-histories", lager.Data{"cutoff-time": timestamp})
logger := sdp.logger.Session("pruning-scaling-histories-and-cooldowns", lager.Data{"history-cutoff-time": historyCutoffTimestamp})
logger.Info("starting")
defer logger.Info("completed")

err := sdp.scalingEngineDb.PruneScalingHistories(ctx, timestamp)
err := sdp.scalingEngineDb.PruneScalingHistories(ctx, historyCutoffTimestamp)
if err != nil {
sdp.logger.Error("failed-prune-scaling-histories", err)
return
}

err = sdp.scalingEngineDb.PruneCooldowns(ctx, sdp.clock.Now().UnixNano())
if err != nil {
sdp.logger.Error("failed-prune-scaling-cooldowns", err)
}
}
20 changes: 20 additions & 0 deletions src/autoscaler/operator/scalingenginedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,25 @@ var _ = Describe("ScalingEngineDbPruner", func() {
Eventually(buffer).Should(gbytes.Say("test error"))
})
})

Context("when pruning records from scalingcooldown table", func() {
It("prunes expired cooldowns", func() {
Eventually(scalingEngineDB.PruneCooldownsCallCount).Should(Equal(1))
_, cutoffTime := scalingEngineDB.PruneCooldownsArgsForCall(0)
Expect(cutoffTime).To(Equal(fclock.Now().UnixNano()))
})
})

Context("when pruning records from scalingcooldown table fails", func() {
BeforeEach(func() {
scalingEngineDB.PruneCooldownsReturns(errors.New("test error"))
})

It("should error", func() {
Eventually(scalingEngineDB.PruneCooldownsCallCount).Should(Equal(1))
Eventually(buffer).Should(gbytes.Say("test error"))
})
})

})
})

0 comments on commit 70f2736

Please sign in to comment.