Skip to content

Commit

Permalink
1:layer0 ec2 instance_type = "t3.xlarge"
Browse files Browse the repository at this point in the history
2:Memory for the api-image updated to use 12GB memory
3:Add a new janitor combined tag and job janitors together. Removed ttl assignment.
4:Added a new method for ecs to retrieve the running task.
5:Dynamodb mode changed to "pay per request".
6:New janitor will check the all the tasks that is not in running status then delete all related deploy , job, task in the tag table.
7:New janitor deletes the jobs that has created for more than 1 hour or completed.
8:The new janitor runs every 10 mins.

Merge remote-tracking branch 'origin/v0.12.0-dev' into chunnel-prod

# Conflicts:
#	api/logic/job_logic.go
#	api/main.go
#	common/config/config.go
#	common/db/tag_store/dynamo.go
#	setup/module/api/db.tf
#	setup/module/api/environment.tf
  • Loading branch information
Victor Bi committed Oct 6, 2020
1 parent 2ad9f89 commit cec2f9b
Show file tree
Hide file tree
Showing 19 changed files with 417 additions and 100 deletions.
19 changes: 19 additions & 0 deletions api/backend/ecs/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@ func NewECSTaskManager(
}
}

func (this *ECSTaskManager) ListRunningTasks() ([]string, error) {
clusterNames, err := this.Backend.ListEnvironments()
if err != nil {
return nil, err
}

taskARNs := []string{}
for _, clusterName := range clusterNames {
clusterTaskARNs, err := this.ECS.ListClusterRunningTaskARNs(clusterName.String(), id.PREFIX)
if err != nil {
return nil, err
}

taskARNs = append(taskARNs, clusterTaskARNs...)
}

return taskARNs, nil
}

