Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Schedule action record retention implementation #4966

Merged
merged 2 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/support-cron-scheduler/res/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ Database:
Timeout: 5s
Type: postgres

Retention:
Enabled: true
Interval: 24h # Purging interval defines when the database should be rid of records above the high watermark.
MaxCap: 10000 # The maximum capacity defines where the high watermark of records should be detected for purging the amount of the records to the minimum capacity.
MinCap: 8000 # The minimum capacity defines where the total count of records should be returned to during purging.
13 changes: 13 additions & 0 deletions internal/pkg/infrastructure/postgres/scheduleactionrecord.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ func (c *Client) LatestScheduleActionRecordsByJobName(ctx context.Context, jobNa
return records, nil
}

// LatestScheduleActionRecordsByOffset queries the latest schedule action records by offset
func (c *Client) LatestScheduleActionRecordsByOffset(ctx context.Context, offset uint32) (model.ScheduleActionRecord, errors.EdgeX) {
records, err := queryScheduleActionRecords(ctx, c.ConnPool, sqlQueryAllWithPaginationDescByCol(scheduleActionRecordTableName, createdCol), offset, 1)
if err != nil {
return model.ScheduleActionRecord{}, errors.NewCommonEdgeX(errors.KindDatabaseError, "failed to query all schedule action records", err)
}

if len(records) == 0 {
return model.ScheduleActionRecord{}, errors.NewCommonEdgeX(errors.KindServerError, fmt.Sprintf("no schedule action record found with offset '%d'", offset), err)
}
return records[0], nil
}

// ScheduleActionRecordsByStatus queries the schedule action records by status with the given range, offset, and limit
func (c *Client) ScheduleActionRecordsByStatus(ctx context.Context, status string, start, end int64, offset, limit int) ([]model.ScheduleActionRecord, errors.EdgeX) {
var err errors.EdgeX
Expand Down
51 changes: 51 additions & 0 deletions internal/support/cronscheduler/application/scheduleactionrecord.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/robfig/cron/v3"
Expand All @@ -24,6 +25,8 @@ import (
"github.com/edgexfoundry/edgex-go/internal/support/cronscheduler/container"
)

var asyncPurgeRecordOnce sync.Once

// AllScheduleActionRecords query the schedule action records with the specified offset, limit, and time range
func AllScheduleActionRecords(ctx context.Context, start, end int64, offset, limit int, dic *di.Container) (scheduleActionRecordDTOs []dtos.ScheduleActionRecord, totalCount uint32, err errors.EdgeX) {
dbClient := container.DBClientFrom(dic.Get)
Expand Down Expand Up @@ -166,6 +169,31 @@ func GenerateMissedScheduleActionRecords(ctx context.Context, dic *di.Container,
return nil, len(missedRecords) > 0
}

// AsyncPurgeRecord purge schedule action records according to the retention capability.
func AsyncPurgeRecord(ctx context.Context, dic *di.Container, interval time.Duration) {
asyncPurgeRecordOnce.Do(func() {
go func() {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
timer := time.NewTimer(interval)
for {
timer.Reset(interval)
select {
case <-ctx.Done():
lc.Info("Exiting schedule action records retention")
return
case <-timer.C:
lc.Info("Start checking the schedule action records and purge the outdated ones according to the retention settings")
err := purgeRecord(ctx, dic)
if err != nil {
lc.Errorf("Failed to purge schedule action records, %v", err)
break
}
}
}
}()
})
}

func generateMissedRuns(def models.ScheduleDef, latestTime time.Time) (missedRuns []time.Time, err errors.EdgeX) {
currentTime := time.Now()

Expand Down Expand Up @@ -231,3 +259,26 @@ func parseCronExpression(cronExpr string) (cron.Schedule, error) {
}
return schedule, nil
}

func purgeRecord(ctx context.Context, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
dbClient := container.DBClientFrom(dic.Get)
config := container.ConfigurationFrom(dic.Get)
total, err := dbClient.ScheduleActionRecordTotalCount(ctx, 0, time.Now().UnixMilli())
if err != nil {
return errors.NewCommonEdgeX(errors.Kind(err), "failed to query schedule action record total count, %v", err)
}
if total >= config.Retention.MaxCap {
lc.Debugf("Purging the schedule action record amount %d to the minimum capacity %d", total, config.Retention.MinCap)
record, err := dbClient.LatestScheduleActionRecordsByOffset(ctx, config.Retention.MinCap)
if err != nil {
return errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to query schedule action records with offset '%d'", config.Retention.MinCap), err)
}
age := time.Now().UnixMilli() - record.Created
err = dbClient.DeleteScheduleActionRecordByAge(ctx, age)
if err != nil {
return errors.NewCommonEdgeX(errors.Kind(err), fmt.Sprintf("failed to delete schedule action records by age '%d'", age), err)
}
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,23 @@
package application

import (
"context"
"testing"
"time"

"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v3/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v3/di"
"github.com/edgexfoundry/go-mod-core-contracts/v3/clients/logger"
"github.com/edgexfoundry/go-mod-core-contracts/v3/models"

"github.com/edgexfoundry/edgex-go/internal/support/cronscheduler/config"
"github.com/edgexfoundry/edgex-go/internal/support/cronscheduler/container"
dbMock "github.com/edgexfoundry/edgex-go/internal/support/cronscheduler/infrastructure/interfaces/mocks"
)

var (
Expand Down Expand Up @@ -102,3 +114,52 @@ func TestFindMissedCronRuns(t *testing.T) {
})
}
}

func TestPurgeRecord(t *testing.T) {
ctx := context.Background()
configuration := &config.ConfigurationStruct{
Retention: config.RecordRetention{
Enabled: true,
Interval: "1s",
MaxCap: 5,
MinCap: 3,
},
}
dic := di.NewContainer(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
return configuration
},
bootstrapContainer.LoggingClientInterfaceName: func(get di.Get) interface{} {
return logger.NewMockClient()
},
})

tests := []struct {
name string
recordCount uint32
}{
{"invoke schedule action record purging", configuration.Retention.MaxCap},
{"not invoke schedule action record purging", configuration.Retention.MinCap},
}
for _, testCase := range tests {
t.Run(testCase.name, func(t *testing.T) {
dbClientMock := &dbMock.DBClient{}
record := models.ScheduleActionRecord{}
dbClientMock.On("LatestScheduleActionRecordsByOffset", ctx, configuration.Retention.MinCap).Return(record, nil)
dbClientMock.On("ScheduleActionRecordTotalCount", ctx, int64(0), mock.AnythingOfType("int64")).Return(testCase.recordCount, nil)
dbClientMock.On("DeleteScheduleActionRecordByAge", ctx, mock.AnythingOfType("int64")).Return(nil)
dic.Update(di.ServiceConstructorMap{
container.DBClientInterfaceName: func(get di.Get) interface{} {
return dbClientMock
},
})
err := purgeRecord(ctx, dic)
require.NoError(t, err)
if testCase.recordCount >= configuration.Retention.MaxCap {
dbClientMock.AssertCalled(t, "DeleteScheduleActionRecordByAge", ctx, mock.AnythingOfType("int64"))
} else {
dbClientMock.AssertNotCalled(t, "DeleteScheduleActionRecordByAge", ctx, mock.AnythingOfType("int64"))
}
})
}
}
8 changes: 8 additions & 0 deletions internal/support/cronscheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type ConfigurationStruct struct {
Service bootstrapConfig.ServiceInfo
Clients bootstrapConfig.ClientsCollection
MessageBus bootstrapConfig.MessageBusInfo
Retention RecordRetention
}

