Skip to content

Commit

Permalink
fix: task improvements (#20379)
Browse files Browse the repository at this point in the history
Co-authored-by: Qiu Jian <[email protected]>
  • Loading branch information
swordqiu and Qiu Jian authored May 29, 2024
1 parent d8c4a28 commit 8abda21
Show file tree
Hide file tree
Showing 18 changed files with 394 additions and 114 deletions.
24 changes: 24 additions & 0 deletions pkg/appsrv/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ type SWorkerManager struct {
ignoreOverflow bool

cancelPrevIdent bool

queueInitHook func() error
queueEmptyHook func() error
}

func NewWorkerManager(name string, workerCount int, backlog int, dbWorker bool) *SWorkerManager {
Expand Down Expand Up @@ -222,6 +225,14 @@ type sWorkerTask struct {
start time.Time
}

func (wm *SWorkerManager) SetQueueInitHook(f func() error) {
wm.queueInitHook = f
}

func (wm *SWorkerManager) SetQueueEmptyHook(f func() error) {
wm.queueEmptyHook = f
}

func (wm *SWorkerManager) EnableCancelPreviousIdenticalTask() {
wm.cancelPrevIdent = true
}
Expand Down Expand Up @@ -260,6 +271,12 @@ func (wm *SWorkerManager) removeWorker(worker *SWorker) {
} else {
wm.detachedWorker.removeWithLock(worker)
}
if wm.activeWorker.size()+wm.detachedWorker.size() == 0 && wm.queueEmptyHook != nil {
err := wm.queueEmptyHook()
if err != nil {
log.Errorf("queueEmptyHook fail %s", err)
}
}
}

func execCallback(task *sWorkerTask) {
Expand All @@ -283,6 +300,13 @@ func (wm *SWorkerManager) schedule() {
}

func (wm *SWorkerManager) scheduleWithLock() {
if wm.activeWorker.size()+wm.detachedWorker.size() == 0 && wm.queueInitHook != nil {
err := wm.queueInitHook()
if err != nil {
log.Errorf("queueInitHook fail %s", err)
return
}
}
queueSize := wm.queue.Size()
if wm.activeWorker.size() < wm.workerCount && queueSize > 0 {
wm.workerId += 1
Expand Down
19 changes: 19 additions & 0 deletions pkg/cloudcommon/consts/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ var (
defaultDBConnectionString string

defaultDBChecksumHashAlgorithm string

taskWorkerCount int
localTaskWorkerCount int
)

func SetDefaultDB(dialect, connStr string) {
Expand All @@ -52,3 +55,19 @@ func DefaultDBChecksumHashAlgorithm() string {
}
return "sha256"
}

func SetTaskWorkerCount(cnt int) {
taskWorkerCount = cnt
}

func SetLocalTaskWorkerCount(cnt int) {
localTaskWorkerCount = cnt
}

func TaskWorkerCount() int {
return taskWorkerCount
}

func LocalTaskWorkerCount() int {
return localTaskWorkerCount
}
22 changes: 17 additions & 5 deletions pkg/cloudcommon/db/taskman/localtaskworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,23 @@ import (
"context"
"fmt"
"runtime/debug"
"sync"

"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/util/version"

"yunion.io/x/onecloud/pkg/appsrv"
"yunion.io/x/onecloud/pkg/cloudcommon/consts"
"yunion.io/x/onecloud/pkg/mcclient/modules/yunionconf"
)

var localTaskWorkerMan *appsrv.SWorkerManager
var localTaskWorkerManLock *sync.Mutex

func init() {
localTaskWorkerMan = appsrv.NewWorkerManager("LocalTaskWorkerManager", 4, 1024, false)
localTaskWorkerManLock = &sync.Mutex{}
}

func Error2TaskData(err error) jsonutils.JSONObject {
Expand All @@ -47,9 +50,6 @@ type localTask struct {
}

func (t *localTask) Run() {
log.Debugf("XXXXXXXXXXXXXXXXXXLOCAL TASK RUN STARTXXXXXXXXXXXXXXXXX")
defer log.Debugf("XXXXXXXXXXXXXXXXXXLOCAL TASK RUN END XXXXXXXXXXXXXXXXX")

defer func() {
if r := recover(); r != nil {
yunionconf.BugReport.SendBugReport(context.Background(), version.GetShortString(), string(debug.Stack()), errors.Errorf("%s", r))
Expand Down Expand Up @@ -78,6 +78,18 @@ func LocalTaskRunWithWorkers(task ITask, proc func() (jsonutils.JSONObject, erro
wm.Run(&t, nil, nil)
}

func getLocalTaskWorkerMan() *appsrv.SWorkerManager {
localTaskWorkerManLock.Lock()
defer localTaskWorkerManLock.Unlock()

if localTaskWorkerMan != nil {
return localTaskWorkerMan
}
log.Infof("LocalTaskWorkerManager %d", consts.LocalTaskWorkerCount())
localTaskWorkerMan = appsrv.NewWorkerManager("LocalTaskWorkerManager", consts.LocalTaskWorkerCount(), 1024, false)
return localTaskWorkerMan
}

func LocalTaskRun(task ITask, proc func() (jsonutils.JSONObject, error)) {
LocalTaskRunWithWorkers(task, proc, localTaskWorkerMan)
LocalTaskRunWithWorkers(task, proc, getLocalTaskWorkerMan())
}
36 changes: 32 additions & 4 deletions pkg/cloudcommon/db/taskman/subtasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"yunion.io/x/jsonutils"
"yunion.io/x/log"
"yunion.io/x/sqlchemy"

"yunion.io/x/onecloud/pkg/cloudcommon/db"
)
Expand All @@ -36,7 +37,12 @@ type SSubTaskmanager struct {
var SubTaskManager *SSubTaskmanager

func init() {
SubTaskManager = &SSubTaskmanager{SModelBaseManager: db.NewModelBaseManager(SSubTask{}, "subtasks_tbl", "subtask", "subtasks")}
SubTaskManager = &SSubTaskmanager{SModelBaseManager: db.NewModelBaseManager(
SSubTask{},
"subtasks_tbl",
"subtask",
"subtasks",
)}
}

type SSubTask struct {
Expand All @@ -62,12 +68,17 @@ func (manager *SSubTaskmanager) GetSubTask(ptaskId string, subtaskId string) *SS
return &subtask
}

func (manager *SSubTaskmanager) GetTotalSubtasks(taskId string, stage string, status string) []SSubTask {
subtasks := make([]SSubTask, 0)
func (manager *SSubTaskmanager) getTotalSubtasksQuery(taskId string, stage string, status string) *sqlchemy.SQuery {
q := manager.Query().Equals("task_id", taskId).Equals("stage", stage)
if len(status) > 0 {
q = q.Equals("status", status)
}
return q
}

func (manager *SSubTaskmanager) GetSubtasks(taskId string, stage string, status string) []SSubTask {
subtasks := make([]SSubTask, 0)
q := manager.getTotalSubtasksQuery(taskId, stage, status)
err := db.FetchModelObjects(manager, q, &subtasks)
if err != nil {
log.Errorf("GetInitSubtasks fail %s", err)
Expand All @@ -76,8 +87,25 @@ func (manager *SSubTaskmanager) GetTotalSubtasks(taskId string, stage string, st
return subtasks
}

func (manager *SSubTaskmanager) GetSubtasksCount(taskId string, stage string, status string) (int, error) {
q := manager.getTotalSubtasksQuery(taskId, stage, status)
return q.CountWithError()
}

func (manager *SSubTaskmanager) GetTotalSubtasks(taskId string, stage string) []SSubTask {
return manager.GetSubtasks(taskId, stage, "")
}

func (manager *SSubTaskmanager) GetTotalSubtasksCount(taskId string, stage string) (int, error) {
return manager.GetSubtasksCount(taskId, stage, "")
}

func (manager *SSubTaskmanager) GetInitSubtasks(taskId string, stage string) []SSubTask {
return manager.GetTotalSubtasks(taskId, stage, SUBTASK_INIT)
return manager.GetSubtasks(taskId, stage, SUBTASK_INIT)
}

func (manager *SSubTaskmanager) GetInitSubtasksCount(taskId string, stage string) (int, error) {
return manager.GetSubtasksCount(taskId, stage, SUBTASK_INIT)
}

func (self *SSubTask) SaveResults(failed bool, result jsonutils.JSONObject) error {
Expand Down
25 changes: 25 additions & 0 deletions pkg/cloudcommon/db/taskman/taskobjs.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type STaskObject struct {

TaskId string `width:"36" charset:"ascii" nullable:"false" primary:"true" index:"true"` // Column(VARCHAR(36, charset='ascii'), nullable=False, primary_key=True, index=True)
ObjId string `width:"36" charset:"ascii" nullable:"false" primary:"true"` // Column(VARCHAR(36, charset='ascii'), nullable=False, primary_key=True)
Object string `json:"object" width:"128" charset:"utf8" nullable:"false" list:"user"`
}

func (manager *STaskObjectManager) GetObjectIds(task *STask) []string {
Expand All @@ -73,6 +74,30 @@ func (manager *STaskObjectManager) GetObjectIds(task *STask) []string {
return ret
}

func (manager *STaskObjectManager) GetObjectNames(task *STask) []string {
ret := make([]string, 0)
taskobjs := manager.Query().SubQuery()
q := taskobjs.Query(taskobjs.Field("object")).Equals("task_id", task.Id)
rows, err := q.Rows()
if err != nil {
if err != sql.ErrNoRows {
log.Errorf("TaskObjectManager GetObjectIds fail %s", err)
}
return nil
}
defer rows.Close()
for rows.Next() {
var objId string
err = rows.Scan(&objId)
if err != nil {
log.Errorf("TaskObjectManager GetObjects fetch row fail %s", err)
return nil
}
ret = append(ret, objId)
}
return ret
}

func (manager *STaskObjectManager) FetchOwnerId(ctx context.Context, data jsonutils.JSONObject) (mcclient.IIdentityProvider, error) {
return manager.SProjectizedResourceBaseManager.FetchOwnerId(ctx, data)
}
Expand Down
Loading

0 comments on commit 8abda21

Please sign in to comment.