From 9005dda37068e531cf1ba193f266cf3c86260cd8 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Wed, 12 Jun 2024 19:31:35 +0100 Subject: [PATCH] record-tester: Add support for copy-only recordings (#356) * go.mod: Update go-api-client * recordtester: Allow configuring recording spec * rt: Check for the right nunmber of profiles * recordtester: Fix checking of source ready * go.mod: Update to merged go-api-client --- cmd/recordtester/recordtester.go | 9 ++ go.mod | 4 +- go.sum | 8 +- internal/app/recordtester/recordtester_app.go | 153 ++++++++++++------ 4 files changed, 122 insertions(+), 52 deletions(-) diff --git a/cmd/recordtester/recordtester.go b/cmd/recordtester/recordtester.go index 73d7e792..31b65875 100644 --- a/cmd/recordtester/recordtester.go +++ b/cmd/recordtester/recordtester.go @@ -176,6 +176,7 @@ func main() { testTranscode := fs.Bool("transcode", false, "Check Transcode API workflow") catalystPipelineStrategy := fs.String("catalyst-pipeline-strategy", "", "Which catalyst pipeline strategy to use regarding. The appropriate values are defined by catalyst-api itself.") recordObjectStoreId := fs.String("record-object-store-id", "", "ID for the Object Store to use for recording storage. Forwarded to the streams created in the API") + recordingSpecStr := fs.String("recording-spec", "", "JSON object with the `recordingSpec` field to use in the test streams. Forwarded to the streams created in the API") // Discord related flags discordURL := fs.String("discord-url", "", "URL of Discord's webhook to send messages to Discord channel") @@ -275,6 +276,13 @@ func main() { } } + var recordingSpec *api.RecordingSpec + if *recordingSpecStr != "" { + if err := json.Unmarshal([]byte(*recordingSpecStr), &recordingSpec); err != nil { + glog.Fatalf("Error parsing --recording-spec argument: %v", err) + } + } + serfMembers, err := getSerfMembers(*useSerf, *serfRPCAddr) if err != nil { glog.Fatalf("failed to process serf members: %v", err) @@ -338,6 +346,7 @@ func main() { Analyzers: lanalyzers, Ingest: ingest, RecordObjectStoreId: *recordObjectStoreId, + RecordingSpec: recordingSpec, UseForceURL: *forceRecordingUrl, RecordingWaitTime: *recordingWaitTime, UseHTTP: *useHttp, diff --git a/go.mod b/go.mod index 7cae9582..6026d9ca 100644 --- a/go.mod +++ b/go.mod @@ -9,10 +9,10 @@ require ( github.com/Necroforger/dgrouter v0.0.0-20200517224846-e66453b957c1 github.com/PagerDuty/go-pagerduty v1.7.0 github.com/bwmarrin/discordgo v0.27.1 - github.com/golang/glog v1.1.2 + github.com/golang/glog v1.2.1 github.com/gosuri/uilive v0.0.3 // indirect github.com/gosuri/uiprogress v0.0.1 - github.com/livepeer/go-api-client v0.4.14-0.20231130155418-dd87e78bea93 + github.com/livepeer/go-api-client v0.4.24-0.20240607131835-949d242a631b github.com/livepeer/go-livepeer v0.7.2-0.20231110152159-b17a70dfe719 github.com/livepeer/joy4 v0.1.2-0.20220210094601-95e4d28f5f07 github.com/livepeer/leaderboard-serverless v1.0.0 diff --git a/go.sum b/go.sum index 59e827c7..f04a173b 100644 --- a/go.sum +++ b/go.sum @@ -274,8 +274,8 @@ github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOW github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= -github.com/golang/glog v1.1.2 h1:DVjP2PbBOzHyzA+dn3WhHIq4NdVu3Q+pvivFICf/7fo= -github.com/golang/glog v1.1.2/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ= +github.com/golang/glog v1.2.1 h1:OptwRhECazUx5ix5TTWC3EZhsZEHWcYWY4FQHTIubm4= +github.com/golang/glog v1.2.1/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -555,8 +555,8 @@ github.com/libp2p/go-netroute v0.2.0 h1:0FpsbsvuSnAhXFnCY0VLFbJOzaK0VnP0r1QT/o4n github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo= github.com/livepeer/catalyst-api v0.1.1 h1:WP4rHH88b+lsxo33wPCjl0yvqVDNyxkleZH1sA0M5GE= github.com/livepeer/catalyst-api v0.1.1/go.mod h1:d6XPE9ehhCutWhCqqcmlYqQa+e9bf3Ke92x+gRZlzoQ= -github.com/livepeer/go-api-client v0.4.14-0.20231130155418-dd87e78bea93 h1:vQYapLFJ9EyRWTjOsJr1ullF0wiazRme2fSJDZnFrIs= -github.com/livepeer/go-api-client v0.4.14-0.20231130155418-dd87e78bea93/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= +github.com/livepeer/go-api-client v0.4.24-0.20240607131835-949d242a631b h1:J8cWLpnTINGAWVPU503SnUhTmuDhXnm9QxpC/CFGk2k= +github.com/livepeer/go-api-client v0.4.24-0.20240607131835-949d242a631b/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= github.com/livepeer/go-livepeer v0.7.2-0.20231110152159-b17a70dfe719 h1:468kFmwQFaI00eNCLL8qA5XuIBMwqqVgKEXvqS7msa8= github.com/livepeer/go-livepeer v0.7.2-0.20231110152159-b17a70dfe719/go.mod h1:d6qTStiNmXTQ/5YLB9fhzgDV9MdXg3KmqESQpur2Ak0= github.com/livepeer/go-tools v0.3.0 h1:xK0mJyPWWyvj9Oi9nfLglhCtk0KM8883WB7VO1oPF8g= diff --git a/internal/app/recordtester/recordtester_app.go b/internal/app/recordtester/recordtester_app.go index fc3147f4..19fee3dd 100644 --- a/internal/app/recordtester/recordtester_app.go +++ b/internal/app/recordtester/recordtester_app.go @@ -9,6 +9,7 @@ import ( "math/rand" "net/http" "os" + "strings" "time" "github.com/golang/glog" @@ -51,6 +52,7 @@ type ( Analyzers testers.AnalyzerByRegion Ingest *api.Ingest RecordObjectStoreId string + RecordingSpec *api.RecordingSpec UseForceURL bool RecordingWaitTime time.Duration UseHTTP bool @@ -65,6 +67,7 @@ type ( lanalyzers testers.AnalyzerByRegion ingest *api.Ingest recordObjectStoreId string + recordingSpec *api.RecordingSpec useForceURL bool recordingWaitTime time.Duration useHTTP bool @@ -89,6 +92,7 @@ func NewRecordTester(gctx context.Context, opts RecordTesterOptions, serfOpts Se ctx: ctx, cancel: cancel, recordObjectStoreId: opts.RecordObjectStoreId, + recordingSpec: opts.RecordingSpec, useForceURL: opts.UseForceURL, recordingWaitTime: opts.RecordingWaitTime, useHTTP: opts.UseHTTP, @@ -132,7 +136,12 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. streamName := fmt.Sprintf("%s_%s", hostName, time.Now().Format("2006-01-02T15:04:05Z07:00")) var stream *api.Stream for { - stream, err = rt.lapi.CreateStream(api.CreateStreamReq{Name: streamName, Record: true, RecordObjectStoreId: rt.recordObjectStoreId}) + stream, err = rt.lapi.CreateStream(api.CreateStreamReq{ + Name: streamName, + Record: true, + RecordingSpec: rt.recordingSpec, + RecordObjectStoreId: rt.recordObjectStoreId, + }) if err != nil { if testers.Timedout(err) && apiTry < 3 { apiTry++ @@ -247,6 +256,16 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. if err := rt.isCancelled(); err != nil { return 0, err } + + lapiNoAPIKey := api.NewAPIClient(api.ClientOptions{ + Server: rt.lapi.GetServer(), + AccessToken: "", // test playback info call without API key + Timeout: 8 * time.Second, + }) + if code, err := checkPlaybackInfo(stream.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil { + return code, err + } + glog.Infof("Waiting 10 seconds. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID) time.Sleep(10 * time.Second) // now get sessions @@ -284,33 +303,84 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. } glog.Infof("Streaming done, waiting for recording URL to appear. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID) + + deadline := time.Now().Add(rt.recordingWaitTime) if rt.useForceURL { + deadline = time.Now().Add(5 * time.Second) + } + + // For checking if sourcePlayback was available we see if at least 1 session + // recording (asset) got a playbackUrl before the processing was done. + var sourcePlayback bool + for errCode, errs := -1, []error{}; errCode != 0; { + if time.Now().After(deadline) { + errsStrs := make([]string, len(errs)) + for i, err := range errs { + errsStrs[i] = err.Error() + } + err := fmt.Errorf("timeout waiting for recording URL to appear: %s", strings.Join(errsStrs, "; ")) + return errCode, err + } else if err = rt.isCancelled(); err != nil { + return 0, err + } time.Sleep(5 * time.Second) - } else { - time.Sleep(rt.recordingWaitTime) + + errCode, errs = 0, nil + for _, sess := range sessions { + // currently the assetID is the same as the sessionID so we could just query on that but just in case that + // ever changes, we can use the ListAssets call to find the asset + assets, _, err := rt.lapi.ListAssets(api.ListOptions{ + Limit: 1, + Filters: map[string]interface{}{ + "sourceSessionId": sess.ID, + }, + }) + if err != nil { + errCode, errs = 248, append(errs, err) + continue + } + + if len(assets) != 1 { + err := fmt.Errorf("unexpected number of assets. expected: 1 actual: %d", len(assets)) + errCode, errs = 247, append(errs, err) + continue + } + asset := assets[0] + + if code, err := checkPlaybackInfo(asset.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil { + errCode, errs = code, append(errs, err) + } else { + // if we get playback before the processing is done it means source playback was provided + if asset.Status.Phase != "ready" { + sourcePlayback = true + } + } + + if asset.Status.Phase != "ready" { + err := fmt.Errorf("asset status is %s but should be ready", asset.Status.Phase) + errCode, errs = 246, append(errs, err) + } + } } - if err = rt.isCancelled(); err != nil { - return 0, err + if !sourcePlayback { + return 246, errors.New("source playback was not provided") } - sessions, err = rt.lapi.GetSessionsNew(stream.ID, rt.useForceURL) + // check actual recordings playback + sessions, err = rt.lapi.GetSessionsNew(stream.ID, false) if err != nil { - err := fmt.Errorf("error getting sessions for stream id=%s err=%v", stream.ID, err) + glog.Errorf("Error getting sessions err=%v streamId=%s playbackId=%s", err, stream.ID, stream.PlaybackID) return 252, err } glog.V(model.DEBUG).Infof("Sessions: %+v streamId=%s playbackId=%s", sessions, stream.ID, stream.PlaybackID) - if err = rt.isCancelled(); err != nil { - return 0, err - } - lapiNoAPIKey := api.NewAPIClient(api.ClientOptions{ - Server: rt.lapi.GetServer(), - AccessToken: "", // test playback info call without API key - Timeout: 8 * time.Second, - }) - if code, err := checkPlaybackInfo(stream.PlaybackID, rt.lapi, lapiNoAPIKey); err != nil { - return code, err + if len(sessions) != expectedSessions { + err := fmt.Errorf("invalid session count, expected %d but got %d", + expectedSessions, len(sessions)) + glog.Error(err) + return 251, err } + for _, sess := range sessions { statusShould := api.RecordingStatusReady if rt.useForceURL { @@ -330,40 +400,21 @@ func (rt *recordTester) Start(fileName string, testDuration, pauseDuration time. return 0, err } if rt.mp4 { - es, err := rt.checkDownMp4(stream, sess.Mp4Url, testDuration) + es, err := rt.checkRecordingMp4(stream, sess.Mp4Url, testDuration) if err != nil { return es, err } } - es, err := rt.checkDown(stream, sess.RecordingURL, testDuration) - if err != nil { - return es, err + if err = rt.isCancelled(); err != nil { + return 0, err } - - // currently the assetID is the same as the sessionID so we could just query on that but just in case that - // ever changes, we can use the ListAssets call to find the asset - assets, _, err := rt.lapi.ListAssets(api.ListOptions{ - Limit: 1, - Filters: map[string]interface{}{ - "sourceSessionId": sess.ID, - }, - }) + es, err := rt.checkRecordingHls(stream, sess.RecordingURL, testDuration) if err != nil { - return 248, err - } - - if len(assets) != 1 { - return 247, fmt.Errorf("unexpected number of assets. expected: 1 actual: %d", len(assets)) - } - if !assets[0].SourcePlaybackReady { - return 246, fmt.Errorf("source playback was not ready") - } - - if code, err := checkPlaybackInfo(assets[0].PlaybackID, rt.lapi, lapiNoAPIKey); err != nil { - return code, err + return es, err } } + glog.Infof("Done Record Test. streamId=%s playbackId=%s", stream.ID, stream.PlaybackID) rt.lapi.DeleteStream(stream.ID) @@ -418,7 +469,13 @@ func (rt *recordTester) doOneHTTPStream(fileName, streamName, broadcasterURL str var err error apiTry := 0 for { - session, err = rt.lapi.CreateStream(api.CreateStreamReq{Name: streamName, Record: true, RecordObjectStoreId: rt.recordObjectStoreId, ParentID: stream.ID}) + session, err = rt.lapi.CreateStream(api.CreateStreamReq{ + Name: streamName, + Record: true, + RecordingSpec: rt.recordingSpec, + RecordObjectStoreId: rt.recordObjectStoreId, + ParentID: stream.ID, + }) if err != nil { if testers.Timedout(err) && apiTry < 3 { apiTry++ @@ -449,7 +506,7 @@ func (rt *recordTester) isCancelled() error { return nil } -func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDuration time.Duration) (int, error) { +func (rt *recordTester) checkRecordingMp4(stream *api.Stream, url string, streamDuration time.Duration) (int, error) { es := 0 started := time.Now() glog.V(model.VERBOSE).Infof("Downloading mp4 url=%s streamId=%s playbackId=%s", url, stream.ID, stream.PlaybackID) @@ -500,7 +557,7 @@ func (rt *recordTester) checkDownMp4(stream *api.Stream, url string, streamDurat return es, nil } -func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration time.Duration) (int, error) { +func (rt *recordTester) checkRecordingHls(stream *api.Stream, url string, streamDuration time.Duration) (int, error) { es := 0 started := time.Now() downloader := testers.NewM3utester2(rt.ctx, url, false, false, false, false, 5*time.Second, nil, false) @@ -511,7 +568,11 @@ func (rt *recordTester) checkDown(stream *api.Stream, url string, streamDuration } vs := downloader.VODStats() rt.vodStats = vs - if len(vs.SegmentsNum) != len(api.StandardProfiles)+1 { + expectedProfiles := len(api.StandardProfiles) + 1 + if rt.recordingSpec != nil && rt.recordingSpec.Profiles != nil { + expectedProfiles = len(*rt.recordingSpec.Profiles) + 1 + } + if len(vs.SegmentsNum) != expectedProfiles { glog.Warningf("Number of renditions doesn't match! Has %d should %d. streamId=%s playbackId=%s", len(vs.SegmentsNum), len(api.StandardProfiles)+1, stream.ID, stream.PlaybackID) es = 35 }