From e35ec79cada10b9e9b4032bc0c66885373f68945 Mon Sep 17 00:00:00 2001 From: Silvestre Zabala Date: Mon, 29 Apr 2024 15:41:43 +0200 Subject: [PATCH] fix(operator): Clean up scaling cooldowns # Issue The table `scalingcooldowns` will gain an entry once an app is scaled. This entry is never removed, thus a resource leak is created. The table can get so big, that a simple select starts to take time (as there is no index on the table). # Fix The table is now periodically cleaned by the `operator`. --- .envrc | 2 + src/autoscaler/db/db.go | 1 + .../db/sqldb/scalingengine_sqldb.go | 9 +++ .../db/sqldb/scalingengine_sqldb_test.go | 64 +++++++++++++++++++ src/autoscaler/db/sqldb/sqldb_suite_test.go | 16 ++++- src/autoscaler/operator/scalingenginedb.go | 30 +++++---- .../operator/scalingenginedb_test.go | 20 ++++++ 7 files changed, 128 insertions(+), 14 deletions(-) diff --git a/.envrc b/.envrc index 6d48b37efb..bdc753ffe8 100644 --- a/.envrc +++ b/.envrc @@ -21,3 +21,5 @@ if has bbl && [[ -d "$bbl_state_dir" ]] then eval "$(bbl print-env --state-dir "$bbl_state_dir")" fi + +export DBURL="postgres://postgres:postgres@localhost/autoscaler" diff --git a/src/autoscaler/db/db.go b/src/autoscaler/db/db.go index d38eb19952..5fbe95550e 100644 --- a/src/autoscaler/db/db.go +++ b/src/autoscaler/db/db.go @@ -103,6 +103,7 @@ type ScalingEngineDB interface { CountScalingHistories(ctx context.Context, appId string, start int64, end int64, includeAll bool) (int, error) RetrieveScalingHistories(ctx context.Context, appId string, start int64, end int64, orderType OrderType, includeAll bool, page int, resultsPerPAge int) ([]*models.AppScalingHistory, error) PruneScalingHistories(ctx context.Context, before int64) error + PruneCooldowns(ctx context.Context, before int64) error UpdateScalingCooldownExpireTime(appId string, expireAt int64) error CanScaleApp(appId string) (bool, int64, error) GetActiveSchedule(appId string) (*models.ActiveSchedule, error) diff --git a/src/autoscaler/db/sqldb/scalingengine_sqldb.go b/src/autoscaler/db/sqldb/scalingengine_sqldb.go index a14595fa72..929a9ec808 100644 --- a/src/autoscaler/db/sqldb/scalingengine_sqldb.go +++ b/src/autoscaler/db/sqldb/scalingengine_sqldb.go @@ -170,6 +170,15 @@ func (sdb *ScalingEngineSQLDB) PruneScalingHistories(ctx context.Context, before return err } +func (sdb *ScalingEngineSQLDB) PruneCooldowns(ctx context.Context, before int64) error { + query := sdb.sqldb.Rebind("DELETE FROM scalingcooldown WHERE expireat < ?") + _, err := sdb.sqldb.ExecContext(ctx, query, before) + if err != nil { + sdb.logger.Error("failed-prune-scaling-cooldowns-from-scalingcooldown-table", err, lager.Data{"query": query, "before": before}) + } + return err +} + func (sdb *ScalingEngineSQLDB) CanScaleApp(appId string) (bool, int64, error) { query := sdb.sqldb.Rebind("SELECT expireat FROM scalingcooldown WHERE appid = ?") rows, err := sdb.sqldb.Query(query, appId) diff --git a/src/autoscaler/db/sqldb/scalingengine_sqldb_test.go b/src/autoscaler/db/sqldb/scalingengine_sqldb_test.go index 94d1a22e51..ccf880ecb3 100644 --- a/src/autoscaler/db/sqldb/scalingengine_sqldb_test.go +++ b/src/autoscaler/db/sqldb/scalingengine_sqldb_test.go @@ -3,6 +3,7 @@ package sqldb_test import ( "context" "os" + "strconv" "strings" "time" @@ -72,10 +73,12 @@ var _ = Describe("ScalingEngineSqldb", func() { cleanupForApp(appId) cleanupForApp(appId2) cleanupForApp(appId3) + cleanUpCooldownTable() DeferCleanup(func() { cleanupForApp(appId) cleanupForApp(appId2) cleanupForApp(appId3) + cleanUpCooldownTable() }) }) @@ -545,6 +548,67 @@ var _ = Describe("ScalingEngineSqldb", func() { }) }) + Describe("PruneCooldowns", Serial, func() { + var appIds []string + + BeforeEach(func() { + + appIds = make([]string, 10) + for i := 0; i < 10; i++ { + appIds[i] = addProcessIdTo("an-app-id-" + strconv.Itoa(i)) + err := sdb.UpdateScalingCooldownExpireTime(appIds[i], 111111*int64(i+1)) + Expect(err).NotTo(HaveOccurred()) + } + + }) + + JustBeforeEach(func() { + err = sdb.PruneCooldowns(context.TODO(), before) + }) + + Context("when pruning cooldowns before all the timestamps", func() { + BeforeEach(func() { + before = 111111 + }) + + It("does not remove any cooldowns", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(getNumberOfCooldownEntries()).To(Equal(10)) + }) + }) + + Context("when pruning all the cooldowns", func() { + BeforeEach(func() { + before = time.Now().UnixNano() + }) + + It("empties the scalingcooldowns table", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(getNumberOfCooldownEntries()).To(Equal(0)) + }) + }) + + Context("when pruning part of the cooldowns", func() { + BeforeEach(func() { + before = 333333 + }) + + It("removes cooldowns before the time specified", func() { + Expect(err).NotTo(HaveOccurred()) + Expect(getNumberOfCooldownEntries()).To(Equal(8)) + }) + }) + + Context("when db fails", func() { + BeforeEach(func() { + _ = sdb.Close() + }) + It("should error", func() { + Expect(err).To(MatchError(MatchRegexp("sql: .*"))) + }) + }) + }) + Describe("UpdateScalingCooldownExpireTime", func() { JustBeforeEach(func() { diff --git a/src/autoscaler/db/sqldb/sqldb_suite_test.go b/src/autoscaler/db/sqldb/sqldb_suite_test.go index c870f6b9b9..9a118ecf5e 100644 --- a/src/autoscaler/db/sqldb/sqldb_suite_test.go +++ b/src/autoscaler/db/sqldb/sqldb_suite_test.go @@ -220,15 +220,29 @@ func removeScalingHistoryForApp(appId string) { FailOnError("can not clean table scalinghistory", err) } +func getNumberOfCooldownEntries() int { + var num int + query := dbHelper.Rebind("SELECT COUNT(*) FROM scalingcooldown") + err := dbHelper.QueryRow(query).Scan(&num) + FailOnError("can not count the number of records in table scalingcooldown", err) + return num +} + func removeCooldownForApp(appId string) { query := dbHelper.Rebind("DELETE from scalingcooldown where appId = ?") _, err := dbHelper.Exec(query, appId) + FailOnError("can not remove scalingcooldown for app", err) +} + +func cleanUpCooldownTable() { + _, err := dbHelper.Exec("DELETE from scalingcooldown") FailOnError("can not clean table scalingcooldown", err) } + func removeActiveScheduleForApp(appId string) { query := dbHelper.Rebind("DELETE from activeschedule where appId = ?") _, err := dbHelper.Exec(query, appId) - FailOnError("can not clean table scalingcooldown", err) + FailOnError("can not remove actives schedules for app", err) } func hasScalingHistory(appId string, timestamp int64) bool { diff --git a/src/autoscaler/operator/scalingenginedb.go b/src/autoscaler/operator/scalingenginedb.go index 4bd1416bf5..9572dc1be8 100644 --- a/src/autoscaler/operator/scalingenginedb.go +++ b/src/autoscaler/operator/scalingenginedb.go @@ -10,31 +10,35 @@ import ( ) type ScalingEngineDbPruner struct { - scalingEngineDb db.ScalingEngineDB - cutoffDuration time.Duration - clock clock.Clock - logger lager.Logger + scalingEngineDb db.ScalingEngineDB + scalingHistoriesCutoffDuration time.Duration + clock clock.Clock + logger lager.Logger } -func NewScalingEngineDbPruner(scalingEngineDb db.ScalingEngineDB, cutoffDuration time.Duration, clock clock.Clock, logger lager.Logger) *ScalingEngineDbPruner { +func NewScalingEngineDbPruner(scalingEngineDb db.ScalingEngineDB, scalingHistoriesCutoffDuration time.Duration, clock clock.Clock, logger lager.Logger) *ScalingEngineDbPruner { return &ScalingEngineDbPruner{ - scalingEngineDb: scalingEngineDb, - cutoffDuration: cutoffDuration, - clock: clock, - logger: logger.Session("scaling_engine_db_pruner"), + scalingEngineDb: scalingEngineDb, + scalingHistoriesCutoffDuration: scalingHistoriesCutoffDuration, + clock: clock, + logger: logger.Session("scaling_engine_db_pruner"), } } func (sdp ScalingEngineDbPruner) Operate(ctx context.Context) { - timestamp := sdp.clock.Now().Add(-sdp.cutoffDuration).UnixNano() + historyCutoffTimestamp := sdp.clock.Now().Add(-sdp.scalingHistoriesCutoffDuration).UnixNano() - logger := sdp.logger.Session("pruning-scaling-histories", lager.Data{"cutoff-time": timestamp}) + logger := sdp.logger.Session("pruning-scaling-histories-and-cooldowns", lager.Data{"history-cutoff-time": historyCutoffTimestamp}) logger.Info("starting") defer logger.Info("completed") - err := sdp.scalingEngineDb.PruneScalingHistories(ctx, timestamp) + err := sdp.scalingEngineDb.PruneScalingHistories(ctx, historyCutoffTimestamp) if err != nil { sdp.logger.Error("failed-prune-scaling-histories", err) - return + } + + err = sdp.scalingEngineDb.PruneCooldowns(ctx, sdp.clock.Now().UnixNano()) + if err != nil { + sdp.logger.Error("failed-prune-scaling-cooldowns", err) } } diff --git a/src/autoscaler/operator/scalingenginedb_test.go b/src/autoscaler/operator/scalingenginedb_test.go index 3e56a506ae..97d00b49ca 100644 --- a/src/autoscaler/operator/scalingenginedb_test.go +++ b/src/autoscaler/operator/scalingenginedb_test.go @@ -57,5 +57,25 @@ var _ = Describe("ScalingEngineDbPruner", func() { Eventually(buffer).Should(gbytes.Say("test error")) }) }) + + Context("when pruning records from scalingcooldown table", func() { + It("prunes expired cooldowns", func() { + Eventually(scalingEngineDB.PruneCooldownsCallCount).Should(Equal(1)) + _, cutoffTime := scalingEngineDB.PruneCooldownsArgsForCall(0) + Expect(cutoffTime).To(Equal(fclock.Now().UnixNano())) + }) + }) + + Context("when pruning records from scalingcooldown table fails", func() { + BeforeEach(func() { + scalingEngineDB.PruneCooldownsReturns(errors.New("test error")) + }) + + It("should error", func() { + Eventually(scalingEngineDB.PruneCooldownsCallCount).Should(Equal(1)) + Eventually(buffer).Should(gbytes.Say("test error")) + }) + }) + }) })