diff --git a/cmd/support-cron-scheduler/res/configuration.yaml b/cmd/support-cron-scheduler/res/configuration.yaml index f77b676bb4..098e8bb0b0 100644 --- a/cmd/support-cron-scheduler/res/configuration.yaml +++ b/cmd/support-cron-scheduler/res/configuration.yaml @@ -31,3 +31,8 @@ Database: Timeout: 5s Type: postgres +Retention: + Enabled: true + Interval: 24h # Purging interval defines when the database should be rid of records above the high watermark. + MaxCap: 10000 # The maximum capacity defines where the high watermark of records should be detected for purging the amount of the records to the minimum capacity. + MinCap: 8000 # The minimum capacity defines where the total count of records should be returned to during purging. diff --git a/internal/pkg/infrastructure/postgres/scheduleactionrecord.go b/internal/pkg/infrastructure/postgres/scheduleactionrecord.go index d2d154c07c..e27fd17ccd 100644 --- a/internal/pkg/infrastructure/postgres/scheduleactionrecord.go +++ b/internal/pkg/infrastructure/postgres/scheduleactionrecord.go @@ -83,6 +83,19 @@ func (c *Client) LatestScheduleActionRecordsByJobName(ctx context.Context, jobNa return records, nil } +// LatestScheduleActionRecordsByOffset queries the latest schedule action records by offset +func (c *Client) LatestScheduleActionRecordsByOffset(ctx context.Context, offset uint32) (model.ScheduleActionRecord, errors.EdgeX) { + records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryAllWithPaginationDescByCol(scheduleActionRecordTableName, createdCol), offset, 1) + if err != nil { + return model.ScheduleActionRecord{}, errors.NewCommonEdgeX(errors.KindDatabaseError, "failed to query all schedule action records", err) + } + + if len(records) == 0 { + return model.ScheduleActionRecord{}, errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("no schedule action record found with offset '%d'", offset), err) + } + return records[0], nil +} + // ScheduleActionRecordsByStatus queries the schedule action records by status with the given range, offset, and limit func (c *Client) ScheduleActionRecordsByStatus(ctx context.Context, status string, start, end int64, offset, limit int) ([]model.ScheduleActionRecord, errors.EdgeX) { var err errors.EdgeX diff --git a/internal/support/cronscheduler/application/scheduleactionrecord.go b/internal/support/cronscheduler/application/scheduleactionrecord.go index 227dfa2ddd..d24edc5222 100644 --- a/internal/support/cronscheduler/application/scheduleactionrecord.go +++ b/internal/support/cronscheduler/application/scheduleactionrecord.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "strings" + "sync" "time" "github.com/robfig/cron/v3" @@ -24,6 +25,8 @@ import ( "github.com/edgexfoundry/edgex-go/internal/support/cronscheduler/container" ) +var asyncPurgeRecordOnce sync.Once + // AllScheduleActionRecords query the schedule action records with the specified offset, limit, and time range func AllScheduleActionRecords(ctx context.Context, start, end int64, offset, limit int, dic *di.Container) (scheduleActionRecordDTOs []dtos.ScheduleActionRecord, totalCount uint32, err errors.EdgeX) { dbClient := container.DBClientFrom(dic.Get) @@ -166,6 +169,31 @@ func GenerateMissedScheduleActionRecords(ctx context.Context, dic *di.Container, return nil, len(missedRecords) > 0 } +// AsyncPurgeRecord purge schedule action records according to the retention capability. +func AsyncPurgeRecord(ctx context.Context, dic *di.Container, interval time.Duration) { + asyncPurgeRecordOnce.Do(func() { + go func() { + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + timer := time.NewTimer(interval) + for { + timer.Reset(interval) + select { + case <-ctx.Done(): + lc.Info("Exiting schedule action records retention") + return + case <-timer.C: + lc.Info("Start checking the schedule action records and purge the outdated ones according to the retention settings") + err := purgeRecord(ctx, dic) + if err != nil { + lc.Errorf("Failed to purge schedule action records, %v", err) + break + } + } + } + }() + }) +} + func generateMissedRuns(def models.ScheduleDef, latestTime time.Time) (missedRuns []time.Time, err errors.EdgeX) { currentTime := time.Now() @@ -231,3 +259,26 @@ func parseCronExpression(cronExpr string) (cron.Schedule, error) { } return schedule, nil } + +func purgeRecord(ctx context.Context, dic *di.Container) errors.EdgeX { + lc := bootstrapContainer.LoggingClientFrom(dic.Get) + dbClient := container.DBClientFrom(dic.Get) + config := container.ConfigurationFrom(dic.Get) + total, err := dbClient.ScheduleActionRecordTotalCount(ctx, 0, time.Now().UnixMilli()) + if err != nil { + return errors.NewCommonEdgeX(errors.Kind(err), "failed to query schedule action record total count, %v", err) + } + if total >= config.Retention.MaxCap { + lc.Debugf("Purging the schedule action record amount %d to the minimum capacity %d", total, config.Retention.MinCap) + record, err := dbClient.LatestScheduleActionRecordsByOffset(ctx, config.Retention.MinCap) + if err != nil { + return errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to query schedule action records with offset '%d'", config.Retention.MinCap), err) + } + age := time.Now().UnixMilli() - record.Created + err = dbClient.DeleteScheduleActionRecordByAge(ctx, age) + if err != nil { + return errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to delete schedule action records by age '%d'", age), err) + } + } + return nil +} diff --git a/internal/support/cronscheduler/application/scheduleactionrecord_test.go b/internal/support/cronscheduler/application/scheduleactionrecord_test.go index 8598989324..e51ede71b4 100644 --- a/internal/support/cronscheduler/application/scheduleactionrecord_test.go +++ b/internal/support/cronscheduler/application/scheduleactionrecord_test.go @@ -6,11 +6,23 @@ package application import ( + "context" "testing" "time" "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container" + "github.com/edgexfoundry/go-mod-bootstrap/v3/di" + "github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger" + "github.com/edgexfoundry/go-mod-core-contracts/v3/models" + + "github.com/edgexfoundry/edgex-go/internal/support/cronscheduler/config" + "github.com/edgexfoundry/edgex-go/internal/support/cronscheduler/container" + dbMock "github.com/edgexfoundry/edgex-go/internal/support/cronscheduler/infrastructure/interfaces/mocks" ) var ( @@ -102,3 +114,52 @@ func TestFindMissedCronRuns(t *testing.T) { }) } } + +func TestPurgeRecord(t *testing.T) { + ctx := context.Background() + configuration := &config.ConfigurationStruct{ + Retention: config.RecordRetention{ + Enabled: true, + Interval: "1s", + MaxCap: 5, + MinCap: 3, + }, + } + dic := di.NewContainer(di.ServiceConstructorMap{ + container.ConfigurationName: func(get di.Get) interface{} { + return configuration + }, + bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} { + return logger.NewMockClient() + }, + }) + + tests := []struct { + name string + recordCount uint32 + }{ + {"invoke schedule action record purging", configuration.Retention.MaxCap}, + {"not invoke schedule action record purging", configuration.Retention.MinCap}, + } + for _, testCase := range tests { + t.Run(testCase.name, func(t *testing.T) { + dbClientMock := &dbMock.DBClient{} + record := models.ScheduleActionRecord{} + dbClientMock.On("LatestScheduleActionRecordsByOffset", ctx, configuration.Retention.MinCap).Return(record, nil) + dbClientMock.On("ScheduleActionRecordTotalCount", ctx, int64(0), mock.AnythingOfType("int64")).Return(testCase.recordCount, nil) + dbClientMock.On("DeleteScheduleActionRecordByAge", ctx, mock.AnythingOfType("int64")).Return(nil) + dic.Update(di.ServiceConstructorMap{ + container.DBClientInterfaceName: func(get di.Get) interface{} { + return dbClientMock + }, + }) + err := purgeRecord(ctx, dic) + require.NoError(t, err) + if testCase.recordCount >= configuration.Retention.MaxCap { + dbClientMock.AssertCalled(t, "DeleteScheduleActionRecordByAge", ctx, mock.AnythingOfType("int64")) + } else { + dbClientMock.AssertNotCalled(t, "DeleteScheduleActionRecordByAge", ctx, mock.AnythingOfType("int64")) + } + }) + } +} diff --git a/internal/support/cronscheduler/config/config.go b/internal/support/cronscheduler/config/config.go index 6a7ffd43cf..547da8f6bd 100644 --- a/internal/support/cronscheduler/config/config.go +++ b/internal/support/cronscheduler/config/config.go @@ -17,6 +17,7 @@ type ConfigurationStruct struct { Service bootstrapConfig.ServiceInfo Clients bootstrapConfig.ClientsCollection MessageBus bootstrapConfig.MessageBusInfo + Retention RecordRetention } type WritableInfo struct { @@ -25,6 +26,13 @@ type WritableInfo struct { Telemetry bootstrapConfig.TelemetryInfo } +type RecordRetention struct { + Enabled bool + Interval string + MaxCap uint32 + MinCap uint32 +} + // UpdateFromRaw converts configuration received from the registry to a service-specific configuration struct which is // then used to overwrite the service's existing configuration struct. func (c *ConfigurationStruct) UpdateFromRaw(rawConfig any) bool { diff --git a/internal/support/cronscheduler/infrastructure/interfaces/db.go b/internal/support/cronscheduler/infrastructure/interfaces/db.go index 65387325cf..bbb5b22095 100644 --- a/internal/support/cronscheduler/infrastructure/interfaces/db.go +++ b/internal/support/cronscheduler/infrastructure/interfaces/db.go @@ -27,6 +27,7 @@ type DBClient interface { AddScheduleActionRecords(ctx context.Context, scheduleActionRecord []model.ScheduleActionRecord) ([]model.ScheduleActionRecord, errors.EdgeX) AllScheduleActionRecords(ctx context.Context, start, end int64, offset, limit int) ([]model.ScheduleActionRecord, errors.EdgeX) LatestScheduleActionRecordsByJobName(ctx context.Context, jobName string) ([]model.ScheduleActionRecord, errors.EdgeX) + LatestScheduleActionRecordsByOffset(ctx context.Context, offset uint32) (model.ScheduleActionRecord, errors.EdgeX) ScheduleActionRecordsByStatus(ctx context.Context, status string, start, end int64, offset, limit int) ([]model.ScheduleActionRecord, errors.EdgeX) ScheduleActionRecordsByJobName(ctx context.Context, jobName string, start, end int64, offset, limit int) ([]model.ScheduleActionRecord, errors.EdgeX) ScheduleActionRecordsByJobNameAndStatus(ctx context.Context, jobName, status string, start, end int64, offset, limit int) ([]model.ScheduleActionRecord, errors.EdgeX) diff --git a/internal/support/cronscheduler/infrastructure/interfaces/mocks/DBClient.go b/internal/support/cronscheduler/infrastructure/interfaces/mocks/DBClient.go index 93512feb3d..3a10e342f5 100644 --- a/internal/support/cronscheduler/infrastructure/interfaces/mocks/DBClient.go +++ b/internal/support/cronscheduler/infrastructure/interfaces/mocks/DBClient.go @@ -250,6 +250,36 @@ func (_m *DBClient) LatestScheduleActionRecordsByJobName(ctx context.Context, jo return r0, r1 } +// LatestScheduleActionRecordsByOffset provides a mock function with given fields: ctx, offset +func (_m *DBClient) LatestScheduleActionRecordsByOffset(ctx context.Context, offset uint32) (models.ScheduleActionRecord, errors.EdgeX) { + ret := _m.Called(ctx, offset) + + if len(ret) == 0 { + panic("no return value specified for LatestScheduleActionRecordsByOffset") + } + + var r0 models.ScheduleActionRecord + var r1 errors.EdgeX + if rf, ok := ret.Get(0).(func(context.Context, uint32) (models.ScheduleActionRecord, errors.EdgeX)); ok { + return rf(ctx, offset) + } + if rf, ok := ret.Get(0).(func(context.Context, uint32) models.ScheduleActionRecord); ok { + r0 = rf(ctx, offset) + } else { + r0 = ret.Get(0).(models.ScheduleActionRecord) + } + + if rf, ok := ret.Get(1).(func(context.Context, uint32) errors.EdgeX); ok { + r1 = rf(ctx, offset) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(errors.EdgeX) + } + } + + return r0, r1 +} + // ScheduleActionRecordCountByJobName provides a mock function with given fields: ctx, jobName, start, end func (_m *DBClient) ScheduleActionRecordCountByJobName(ctx context.Context, jobName string, start int64, end int64) (uint32, errors.EdgeX) { ret := _m.Called(ctx, jobName, start, end) diff --git a/internal/support/cronscheduler/init.go b/internal/support/cronscheduler/init.go index b86f39de0a..11760f84e9 100644 --- a/internal/support/cronscheduler/init.go +++ b/internal/support/cronscheduler/init.go @@ -17,6 +17,7 @@ package cronscheduler import ( "context" "sync" + "time" "github.com/labstack/echo/v4" @@ -62,5 +63,15 @@ func (b *Bootstrap) BootstrapHandler(ctx context.Context, wg *sync.WaitGroup, _ return false } + config := container.ConfigurationFrom(dic.Get) + if config.Retention.Enabled { + retentionInterval, err := time.ParseDuration(config.Retention.Interval) + if err != nil { + lc.Errorf("Failed to parse schedule action record retention interval, %v", err) + return false + } + application.AsyncPurgeRecord(ctx, dic, retentionInterval) + } + return true }