Skip to content

Commit

Permalink
Changes needed to work with livepeer/labrador
Browse files Browse the repository at this point in the history
  • Loading branch information
darkdarkdragon committed Jan 7, 2020
1 parent f17afdd commit 11dcd97
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 60 deletions.
24 changes: 20 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,16 @@ This command block until interrupted.
It provide these REST endpoints:

---
GET `/stats`
GET `/stats?latencies&base_manifest_id=basemanifestid`

if optional parameter `latencies` present, `/stats` will return raw latencies values
if optional parameter `base_manifest_id` present, then stats will be filtered by that base manifest id

returns object:
```json
{
"rtmp_active_streams": 1,
"rtm_pstreams": 1,
"rtmp_streams": 1,
"media_streams": 1,
"sent_segments": 3,
"downloaded_segments": 7,
Expand All @@ -116,7 +119,16 @@ returns object:
"bytes_downloaded": 2721864,
"success_rate": 77.77777777777779,
"connection_lost": 0,
"finished": false
"start_time": "2020-01-07T21:59:51.008881+02:00",
"finished": false,
"raw_source_latencies": [
79210696,
87020114
],
"raw_transcoded_latencies": [
298718923,
324982522
]
}
```

Expand Down Expand Up @@ -149,6 +161,7 @@ Accepts object:
"simultaneous": 1,
"profiles_num": 2,
"do_not_clear_stats": false,
"measure_latency": true,
"http_ingest": false
}

Expand All @@ -161,7 +174,10 @@ Accepts object:
Returns

```json
{"success": true}
{
"success": true,
"base_manifest_id": "manifest_id_here"
}
```