type WritableInfo struct {
Expand All @@ -25,6 +26,13 @@ type WritableInfo struct {
Telemetry bootstrapConfig.TelemetryInfo
}

type RecordRetention struct {
Enabled bool
Interval string
MaxCap uint32
MinCap uint32
}

// UpdateFromRaw converts configuration received from the registry to a service-specific configuration struct which is
// then used to overwrite the service's existing configuration struct.
func (c *ConfigurationStruct) UpdateFromRaw(rawConfig any) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type DBClient interface {
AddScheduleActionRecords(ctx context.Context, scheduleActionRecord []model.ScheduleActionRecord) ([]model.ScheduleActionRecord, errors.EdgeX)
AllScheduleActionRecords(ctx context.Context, start, end int64, offset, limit int) ([]model.ScheduleActionRecord, errors.EdgeX)
LatestScheduleActionRecordsByJobName(ctx context.Context, jobName string) ([]model.ScheduleActionRecord, errors.EdgeX)
LatestScheduleActionRecordsByOffset(ctx context.Context, offset uint32) (model.ScheduleActionRecord, errors.EdgeX)
ScheduleActionRecordsByStatus(ctx context.Context, status string, start, end int64, offset, limit int) ([]model.ScheduleActionRecord, errors.EdgeX)
ScheduleActionRecordsByJobName(ctx context.Context, jobName string, start, end int64, offset, limit int) ([]model.ScheduleActionRecord, errors.EdgeX)
ScheduleActionRecordsByJobNameAndStatus(ctx context.Context, jobName, status string, start, end int64, offset, limit int) ([]model.ScheduleActionRecord, errors.EdgeX)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions internal/support/cronscheduler/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package cronscheduler
import (
"context"
"sync"
"time"

"github.com/labstack/echo/v4"

Expand Down Expand Up @@ -62,5 +63,15 @@ func (b *Bootstrap) BootstrapHandler(ctx context.Context, wg *sync.WaitGroup, _
return false
}

config := container.ConfigurationFrom(dic.Get)
if config.Retention.Enabled {
retentionInterval, err := time.ParseDuration(config.Retention.Interval)
if err != nil {
lc.Errorf("Failed to parse schedule action record retention interval, %v", err)
return false
}
application.AsyncPurgeRecord(ctx, dic, retentionInterval)
}

return true
}
Loading