From ec1d82e6ea16a13abaf811e5b1747c99a30e9b5e Mon Sep 17 00:00:00 2001 From: Kittisak Phormraksa Date: Sat, 22 Jun 2024 09:13:15 +0700 Subject: [PATCH] Control RTSP stream speed (#11) Control RTSP speed --- internal/streams/custom.go | 58 +++++++++++++++++++++++++++++++++++++ internal/streams/streams.go | 3 ++ pkg/core/connection.go | 2 ++ pkg/rtsp/client.go | 5 ++++ 4 files changed, 68 insertions(+) create mode 100644 internal/streams/custom.go diff --git a/internal/streams/custom.go b/internal/streams/custom.go new file mode 100644 index 00000000..8233875e --- /dev/null +++ b/internal/streams/custom.go @@ -0,0 +1,58 @@ +package streams + +import ( + "net/http" + "strconv" + + "github.com/AlexxIT/go2rtc/pkg/rtsp" +) + +// only support RTSP sources +func apiStreamsSpeed(w http.ResponseWriter, r *http.Request) { + query := r.URL.Query() + streamName := query.Get("name") + + if streamName == "" { + http.Error(w, "name required", http.StatusBadRequest) + return + } + + switch r.Method { + case "PUT": + streamsMu.RLock() + stream := Get(streamName) + defer streamsMu.RUnlock() + + if stream == nil { + http.Error(w, "", http.StatusNotFound) + return + } + + speedStr := query.Get("speed") + if speedStr == "" { + http.Error(w, "speed required", http.StatusBadRequest) + return + } + + _, err := strconv.ParseFloat(speedStr, 64) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + for _, producer := range stream.producers { + if conn, ok := producer.conn.(*rtsp.Conn); ok { + conn.Connection.Speed = speedStr + err := conn.Play() + if err != nil { + log.Error().Msgf("[stream] conn.Play(): %+v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + } + } + } + + return + } + + http.Error(w, "", http.StatusNotFound) +} diff --git a/internal/streams/streams.go b/internal/streams/streams.go index 737c9a96..a772a02d 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -29,6 +29,9 @@ func Init() { api.HandleFunc("api/streams", apiStreams) api.HandleFunc("api/streams.dot", apiStreamsDOT) + // custom + api.HandleFunc("api/custom/streams/speed", apiStreamsSpeed) + if cfg.Publish == nil { return } diff --git a/pkg/core/connection.go b/pkg/core/connection.go index ba06a8b1..9fd7bb06 100644 --- a/pkg/core/connection.go +++ b/pkg/core/connection.go @@ -50,8 +50,10 @@ type Connection struct { Transport any `json:"-"` + // custom stopBitrateWorker chan struct{} `json:"-"` Bitrate int `json:"bitrate,omitempty"` // bytes per second + Speed string `json:"speed,omitempty"` // empty string indicates normal speed } func (c *Connection) GetMedias() []*Media { diff --git a/pkg/rtsp/client.go b/pkg/rtsp/client.go index 352c00a1..c10b5eae 100644 --- a/pkg/rtsp/client.go +++ b/pkg/rtsp/client.go @@ -313,6 +313,11 @@ func (c *Conn) SetupMedia(media *core.Media) (byte, error) { func (c *Conn) Play() (err error) { req := &tcp.Request{Method: MethodPlay, URL: c.URL} + if c.Speed != "" { + req.Header = map[string][]string{ + "Scale": {c.Speed}, + } + } return c.WriteRequest(req) }