Skip to content

Commit

Permalink
Better calculate latencies
Browse files Browse the repository at this point in the history
Record latencies per stream

closes #10
  • Loading branch information
darkdarkdragon committed Jan 11, 2020
1 parent 26c2a07 commit a4a202b
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 56 deletions.
4 changes: 4 additions & 0 deletions cmd/streamtester/streamtester.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func main() {
ignoreGaps := flag.Bool("ignore-gaps", false, "Do not stop streaming if gaps found")
ignoreTimeDrift := flag.Bool("ignore-time-drift", false, "Do not stop streaming if time drift detected")
httpIngest := flag.Bool("http-ingest", false, "Use Livepeer HTTP HLS ingest")
fileArg := flag.String("file", "", "File to stream")
_ = flag.String("config", "", "config file (optional)")

ff.Parse(flag.CommandLine, os.Args[1:],
Expand Down Expand Up @@ -91,6 +92,9 @@ func main() {
if len(flag.Args()) > 0 {
fn = flag.Arg(0)
}
if *fileArg != "" {
fn = *fileArg
}
model.ProfilesNum = *profiles
var err error
var waitForDur time.Duration
Expand Down
59 changes: 34 additions & 25 deletions internal/model/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ import (
)

const (
SHORT = 4
DEBUG = 5
VERBOSE = 6
SHORT = 4
DEBUG = 5
VERBOSE = 6
VVERBOSE = 7
INSANE = 12
)

// ProfilesNum number of transcoding profiles
Expand Down Expand Up @@ -52,28 +54,29 @@ type Latencies struct {

// Stats represents global test statistics
type Stats struct {
RTMPActiveStreams int `json:"rtmp_active_streams"` // number of active RTMP streams
RTMPstreams int `json:"rtmp_streams"` // number of RTMP streams
MediaStreams int `json:"media_streams"` // number of media streams
TotalSegmentsToSend int `json:"total_segments_to_send"`
SentSegments int `json:"sent_segments"`
DownloadedSegments int `json:"downloaded_segments"`
ShouldHaveDownloadedSegments int `json:"should_have_downloaded_segments"`
FailedToDownloadSegments int `json:"failed_to_download_segments"`
BytesDownloaded int64 `json:"bytes_downloaded"`
Retries int `json:"retries"`
SuccessRate float64 `json:"success_rate"` // DownloadedSegments/profilesNum*SentSegments
ConnectionLost int `json:"connection_lost"`
Finished bool `json:"finished"`
ProfilesNum int `json:"profiles_num"`
SourceLatencies Latencies `json:"source_latencies"`
TranscodedLatencies Latencies `json:"transcoded_latencies"`
RawSourceLatencies []time.Duration `json:"raw_source_latencies"`
RawTranscodedLatencies []time.Duration `json:"raw_transcoded_latencies"`
WowzaMode bool `json:"wowza_mode"`
Gaps int `json:"gaps"`
StartTime time.Time `json:"start_time"`
Errors map[string]int `json:"errors"`
RTMPActiveStreams int `json:"rtmp_active_streams"` // number of active RTMP streams
RTMPstreams int `json:"rtmp_streams"` // number of RTMP streams
MediaStreams int `json:"media_streams"` // number of media streams
TotalSegmentsToSend int `json:"total_segments_to_send"`
SentSegments int `json:"sent_segments"`
DownloadedSegments int `json:"downloaded_segments"`
ShouldHaveDownloadedSegments int `json:"should_have_downloaded_segments"`
FailedToDownloadSegments int `json:"failed_to_download_segments"`
BytesDownloaded int64 `json:"bytes_downloaded"`
Retries int `json:"retries"`
SuccessRate float64 `json:"success_rate"` // DownloadedSegments/profilesNum*SentSegments
ConnectionLost int `json:"connection_lost"`
Finished bool `json:"finished"`
ProfilesNum int `json:"profiles_num"`
SourceLatencies Latencies `json:"source_latencies"`
TranscodedLatencies Latencies `json:"transcoded_latencies"`
RawSourceLatencies []time.Duration `json:"raw_source_latencies"`
RawTranscodedLatencies []time.Duration `json:"raw_transcoded_latencies"`
RawTranscodeLatenciesPerStream [][]time.Duration `json:"raw_transcode_latencies_per_stream"`
WowzaMode bool `json:"wowza_mode"`
Gaps int `json:"gaps"`
StartTime time.Time `json:"start_time"`
Errors map[string]int `json:"errors"`
}

// REST requests
Expand Down Expand Up @@ -121,6 +124,12 @@ Bytes dowloaded: %12d`, st.RTMPstreams, st.MediaStreams,
if len(st.Errors) > 0 {
r = fmt.Sprintf("%s\nErrors: %+v\n", r, st.Errors)
}
if len(st.RawTranscodeLatenciesPerStream) > 0 {
r += "\nTranscode latencies per stream:\n"
for _, rtl := range st.RawTranscodeLatenciesPerStream {
r += fmt.Sprintf("%+v\n", rtl)
}
}
return r
}

Expand Down
1 change: 1 addition & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (ss *StreamerServer) handleStats(w http.ResponseWriter, r *http.Request) {
if !returnRawLatencies {
stats.RawSourceLatencies = nil
stats.RawTranscodedLatencies = nil
stats.RawTranscodeLatenciesPerStream = nil
}
// glog.Infof("Lat avg %d p50 %d p95 %d p99 %d avg %s p50 %s p95 %s p99 %s", stats.SourceLatencies.Avg, stats.SourceLatencies.P50, stats.SourceLatencies.P95,
// stats.SourceLatencies.P99, stats.SourceLatencies.Avg, stats.SourceLatencies.P50, stats.SourceLatencies.P95, stats.SourceLatencies.P99)
Expand Down
2 changes: 1 addition & 1 deletion internal/testers/infinite_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (ip *infinitePuller) Start() {

done := make(chan struct{})
// var sentTimesMap *utils.SyncedTimesMap
down := newM3UTester(done, nil, true, true, ip.save)
down := newM3UTester(done, nil, true, true, ip.save, nil)
// go findSkippedSegmentsNumber(up, down)
// sr.downloaders = append(sr.downloaders, down)
msg := fmt.Sprintf("Starting to pull infinite stream from %s", ip.url)
Expand Down
81 changes: 62 additions & 19 deletions internal/testers/m3utester.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type m3utester struct {
startTime time.Time
done <-chan struct{} // signals to stop
sentTimesMap *utils.SyncedTimesMap
segmentsMatcher *segmentsMatcher
downloadResults fullDownloadResultsMap
fullResultsCh chan fullDownloadResult
dm sync.Mutex
Expand Down Expand Up @@ -131,14 +132,15 @@ func (p downloadResultsBySeq) findByMySeqNo(seqNo uint64) *downloadResult {
}

// newM3UTester ...
func newM3UTester(done <-chan struct{}, sentTimesMap *utils.SyncedTimesMap, wowzaMode, infiniteMode, save bool) *m3utester {
func newM3UTester(done <-chan struct{}, sentTimesMap *utils.SyncedTimesMap, wowzaMode, infiniteMode, save bool, sm *segmentsMatcher) *m3utester {
t := &m3utester{
downloads: make(map[string]*mediaDownloader),
done: done,
sentTimesMap: sentTimesMap,
wowzaMode: wowzaMode,
infiniteMode: infiniteMode,
save: save,
segmentsMatcher: sm,
downloadResults: make(map[string]*fullDownloadResults),
fullResultsCh: make(chan fullDownloadResult, 16),
}
Expand Down Expand Up @@ -640,7 +642,7 @@ func (mt *m3utester) downloadLoop() {
// variantID := strconv.Itoa(variant.Bandwidth) + variant.Resolution
mt.mu.Lock()
if _, ok := mt.downloads[mediaURL]; !ok {
md := newMediaDownloader(variant.URI, mediaURL, variant.Resolution, mt.done, mt.sentTimesMap, mt.wowzaMode, mt.save, mt.fullResultsCh, mt.saveDirName)
md := newMediaDownloader(variant.URI, mediaURL, variant.Resolution, mt.done, mt.sentTimesMap, mt.wowzaMode, mt.save, mt.fullResultsCh, mt.saveDirName, mt.segmentsMatcher)
mt.downloads[mediaURL] = md
// md.source = strings.Contains(mediaURL, "source")
md.source = i == 0
Expand Down Expand Up @@ -703,7 +705,8 @@ type mediaDownloader struct {
firstSegmentTime time.Duration
done <-chan struct{} // signals to stop
sentTimesMap *utils.SyncedTimesMap
latencies []time.Duration
latencies []time.Duration // latencies stored as segments get downloaded
latenciesPerStream []time.Duration // here index is seqNo, so if segment is failed download then value will be zero
source bool
wowzaMode bool
saveSegmentsToDisk bool
Expand All @@ -712,19 +715,21 @@ type mediaDownloader struct {
saveDir string
livepeerNameSchema bool
fullResultsCh chan fullDownloadResult
segmentsMatcher *segmentsMatcher
}

func newMediaDownloader(name, u, resolution string, done <-chan struct{}, sentTimesMap *utils.SyncedTimesMap, wowzaMode, save bool, frc chan fullDownloadResult,
baseSaveDir string) *mediaDownloader {
baseSaveDir string, sm *segmentsMatcher) *mediaDownloader {
pu, err := url.Parse(u)
if err != nil {
glog.Fatal(err)
}
md := &mediaDownloader{
name: name,
u: pu,
resolution: resolution,
downTasks: make(chan downloadTask, 16),
name: name,
u: pu,
resolution: resolution,
segmentsMatcher: sm,
downTasks: make(chan downloadTask, 16),
stats: downloadStats{
errors: make(map[string]int),
},
Expand Down Expand Up @@ -839,19 +844,46 @@ func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan download
md.firstSegmentParsed = true
md.firstSegmentTime = fst
}
if md.sentTimesMap != nil {
fsttim, err := utils.GetVideoStartTime(b)
if md.segmentsMatcher != nil {
fsttim, dur, err := utils.GetVideoStartTimeAndDur(b)
if err != nil {
if err != io.EOF {
glog.Fatal(err)
}
} else {
var latency time.Duration
if st, has := md.sentTimesMap.GetTime(fsttim, fsurl); has {
latency = now.Sub(st)
latency, speedRatio, merr := md.segmentsMatcher.matchSegment(fsttim, dur, now)
src := " source"
if !md.source {
src = "transcoded"
}
glog.V(model.DEBUG).Infof("== downloaded %s seqNo %d start time %s lat %s now %s speed ratio %v merr %v", src, task.seqNo, fsttim, latency, now, speedRatio, merr)
if merr == nil {
md.mu.Lock()
md.latencies = append(md.latencies, latency)
for len(md.latenciesPerStream) <= int(task.seqNo) {
md.latenciesPerStream = append(md.latenciesPerStream, 0)
}
md.latenciesPerStream[task.seqNo] = latency
glog.V(model.VVERBOSE).Infof("lat: %+v", md.latenciesPerStream)
md.mu.Unlock()
}
// glog.Infof("== downloaded segment seqNo %d segment start time %s latency %s current time %s", task.seqNo, fsttim, latency, now)
/*
var st time.Time
var has bool
if st, has = md.sentTimesMap.GetTime(fsttim, fsurl); has {
latency = now.Sub(st)
md.latencies = append(md.latencies, latency)
md.mu.Lock()
for len(md.latenciesPerStream) <= int(task.seqNo) {
md.latenciesPerStream = append(md.latenciesPerStream, 0)
}
md.latenciesPerStream[task.seqNo] = latency
md.mu.Unlock()
}
latency2, speedRatio, merr := md.segmentsMatcher.matchSegment(fsttim, dur, now)
glog.Infof("== downloaded %s seqNo %d start time %s lat %s mlat %s now %s lat found %v sr %v merr %v",
src, task.seqNo, fsttim, latency, latency2, now, has, speedRatio, merr)
*/
}
}

Expand Down Expand Up @@ -967,8 +999,10 @@ func (md *mediaDownloader) downloadLoop() {
time.Sleep(time.Second)
continue
}
// fmt.Println("-----################")
// fmt.Println(string(b))
// if !md.source {
// fmt.Println("-----################")
// fmt.Println(string(b))
// }
// glog.Infoln("-----################")
// glog.Infoln(string(b))
// err = mpl.DecodeFrom(resp.Body, true)
Expand All @@ -982,8 +1016,8 @@ func (md *mediaDownloader) downloadLoop() {
if err != nil {
glog.Fatal(err)
}
glog.V(model.VERBOSE).Infof("Got media playlist %s with %d (really %d) segments of url %s:", md.resolution, len(pl.Segments), countSegments(pl), surl)
glog.V(model.VERBOSE).Info(pl)
glog.V(model.INSANE).Infof("Got media playlist %s with %d (really %d) segments of url %s:", md.resolution, len(pl.Segments), countSegments(pl), surl)
glog.V(model.INSANE).Info(pl)
if !gotManifest && md.saveSegmentsToDisk {
md.savePlayList.TargetDuration = pl.TargetDuration
md.savePlayList.SeqNo = pl.SeqNo
Expand All @@ -1004,7 +1038,16 @@ func (md *mediaDownloader) downloadLoop() {
}
seen.Add(segment.URI)
mySeqNo++
md.downTasks <- downloadTask{url: segment.URI, seqNo: pl.SeqNo + uint64(i), title: segment.Title, duration: segment.Duration, mySeqNo: mySeqNo, appTime: now}
seqNo := pl.SeqNo + uint64(i)
// attempt to parse seqNo from file name
_, fn := path.Split(segment.URI)
ext := path.Ext(fn)
fn = strings.TrimSuffix(fn, ext)
parsedSeq, err := strconv.ParseUint(fn, 10, 64)
if err == nil {
seqNo = parsedSeq
}
md.downTasks <- downloadTask{url: segment.URI, seqNo: seqNo, title: segment.Title, duration: segment.Duration, mySeqNo: mySeqNo, appTime: now}
now = now.Add(time.Millisecond)
// glog.V(model.VERBOSE).Infof("segment %s is of length %f seqId=%d", segment.URI, segment.Duration, segment.SeqId)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/testers/segments_matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func (sm *segmentsMatcher) matchSegment(firstPaketsPTS time.Duration, segmentDur
}

func (sm *segmentsMatcher) cleanup() {
glog.Infof(`segments matcher cleanup start len %d`, len(sm.sentFrames))
glog.V(model.VVERBOSE).Infof(`segments matcher cleanup start len %d`, len(sm.sentFrames))
// sm.mu.Lock()
// glog.Infof(`segments matcher cleanup start len %d 2`, len(sm.sentFrames))
defer func() {
glog.Infof(`segments matcher cleanup start len %d end`, len(sm.sentFrames))
glog.V(model.VVERBOSE).Infof(`segments matcher cleanup start len %d end`, len(sm.sentFrames))
}()
// defer sm.mu.Unlock()
if len(sm.sentFrames) == 0 {
Expand All @@ -122,7 +122,7 @@ func (sm *segmentsMatcher) cleanup() {
break
}
}
glog.Infof(`segments matcher cleanup len %d until %d`, len(sm.sentFrames), until)
glog.V(model.VVERBOSE).Infof(`segments matcher cleanup len %d until %d`, len(sm.sentFrames), until)
if until > 0 {
sm.sentFrames = sm.sentFrames[until:]
}
Expand Down
21 changes: 15 additions & 6 deletions internal/testers/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,19 @@ func (sr *streamer) startStreams(baseManfistID, sourceFileName string, repeatNum
}
done := make(chan struct{})
var sentTimesMap *utils.SyncedTimesMap
var segmentsMatcher *segmentsMatcher
if measureLatency {
sentTimesMap = utils.NewSyncedTimesMap()
// sentTimesMap = utils.NewSyncedTimesMap()
segmentsMatcher = newsementsMatcher()
}
up := newRtmpStreamer(rtmpURL, sourceFileName, baseManfistID, sentTimesMap, bar, done, sr.wowzaMode, nil)
up := newRtmpStreamer(rtmpURL, sourceFileName, baseManfistID, sentTimesMap, bar, done, sr.wowzaMode, segmentsMatcher)
wg.Add(1)
go func() {
up.StartUpload(sourceFileName, rtmpURL, totalSegments, waitForTarget)
wg.Done()
}()
sr.uploaders = append(sr.uploaders, up)
down := newM3UTester(done, sentTimesMap, sr.wowzaMode, false, false)
down := newM3UTester(done, sentTimesMap, sr.wowzaMode, false, false, segmentsMatcher)
go findSkippedSegmentsNumber(up, down)
sr.downloaders = append(sr.downloaders, down)
down.Start(mediaURL)
Expand Down Expand Up @@ -268,13 +270,20 @@ func (sr *streamer) Stats(basedManifestID string) *model.Stats {
stats.BytesDownloaded += ds.bytes
stats.Retries += ds.retries
stats.Gaps += ds.gaps
if mt.sentTimesMap != nil {
if mt.segmentsMatcher != nil {
for _, md := range mt.downloads {
md.mu.Lock()
lcp := make([]time.Duration, len(md.latencies), len(md.latencies))
copy(lcp, md.latencies)
if md.source {
sourceLatencies.Add(md.latencies)
sourceLatencies.Add(lcp)
} else {
transcodedLatencies.Add(md.latencies)
transcodedLatencies.Add(lcp)
lcp := make([]time.Duration, len(md.latenciesPerStream))
copy(lcp, md.latenciesPerStream)
stats.RawTranscodeLatenciesPerStream = append(stats.RawTranscodeLatenciesPerStream, lcp)
}
md.mu.Unlock()
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/utils/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func NewSyncedTimesMap() *SyncedTimesMap {

// SetTime sets time
func (stp *SyncedTimesMap) SetTime(mark time.Duration, t time.Time) {
m := mark / (time.Millisecond * 100)
m := mark / (time.Millisecond * 500)
stp.m.Lock()
stp.times[m] = t
stp.m.Unlock()
Expand All @@ -190,7 +190,7 @@ func (stp *SyncedTimesMap) SetTime(mark time.Duration, t time.Time) {
func (stp *SyncedTimesMap) GetTime(mark time.Duration, name string) (time.Time, bool) {
// glog.Infof("=== get for name %s", name)
_, file := path.Split(name)
m := mark / (time.Millisecond * 100)
m := mark / (time.Millisecond * 500)
// stp.m.RLock()
stp.m.Lock()
t, h := stp.times[m]
Expand Down

0 comments on commit a4a202b

Please sign in to comment.