From 23e0882d3f8ae50e9dc06060cf74e42cfa3ce361 Mon Sep 17 00:00:00 2001 From: Kittisak Phormraksa Date: Tue, 30 Apr 2024 15:53:53 +0700 Subject: [PATCH] Add bitrate to webrtc/mse/mp4 consumer info (#5) --- Makefile | 2 ++ pkg/core/core.go | 2 ++ pkg/custom/custom.go | 21 +++++++++++++++++++++ pkg/isapi/client.go | 9 ++++++++- pkg/isapi/consumer.go | 8 +++++--- pkg/isapi/custom.go | 8 ++++++++ pkg/mp4/consumer.go | 4 ++++ pkg/mp4/custom.go | 8 ++++++++ pkg/webrtc/conn.go | 6 ++++++ pkg/webrtc/consumer.go | 1 + pkg/webrtc/custom.go | 10 ++++++++++ 11 files changed, 75 insertions(+), 4 deletions(-) create mode 100644 Makefile create mode 100644 pkg/custom/custom.go create mode 100644 pkg/isapi/custom.go create mode 100644 pkg/mp4/custom.go create mode 100644 pkg/webrtc/custom.go diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..b6198160 --- /dev/null +++ b/Makefile @@ -0,0 +1,2 @@ +dev: + goreload . \ No newline at end of file diff --git a/pkg/core/core.go b/pkg/core/core.go index 146533e3..2d5885d6 100644 --- a/pkg/core/core.go +++ b/pkg/core/core.go @@ -100,6 +100,7 @@ type Info struct { Senders []*Sender `json:"senders,omitempty"` Recv int `json:"recv,omitempty"` Send int `json:"send,omitempty"` + Bitrate int `json:"bitrate,omitempty"` // bytes per second } const ( @@ -147,6 +148,7 @@ type SuperConsumer struct { Medias []*Media `json:"medias,omitempty"` Senders []*Sender `json:"senders,omitempty"` Send int `json:"send,omitempty"` + Bitrate int `json:"bitrate,omitempty"` // bytes per second } func (s *SuperConsumer) GetMedias() []*Media { diff --git a/pkg/custom/custom.go b/pkg/custom/custom.go new file mode 100644 index 00000000..8d72150e --- /dev/null +++ b/pkg/custom/custom.go @@ -0,0 +1,21 @@ +package custom + +import ( + "time" +) + +func StartBitrateWorker(send, bitrate *int, stop chan struct{}) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + prevSendBytes := *send + for { + select { + case <-ticker.C: + *bitrate = *send - prevSendBytes + prevSendBytes = *send + case <-stop: + return + } + } +} diff --git a/pkg/isapi/client.go b/pkg/isapi/client.go index e5dfafd4..ff25a79f 100644 --- a/pkg/isapi/client.go +++ b/pkg/isapi/client.go @@ -21,6 +21,9 @@ type Client struct { medias []*core.Media sender *core.Sender send int + + bitrate int // bytes per second + stopBitrateWorker chan struct{} } func NewClient(rawURL string) (*Client, error) { @@ -33,7 +36,9 @@ func NewClient(rawURL string) (*Client, error) { u.Scheme = "http" u.Path = "" - return &Client{url: u.String()}, nil + c := &Client{url: u.String()} + c.startBitrateWorker() + return c, nil } func (c *Client) Dial() (err error) { @@ -145,6 +150,8 @@ func (c *Client) Close() (err error) { tcp.Close(res) + c.stopBitrateWorker <- struct{}{} + return nil } diff --git a/pkg/isapi/consumer.go b/pkg/isapi/consumer.go index c7b51c9d..56aa1f8c 100644 --- a/pkg/isapi/consumer.go +++ b/pkg/isapi/consumer.go @@ -2,6 +2,7 @@ package isapi import ( "encoding/json" + "github.com/AlexxIT/go2rtc/pkg/core" "github.com/pion/rtp" ) @@ -52,9 +53,10 @@ func (c *Client) Stop() (err error) { func (c *Client) MarshalJSON() ([]byte, error) { info := &core.Info{ - Type: "ISAPI active consumer", - Medias: c.medias, - Send: c.send, + Type: "ISAPI active consumer", + Medias: c.medias, + Send: c.send, + Bitrate: c.bitrate, } if c.sender != nil { info.Senders = []*core.Sender{c.sender} diff --git a/pkg/isapi/custom.go b/pkg/isapi/custom.go new file mode 100644 index 00000000..b6a2e29b --- /dev/null +++ b/pkg/isapi/custom.go @@ -0,0 +1,8 @@ +package isapi + +import "github.com/AlexxIT/go2rtc/pkg/custom" + +func (c *Client) startBitrateWorker() { + c.stopBitrateWorker = make(chan struct{}) + go custom.StartBitrateWorker(&c.send, &c.bitrate, c.stopBitrateWorker) +} diff --git a/pkg/mp4/consumer.go b/pkg/mp4/consumer.go index 83b2d2e3..a21138e1 100644 --- a/pkg/mp4/consumer.go +++ b/pkg/mp4/consumer.go @@ -23,6 +23,8 @@ type Consumer struct { Rotate int `json:"-"` ScaleX int `json:"-"` ScaleY int `json:"-"` + + stopBitrateWorker chan struct{} } func NewConsumer(medias []*core.Media) *Consumer { @@ -52,6 +54,7 @@ func NewConsumer(medias []*core.Media) *Consumer { wr: core.NewWriteBuffer(nil), } cons.Medias = medias + cons.startBitrateWorker() return cons } @@ -185,5 +188,6 @@ func (c *Consumer) WriteTo(wr io.Writer) (int64, error) { func (c *Consumer) Stop() error { _ = c.SuperConsumer.Close() + c.stopBitrateWorker <- struct{}{} return c.wr.Close() } diff --git a/pkg/mp4/custom.go b/pkg/mp4/custom.go new file mode 100644 index 00000000..6b1ca355 --- /dev/null +++ b/pkg/mp4/custom.go @@ -0,0 +1,8 @@ +package mp4 + +import "github.com/AlexxIT/go2rtc/pkg/custom" + +func (c *Consumer) startBitrateWorker() { + c.stopBitrateWorker = make(chan struct{}) + go custom.StartBitrateWorker(&c.SuperConsumer.Send, &c.SuperConsumer.Bitrate, c.stopBitrateWorker) +} diff --git a/pkg/webrtc/conn.go b/pkg/webrtc/conn.go index 64835353..b9f8baf7 100644 --- a/pkg/webrtc/conn.go +++ b/pkg/webrtc/conn.go @@ -28,6 +28,9 @@ type Conn struct { offer string remote string closed core.Waiter + + bitrate int // bytes per second + stopBitrateWorker chan struct{} } func NewConn(pc *webrtc.PeerConnection) *Conn { @@ -127,11 +130,14 @@ func NewConn(pc *webrtc.PeerConnection) *Conn { } }) + c.startBitrateWorker() + return c } func (c *Conn) Close() error { c.closed.Done(nil) + c.stopBitrateWorker <- struct{}{} return c.pc.Close() } diff --git a/pkg/webrtc/consumer.go b/pkg/webrtc/consumer.go index 784b93fe..f86268ee 100644 --- a/pkg/webrtc/consumer.go +++ b/pkg/webrtc/consumer.go @@ -93,6 +93,7 @@ func (c *Conn) MarshalJSON() ([]byte, error) { Senders: c.senders, Recv: c.recv, Send: c.send, + Bitrate: c.bitrate, } return json.Marshal(info) } diff --git a/pkg/webrtc/custom.go b/pkg/webrtc/custom.go new file mode 100644 index 00000000..16c94b20 --- /dev/null +++ b/pkg/webrtc/custom.go @@ -0,0 +1,10 @@ +package webrtc + +import ( + "github.com/AlexxIT/go2rtc/pkg/custom" +) + +func (c *Conn) startBitrateWorker() { + c.stopBitrateWorker = make(chan struct{}) + go custom.StartBitrateWorker(&c.send, &c.bitrate, c.stopBitrateWorker) +}