func (this *ECSTaskManager) ListTasks() ([]string, error) {
clusterNames, err := this.Backend.ListEnvironments()
if err != nil {
Expand Down
35 changes: 35 additions & 0 deletions api/backend/ecs/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,41 @@ func TestListTasks(t *testing.T) {
assert.Equal(t, expected, result)
}

func TestListRunningTasks(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

ecsEnvironmentIDs := []id.ECSEnvironmentID{
id.ECSEnvironmentID("env_id1"),
id.ECSEnvironmentID("env_id2"),
}

mockTask := NewMockECSTaskManager(ctrl)
mockTask.Backend.EXPECT().
ListEnvironments().
Return(ecsEnvironmentIDs, nil)

for i, ecsEnvironmentID := range ecsEnvironmentIDs {
arn := fmt.Sprintf("arn_%d", i)

mockTask.ECS.EXPECT().
ListClusterRunningTaskARNs(ecsEnvironmentID.String(), id.PREFIX).
Return([]string{arn}, nil)
}

result, err := mockTask.Task().ListRunningTasks()
if err != nil {
t.Fatal(err)
}

expected := []string{
"arn_0",
"arn_1",
}

assert.Equal(t, expected, result)
}

func TestDeleteTask(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
1 change: 1 addition & 0 deletions api/backend/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Backend interface {

CreateTask(environmentID, deployID string, overrides []models.ContainerOverride) (string, error)
ListTasks() ([]string, error)
ListRunningTasks() ([]string, error)
GetTask(environmentID, taskARN string) (*models.Task, error)
GetEnvironmentTasks(environmentID string) (map[string]*models.Task, error)
DeleteTask(environmentID, taskARN string) error
Expand Down
13 changes: 13 additions & 0 deletions api/backend/mock_backend/mock_backend.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

130 changes: 130 additions & 0 deletions api/logic/janitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package logic

import (
"time"

"github.com/quintilesims/layer0/common/db/job_store"
"github.com/quintilesims/layer0/common/db/tag_store"
"github.com/quintilesims/layer0/common/errors"
"github.com/quintilesims/layer0/common/logutils"
"github.com/quintilesims/layer0/common/models"
"github.com/quintilesims/layer0/common/types"
"github.com/quintilesims/layer0/common/waitutils"
)

const (
JOB_LIFETIME = time.Hour * 1
JANITOR_SLEEP_DURATION = time.Minute * 10
)

var janitorLogger = logutils.NewStackTraceLogger("Janitor")

type Janitor struct {
JobLogic JobLogic
TaskLogic TaskLogic
TagStore tag_store.TagStore
JobStore job_store.JobStore
Clock waitutils.Clock
}

func NewJanitor(jobLogic JobLogic, taskLogic TaskLogic, jobStore job_store.JobStore, tagStore tag_store.TagStore) *Janitor {
return &Janitor{
JobLogic: jobLogic,
TaskLogic: taskLogic,
JobStore: jobStore,
TagStore: tagStore,
Clock: waitutils.RealClock{},
}
}

func (this *Janitor) Run() {
go func() {
for {
janitorLogger.Info("Starting cleanup")
this.pulse()
janitorLogger.Infof("Finished cleanup")
this.Clock.Sleep(JANITOR_SLEEP_DURATION)
}
}()
}

func (this *Janitor) pulse() error {

//clean jobs that is not running
jobs, err := this.JobLogic.ListJobs()
if err != nil {
janitorLogger.Errorf("Failed to list jobs: %v", err)
return err
}

errs := []error{}

for _, job := range jobs {
timeSinceCreated := this.Clock.Since(job.TimeCreated)
if job.JobStatus != int64(types.InProgress) {
if timeSinceCreated > JOB_LIFETIME && job.JobStatus != int64(types.Error) {
janitorLogger.Infof("Deleting job '%s'", job.JobID)
if err := this.JobLogic.Delete(job.JobID); err != nil {
errs = append(errs, err)
//If by any reasons the job can not be deleted (data corruption), because it already pass the Job life time, just delete the item from dynamodb.
if err := this.JobStore.Delete(job.JobID); err != nil {
errs = append(errs, err)
}

} else {
janitorLogger.Infof("Finished deleting job '%s'", job.JobID)
}
}
}
}

// start clean tags table, Clean non-running tasks in db
tasks, err := this.TaskLogic.ListRunningTasks()
if err != nil {
janitorLogger.Errorf("Failed to list tasks: %v", err)
return err
}

taskExists := func(id string) bool {
for _, t := range tasks {
if t.TaskID == id {
return true
}
}
return false
}

//clean the task in db that not in ecs task
taskTags, err := this.TagStore.SelectByType("task")
if err != nil {
janitorLogger.Errorln("Could not query for tag store for task entity type - ", err.Error())
}

for _, tag := range taskTags {
if !taskExists(tag.EntityID) {
if err := this.TagStore.Delete(tag.EntityType, tag.EntityID, tag.Key); err != nil {
continue
}
//to do
janitorLogger.Infof("Tag for task (%s) has been deleted\n", tag.EntityID)
}
}

//clean up orphan api deploy
deployTags, err := this.TagStore.SelectByType("deploy")
for _, dtag := range deployTags.WithValue("job") {
if !taskTags.WithKey("deploy_id").Any(func(t models.Tag) bool {
if t.Value == dtag.EntityID {
return true
}
return false
}) {
if err := this.TagStore.Delete("deploy", dtag.EntityID, "name"); err != nil {
janitorLogger.Errorf("Failed to delete tag '%s' %s : %v", dtag.EntityType, dtag.EntityID, err)
errs = append(errs, err)
}
janitorLogger.Infof("orphan deploy record (%s) has been deleted\n", dtag.EntityID)
}
}
return errors.MultiError(errs)
}
41 changes: 41 additions & 0 deletions api/logic/janitor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package logic

import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/quintilesims/layer0/api/logic/mock_logic"
"github.com/quintilesims/layer0/common/models"
)

func TestJanitorPulse(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

jobLogicMock := mock_logic.NewMockJobLogic(ctrl)

jobs := []*models.Job{
{
JobID: "old_job",
TimeCreated: time.Now().Add(-(JOB_LIFETIME * 2)),
},
{
JobID: "young_job",
TimeCreated: time.Now(),
},
}

jobLogicMock.EXPECT().
ListJobs().
Return(jobs, nil)

jobLogicMock.EXPECT().
Delete("old_job").
Return(nil)

janitor := NewJobJanitor(jobLogicMock)
if err := janitor.pulse(); err != nil {
t.Fatal(err)
}
}
27 changes: 11 additions & 16 deletions api/logic/job_janitor.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
package logic

import (
"time"

"github.com/quintilesims/layer0/common/errors"
"github.com/quintilesims/layer0/common/logutils"
"github.com/quintilesims/layer0/common/types"
"github.com/quintilesims/layer0/common/waitutils"
)

const (
JOB_LIFETIME = time.Hour * 1
JANITOR_SLEEP_DURATION = time.Minute * 10
)

var jobLogger = logutils.NewStackTraceLogger("Job Janitor")

type JobJanitor struct {
Expand Down Expand Up @@ -48,15 +42,16 @@ func (this *JobJanitor) pulse() error {
errs := []error{}
for _, job := range jobs {
timeSinceCreated := this.Clock.Since(job.TimeCreated)

if timeSinceCreated > JOB_LIFETIME {
jobLogger.Infof("Deleting job '%s'", job.JobID)

if err := this.jobLogic.Delete(job.JobID); err != nil {
jobLogger.Errorf("Failed to delete job '%s': %v", job.JobID, err)
errs = append(errs, err)
} else {
jobLogger.Infof("Finished deleting job '%s'", job.JobID)
if job.JobStatus != int64(types.InProgress) {
if timeSinceCreated > JOB_LIFETIME || job.JobStatus == int64(types.Completed) {
jobLogger.Infof("Deleting job '%s'", job.JobID)

if err := this.jobLogic.Delete(job.JobID); err != nil {
jobLogger.Errorf("Failed to delete job '%s': %v", job.JobID, err)
errs = append(errs, err)
} else {
jobLogger.Infof("Finished deleting job '%s'", job.JobID)
}
}
}
}
Expand Down
33 changes: 17 additions & 16 deletions api/logic/job_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,29 +99,30 @@ func (this *L0JobLogic) CreateJob(jobType types.JobType, request interface{}) (*
if err != nil {
return nil, err
}
var jobTTLHours int
switch jobType {
case types.CreateTaskJob:
jobTTLHours = config.CREATE_TASK_JOB_TTL
case types.DeleteEnvironmentJob:
jobTTLHours = config.DELETE_ENVIRONMENT_JOB_TTL
case types.DeleteLoadBalancerJob:
jobTTLHours = config.DELETE_LOAD_BALANCER_JOB_TTL
case types.DeleteServiceJob:
jobTTLHours = config.DELETE_SERVICE_JOB_TTL
case types.DeleteTaskJob:
jobTTLHours = config.DELETE_TASK_JOB_TTL
default:
jobTTLHours = 24
}
/*
var jobTTLHours int
switch jobType {
case types.CreateTaskJob:
jobTTLHours = config.CREATE_TASK_JOB_TTL
case types.DeleteEnvironmentJob:
jobTTLHours = config.DELETE_ENVIRONMENT_JOB_TTL
case types.DeleteLoadBalancerJob:
jobTTLHours = config.DELETE_LOAD_BALANCER_JOB_TTL
case types.DeleteServiceJob:
jobTTLHours = config.DELETE_SERVICE_JOB_TTL
case types.DeleteTaskJob:
jobTTLHours = config.DELETE_TASK_JOB_TTL
default:
jobTTLHours = 24
}
*/
job := &models.Job{
JobID: jobID,
TaskID: taskID,
JobStatus: int64(types.Pending),
JobType: int64(jobType),
Request: reqStr,
TimeCreated: time.Now(),
TimeToExist: time.Now().Add(time.Hour * time.Duration(jobTTLHours)).Unix(),
}

if err := this.JobStore.Insert(job); err != nil {
Expand Down
13 changes: 13 additions & 0 deletions api/logic/mock_logic/mock_task_logic.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cec2f9b

Please sign in to comment.