Skip to content

Commit

Permalink
feat: clear api (#17)
Browse files Browse the repository at this point in the history
refactor: cfg
  • Loading branch information
Tohrusky authored Aug 15, 2024
1 parent 683328a commit a3d016d
Show file tree
Hide file tree
Showing 13 changed files with 196 additions and 63 deletions.
16 changes: 16 additions & 0 deletions common/db/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ func CheckTaskStart(videoKey string) bool {
return task.EncodeParam != ""
}

// CheckTaskComplete checks if a task has completed
func CheckTaskComplete(videoKey string) bool {
task, err := GetTask(videoKey)
if err != nil {
return false
}
return task.EncodeKey != ""
}

// InsertTask inserts a new uncompleted task into the database
func InsertTask(videoKey string) error {
coll := db.DB.Collection(TASK_COLLECTION)
Expand Down Expand Up @@ -53,3 +62,10 @@ func GetTask(videoKey string) (Task, error) {

return task, err
}

// DeleteTask deletes a task from the database
func DeleteTask(videoKey string) error {
coll := db.DB.Collection(TASK_COLLECTION)
_, err := coll.DeleteOne(context.TODO(), Task{Key: videoKey})
return err
}
1 change: 1 addition & 0 deletions common/db/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type VideoClipInfo struct {
Total int `bson:"total,omitempty"`
ClipKey string `bson:"clip_key,omitempty"`
EncodeKey string `bson:"encode_key,omitempty"`
TaskID string `bson:"task_id,omitempty"`
}

type Task struct {
Expand Down
7 changes: 7 additions & 0 deletions common/db/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ func UpdateVideo(filter VideoClipInfo, update VideoClipInfo) error {
return nil
}

// DeleteVideoClips 删除所有视频切片
func DeleteVideoClips(videoKey string) error {
coll := db.DB.Collection(VIDEO_COLLECTION)
_, err := coll.DeleteMany(context.TODO(), VideoClipInfo{Key: videoKey})
return err
}

// GetVideoProgress 获取视频处理进度和每个切片的状态
func GetVideoProgress(videoKey string) ([]bool, error) {
infos, err := GetVideoClips(videoKey)
Expand Down
1 change: 0 additions & 1 deletion common/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const (
// CutTaskPayload is a struct that represents the payload for cut task.
type CutTaskPayload struct {
VideoKey string `json:"video_key"`
Retry bool `json:"retry"`
}

// EncodeTaskPayload is a struct that represents the payload for encode task.
Expand Down
32 changes: 21 additions & 11 deletions deploy/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,31 @@ networks:
driver: bridge

services:
# worker-encode:
# image: lychee0/finalrip-worker-encode:latest
# container_name: finalrip-worker-encode
# restart: always
# environment:
# - FINALRIP_REMOTE_CONFIG_HOST=EASYTIER
# networks:
# - backend
worker-encode:
image: lychee0/finalrip-worker-encode:latest
# image: lychee0/finalrip-worker-encode-pytorch:dev
image: lychee0/finalrip-worker-encode-pytorch
container_name: finalrip-worker-encode
restart: always
environment:
- FINALRIP_REMOTE_CONFIG_HOST=EASYTIER
# deploy:
# resources:
# reservations:
# devices:
# - driver: nvidia
# device_ids:
# - "0"
# capabilities: [gpu]
- FINALRIP_REMOTE_CONFIG_HOST=consul:8500
- FINALRIP_DB_HOST=mongodb
- FINALRIP_REDIS_HOST=redis
- FINALRIP_OSS_ENDPOINT=192.168.0.109:9000
deploy:
resources:
reservations:
devices:
- driver: nvidia
device_ids:
- "0"
capabilities: [gpu]
networks:
- backend
# easytier:
Expand Down
13 changes: 3 additions & 10 deletions module/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"
"time"

"github.com/TensoRaws/FinalRip/module/util"
"github.com/fsnotify/fsnotify"
"github.com/spf13/viper"
_ "github.com/spf13/viper/remote"
Expand Down Expand Up @@ -94,20 +93,14 @@ func initialize() {
}

func updateRemoteConfigOnChange() {
lastCfg, err := util.DeepCopyMap(config.AllSettings())
if err != nil {
fmt.Println("failed to copy config: " + err.Error())
}
lastCfg := config.AllSettings()

err = config.WatchRemoteConfig()
err := config.WatchRemoteConfig()
if err != nil {
fmt.Println("failed to watch remote config: " + err.Error())
}

cfg, err := util.DeepCopyMap(config.AllSettings())
if err != nil {
fmt.Println("failed to copy config: " + err.Error())
}
cfg := config.AllSettings()

if !reflect.DeepEqual(lastCfg, cfg) {
// 配置文件发生变更之后,重新初始化配置
Expand Down
12 changes: 6 additions & 6 deletions module/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,7 @@ func PutBytes(key string, data []byte) error {

// GetWithPath downloads and saves the object as a file in the local filesystem by key.
func GetWithPath(key string, path string) error {
err = oss.FGetObject(context.Background(), config.OSSConfig.Bucket, key, path, minio.GetObjectOptions{})
if err != nil {
return err
}

return nil
return oss.FGetObject(context.Background(), config.OSSConfig.Bucket, key, path, minio.GetObjectOptions{})
}

// Get gets the file pointed to by key.
Expand Down Expand Up @@ -105,6 +100,11 @@ func GetBytes(key string) ([]byte, error) {
return buf.Bytes(), nil
}

// Delete deletes the file pointed to by key.
func Delete(key string) error {
return oss.RemoveObject(context.Background(), config.OSSConfig.Bucket, key, minio.RemoveObjectOptions{})
}

// GetPresignedURL gets the presigned URL for the file pointed to by key.
func GetPresignedURL(key string, fileName string, expiration time.Duration) (string, error) {
// Set request parameters
Expand Down
11 changes: 6 additions & 5 deletions server/internal/router/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ func NewAPI() *gin.Engine {

api := r.Group("/api/v1/")
{
processGroup := api.Group("task/")
taskGroup := api.Group("task/")
{
processGroup.POST("new", task.New)
processGroup.POST("start", task.Start)
processGroup.GET("progress", task.Progress)
processGroup.GET("oss/presigned", task.OSSPresigned)
taskGroup.POST("new", task.New)
taskGroup.POST("start", task.Start)
taskGroup.GET("progress", task.Progress)
taskGroup.GET("oss/presigned", task.OSSPresigned)
taskGroup.POST("clear", task.Clear)
}
}

Expand Down
110 changes: 110 additions & 0 deletions server/internal/service/task/clear.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package task

import (
"sync"

"github.com/TensoRaws/FinalRip/common/db"
"github.com/TensoRaws/FinalRip/module/log"
"github.com/TensoRaws/FinalRip/module/oss"
"github.com/TensoRaws/FinalRip/module/queue"
"github.com/TensoRaws/FinalRip/module/resp"
"github.com/gin-gonic/gin"
)

type ClearRequest struct {
VideoKey string `form:"video_key" binding:"required"`
}

// Clear 清理任务 (POST /new)
func Clear(c *gin.Context) {
// 绑定参数
var req NewRequest
if err := c.ShouldBind(&req); err != nil {
resp.AbortWithMsg(c, err.Error())
return
}

// 检查任务是否存在
if !db.CheckTaskExist(req.VideoKey) {
resp.AbortWithMsg(c, "Task not found, please check the video key")
return
}

clips, err := db.GetVideoClips(req.VideoKey)
if err != nil {
resp.AbortWithMsg(c, err.Error())
return
}

// 检查 Cut 任务是否处理完成
if len(clips) == 0 {
resp.AbortWithMsg(c, "Please wait for the cut task to complete before clearing the task")
return
}

// 清理 OSS
ossDelObjKeys := make([]string, 0)
task, _ := db.GetTask(req.VideoKey)
ossDelObjKeys = append(ossDelObjKeys, task.Key)
ossDelObjKeys = append(ossDelObjKeys, task.EncodeKey)

for _, clip := range clips {
if clip.ClipKey != "" {
ossDelObjKeys = append(ossDelObjKeys, clip.ClipKey)
}
if clip.EncodeKey != "" {
ossDelObjKeys = append(ossDelObjKeys, clip.EncodeKey)
}
}

ossDelMulti(ossDelObjKeys)
log.Logger.Infof("Deleted files from OSS: %v", ossDelObjKeys)

// 清理数据库
err = db.DeleteTask(req.VideoKey)
if err != nil {
resp.AbortWithMsg(c, err.Error())
return
}

err = db.DeleteVideoClips(req.VideoKey)
if err != nil {
resp.AbortWithMsg(c, err.Error())
return
}
log.Logger.Infof("Deleted task from database: %s", req.VideoKey)

// 一定要在删除数据库记录和 OSS 之后再取消任务,否则会导致 Merge 任务无法正常取消,以及 Encode 任务异常下载 OSS
// 检查任务是否处理完成
if !db.CheckTaskComplete(req.VideoKey) {
// 清理任务队列,倒序删除
for i := len(clips) - 1; i >= 0; i-- {
clip := clips[i]
err = queue.Isp.CancelProcessing(clip.TaskID)
if err != nil {
log.Logger.Errorf("Failed to cancel processing task: %s", err)
}
err := queue.Isp.DeleteTask(queue.ENCODE_QUEUE, clip.TaskID)
if err != nil {
log.Logger.Errorf("Failed to delete task from encode queue: %s", err)
}
}
}

resp.OK(c)
}

func ossDelMulti(keys []string) {
var wg sync.WaitGroup
for _, key := range keys {
wg.Add(1)
go func(k string) {
err := oss.Delete(k)
if err != nil {
log.Logger.Errorf("Failed to delete file from OSS: %s", err)
}
wg.Done()
}(key)
}
wg.Wait()
}
2 changes: 1 addition & 1 deletion server/internal/service/task/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func New(c *gin.Context) {
return
}

// 检查视频是否存在
// 检查任务是否存在
if db.CheckTaskExist(req.VideoKey) {
resp.AbortWithMsg(c, "Task already exists, please wait for it to complete or delete it.")
return
Expand Down
14 changes: 14 additions & 0 deletions server/internal/service/task/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ func HandleStart(req StartRequest) {
return
}

err = db.UpdateVideo(db.VideoClipInfo{Key: req.VideoKey, ClipKey: clip.ClipKey},
db.VideoClipInfo{TaskID: info.ID})
if err != nil {
log.Logger.Error("Failed to enqueue task: " + err.Error())
return
}

log.Logger.Info("Successfully enqueued task: " + util.StructToString(clip))

wg.Add(1)
Expand Down Expand Up @@ -154,6 +161,13 @@ func HandleStart(req StartRequest) {
log.Logger.Error("Failed to get video clips: " + err.Error())
return
}

// 如果已经clear,不再合并
if len(clips) == 0 {
log.Logger.Info("No clips to merge.")
return
}

payload, err = sonic.Marshal(task.MergeTaskPayload{
Clips: clips,
})
Expand Down
25 changes: 0 additions & 25 deletions worker/internal/cut/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,6 @@ func Handler(ctx context.Context, t *asynq.Task) error {

key := p.VideoKey + "-clip-" + strconv.FormatInt(int64(index), 10) + ".mkv"

// 重试情况
if p.Retry {
err := oss.PutByPath(key, file)
if err != nil {
log.Logger.Errorf("Failed to upload video %v: %v", index, file)
}

if !db.CheckVideoExist(db.VideoClipInfo{
Key: p.VideoKey,
ClipKey: key,
}) {
err = db.InsertVideo(db.VideoClipInfo{
Key: p.VideoKey,
Index: index,
Total: total,
ClipKey: key,
})
if err != nil {
log.Logger.Errorf("Failed to insert video %s: %v", key, err)
}
}

return
}

// 正常情况
if db.CheckVideoExist(db.VideoClipInfo{
Key: p.VideoKey,
Expand Down
15 changes: 11 additions & 4 deletions worker/internal/encode/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package encode

import (
"context"
"errors"
"os"
"path"
"strconv"
Expand Down Expand Up @@ -90,16 +91,22 @@ func Handler(ctx context.Context, t *asynq.Task) error {
return nil
}

// 检查任务是否被取消
if !db.CheckVideoExist(db.VideoClipInfo{
Key: p.Clip.Key,
ClipKey: p.Clip.ClipKey,
}) {
log.Logger.Errorf("Encode Video Clip %s has been canceled", key)
return errors.New("encode video clip has been canceled")
}

err = oss.PutByPath(key, tempEncodedVideo)
if err != nil {
log.Logger.Errorf("Failed to upload encode video %s: %s", key, err)
return err
}

err = db.UpdateVideo(db.VideoClipInfo{Key: p.Clip.Key}, db.VideoClipInfo{
ClipKey: p.Clip.ClipKey,
EncodeKey: key,
})
err = db.UpdateVideo(db.VideoClipInfo{Key: p.Clip.Key, ClipKey: p.Clip.ClipKey}, db.VideoClipInfo{EncodeKey: key})
if err != nil {
log.Logger.Errorf("Failed to upload encode video %s: %s", key, err)
return err
Expand Down

0 comments on commit a3d016d

Please sign in to comment.