Skip to content

Commit

Permalink
Add ability to pull arbitrary HLS stream and run same checks as in in…
Browse files Browse the repository at this point in the history
…finite stream testing mode.
  • Loading branch information
darkdarkdragon committed Dec 17, 2019
1 parent 03eeb9d commit 5f6e175
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 7 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Params:
- `-time` Time to stream streams (40s, 4m, 24h45m). Not compatible with repeat option

### Infinite stream testing mode
In this mode streaming is stopped only on error, so it will be infinite if transcoding is done ideally.
In this mode Stream Tester streams video to RTMP ingest point and read HLS stream back. Streaming is stopped only on error, so it will be infinite if transcoding is done ideally.

Errors can be reported to Discord.

Expand Down Expand Up @@ -71,6 +71,13 @@ Params:
- `gsbucket` Google Storage bucket (to store segments that was not successfully parsed)
- `gskey` Google Storage private key (in json format (actual key, not file name))

### Infinite HLS pull testing mode
In this mode Stream Tester pulls arbitrary HLS stream and runs same checks as in previous mode.
To use it specify `-media-url` without specifying `-rtmp-url`.




### Saving arbitrary stream to file

Running
Expand Down
9 changes: 8 additions & 1 deletion cmd/streamtester/streamtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func main() {
if len(flag.Args()) > 0 {
fn = flag.Arg(0)
}
model.ProfilesNum = *profiles
var err error
var waitForDur time.Duration
if *waitForTarget != "" {
Expand All @@ -100,6 +101,13 @@ func main() {
testers.IgnoreNoCodecError = *ignoreNoCodecError
testers.IgnoreGaps = *ignoreGaps
testers.IgnoreTimeDrift = *ignoreTimeDrift
if *mediaURL != "" && *rtmpURL == "" {
msg := fmt.Sprintf(`Starting infinite stream to %s`, *mediaURL)
messenger.SendMessage(msg)
sr2 := testers.NewStreamer2(*wowza)
sr2.StartPulling(*mediaURL)
return
}
if *rtmpURL != "" {
if *mediaURL == "" {
glog.Fatal("Should also specifiy -media-url")
Expand Down Expand Up @@ -129,7 +137,6 @@ func main() {
glog.Infof("Starting stream tester, file %s number of streams is %d, repeat %d times no bar %v", fn, *sim, *repeat, *noBar)

defer glog.Infof("Exiting")
model.ProfilesNum = *profiles
sr := testers.NewStreamer(*wowza)
// err = sr.StartStreams(fn, *host, *rtmp, *media, *sim, *repeat, streamDuration, *noBar, *latency, 3, 5*time.Second)
err = sr.StartStreams(fn, *host, *rtmp, *media, *sim, *repeat, streamDuration, false, *latency, *noBar, 3, 5*time.Second, waitForDur)
Expand Down
2 changes: 2 additions & 0 deletions internal/model/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type InfinitePuller interface {
// Streamer2 interface
type Streamer2 interface {
StartStreaming(sourceFileName string, rtmpIngestURL, mediaURL string, waitForTarget time.Duration)
// StartPulling pull arbitrary HLS stream and report found errors
StartPulling(mediaURL string)
}

// Streamer interface
Expand Down
22 changes: 17 additions & 5 deletions internal/testers/m3utester2.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,15 +460,22 @@ func (ms *m3uMediaStream) workerLoop(masterDR chan *downloadResult, latencyResul
return
}
}
latency, speedRatio, merr := ms.segmentsMatcher.matchSegment(dres.startTime, dres.duration, dres.downloadCompetedAt)
var latency time.Duration
var speedRatio float64
var merr error
if ms.segmentsMatcher != nil {
latency, speedRatio, merr = ms.segmentsMatcher.matchSegment(dres.startTime, dres.duration, dres.downloadCompetedAt)
}
glog.Infof(`%s seqNo %4d latency is %s speedRatio is %v`, dres.resolution, dres.seqNo, latency, speedRatio)
if merr != nil {
glog.Infof("downloaded: %+v", dres)
messenger.SendFatalMessage(merr.Error())
panic(merr)
continue
}
latencyResults <- &latencyResult{name: dres.name, resolution: ms.resolution, seqNo: dres.seqNo, latency: latency, speedRatio: speedRatio}
if ms.segmentsMatcher != nil {
latencyResults <- &latencyResult{name: dres.name, resolution: ms.resolution, seqNo: dres.seqNo, latency: latency, speedRatio: speedRatio}
}
// masterDR <- dres
results = append(results, dres)
sort.Sort(results)
Expand All @@ -483,7 +490,7 @@ func (ms *m3uMediaStream) workerLoop(masterDR chan *downloadResult, latencyResul
for i, r := range results {
problem = ""
tillNext = 0
if i < len(results)-1 {
if i < len(results)-2 {
ns := results[i+1]
tillNext = ns.startTime - r.startTime
if tillNext > 0 && !isTimeEqualM(r.duration, tillNext) {
Expand All @@ -499,7 +506,7 @@ func (ms *m3uMediaStream) workerLoop(masterDR chan *downloadResult, latencyResul
// ms.fatalEnd(msg)
// return
// }
if problem != "" && i < len(results)-6 && fatalProblem == "" {
if problem != "" && i < len(results)-6 {
fatalProblem = msg
// break
}
Expand Down Expand Up @@ -588,6 +595,7 @@ func (ms *m3uMediaStream) manifestPullerLoop(wowzaMode bool) {
// glog.Infof("Got media playlist %s with %d (really %d) segments of url %s:", ms.resolution, len(pl.Segments), countSegments(pl), surl)
// glog.Info(pl)
now := time.Now()
var lastTimeDownloadStarted time.Time
for i, segment := range pl.Segments {
if segment != nil {
// glog.Infof("Segment: %+v", *segment)
Expand All @@ -596,7 +604,7 @@ func (ms *m3uMediaStream) manifestPullerLoop(wowzaMode bool) {
segment.URI = wowzaSessionRE.ReplaceAllString(segment.URI, "_")
}
if i == 0 && !seenAtFirst.Contains(segment.URI) && seen.Contains(segment.URI) {
glog.Infof("===> segment at first place %s (%s) seq %d", segment.URI, ms.resolution, pl.SeqNo)
glog.V(model.DEBUG).Infof("===> segment at first place %s (%s) seq %d", segment.URI, ms.resolution, pl.SeqNo)
ms.downloadResults <- &downloadResult{timeAtFirstPlace: now, name: segment.URI, seqNo: pl.SeqNo, status: "200 OK"}
seenAtFirst.Add(segment.URI)
continue
Expand All @@ -606,8 +614,12 @@ func (ms *m3uMediaStream) manifestPullerLoop(wowzaMode bool) {
}
seen.Add(segment.URI)
lastTimeNewSegmentSeen = time.Now()
if time.Since(lastTimeDownloadStarted) < 50*time.Millisecond {
time.Sleep(50 * time.Millisecond)
}
dTask := &downloadTask{baseURL: ms.u, url: segment.URI, seqNo: pl.SeqNo + uint64(i), title: segment.Title, duration: segment.Duration, appTime: now}
go downloadSegment(dTask, ms.downloadResults)
lastTimeDownloadStarted = time.Now()
now = now.Add(time.Millisecond)
// glog.V(model.VERBOSE).Infof("segment %s is of length %f seqId=%d", segment.URI, segment.Duration, segment.SeqId)
}
Expand Down
9 changes: 9 additions & 0 deletions internal/testers/streamer2.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ func (sr *streamer2) StartStreaming(sourceFileName string, rtmpIngestURL, mediaU
messenger.SendMessage(msg)
}

// StartPulling pull arbitrary HLS stream and report found errors
func (sr *streamer2) StartPulling(mediaURL string) {
sr.downloader = newM3utester2(mediaURL, sr.wowzaMode, sr.eof, 0, nil) // starts to download at creation
started := time.Now()
<-sr.eof
msg := fmt.Sprintf(`Streaming stopped after %s`, time.Since(started))
messenger.SendMessage(msg)
}

func (sr *streamer2) waitForTCP(waitForTarget time.Duration, rtmpIngestURL string) error {
var u *url.URL
var err error
Expand Down

0 comments on commit 5f6e175

Please sign in to comment.