Skip to content

Commit

Permalink
Merge pull request #28 from tcriess/feature-websocket-hub
Browse files Browse the repository at this point in the history
Feature websocket hub
  • Loading branch information
GRVYDEV authored Jan 16, 2021
2 parents b89fd86 + b240b95 commit 2d62791
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 111 deletions.
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,9 @@
*.out

# Dependency directories (remove the comment below to include it)
# vendor/
vendor/

# ide
/.idea/
# actual binary
/lightspeed-webrtc
149 changes: 39 additions & 110 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,42 @@ package main

import (
"encoding/json"
"flag"
"fmt"
"log"
"net"
"net/http"
"strconv"
"strings"
"sync"

"flag"

"github.com/GRVYDEV/lightspeed-webrtc/ws"
"github.com/gorilla/websocket"

"github.com/pion/interceptor"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media/samplebuilder"
)

var (
videoBuilder *samplebuilder.SampleBuilder
addr = flag.String("addr", "localhost", "http service address")
ip = flag.String("ip", "none", "IP address for webrtc")
wsPort = flag.Int("ws-port", 8080, "Port for websocket")
rtpPort = flag.Int("rtp-port", 65535, "Port for RTP")
ports = flag.String("ports", "20000-20500", "Port range for webrtc")
upgrader = websocket.Upgrader{
addr = flag.String("addr", "localhost", "http service address")
ip = flag.String("ip", "none", "IP address for webrtc")
wsPort = flag.Int("ws-port", 8080, "Port for websocket")
rtpPort = flag.Int("rtp-port", 65535, "Port for RTP")
ports = flag.String("ports", "20000-20500", "Port range for webrtc")
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}

videoTrack *webrtc.TrackLocalStaticRTP

audioTrack *webrtc.TrackLocalStaticRTP

// lock for peerConnections and trackLocals
listLock sync.RWMutex
peerConnections []peerConnectionState
trackLocals map[string]*webrtc.TrackLocalStaticRTP
hub *ws.Hub
)

type websocketMessage struct {
Event string `json:"event"`
Data string `json:"data"`
}

type peerConnectionState struct {
peerConnection *webrtc.PeerConnection
websocket *threadSafeWriter
}

func main() {
flag.Parse()
log.SetFlags(0)
trackLocals = map[string]*webrtc.TrackLocalStaticRTP{}

// Open a UDP Listener for RTP Packets on port 65535
listener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP(*addr), Port: *rtpPort})
Expand All @@ -83,6 +66,9 @@ func main() {
panic(err)
}

hub = ws.NewHub()
go hub.Run()

// start HTTP server
go func() {
http.HandleFunc("/websocket", websocketHandler)
Expand Down Expand Up @@ -162,34 +148,18 @@ func createWebrtcApi() *webrtc.API {
return webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i), webrtc.WithSettingEngine(s))
}

func cleanConnection(peerConnection *webrtc.PeerConnection) {
listLock.Lock()
defer listLock.Unlock()

for i := range peerConnections {
if peerConnection == peerConnections[i].peerConnection {
peerConnections[i] = peerConnections[len(peerConnections)-1]
peerConnections[len(peerConnections)-1] = peerConnectionState{}
peerConnections = peerConnections[:len(peerConnections)-1]
return
}
}
}

// Handle incoming websockets
func websocketHandler(w http.ResponseWriter, r *http.Request) {

// Upgrade HTTP request to Websocket
unsafeConn, err := upgrader.Upgrade(w, r, nil)
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
return
}

c := &threadSafeWriter{unsafeConn, sync.Mutex{}}

// When this frame returns close the Websocket
defer c.Close() //nolint
defer conn.Close() //nolint

// Create API that takes IP and port range into account
api := createWebrtcApi()
Expand All @@ -206,12 +176,12 @@ func websocketHandler(w http.ResponseWriter, r *http.Request) {

// Accept one audio and one video track Outgoing
transceiverVideo, err := peerConnection.AddTransceiverFromTrack(videoTrack,
webrtc.RtpTransceiverInit{
webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly,
},
)
transceiverAudio, err := peerConnection.AddTransceiverFromTrack(audioTrack,
webrtc.RtpTransceiverInit{
webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly,
},
)
Expand All @@ -231,11 +201,12 @@ func websocketHandler(w http.ResponseWriter, r *http.Request) {
}
}()

// Add our new PeerConnection to global list
listLock.Lock()
peerConnections = append(peerConnections, peerConnectionState{peerConnection, c})
fmt.Printf("Connections: %d\n", len(peerConnections))
listLock.Unlock()
c := ws.NewClient(hub, conn, peerConnection)

go c.WriteLoop()

// Add to the hub
hub.Register <- c

// Trickle ICE. Emit server candidate to client
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
Expand All @@ -249,11 +220,13 @@ func websocketHandler(w http.ResponseWriter, r *http.Request) {
return
}

if writeErr := c.WriteJSON(&websocketMessage{
Event: "candidate",
if msg, err := json.Marshal(ws.WebsocketMessage{
Event: ws.MessageTypeCandidate,
Data: string(candidateString),
}); writeErr != nil {
log.Println(writeErr)
}); err == nil {
c.Send <- msg
} else {
log.Println(err)
}
})

Expand All @@ -264,9 +237,10 @@ func websocketHandler(w http.ResponseWriter, r *http.Request) {
if err := peerConnection.Close(); err != nil {
log.Print(err)
}
case webrtc.PeerConnectionStateClosed:
cleanConnection(peerConnection)
hub.Unregister <- c

case webrtc.PeerConnectionStateClosed:
hub.Unregister <- c
}
})

Expand All @@ -284,61 +258,16 @@ func websocketHandler(w http.ResponseWriter, r *http.Request) {
log.Print(err)
}

if err = c.WriteJSON(&websocketMessage{
Event: "offer",
if msg, err := json.Marshal(ws.WebsocketMessage{
Event: ws.MessageTypeOffer,
Data: string(offerString),
}); err != nil {
log.Print(err)
}); err == nil {
c.Send <- msg
} else {
log.Printf("could not marshal ws message: %s", err)
}

message := &websocketMessage{}
for {
_, raw, err := c.ReadMessage()
if err != nil {
log.Println(err)
return
} else if err := json.Unmarshal(raw, &message); err != nil {
log.Println(err)
return
}

switch message.Event {
case "candidate":

candidate := webrtc.ICECandidateInit{}
if err := json.Unmarshal([]byte(message.Data), &candidate); err != nil {
log.Println(err)
return
}

if err := peerConnection.AddICECandidate(candidate); err != nil {
log.Println(err)
return
}
case "answer":

answer := webrtc.SessionDescription{}
if err := json.Unmarshal([]byte(message.Data), &answer); err != nil {
log.Println(err)
return
}

if err := peerConnection.SetRemoteDescription(answer); err != nil {
log.Println(err)
return
}
}
}
}

type threadSafeWriter struct {
*websocket.Conn
sync.Mutex
}

func (t *threadSafeWriter) WriteJSON(v interface{}) error {
t.Lock()
defer t.Unlock()
go hub.SendInfo(hub.GetInfo()) // non-blocking broadcast, required as the read loop is not started yet.

return t.Conn.WriteJSON(v)
c.ReadLoop()
}
Loading

0 comments on commit 2d62791

Please sign in to comment.