Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify remote to stop publishing when last local subscriber is closed. #860

Merged
merged 5 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions janus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ type dummyGatewayListener struct {
func (l *dummyGatewayListener) ConnectionInterrupted() {
}

type JanusGatewayInterface interface {
Info(context.Context) (*InfoMsg, error)
Create(context.Context) (*JanusSession, error)
Close() error

send(map[string]interface{}, *transaction) (uint64, error)
removeTransaction(uint64)

removeSession(*JanusSession)
}

// Gateway represents a connection to an instance of the Janus Gateway.
type JanusGateway struct {
listener GatewayListener
Expand Down Expand Up @@ -560,12 +571,18 @@ func (gateway *JanusGateway) Create(ctx context.Context) (*JanusSession, error)

// Store this session
gateway.Lock()
defer gateway.Unlock()
gateway.Sessions[session.Id] = session
gateway.Unlock()

return session, nil
}

func (gateway *JanusGateway) removeSession(session *JanusSession) {
gateway.Lock()
defer gateway.Unlock()
delete(gateway.Sessions, session.Id)
}

// Session represents a session instance on the Janus Gateway.
type JanusSession struct {
// Id is the session_id of this session
Expand All @@ -578,7 +595,7 @@ type JanusSession struct {
// and Session.Unlock() methods provided by the embedded sync.Mutex.
sync.Mutex

gateway *JanusGateway
gateway JanusGatewayInterface
}

func (session *JanusSession) send(msg map[string]interface{}, t *transaction) (uint64, error) {
Expand Down Expand Up @@ -670,9 +687,7 @@ func (session *JanusSession) Destroy(ctx context.Context) (*janus.AckMsg, error)
}

// Remove this session from the gateway
session.gateway.Lock()
delete(session.gateway.Sessions, session.Id)
session.gateway.Unlock()
session.gateway.removeSession(session)

return ack, nil
}
Expand Down
3 changes: 2 additions & 1 deletion mcu_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ type RemotePublisherController interface {
PublisherId() string

StartPublishing(ctx context.Context, publisher McuRemotePublisherProperties) error
StopPublishing(ctx context.Context, publisher McuRemotePublisherProperties) error
GetStreams(ctx context.Context) ([]PublisherStream, error)
}

Expand Down Expand Up @@ -214,7 +215,7 @@ type McuPublisher interface {

GetStreams(ctx context.Context) ([]PublisherStream, error)
PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error
UnpublishRemote(ctx context.Context, remoteId string) error
UnpublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error
}

type McuSubscriber interface {
Expand Down
29 changes: 24 additions & 5 deletions mcu_janus.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func convertIntValue(value interface{}) (uint64, error) {
return uint64(t), nil
case uint64:
return t, nil
case int:
if t < 0 {
return 0, fmt.Errorf("Unsupported int number: %+v", t)
}
return uint64(t), nil
case int64:
if t < 0 {
return 0, fmt.Errorf("Unsupported int64 number: %+v", t)
Expand All @@ -92,7 +97,7 @@ func convertIntValue(value interface{}) (uint64, error) {
}
return uint64(r), nil
default:
return 0, fmt.Errorf("Unknown number type: %+v", t)
return 0, fmt.Errorf("Unknown number type: %+v (%T)", t, t)
}
}

Expand Down Expand Up @@ -170,7 +175,9 @@ type mcuJanus struct {

settings McuSettings

gw *JanusGateway
createJanusGateway func(ctx context.Context, wsURL string, listener GatewayListener) (JanusGatewayInterface, error)

gw JanusGatewayInterface
session *JanusSession
handle *JanusHandle

Expand Down Expand Up @@ -213,6 +220,9 @@ func NewMcuJanus(ctx context.Context, url string, config *goconf.ConfigFile) (Mc
publishers: make(map[string]*mcuJanusPublisher),
remotePublishers: make(map[string]*mcuJanusRemotePublisher),

createJanusGateway: func(ctx context.Context, wsURL string, listener GatewayListener) (JanusGatewayInterface, error) {
return NewJanusGateway(ctx, wsURL, listener)
},
reconnectInterval: initialReconnectInterval,
}
mcu.onConnected.Store(emptyOnConnected)
Expand All @@ -222,8 +232,10 @@ func NewMcuJanus(ctx context.Context, url string, config *goconf.ConfigFile) (Mc
mcu.doReconnect(context.Background())
})
mcu.reconnectTimer.Stop()
if err := mcu.reconnect(ctx); err != nil {
return nil, err
if mcu.url != "" {
if err := mcu.reconnect(ctx); err != nil {
return nil, err
}
}
return mcu, nil
}
Expand Down Expand Up @@ -252,7 +264,7 @@ func (m *mcuJanus) disconnect() {

func (m *mcuJanus) reconnect(ctx context.Context) error {
m.disconnect()
gw, err := NewJanusGateway(ctx, m.url, m)
gw, err := m.createJanusGateway(ctx, m.url, m)
if err != nil {
return err
}
Expand Down Expand Up @@ -317,6 +329,11 @@ func (m *mcuJanus) hasRemotePublisher() bool {
}

func (m *mcuJanus) Start(ctx context.Context) error {
if m.url == "" {
if err := m.reconnect(ctx); err != nil {
return err
}
}
info, err := m.gw.Info(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -785,6 +802,8 @@ func (m *mcuJanus) getOrCreateRemotePublisher(ctx context.Context, controller Re
settings: settings,
},

controller: controller,

port: int(port),
rtcpPort: int(rtcp_port),
}
Expand Down
10 changes: 5 additions & 5 deletions mcu_janus_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,16 +380,16 @@ func (p *mcuJanusPublisher) GetStreams(ctx context.Context) ([]PublisherStream,
return streams, nil
}

func getPublisherRemoteId(id string, remoteId string) string {
return fmt.Sprintf("%s@%s", id, remoteId)
func getPublisherRemoteId(id string, remoteId string, hostname string, port int, rtcpPort int) string {
return fmt.Sprintf("%s-%s@%s:%d:%d", id, remoteId, hostname, port, rtcpPort)
}

func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
msg := map[string]interface{}{
"request": "publish_remotely",
"room": p.roomId,
"publisher_id": streamTypeUserIds[p.streamType],
"remote_id": getPublisherRemoteId(p.id, remoteId),
"remote_id": getPublisherRemoteId(p.id, remoteId, hostname, port, rtcpPort),
"host": hostname,
"port": port,
"rtcp_port": rtcpPort,
Expand Down Expand Up @@ -421,12 +421,12 @@ func (p *mcuJanusPublisher) PublishRemote(ctx context.Context, remoteId string,
return nil
}

func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, remoteId string) error {
func (p *mcuJanusPublisher) UnpublishRemote(ctx context.Context, remoteId string, hostname string, port int, rtcpPort int) error {
msg := map[string]interface{}{
"request": "unpublish_remotely",
"room": p.roomId,
"publisher_id": streamTypeUserIds[p.streamType],
"remote_id": getPublisherRemoteId(p.id, remoteId),
"remote_id": getPublisherRemoteId(p.id, remoteId, hostname, port, rtcpPort),
}
response, err := p.handle.Request(ctx, msg)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions mcu_janus_remote_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type mcuJanusRemotePublisher struct {

ref atomic.Int64

controller RemotePublisherController

port int
rtcpPort int
}
Expand Down Expand Up @@ -116,6 +118,10 @@ func (p *mcuJanusRemotePublisher) Close(ctx context.Context) {
return
}

if err := p.controller.StopPublishing(ctx, p); err != nil {
log.Printf("Error stopping remote publisher %s in room %d: %s", p.id, p.roomId, err)
}

p.mu.Lock()
if handle := p.handle; handle != nil {
response, err := p.handle.Request(ctx, map[string]interface{}{
Expand Down
Loading
Loading