Can return 404 if request contains wrong parameters.
Expand Down
6 changes: 3 additions & 3 deletions cmd/streamtester/streamtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,15 @@ func main() {
} else {
sr = testers.NewHTTPLoadTester()
}
err = sr.StartStreams(fn, *bhost, *rtmp, mHost, *media, *sim, *repeat, streamDuration, false, *latency, *noBar, 3, 5*time.Second, waitForDur)
_, err = sr.StartStreams(fn, *bhost, *rtmp, mHost, *media, *sim, *repeat, streamDuration, false, *latency, *noBar, 3, 5*time.Second, waitForDur)
if err != nil {
glog.Fatal(err)
}
if *noBar {
go func() {
for {
time.Sleep(25 * time.Second)
fmt.Println(sr.Stats().FormatForConsole())
fmt.Println(sr.Stats("").FormatForConsole())
// fmt.Println(sr.DownStatsFormatted())
}
}()
Expand All @@ -179,7 +179,7 @@ func main() {
<-sr.Done()
time.Sleep(2 * time.Second)
fmt.Println("========= Stats: =========")
stats := sr.Stats()
stats := sr.Stats("")
fmt.Println(stats.FormatForConsole())
// fmt.Println(sr.AnalyzeFormatted(false))
if *latencyThreshold > 0 && stats.TranscodedLatencies.P95 > 0 {
Expand Down
18 changes: 13 additions & 5 deletions internal/model/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type Streamer2 interface {
// Streamer interface
type Streamer interface {
StartStreams(sourceFileName, bhost, rtmpPort, mhost, mediaPort string, simStreams, repeat uint, streamDuration time.Duration,
notFinal, measureLatency, noBar bool, groupStartBy int, startDelayBetweenGroups, waitForTarget time.Duration) error
Stats() *Stats
notFinal, measureLatency, noBar bool, groupStartBy int, startDelayBetweenGroups, waitForTarget time.Duration) (string, error)
Stats(basedManifestID string) *Stats
StatsFormatted() string
// DownStatsFormatted() string
// AnalyzeFormatted(short bool) string
Expand All @@ -53,7 +53,7 @@ type Latencies struct {
// Stats represents global test statistics
type Stats struct {
RTMPActiveStreams int `json:"rtmp_active_streams"` // number of active RTMP streams
RTMPstreams int `json:"rtm_pstreams"` // number of RTMP streams
RTMPstreams int `json:"rtmp_streams"` // number of RTMP streams
MediaStreams int `json:"media_streams"` // number of media streams
TotalSegmentsToSend int `json:"total_segments_to_send"`
SentSegments int `json:"sent_segments"`
Expand All @@ -72,7 +72,8 @@ type Stats struct {
RawTranscodedLatencies []time.Duration `json:"raw_transcoded_latencies"`
WowzaMode bool `json:"wowza_mode"`
Gaps int `json:"gaps"`
Errors map[string]int
StartTime time.Time `json:"start_time"`
Errors map[string]int `json:"errors"`
}

// REST requests
Expand All @@ -93,12 +94,19 @@ type StartStreamsReq struct {
HTTPIngest bool `json:"http_ingest"`
}

// StartStreamsRes start streams response
type StartStreamsRes struct {
Success bool `json:"success"`
BaseManifestID string `json:"base_manifest_id"`
}

// FormatForConsole formats stats to be shown in console
func (st *Stats) FormatForConsole() string {
p := message.NewPrinter(message.MatchLanguage("en"))
r := p.Sprintf(`
Number of RTMP streams: %7d
Number of media streams: %7d
Started ago %7s
Total number of segments to be sent: %7d
Total number of segments sent to broadcaster: %7d
Total number of segments read back: %7d
Expand All @@ -108,7 +116,7 @@ Success rate: %9.5f%%
Lost connection to broadcaster: %7d
Source latencies: %s
Transcoded latencies: %s
Bytes dowloaded: %12d`, st.RTMPstreams, st.MediaStreams, st.TotalSegmentsToSend, st.SentSegments, st.DownloadedSegments,
Bytes dowloaded: %12d`, st.RTMPstreams, st.MediaStreams, time.Now().Sub(st.StartTime), st.TotalSegmentsToSend, st.SentSegments, st.DownloadedSegments,
st.ShouldHaveDownloadedSegments, st.Retries, st.SuccessRate, st.ConnectionLost, st.SourceLatencies.String(), st.TranscodedLatencies.String(), st.BytesDownloaded)
if len(st.Errors) > 0 {
r = fmt.Sprintf("%s\nErrors: %+v\n", r, st.Errors)
Expand Down
42 changes: 30 additions & 12 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,9 @@ func (ss *StreamerServer) StartWebServer(bindAddr string) {
func (ss *StreamerServer) webServerHandlers(bindAddr string) *http.ServeMux {
mux := http.NewServeMux()

mux.HandleFunc("/start_streams", func(w http.ResponseWriter, r *http.Request) {
ss.handleStartStreams(w, r)
})
mux.HandleFunc("/stats", func(w http.ResponseWriter, r *http.Request) {
ss.handleStats(w, r)
})
mux.HandleFunc("/stop", func(w http.ResponseWriter, r *http.Request) {
ss.handleStop(w, r)
})
mux.HandleFunc("/start_streams", ss.handleStartStreams)
mux.HandleFunc("/stats", ss.handleStats)
mux.HandleFunc("/stop", ss.handleStop)
return mux
}

Expand All @@ -80,12 +74,18 @@ func (ss *StreamerServer) handleStats(w http.ResponseWriter, r *http.Request) {
}
w.Header().Set("Content-Type", "application/json")
returnRawLatencies := false
var baseManifestID string
if _, ok := r.URL.Query()["latencies"]; ok {
returnRawLatencies = true
}
if bmids, ok := r.URL.Query()["base_manifest_id"]; ok {
if len(bmids) > 0 {
baseManifestID = bmids[0]
}
}
stats := &model.Stats{}
if ss.streamer != nil {
stats = ss.streamer.Stats()
stats = ss.streamer.Stats(baseManifestID)
}
if !returnRawLatencies {
stats.RawSourceLatencies = nil
Expand Down Expand Up @@ -170,9 +170,27 @@ func (ss *StreamerServer) handleStartStreams(w http.ResponseWriter, r *http.Requ
}
}

ss.streamer.StartStreams(ssr.FileName, ssr.Host, strconv.Itoa(ssr.RTMP), ssr.MHost, strconv.Itoa(ssr.Media), ssr.Simultaneous,
baseManifestID, err := ss.streamer.StartStreams(ssr.FileName, ssr.Host, strconv.Itoa(ssr.RTMP), ssr.MHost, strconv.Itoa(ssr.Media), ssr.Simultaneous,
ssr.Repeat, streamDuration, true, ssr.MeasureLatency, true, 3, 5*time.Second, 0)

if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}

w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"success": true}`))
res, err := json.Marshal(
&model.StartStreamsRes{
Success: true,
BaseManifestID: baseManifestID,
},
)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
w.Write(res)

}
31 changes: 21 additions & 10 deletions internal/testers/http_load_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,25 @@ func (hlt *HTTPLoadTester) Stop() {

// StartStreams start streaming
func (hlt *HTTPLoadTester) StartStreams(sourceFileName, bhost, rtmpPort, ohost, mediaPort string, simStreams, repeat uint, streamDuration time.Duration,
notFinal, measureLatency, noBar bool, groupStartBy int, startDelayBetweenGroups, waitForTarget time.Duration) error {
notFinal, measureLatency, noBar bool, groupStartBy int, startDelayBetweenGroups, waitForTarget time.Duration) (string, error) {

nRtmpPort, err := strconv.Atoi(rtmpPort)
if err != nil {
return err
return "", err
}
nMediaPort, err := strconv.Atoi(mediaPort)
if err != nil {
return err
return "", err
}
showProgress := false
baseManifestID := strings.ReplaceAll(path.Base(sourceFileName), ".", "") + "_" + randName()

go func() {
for i := 0; i < int(repeat); i++ {
if repeat > 1 {
glog.Infof("Starting %d streaming session", i)
}
err := hlt.startStreams(sourceFileName, bhost, nRtmpPort, nMediaPort, simStreams, showProgress, measureLatency,
err := hlt.startStreams(baseManifestID, sourceFileName, i, bhost, nRtmpPort, nMediaPort, simStreams, showProgress, measureLatency,
streamDuration, groupStartBy, startDelayBetweenGroups, waitForTarget)
if err != nil {
glog.Fatal(err)
Expand All @@ -89,34 +90,33 @@ func (hlt *HTTPLoadTester) StartStreams(sourceFileName, bhost, rtmpPort, ohost,
// hlt.Cancel()
// }
}()
return nil
return baseManifestID, nil
}

func (hlt *HTTPLoadTester) startStreams(sourceFileName, host string, nRtmpPort, nMediaPort int, simStreams uint, showProgress,
func (hlt *HTTPLoadTester) startStreams(baseManifestID, sourceFileName string, repeatNum int, host string, nRtmpPort, nMediaPort int, simStreams uint, showProgress,
measureLatency bool, stopAfter time.Duration, groupStartBy int, startDelayBetweenGroups, waitForTarget time.Duration) error {

// fmt.Printf("Starting streaming %s to %s:%d, number of streams is %d\n", sourceFileName, host, nRtmpPort, simStreams)
msg := fmt.Sprintf("Starting HTTP streaming %s to %s:%d, number of streams is %d\n", sourceFileName, host, nRtmpPort, simStreams)
messenger.SendMessage(msg)
fmt.Println(msg)
httpIngestURLTemplate := "http://%s:%d/live/%s"
baseManfistID := strings.ReplaceAll(path.Base(sourceFileName), ".", "") + "_" + randName()
var wg sync.WaitGroup
for i := 0; i < int(simStreams); i++ {
if groupStartBy > 0 && i%groupStartBy == 0 {
startDelayBetweenGroups = 2*time.Second + time.Duration(rand.Intn(4000))*time.Millisecond
glog.Infof("Waiting for %s before starting stream %d", startDelayBetweenGroups, i)
time.Sleep(startDelayBetweenGroups)
}
manifesID := fmt.Sprintf("%s_%d", baseManfistID, i)
manifesID := fmt.Sprintf("%s_%d_%d", baseManifestID, repeatNum, i)
httpIngestURL := fmt.Sprintf(httpIngestURLTemplate, host, nMediaPort, manifesID)
glog.Infof("HTTP ingest: %s", httpIngestURL)
// var bar *uiprogress.Bar
// if showProgress {
// bar = uiprogress.AddBar(totalSegments).AppendCompleted().PrependElapsed()
// }

up := newHTTPtreamer(hlt.ctx, measureLatency)
up := newHTTPtreamer(hlt.ctx, measureLatency, baseManifestID)
wg.Add(1)
go func() {
up.StartUpload(sourceFileName, httpIngestURL, manifesID, 0, waitForTarget, stopAfter)
Expand Down Expand Up @@ -154,7 +154,7 @@ func (hlt *HTTPLoadTester) AnalyzeFormatted(short bool) string {
}

// Stats ...
func (hlt *HTTPLoadTester) Stats() *model.Stats {
func (hlt *HTTPLoadTester) Stats(basedManifestID string) *model.Stats {
stats := &model.Stats{
RTMPstreams: len(hlt.streamers),
MediaStreams: len(hlt.streamers),
Expand All @@ -164,7 +164,15 @@ func (hlt *HTTPLoadTester) Stats() *model.Stats {
}
transcodedLatencies := utils.LatenciesCalculator{}
for _, st := range hlt.streamers {
if basedManifestID != "" && st.baseManifestID != basedManifestID {
continue
}
ds := st.stats()
if stats.StartTime.IsZero() {
stats.StartTime = ds.started
} else if !ds.started.IsZero() && stats.StartTime.After(ds.started) {
stats.StartTime = ds.started
}
stats.SentSegments += ds.triedToSend
stats.DownloadedSegments += ds.success
stats.FailedToDownloadSegments += ds.downloadFailures
Expand All @@ -188,5 +196,8 @@ func (hlt *HTTPLoadTester) Stats() *model.Stats {
if stats.SentSegments > 0 {
stats.SuccessRate = float64(stats.DownloadedSegments) / (float64(model.ProfilesNum) * float64(stats.SentSegments)) * 100
}
stats.ShouldHaveDownloadedSegments = model.ProfilesNum * stats.SentSegments
stats.ProfilesNum = model.ProfilesNum
stats.RawTranscodedLatencies = transcodedLatencies.Raw()
return stats
}
19 changes: 13 additions & 6 deletions internal/testers/http_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ type Uploader interface {
// then it reads resutls back, check if video segments are
// parseable, and calculates transcode latecies and succes rate
type httpStreamer struct {
ctx context.Context
saveLatencies bool
dstats httpStats
mu sync.RWMutex
ctx context.Context
baseManifestID string
saveLatencies bool
dstats httpStats
mu sync.RWMutex
}

type httpStats struct {
Expand All @@ -50,10 +51,11 @@ type httpStats struct {
errors map[string]int
latencies []time.Duration
finished bool
started time.Time
}

func newHTTPtreamer(ctx context.Context, saveLatencies bool) *httpStreamer {
hs := &httpStreamer{ctx: ctx, saveLatencies: saveLatencies}
func newHTTPtreamer(ctx context.Context, saveLatencies bool, baseManifestID string) *httpStreamer {
hs := &httpStreamer{ctx: ctx, saveLatencies: saveLatencies, baseManifestID: baseManifestID}
hs.dstats.errors = make(map[string]int)
return hs
}
Expand Down Expand Up @@ -96,6 +98,11 @@ outloop:
}

func (hs *httpStreamer) pushSegment(httpURL, manifestID string, seg *hlsSegment) {
hs.mu.Lock()
if hs.dstats.started.IsZero() {
hs.dstats.started = time.Now()
}
hs.mu.Unlock()
urlToUp := fmt.Sprintf("%s/%d.ts", httpURL, seg.seqNo)
glog.V(model.SHORT).Infof("Got segment manifes %s seqNo %d pts %s dur %s bytes %d from segmenter, uploading to %s", manifestID, seg.seqNo, seg.pts, seg.duration, len(seg.data), urlToUp)
var body io.Reader
Expand Down
6 changes: 5 additions & 1 deletion internal/testers/rtmp_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var segLen = 2 * time.Second

// rtmpStreamer streams one video file to RTMP server
type rtmpStreamer struct {
baseManifestID string
ingestURL string
counter *segmentsCounter
skippedSegments int
Expand All @@ -32,16 +33,18 @@ type rtmpStreamer struct {
wowzaMode bool
segmentsMatcher *segmentsMatcher
hasBar bool
started time.Time
}

// source is local file name for now
func newRtmpStreamer(ingestURL, source string, sentTimesMap *utils.SyncedTimesMap, bar *uiprogress.Bar, done chan struct{}, wowzaMode bool, sm *segmentsMatcher) *rtmpStreamer {
func newRtmpStreamer(ingestURL, source, baseManifestID string, sentTimesMap *utils.SyncedTimesMap, bar *uiprogress.Bar, done chan struct{}, wowzaMode bool, sm *segmentsMatcher) *rtmpStreamer {
return &rtmpStreamer{
done: done,
wowzaMode: wowzaMode,
ingestURL: ingestURL,
counter: newSegmentsCounter(segLen, bar, false, sentTimesMap),
hasBar: bar != nil,
baseManifestID: baseManifestID,
segmentsMatcher: sm,
skippedSegments: 1, // Broadcaster always skips first segment, but can skip more - this will be corrected when first
// segment downloaded back
Expand Down Expand Up @@ -154,6 +157,7 @@ func (rs *rtmpStreamer) StartUpload(fn, rtmpURL string, segmentsToStream int, wa
// conn, err := rtmp.Dial("rtmp://localhost:1935/" + manifestID)
// conn, err := rtmp.Dial(rtmpURL)
started := time.Now()
rs.started = started
for {
conn, err = rtmp.DialTimeout(rtmpURL, 4*time.Second)
if err != nil {
Expand Down
Loading

0 comments on commit 11dcd97

Please sign in to comment.