From a3d016d8da6171bde47fc8fd39a731033c52cd7a Mon Sep 17 00:00:00 2001 From: Tohru <65994850+Tohrusky@users.noreply.github.com> Date: Thu, 15 Aug 2024 20:28:10 +0800 Subject: [PATCH] feat: clear api (#17) refactor: cfg --- common/db/task.go | 16 ++++ common/db/type.go | 1 + common/db/video.go | 7 ++ common/task/task.go | 1 - deploy/docker-compose/docker-compose.yml | 32 ++++--- module/config/config.go | 13 +-- module/oss/oss.go | 12 +-- server/internal/router/api/v1/api.go | 11 +-- server/internal/service/task/clear.go | 110 +++++++++++++++++++++++ server/internal/service/task/new.go | 2 +- server/internal/service/task/start.go | 14 +++ worker/internal/cut/worker.go | 25 ------ worker/internal/encode/worker.go | 15 +++- 13 files changed, 196 insertions(+), 63 deletions(-) create mode 100644 server/internal/service/task/clear.go diff --git a/common/db/task.go b/common/db/task.go index 29e92ec..0207ec3 100644 --- a/common/db/task.go +++ b/common/db/task.go @@ -23,6 +23,15 @@ func CheckTaskStart(videoKey string) bool { return task.EncodeParam != "" } +// CheckTaskComplete checks if a task has completed +func CheckTaskComplete(videoKey string) bool { + task, err := GetTask(videoKey) + if err != nil { + return false + } + return task.EncodeKey != "" +} + // InsertTask inserts a new uncompleted task into the database func InsertTask(videoKey string) error { coll := db.DB.Collection(TASK_COLLECTION) @@ -53,3 +62,10 @@ func GetTask(videoKey string) (Task, error) { return task, err } + +// DeleteTask deletes a task from the database +func DeleteTask(videoKey string) error { + coll := db.DB.Collection(TASK_COLLECTION) + _, err := coll.DeleteOne(context.TODO(), Task{Key: videoKey}) + return err +} diff --git a/common/db/type.go b/common/db/type.go index 8e6d4cf..f0b45ee 100644 --- a/common/db/type.go +++ b/common/db/type.go @@ -11,6 +11,7 @@ type VideoClipInfo struct { Total int `bson:"total,omitempty"` ClipKey string `bson:"clip_key,omitempty"` EncodeKey string `bson:"encode_key,omitempty"` + TaskID string `bson:"task_id,omitempty"` } type Task struct { diff --git a/common/db/video.go b/common/db/video.go index 3fb52e4..933cb9c 100644 --- a/common/db/video.go +++ b/common/db/video.go @@ -61,6 +61,13 @@ func UpdateVideo(filter VideoClipInfo, update VideoClipInfo) error { return nil } +// DeleteVideoClips 删除所有视频切片 +func DeleteVideoClips(videoKey string) error { + coll := db.DB.Collection(VIDEO_COLLECTION) + _, err := coll.DeleteMany(context.TODO(), VideoClipInfo{Key: videoKey}) + return err +} + // GetVideoProgress 获取视频处理进度和每个切片的状态 func GetVideoProgress(videoKey string) ([]bool, error) { infos, err := GetVideoClips(videoKey) diff --git a/common/task/task.go b/common/task/task.go index b7a85e8..f947098 100644 --- a/common/task/task.go +++ b/common/task/task.go @@ -11,7 +11,6 @@ const ( // CutTaskPayload is a struct that represents the payload for cut task. type CutTaskPayload struct { VideoKey string `json:"video_key"` - Retry bool `json:"retry"` } // EncodeTaskPayload is a struct that represents the payload for encode task. diff --git a/deploy/docker-compose/docker-compose.yml b/deploy/docker-compose/docker-compose.yml index 2b7c371..bb160d3 100644 --- a/deploy/docker-compose/docker-compose.yml +++ b/deploy/docker-compose/docker-compose.yml @@ -7,21 +7,31 @@ networks: driver: bridge services: + # worker-encode: + # image: lychee0/finalrip-worker-encode:latest + # container_name: finalrip-worker-encode + # restart: always + # environment: + # - FINALRIP_REMOTE_CONFIG_HOST=EASYTIER + # networks: + # - backend worker-encode: - image: lychee0/finalrip-worker-encode:latest - # image: lychee0/finalrip-worker-encode-pytorch:dev + image: lychee0/finalrip-worker-encode-pytorch container_name: finalrip-worker-encode restart: always environment: - - FINALRIP_REMOTE_CONFIG_HOST=EASYTIER - # deploy: - # resources: - # reservations: - # devices: - # - driver: nvidia - # device_ids: - # - "0" - # capabilities: [gpu] + - FINALRIP_REMOTE_CONFIG_HOST=consul:8500 + - FINALRIP_DB_HOST=mongodb + - FINALRIP_REDIS_HOST=redis + - FINALRIP_OSS_ENDPOINT=192.168.0.109:9000 + deploy: + resources: + reservations: + devices: + - driver: nvidia + device_ids: + - "0" + capabilities: [gpu] networks: - backend # easytier: diff --git a/module/config/config.go b/module/config/config.go index e7646c3..fbfb2aa 100644 --- a/module/config/config.go +++ b/module/config/config.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/TensoRaws/FinalRip/module/util" "github.com/fsnotify/fsnotify" "github.com/spf13/viper" _ "github.com/spf13/viper/remote" @@ -94,20 +93,14 @@ func initialize() { } func updateRemoteConfigOnChange() { - lastCfg, err := util.DeepCopyMap(config.AllSettings()) - if err != nil { - fmt.Println("failed to copy config: " + err.Error()) - } + lastCfg := config.AllSettings() - err = config.WatchRemoteConfig() + err := config.WatchRemoteConfig() if err != nil { fmt.Println("failed to watch remote config: " + err.Error()) } - cfg, err := util.DeepCopyMap(config.AllSettings()) - if err != nil { - fmt.Println("failed to copy config: " + err.Error()) - } + cfg := config.AllSettings() if !reflect.DeepEqual(lastCfg, cfg) { // 配置文件发生变更之后,重新初始化配置 diff --git a/module/oss/oss.go b/module/oss/oss.go index 0f23a5c..ef2b74c 100644 --- a/module/oss/oss.go +++ b/module/oss/oss.go @@ -71,12 +71,7 @@ func PutBytes(key string, data []byte) error { // GetWithPath downloads and saves the object as a file in the local filesystem by key. func GetWithPath(key string, path string) error { - err = oss.FGetObject(context.Background(), config.OSSConfig.Bucket, key, path, minio.GetObjectOptions{}) - if err != nil { - return err - } - - return nil + return oss.FGetObject(context.Background(), config.OSSConfig.Bucket, key, path, minio.GetObjectOptions{}) } // Get gets the file pointed to by key. @@ -105,6 +100,11 @@ func GetBytes(key string) ([]byte, error) { return buf.Bytes(), nil } +// Delete deletes the file pointed to by key. +func Delete(key string) error { + return oss.RemoveObject(context.Background(), config.OSSConfig.Bucket, key, minio.RemoveObjectOptions{}) +} + // GetPresignedURL gets the presigned URL for the file pointed to by key. func GetPresignedURL(key string, fileName string, expiration time.Duration) (string, error) { // Set request parameters diff --git a/server/internal/router/api/v1/api.go b/server/internal/router/api/v1/api.go index 55fdfe2..e6d5852 100644 --- a/server/internal/router/api/v1/api.go +++ b/server/internal/router/api/v1/api.go @@ -22,12 +22,13 @@ func NewAPI() *gin.Engine { api := r.Group("/api/v1/") { - processGroup := api.Group("task/") + taskGroup := api.Group("task/") { - processGroup.POST("new", task.New) - processGroup.POST("start", task.Start) - processGroup.GET("progress", task.Progress) - processGroup.GET("oss/presigned", task.OSSPresigned) + taskGroup.POST("new", task.New) + taskGroup.POST("start", task.Start) + taskGroup.GET("progress", task.Progress) + taskGroup.GET("oss/presigned", task.OSSPresigned) + taskGroup.POST("clear", task.Clear) } } diff --git a/server/internal/service/task/clear.go b/server/internal/service/task/clear.go new file mode 100644 index 0000000..0de2dea --- /dev/null +++ b/server/internal/service/task/clear.go @@ -0,0 +1,110 @@ +package task + +import ( + "sync" + + "github.com/TensoRaws/FinalRip/common/db" + "github.com/TensoRaws/FinalRip/module/log" + "github.com/TensoRaws/FinalRip/module/oss" + "github.com/TensoRaws/FinalRip/module/queue" + "github.com/TensoRaws/FinalRip/module/resp" + "github.com/gin-gonic/gin" +) + +type ClearRequest struct { + VideoKey string `form:"video_key" binding:"required"` +} + +// Clear 清理任务 (POST /new) +func Clear(c *gin.Context) { + // 绑定参数 + var req NewRequest + if err := c.ShouldBind(&req); err != nil { + resp.AbortWithMsg(c, err.Error()) + return + } + + // 检查任务是否存在 + if !db.CheckTaskExist(req.VideoKey) { + resp.AbortWithMsg(c, "Task not found, please check the video key") + return + } + + clips, err := db.GetVideoClips(req.VideoKey) + if err != nil { + resp.AbortWithMsg(c, err.Error()) + return + } + + // 检查 Cut 任务是否处理完成 + if len(clips) == 0 { + resp.AbortWithMsg(c, "Please wait for the cut task to complete before clearing the task") + return + } + + // 清理 OSS + ossDelObjKeys := make([]string, 0) + task, _ := db.GetTask(req.VideoKey) + ossDelObjKeys = append(ossDelObjKeys, task.Key) + ossDelObjKeys = append(ossDelObjKeys, task.EncodeKey) + + for _, clip := range clips { + if clip.ClipKey != "" { + ossDelObjKeys = append(ossDelObjKeys, clip.ClipKey) + } + if clip.EncodeKey != "" { + ossDelObjKeys = append(ossDelObjKeys, clip.EncodeKey) + } + } + + ossDelMulti(ossDelObjKeys) + log.Logger.Infof("Deleted files from OSS: %v", ossDelObjKeys) + + // 清理数据库 + err = db.DeleteTask(req.VideoKey) + if err != nil { + resp.AbortWithMsg(c, err.Error()) + return + } + + err = db.DeleteVideoClips(req.VideoKey) + if err != nil { + resp.AbortWithMsg(c, err.Error()) + return + } + log.Logger.Infof("Deleted task from database: %s", req.VideoKey) + + // 一定要在删除数据库记录和 OSS 之后再取消任务,否则会导致 Merge 任务无法正常取消,以及 Encode 任务异常下载 OSS + // 检查任务是否处理完成 + if !db.CheckTaskComplete(req.VideoKey) { + // 清理任务队列,倒序删除 + for i := len(clips) - 1; i >= 0; i-- { + clip := clips[i] + err = queue.Isp.CancelProcessing(clip.TaskID) + if err != nil { + log.Logger.Errorf("Failed to cancel processing task: %s", err) + } + err := queue.Isp.DeleteTask(queue.ENCODE_QUEUE, clip.TaskID) + if err != nil { + log.Logger.Errorf("Failed to delete task from encode queue: %s", err) + } + } + } + + resp.OK(c) +} + +func ossDelMulti(keys []string) { + var wg sync.WaitGroup + for _, key := range keys { + wg.Add(1) + go func(k string) { + err := oss.Delete(k) + if err != nil { + log.Logger.Errorf("Failed to delete file from OSS: %s", err) + } + wg.Done() + }(key) + } + wg.Wait() +} diff --git a/server/internal/service/task/new.go b/server/internal/service/task/new.go index 178e2e2..5cf052f 100644 --- a/server/internal/service/task/new.go +++ b/server/internal/service/task/new.go @@ -20,7 +20,7 @@ func New(c *gin.Context) { return } - // 检查视频是否存在 + // 检查任务是否存在 if db.CheckTaskExist(req.VideoKey) { resp.AbortWithMsg(c, "Task already exists, please wait for it to complete or delete it.") return diff --git a/server/internal/service/task/start.go b/server/internal/service/task/start.go index d9a20ea..d2839fc 100644 --- a/server/internal/service/task/start.go +++ b/server/internal/service/task/start.go @@ -122,6 +122,13 @@ func HandleStart(req StartRequest) { return } + err = db.UpdateVideo(db.VideoClipInfo{Key: req.VideoKey, ClipKey: clip.ClipKey}, + db.VideoClipInfo{TaskID: info.ID}) + if err != nil { + log.Logger.Error("Failed to enqueue task: " + err.Error()) + return + } + log.Logger.Info("Successfully enqueued task: " + util.StructToString(clip)) wg.Add(1) @@ -154,6 +161,13 @@ func HandleStart(req StartRequest) { log.Logger.Error("Failed to get video clips: " + err.Error()) return } + + // 如果已经clear,不再合并 + if len(clips) == 0 { + log.Logger.Info("No clips to merge.") + return + } + payload, err = sonic.Marshal(task.MergeTaskPayload{ Clips: clips, }) diff --git a/worker/internal/cut/worker.go b/worker/internal/cut/worker.go index ef13773..f683819 100644 --- a/worker/internal/cut/worker.go +++ b/worker/internal/cut/worker.go @@ -81,31 +81,6 @@ func Handler(ctx context.Context, t *asynq.Task) error { key := p.VideoKey + "-clip-" + strconv.FormatInt(int64(index), 10) + ".mkv" - // 重试情况 - if p.Retry { - err := oss.PutByPath(key, file) - if err != nil { - log.Logger.Errorf("Failed to upload video %v: %v", index, file) - } - - if !db.CheckVideoExist(db.VideoClipInfo{ - Key: p.VideoKey, - ClipKey: key, - }) { - err = db.InsertVideo(db.VideoClipInfo{ - Key: p.VideoKey, - Index: index, - Total: total, - ClipKey: key, - }) - if err != nil { - log.Logger.Errorf("Failed to insert video %s: %v", key, err) - } - } - - return - } - // 正常情况 if db.CheckVideoExist(db.VideoClipInfo{ Key: p.VideoKey, diff --git a/worker/internal/encode/worker.go b/worker/internal/encode/worker.go index ce924c9..6d66d20 100644 --- a/worker/internal/encode/worker.go +++ b/worker/internal/encode/worker.go @@ -2,6 +2,7 @@ package encode import ( "context" + "errors" "os" "path" "strconv" @@ -90,16 +91,22 @@ func Handler(ctx context.Context, t *asynq.Task) error { return nil } + // 检查任务是否被取消 + if !db.CheckVideoExist(db.VideoClipInfo{ + Key: p.Clip.Key, + ClipKey: p.Clip.ClipKey, + }) { + log.Logger.Errorf("Encode Video Clip %s has been canceled", key) + return errors.New("encode video clip has been canceled") + } + err = oss.PutByPath(key, tempEncodedVideo) if err != nil { log.Logger.Errorf("Failed to upload encode video %s: %s", key, err) return err } - err = db.UpdateVideo(db.VideoClipInfo{Key: p.Clip.Key}, db.VideoClipInfo{ - ClipKey: p.Clip.ClipKey, - EncodeKey: key, - }) + err = db.UpdateVideo(db.VideoClipInfo{Key: p.Clip.Key, ClipKey: p.Clip.ClipKey}, db.VideoClipInfo{EncodeKey: key}) if err != nil { log.Logger.Errorf("Failed to upload encode video %s: %s", key, err) return err