From cd79e51f8fec68146a7b7c898057bd38c104da17 Mon Sep 17 00:00:00 2001 From: link Date: Tue, 20 Dec 2022 14:05:16 +0800 Subject: [PATCH] Socketio modification (#771) --- cmd/migration-tool/main.go | 2 +- main.go | 12 +-- route/periodical.go | 44 +++------- route/route.go | 1 + route/socket.go | 76 +++++++++-------- route/v1/file.go | 13 ++- route/v1/system.go | 5 +- service/notify.go | 167 +++---------------------------------- service/service.go | 11 ++- service/system.go | 1 + 10 files changed, 95 insertions(+), 237 deletions(-) diff --git a/cmd/migration-tool/main.go b/cmd/migration-tool/main.go index d28663c0a..7068031c7 100644 --- a/cmd/migration-tool/main.go +++ b/cmd/migration-tool/main.go @@ -48,7 +48,7 @@ func init() { sqliteDB = sqlite.GetDb(dbFlag) // gredis.GetRedisConn(config.RedisInfo), - service.MyService = service.NewService(sqliteDB, "") + service.MyService = service.NewService(sqliteDB, "", nil) } func main() { diff --git a/main.go b/main.go index 6fe39b99a..72c5d08a6 100644 --- a/main.go +++ b/main.go @@ -11,7 +11,6 @@ import ( "github.com/IceWhaleTech/CasaOS-Common/model" "github.com/IceWhaleTech/CasaOS-Common/utils/constants" "github.com/IceWhaleTech/CasaOS-Common/utils/logger" - "github.com/IceWhaleTech/CasaOS/model/notify" "github.com/IceWhaleTech/CasaOS/pkg/cache" "github.com/IceWhaleTech/CasaOS/pkg/config" "github.com/IceWhaleTech/CasaOS/pkg/sqlite" @@ -21,6 +20,7 @@ import ( "github.com/IceWhaleTech/CasaOS/service" "github.com/IceWhaleTech/CasaOS/types" "github.com/coreos/go-systemd/daemon" + "github.com/gin-gonic/gin" "go.uber.org/zap" "github.com/robfig/cron" @@ -53,7 +53,7 @@ func init() { sqliteDB = sqlite.GetDb(*dbFlag) // gredis.GetRedisConn(config.RedisInfo), - service.MyService = service.NewService(sqliteDB, config.CommonInfo.RuntimePath) + service.MyService = service.NewService(sqliteDB, config.CommonInfo.RuntimePath, route.SocketIo()) service.Cache = cache.Init() @@ -74,15 +74,17 @@ func init() { // @name Authorization // @BasePath /v1 func main() { - service.NotifyMsg = make(chan notify.Message, 10) if *versionFlag { return } - go route.SocketInit(service.NotifyMsg) // model.Setup() // gredis.Setup() r := route.InitRouter() + defer service.SocketServer.Close() + r.GET("/v1/socketio/*any", gin.WrapH(service.SocketServer)) + r.POST("/v1/socketio/*any", gin.WrapH(service.SocketServer)) + // service.SyncTask(sqliteDB) cron2 := cron.New() // every day execution @@ -108,7 +110,7 @@ func main() { if err != nil { panic(err) } - routers := []string{"sys", "port", "file", "folder", "batch", "image", "samba", "notify"} + routers := []string{"sys", "port", "file", "folder", "batch", "image", "samba", "notify", "socketio"} for _, v := range routers { err = service.MyService.Gateway().CreateRoute(&model.Route{ Path: "/v1/" + v, diff --git a/route/periodical.go b/route/periodical.go index db8fa0923..588660b3d 100644 --- a/route/periodical.go +++ b/route/periodical.go @@ -22,37 +22,6 @@ import ( "github.com/IceWhaleTech/CasaOS/service" ) -func SendNetINfoBySocket() { - netList := service.MyService.System().GetNetInfo() - newNet := []model.IOCountersStat{} - nets := service.MyService.System().GetNet(true) - for _, n := range netList { - for _, netCardName := range nets { - if n.Name == netCardName { - item := *(*model.IOCountersStat)(unsafe.Pointer(&n)) - item.State = strings.TrimSpace(service.MyService.System().GetNetState(n.Name)) - item.Time = time.Now().Unix() - newNet = append(newNet, item) - break - } - } - } - service.MyService.Notify().SendNetInfoBySocket(newNet) -} - -func SendCPUBySocket() { - cpu := service.MyService.System().GetCpuPercent() - num := service.MyService.System().GetCpuCoreNum() - cpuData := make(map[string]interface{}) - cpuData["percent"] = cpu - cpuData["num"] = num - service.MyService.Notify().SendCPUInfoBySocket(cpuData) -} - -func SendMemBySocket() { - service.MyService.Notify().SendMemInfoBySocket(service.MyService.System().GetMemInfo()) -} - func SendAllHardwareStatusBySocket() { netList := service.MyService.System().GetNetInfo() newNet := []model.IOCountersStat{} @@ -89,7 +58,18 @@ func SendAllHardwareStatusBySocket() { memInfo := service.MyService.System().GetMemInfo() - service.MyService.Notify().SendAllHardwareStatusBySocket(memInfo, cpuData, newNet) + body := make(map[string]interface{}) + + body["sys_mem"] = memInfo + + body["sys_cpu"] = cpuData + + body["sys_net"] = newNet + systemTempMap := service.MyService.Notify().GetSystemTempMap() + for k, v := range systemTempMap { + body[k] = v + } + service.MyService.Notify().SendNotify("sys_hardware_status", body) } // func MonitoryUSB() { diff --git a/route/route.go b/route/route.go index 2084e80d5..0fd234b38 100644 --- a/route/route.go +++ b/route/route.go @@ -119,6 +119,7 @@ func InitRouter() *gin.Engine { v1FolderGroup.PUT("/name", v1.RenamePath) v1FolderGroup.GET("", v1.DirPath) ///file/dirpath v1FolderGroup.POST("", v1.MkdirAll) ///file/mkdir + v1FolderGroup.GET("/size", v1.GetSize) } v1BatchGroup := v1Group.Group("/batch") v1BatchGroup.Use() diff --git a/route/socket.go b/route/socket.go index 1ab4f8817..4d9e6378f 100644 --- a/route/socket.go +++ b/route/socket.go @@ -11,48 +11,52 @@ package route import ( - "strconv" - "time" - - "github.com/IceWhaleTech/CasaOS-Common/utils/port" - "github.com/IceWhaleTech/CasaOS/model/notify" - "github.com/IceWhaleTech/CasaOS/pkg/config" + "github.com/IceWhaleTech/CasaOS-Common/utils/logger" "github.com/IceWhaleTech/CasaOS/service" - f "github.com/ambelovsky/gosf" + socketio "github.com/googollee/go-socket.io" + "go.uber.org/zap" ) -func SocketInit(msg chan notify.Message) { - // set socket port - socketPort := 0 - if len(config.ServerInfo.SocketPort) == 0 { - socketPort, _ = port.GetAvailablePort("tcp") - config.ServerInfo.SocketPort = strconv.Itoa(socketPort) - config.Cfg.Section("server").Key("SocketPort").SetValue(strconv.Itoa(socketPort)) - config.Cfg.SaveTo(config.SystemConfigInfo.ConfigPath) - } else { - socketPort, _ = strconv.Atoi(config.ServerInfo.SocketPort) - if !port.IsPortAvailable(socketPort, "tcp") { - socketPort, _ := port.GetAvailablePort("tcp") - config.ServerInfo.SocketPort = strconv.Itoa(socketPort) - config.Cfg.Section("server").Key("SocketPort").SetValue(strconv.Itoa(socketPort)) - config.Cfg.SaveTo(config.SystemConfigInfo.ConfigPath) - } - } - - f.OnConnect(func(c *f.Client, request *f.Request) { +func SocketIo() *socketio.Server { + server := socketio.NewServer(nil) + server.OnConnect("/", func(s socketio.Conn) error { + s.SetContext("") + logger.Info("connected", zap.Any("id", s.ID())) + s.Join("public") service.ClientCount += 1 + return nil }) - f.OnDisconnect(func(c *f.Client, request *f.Request) { - service.ClientCount -= 1 + + server.OnEvent("/", "notice", func(s socketio.Conn, msg string) { + logger.Info("notice", zap.Any("msg", msg)) + s.Emit("reply", "have "+msg) }) - go func(msg chan notify.Message) { - for v := range msg { - f.Broadcast("", v.Path, &v.Msg) - time.Sleep(time.Millisecond * 100) - } - }(msg) - f.Startup(map[string]interface{}{ - "port": socketPort, + // server.OnEvent("/chat", "msg", func(s socketio.Conn, msg string) string { + // s.SetContext(msg) + // return "recv " + msg + // }) + + // server.OnEvent("/", "bye", func(s socketio.Conn) string { + // last := s.Context().(string) + // s.Emit("bye", last) + // s.Close() + // return last + // }) + + server.OnError("/", func(s socketio.Conn, e error) { + logger.Error("meet error", zap.Any("error", e)) + }) + + server.OnDisconnect("/", func(s socketio.Conn, reason string) { + service.ClientCount -= 1 + logger.Info("closed", zap.Any("reason", reason)) }) + + go func() { + if err := server.Serve(); err != nil { + logger.Error("error when trying to listen socketio ", zap.Any("error", err)) + } + }() + return server } diff --git a/route/v1/file.go b/route/v1/file.go index 143045b5b..0fcdf96f3 100644 --- a/route/v1/file.go +++ b/route/v1/file.go @@ -184,7 +184,7 @@ func GetDownloadFile(c *gin.Context) { func GetDownloadSingleFile(c *gin.Context) { filePath := c.Query("path") if len(filePath) == 0 { - c.JSON(service.ClientCount, model.Result{ + c.JSON(common_err.CLIENT_ERROR, model.Result{ Success: common_err.INVALID_PARAMS, Message: common_err.GetMsg(common_err.INVALID_PARAMS), }) @@ -649,3 +649,14 @@ func DeleteOperateFileOrDir(c *gin.Context) { go service.MyService.Notify().SendFileOperateNotify(true) c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS)}) } +func GetSize(c *gin.Context) { + json := make(map[string]string) + c.ShouldBind(&json) + path := json["path"] + size, err := file.GetFileOrDirSize(path) + if err != nil { + c.JSON(common_err.SERVICE_ERROR, model.Result{Success: common_err.SERVICE_ERROR, Message: common_err.GetMsg(common_err.SERVICE_ERROR), Data: err.Error()}) + return + } + c.JSON(common_err.SUCCESS, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS), Data: size}) +} diff --git a/route/v1/system.go b/route/v1/system.go index e8bfd68a2..e2476c8fa 100644 --- a/route/v1/system.go +++ b/route/v1/system.go @@ -336,11 +336,10 @@ func GetSystemProxy(c *gin.Context) { func PutSystemState(c *gin.Context) { state := c.Param("state") - if state == "off" { + if strings.ToLower(state) == "off" { service.MyService.System().SystemShutdown() - } else if state == "restart" { + } else if strings.ToLower(state) == "restart" { service.MyService.System().SystemReboot() - } c.JSON(http.StatusOK, model.Result{Success: common_err.SUCCESS, Message: common_err.GetMsg(common_err.SUCCESS), Data: "The operation will be completed shortly."}) } diff --git a/service/notify.go b/service/notify.go index 1571f28da..70d544083 100644 --- a/service/notify.go +++ b/service/notify.go @@ -10,14 +10,14 @@ import ( "github.com/IceWhaleTech/CasaOS/model/notify" "github.com/IceWhaleTech/CasaOS/service/model" "github.com/IceWhaleTech/CasaOS/types" - "github.com/ambelovsky/gosf" + socketio "github.com/googollee/go-socket.io" "github.com/gorilla/websocket" "gorm.io/gorm" ) var ( - NotifyMsg chan notify.Message + //NotifyMsg chan notify.Message ClientCount int ) @@ -31,12 +31,9 @@ type NotifyServer interface { MarkRead(id string, state int) // SendText(m model.AppNotify) SendUninstallAppBySocket(app notifyCommon.Application) - SendNetInfoBySocket(netList []model2.IOCountersStat) - SendCPUInfoBySocket(cpu map[string]interface{}) - SendMemInfoBySocket(mem map[string]interface{}) + SendFileOperateNotify(nowSend bool) SendInstallAppBySocket(app notifyCommon.Application) - SendAllHardwareStatusBySocket(mem map[string]interface{}, cpu map[string]interface{}, netList []model2.IOCountersStat) SendStorageBySocket(message notify.StorageMessage) SendNotify(path string, message map[string]interface{}) SettingSystemTempData(message map[string]interface{}) @@ -55,57 +52,11 @@ func (i *notifyServer) SettingSystemTempData(message map[string]interface{}) { } func (i *notifyServer) SendNotify(path string, message map[string]interface{}) { - msg := gosf.Message{} - msg.Body = message - msg.Success = true - msg.Text = path - - notify := notify.Message{} - notify.Path = path - notify.Msg = msg - - NotifyMsg <- notify + SocketServer.BroadcastToRoom("/", "public", path, message) } func (i *notifyServer) SendStorageBySocket(message notify.StorageMessage) { - body := make(map[string]interface{}) - body["data"] = message - - msg := gosf.Message{} - msg.Body = body - msg.Success = true - msg.Text = "storage_status" - - notify := notify.Message{} - notify.Path = "storage_status" - notify.Msg = msg - - NotifyMsg <- notify -} - -func (i *notifyServer) SendAllHardwareStatusBySocket(mem map[string]interface{}, cpu map[string]interface{}, netList []model2.IOCountersStat) { - body := make(map[string]interface{}) - - body["sys_mem"] = mem - - body["sys_cpu"] = cpu - - body["sys_net"] = netList - - for k, v := range i.SystemTempMap { - body[k] = v - } - - msg := gosf.Message{} - msg.Body = body - msg.Success = true - msg.Text = "sys_hardware_status" - - notify := notify.Message{} - notify.Path = "sys_hardware_status" - notify.Msg = msg - - NotifyMsg <- notify + SocketServer.BroadcastToRoom("/", "public", "storage_status", message) } // Send periodic broadcast messages @@ -122,17 +73,8 @@ func (i *notifyServer) SendFileOperateNotify(nowSend bool) { listMsg := make(map[string]interface{}) if len == 0 { model.Data = []string{} - listMsg["file_operate"] = model - msg := gosf.Message{} - msg.Success = true - msg.Body = listMsg - msg.Text = "file_operate" - - notify := notify.Message{} - notify.Path = "file_operate" - notify.Msg = msg - NotifyMsg <- notify + SocketServer.BroadcastToRoom("/", "public", "file_operate", listMsg) return } @@ -180,16 +122,7 @@ func (i *notifyServer) SendFileOperateNotify(nowSend bool) { model.Data = list listMsg["file_operate"] = model - - msg := gosf.Message{} - msg.Success = true - msg.Body = listMsg - msg.Text = "file_operate" - - notify := notify.Message{} - notify.Path = "file_operate" - notify.Msg = msg - NotifyMsg <- notify + SocketServer.BroadcastToRoom("/", "public", "file_operate", listMsg) } else { for { @@ -246,99 +179,19 @@ func (i *notifyServer) SendFileOperateNotify(nowSend bool) { model.Data = list listMsg["file_operate"] = model - - msg := gosf.Message{} - msg.Success = true - msg.Body = listMsg - msg.Text = "file_operate" - - notify := notify.Message{} - notify.Path = "file_operate" - notify.Msg = msg - NotifyMsg <- notify + SocketServer.BroadcastToRoom("/", "public", "file_operate", listMsg) time.Sleep(time.Second * 3) } } } -func (i *notifyServer) SendMemInfoBySocket(mem map[string]interface{}) { - body := make(map[string]interface{}) - body["data"] = mem - - msg := gosf.Message{} - msg.Body = body - msg.Success = true - msg.Text = "sys_mem" - - notify := notify.Message{} - notify.Path = "sys_mem" - notify.Msg = msg - - NotifyMsg <- notify -} - func (i *notifyServer) SendInstallAppBySocket(app notifyCommon.Application) { - body := make(map[string]interface{}) - body["data"] = app + SocketServer.BroadcastToRoom("/", "public", "app_install", app) - msg := gosf.Message{} - msg.Body = body - msg.Success = true - msg.Text = "app_install" - - notify := notify.Message{} - notify.Path = "app_install" - notify.Msg = msg - - NotifyMsg <- notify -} - -func (i *notifyServer) SendCPUInfoBySocket(cpu map[string]interface{}) { - body := make(map[string]interface{}) - body["data"] = cpu - - msg := gosf.Message{} - msg.Body = body - msg.Success = true - msg.Text = "sys_cpu" - - notify := notify.Message{} - notify.Path = "sys_cpu" - notify.Msg = msg - - NotifyMsg <- notify -} - -func (i *notifyServer) SendNetInfoBySocket(netList []model2.IOCountersStat) { - body := make(map[string]interface{}) - body["data"] = netList - - msg := gosf.Message{} - msg.Body = body - msg.Success = true - msg.Text = "sys_net" - - notify := notify.Message{} - notify.Path = "sys_net" - notify.Msg = msg - - NotifyMsg <- notify } func (i *notifyServer) SendUninstallAppBySocket(app notifyCommon.Application) { - body := make(map[string]interface{}) - body["data"] = app - - msg := gosf.Message{} - msg.Body = body - msg.Success = true - msg.Text = "app_uninstall" - - notify := notify.Message{} - notify.Path = "app_uninstall" - notify.Msg = msg - - NotifyMsg <- notify + SocketServer.BroadcastToRoom("/", "public", "app_uninstall", app) } func (i *notifyServer) SSR() { diff --git a/service/service.go b/service/service.go index a215d565d..12ee71f2d 100644 --- a/service/service.go +++ b/service/service.go @@ -12,15 +12,18 @@ package service import ( "github.com/IceWhaleTech/CasaOS-Common/external" + "github.com/IceWhaleTech/CasaOS-Common/utils/logger" + socketio "github.com/googollee/go-socket.io" "github.com/gorilla/websocket" "github.com/patrickmn/go-cache" + "go.uber.org/zap" "gorm.io/gorm" ) var Cache *cache.Cache var MyService Repository - +var SocketServer *socketio.Server var ( WebSocketConns []*websocket.Conn SocketRun bool @@ -37,7 +40,11 @@ type Repository interface { Gateway() external.ManagementService } -func NewService(db *gorm.DB, RuntimePath string) Repository { +func NewService(db *gorm.DB, RuntimePath string, socket *socketio.Server) Repository { + if socket == nil { + logger.Error("socket is nil", zap.Any("error", "socket is nil")) + } + SocketServer = socket gatewayManagement, err := external.NewManagementService(RuntimePath) if err != nil && len(RuntimePath) > 0 { panic(err) diff --git a/service/system.go b/service/system.go index 80a768c9b..bc3c076a6 100644 --- a/service/system.go +++ b/service/system.go @@ -384,6 +384,7 @@ func (s *systemService) GetCPUPower() map[string]string { } func (s *systemService) SystemReboot() error { + //cmd := exec.Command("/bin/bash", "-c", "reboot") arg := []string{"6"} cmd := exec.Command("init", arg...) _, err := cmd.CombinedOutput()