From 683328a08ec839c7701baadb8d73712331d01347 Mon Sep 17 00:00:00 2001 From: Tohru <65994850+Tohrusky@users.noreply.github.com> Date: Thu, 15 Aug 2024 15:40:46 +0800 Subject: [PATCH] feat: new api (#16) * refactor: some pkg * feat: oss * fix: cros * faet: new api --- common/db/completed.go | 47 ---------------- common/db/task.go | 55 +++++++++++++++++++ common/db/type.go | 6 +- common/db/video.go | 30 ---------- conf/finalrip.yml | 4 +- module/oss/oss.go | 11 ++++ server/internal/middleware/cros/cros.go | 11 +++- server/internal/router/api/v1/api.go | 16 +++--- server/internal/service/task/new.go | 37 +++++++++++++ server/internal/service/task/oss.go | 39 +++++++++++++ .../service/{process => task}/progress.go | 4 +- .../service/{process => task}/start.go | 22 +++++++- worker/internal/encode/worker.go | 5 +- worker/internal/merge/worker.go | 2 +- 14 files changed, 188 insertions(+), 101 deletions(-) delete mode 100644 common/db/completed.go create mode 100644 common/db/task.go create mode 100644 server/internal/service/task/new.go create mode 100644 server/internal/service/task/oss.go rename server/internal/service/{process => task}/progress.go (96%) rename server/internal/service/{process => task}/start.go (88%) diff --git a/common/db/completed.go b/common/db/completed.go deleted file mode 100644 index 30ca330..0000000 --- a/common/db/completed.go +++ /dev/null @@ -1,47 +0,0 @@ -package db - -import ( - "context" - - "github.com/TensoRaws/FinalRip/module/db" - "go.mongodb.org/mongo-driver/bson" -) - -// InsertUncompletedTask inserts a new uncompleted task into the database -func InsertUncompletedTask(videoKey string, encodeParam string, script string) error { - coll := db.DB.Collection(COMPLETED_COLLECTION) - _, err := coll.InsertOne(context.TODO(), CompletedTask{ - Key: videoKey, - EncodeParam: encodeParam, - Script: script, - }) - return err -} - -// UpdateUncompletedTask updates an uncompleted task in the database -func UpdateUncompletedTask(videoKey string, encodeKey string) error { - coll := db.DB.Collection(COMPLETED_COLLECTION) - - filter := CompletedTask{ - Key: videoKey, - } - - up := bson.D{{"$set", CompletedTask{ //nolint: govet - EncodeKey: encodeKey, - }}} - - _, err := coll.UpdateOne(context.TODO(), filter, up) - if err != nil { - return err - } - return nil -} - -// GetCompletedTask gets a completed Task from the database -func GetCompletedTask(videoKey string) (CompletedTask, error) { - coll := db.DB.Collection(COMPLETED_COLLECTION) - var task CompletedTask - err := coll.FindOne(context.TODO(), CompletedTask{Key: videoKey}).Decode(&task) - - return task, err -} diff --git a/common/db/task.go b/common/db/task.go new file mode 100644 index 0000000..29e92ec --- /dev/null +++ b/common/db/task.go @@ -0,0 +1,55 @@ +package db + +import ( + "context" + + "github.com/TensoRaws/FinalRip/module/db" + "go.mongodb.org/mongo-driver/bson" +) + +// CheckTaskExist checks if a task exists in the database +func CheckTaskExist(videoKey string) bool { + coll := db.DB.Collection(TASK_COLLECTION) + count, _ := coll.CountDocuments(context.TODO(), Task{Key: videoKey}) + return count > 0 +} + +// CheckTaskStart checks if a task has started +func CheckTaskStart(videoKey string) bool { + task, err := GetTask(videoKey) + if err != nil { + return false + } + return task.EncodeParam != "" +} + +// InsertTask inserts a new uncompleted task into the database +func InsertTask(videoKey string) error { + coll := db.DB.Collection(TASK_COLLECTION) + _, err := coll.InsertOne(context.TODO(), Task{ + Key: videoKey, + }) + return err +} + +// UpdateTask updates an uncompleted task in the database +func UpdateTask(filter Task, update Task) error { + coll := db.DB.Collection(TASK_COLLECTION) + + up := bson.D{{"$set", update}} //nolint:govet + + _, err := coll.UpdateOne(context.TODO(), filter, up) + if err != nil { + return err + } + return nil +} + +// GetTask gets a completed Task from the database +func GetTask(videoKey string) (Task, error) { + coll := db.DB.Collection(TASK_COLLECTION) + var task Task + err := coll.FindOne(context.TODO(), Task{Key: videoKey}).Decode(&task) + + return task, err +} diff --git a/common/db/type.go b/common/db/type.go index 9047913..8e6d4cf 100644 --- a/common/db/type.go +++ b/common/db/type.go @@ -1,8 +1,8 @@ package db const ( - VIDEO_COLLECTION = "video" - COMPLETED_COLLECTION = "completed" + VIDEO_COLLECTION = "video" + TASK_COLLECTION = "task" ) type VideoClipInfo struct { @@ -13,7 +13,7 @@ type VideoClipInfo struct { EncodeKey string `bson:"encode_key,omitempty"` } -type CompletedTask struct { +type Task struct { Key string `bson:"key,omitempty"` EncodeKey string `bson:"encode_key,omitempty"` EncodeParam string `bson:"encode_param,omitempty"` diff --git a/common/db/video.go b/common/db/video.go index d5b1abe..3fb52e4 100644 --- a/common/db/video.go +++ b/common/db/video.go @@ -23,15 +23,6 @@ func InsertVideo(info VideoClipInfo) error { return err } -// InsertManyVideo 批量插入视频信息 -func InsertManyVideo(infos []VideoClipInfo) error { - coll := db.DB.Collection(VIDEO_COLLECTION) - _, err := coll.InsertMany(context.TODO(), []interface{}{ - infos, - }) - return err -} - // GetVideoClips 获取所有视频切片信息,按照索引排序 func GetVideoClips(videoKey string) ([]VideoClipInfo, error) { coll := db.DB.Collection(VIDEO_COLLECTION) @@ -70,27 +61,6 @@ func UpdateVideo(filter VideoClipInfo, update VideoClipInfo) error { return nil } -// UpdateVideoEncodeClip 更新 Encode 后视频切片信息 -func UpdateVideoEncodeClip(videoKey string, clipKey string, encodeKey string) error { - coll := db.DB.Collection(VIDEO_COLLECTION) - - filter := VideoClipInfo{ - Key: videoKey, - ClipKey: clipKey, - } - - update := bson.D{{"$set", VideoClipInfo{ //nolint: govet - EncodeKey: encodeKey, - }}} - - _, err := coll.UpdateOne(context.TODO(), filter, update) - if err != nil { - return err - } - - return nil -} - // GetVideoProgress 获取视频处理进度和每个切片的状态 func GetVideoProgress(videoKey string) ([]bool, error) { infos, err := GetVideoClips(videoKey) diff --git a/conf/finalrip.yml b/conf/finalrip.yml index cf3af22..3d99444 100644 --- a/conf/finalrip.yml +++ b/conf/finalrip.yml @@ -28,8 +28,8 @@ redis: oss: type: minio # minio, cos endpoint: 127.0.0.1:9000 - accessKey: ChYm7ufIwNAOzq6PQPCA - secretKey: udicP52IwRbmo2hf6lFvjUS7NP5BhlAdsGNIuDE5 + accessKey: homo + secretKey: homo114514 region: local bucket: finalrip ssl: false diff --git a/module/oss/oss.go b/module/oss/oss.go index 2a3df1c..0f23a5c 100644 --- a/module/oss/oss.go +++ b/module/oss/oss.go @@ -119,3 +119,14 @@ func GetPresignedURL(key string, fileName string, expiration time.Duration) (str } return presignedURL.String(), nil } + +// GetUploadPresignedURL gets the presigned URL for the file upload, use PUT method in frontend. +func GetUploadPresignedURL(key string, expiration time.Duration) (string, error) { + // Generate presigned put object url + presignedURL, err := oss.PresignedPutObject(context.Background(), config.OSSConfig.Bucket, key, expiration) + if err != nil { + log.Logger.Error("Failed to generate presigned URL: " + key + err.Error()) + return "", err + } + return presignedURL.String(), nil +} diff --git a/server/internal/middleware/cros/cros.go b/server/internal/middleware/cros/cros.go index fd7229a..0f29683 100644 --- a/server/internal/middleware/cros/cros.go +++ b/server/internal/middleware/cros/cros.go @@ -1,15 +1,15 @@ package cros import ( + "net/http" + "github.com/gin-gonic/gin" ) // Cors 设置跨域 func Cors() gin.HandlerFunc { return func(c *gin.Context) { - origin := c.GetHeader("origin") - - c.Header("Access-Control-Allow-Origin", origin) + c.Header("Access-Control-Allow-Origin", "*") c.Header("Access-Control-Allow-Headers", "Content-Type, AccessToken, X-CSRF-Token, Authorization, Token, X-Token, X-User-Id") c.Header("Access-Control-Allow-Methods", "POST, GET") @@ -18,6 +18,11 @@ func Cors() gin.HandlerFunc { "Access-Control-Allow-Headers, Content-Type, New-Token, New-Expires-At") c.Header("Access-Control-Allow-Credentials", "true") + if c.Request.Method == "OPTIONS" { + c.JSON(http.StatusOK, "Options Request!") + return + } + c.Next() } } diff --git a/server/internal/router/api/v1/api.go b/server/internal/router/api/v1/api.go index 7e87ccf..55fdfe2 100644 --- a/server/internal/router/api/v1/api.go +++ b/server/internal/router/api/v1/api.go @@ -6,15 +6,13 @@ import ( "github.com/TensoRaws/FinalRip/server/internal/middleware/auth" "github.com/TensoRaws/FinalRip/server/internal/middleware/cros" "github.com/TensoRaws/FinalRip/server/internal/middleware/logger" - "github.com/TensoRaws/FinalRip/server/internal/service/process" + "github.com/TensoRaws/FinalRip/server/internal/service/task" "github.com/gin-gonic/gin" ) func NewAPI() *gin.Engine { r := gin.New() - r.Use(cros.Cors()) // 跨域中间件 - r.Use(logger.DefaultLogger(), gin.Recovery()) // 日志中间件 - r.Use(auth.RequireAuth()) // 鉴权中间件 + r.Use(cros.Cors(), logger.DefaultLogger(), gin.Recovery(), auth.RequireAuth()) r.GET("/", func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{ @@ -24,12 +22,12 @@ func NewAPI() *gin.Engine { api := r.Group("/api/v1/") { - processGroup := api.Group("process/") + processGroup := api.Group("task/") { - // 开始压制 - processGroup.POST("start", process.Start) - // 查看进度 - processGroup.GET("progress", process.Progress) + processGroup.POST("new", task.New) + processGroup.POST("start", task.Start) + processGroup.GET("progress", task.Progress) + processGroup.GET("oss/presigned", task.OSSPresigned) } } diff --git a/server/internal/service/task/new.go b/server/internal/service/task/new.go new file mode 100644 index 0000000..178e2e2 --- /dev/null +++ b/server/internal/service/task/new.go @@ -0,0 +1,37 @@ +package task + +import ( + "github.com/TensoRaws/FinalRip/common/db" + "github.com/TensoRaws/FinalRip/module/log" + "github.com/TensoRaws/FinalRip/module/resp" + "github.com/gin-gonic/gin" +) + +type NewRequest struct { + VideoKey string `form:"video_key" binding:"required"` +} + +// New 创建任务 (POST /new) +func New(c *gin.Context) { + // 绑定参数 + var req NewRequest + if err := c.ShouldBind(&req); err != nil { + resp.AbortWithMsg(c, err.Error()) + return + } + + // 检查视频是否存在 + if db.CheckTaskExist(req.VideoKey) { + resp.AbortWithMsg(c, "Task already exists, please wait for it to complete or delete it.") + return + } + + err := db.InsertTask(req.VideoKey) + if err != nil { + log.Logger.Error("Failed to insert task: " + err.Error()) + resp.AbortWithMsg(c, err.Error()) + return + } + + resp.OK(c) +} diff --git a/server/internal/service/task/oss.go b/server/internal/service/task/oss.go new file mode 100644 index 0000000..d5f52d2 --- /dev/null +++ b/server/internal/service/task/oss.go @@ -0,0 +1,39 @@ +package task + +import ( + "time" + + "github.com/TensoRaws/FinalRip/module/log" + "github.com/TensoRaws/FinalRip/module/oss" + "github.com/TensoRaws/FinalRip/module/resp" + "github.com/gin-gonic/gin" +) + +type OSSPresignedRequest struct { + VideoKey string `form:"video_key" binding:"required"` +} + +type OSSPresignedResponse struct { + URL string `json:"url"` +} + +// OSSPresigned 获取 OSS 上传 URL (GET /oss/presigned) +func OSSPresigned(c *gin.Context) { + // 绑定参数 + var req OSSPresignedRequest + if err := c.ShouldBindQuery(&req); err != nil { + resp.AbortWithMsg(c, err.Error()) + return + } + + url, err := oss.GetUploadPresignedURL(req.VideoKey, 48*time.Hour) + if err != nil { + log.Logger.Error("get upload presigned url failed: " + err.Error()) + resp.AbortWithMsg(c, "get upload presigned url failed: "+err.Error()) + return + } + + resp.OKWithData(c, &OSSPresignedResponse{ + URL: url, + }) +} diff --git a/server/internal/service/process/progress.go b/server/internal/service/task/progress.go similarity index 96% rename from server/internal/service/process/progress.go rename to server/internal/service/task/progress.go index fd78a28..17cf0b9 100644 --- a/server/internal/service/process/progress.go +++ b/server/internal/service/task/progress.go @@ -1,4 +1,4 @@ -package process +package task import ( "time" @@ -38,7 +38,7 @@ func Progress(c *gin.Context) { return } - task, err := db.GetCompletedTask(req.VideoKey) + task, err := db.GetTask(req.VideoKey) if err != nil { log.Logger.Errorf("db.GetCompletedEncodeKey failed, err: %v", err) resp.AbortWithMsg(c, err.Error()) diff --git a/server/internal/service/process/start.go b/server/internal/service/task/start.go similarity index 88% rename from server/internal/service/process/start.go rename to server/internal/service/task/start.go index 5ebe11f..d9a20ea 100644 --- a/server/internal/service/process/start.go +++ b/server/internal/service/task/start.go @@ -1,4 +1,4 @@ -package process +package task import ( "errors" @@ -31,9 +31,25 @@ func Start(c *gin.Context) { return } - err := db.InsertUncompletedTask(req.VideoKey, req.EncodeParam, req.Script) + // 检查任务是否 new 上传 + if !db.CheckTaskExist(req.VideoKey) { + resp.AbortWithMsg(c, "Task not found, please upload video first.") + return + } + + // 检查任务是否已经开始 + if db.CheckTaskStart(req.VideoKey) { + resp.AbortWithMsg(c, "Task already started.") + return + } + + // 更新任务 + err := db.UpdateTask(db.Task{Key: req.VideoKey}, db.Task{ + EncodeParam: req.EncodeParam, + Script: req.Script, + }) if err != nil { - log.Logger.Error("Failed to insert uncompleted task: " + err.Error()) + log.Logger.Error("Failed to update task: " + err.Error()) resp.AbortWithMsg(c, err.Error()) return } diff --git a/worker/internal/encode/worker.go b/worker/internal/encode/worker.go index 922492a..ce924c9 100644 --- a/worker/internal/encode/worker.go +++ b/worker/internal/encode/worker.go @@ -96,7 +96,10 @@ func Handler(ctx context.Context, t *asynq.Task) error { return err } - err = db.UpdateVideoEncodeClip(p.Clip.Key, p.Clip.ClipKey, key) + err = db.UpdateVideo(db.VideoClipInfo{Key: p.Clip.Key}, db.VideoClipInfo{ + ClipKey: p.Clip.ClipKey, + EncodeKey: key, + }) if err != nil { log.Logger.Errorf("Failed to upload encode video %s: %s", key, err) return err diff --git a/worker/internal/merge/worker.go b/worker/internal/merge/worker.go index 9ad60a4..7f9df59 100644 --- a/worker/internal/merge/worker.go +++ b/worker/internal/merge/worker.go @@ -120,7 +120,7 @@ func Handler(ctx context.Context, t *asynq.Task) error { } // 保存合并后的视频信息 - err = db.UpdateUncompletedTask(p.Clips[0].Key, mergedKey) + err = db.UpdateTask(db.Task{Key: p.Clips[0].Key}, db.Task{EncodeKey: mergedKey}) if err != nil { log.Logger.Errorf("Failed to update completed task: %v", err) return err