From c9f7c169c0948826f43b4b28244c131f1db55474 Mon Sep 17 00:00:00 2001 From: maxutility2011 Date: Sun, 9 Jun 2024 21:53:55 +0000 Subject: [PATCH] removing comments --- api_server/api_server_main.go | 25 +++---------------------- job/command.go | 4 +--- job/validator.go | 8 +------- job_sqs/receiver.go | 2 +- job_sqs/sender.go | 2 +- models/drm.go | 2 +- models/worker.go | 2 +- redis_client/redis_client.go | 2 +- s3/s3.go | 2 +- scheduler/scheduler.go | 15 +++++---------- worker/app/worker_app.go | 20 +------------------- worker/transcoder/worker_transcoder.go | 2 +- worker/utils/bandwidth_reader.go | 2 +- worker/utils/cpuutil_reader.go | 3 +-- 14 files changed, 20 insertions(+), 71 deletions(-) diff --git a/api_server/api_server_main.go b/api_server/api_server_main.go index 451bfd7..b6ca52c 100755 --- a/api_server/api_server_main.go +++ b/api_server/api_server_main.go @@ -119,16 +119,10 @@ func createJob(j job.LiveJobSpec, warnings []string) (error, job.LiveJob) { if j.Output.Stream_type == job.DASH { // Example: https://bozhang-private.s3.amazonaws.com/output_70255156-26ef-4378-811b-dfc44a7c6cb5/master.mpd lj.Playback_url = "https://" + j.Output.S3_output.Bucket + ".s3.amazonaws.com/output_" + lj.Id + "/" + job.DASH_MPD_FILE_NAME - //lj.Playback_url = "http://" + server_config.Origin_server_hostname + ":" + server_config.Origin_server_port + "/" + job.Media_output_path_prefix + lj.Id + "/" + job.DASH_MPD_FILE_NAME // Test ONLY. TODO: stream output should be uploaded to cloud storage. } else if j.Output.Stream_type == job.HLS { lj.Playback_url = "https://" + j.Output.S3_output.Bucket + ".s3.amazonaws.com/output_" + lj.Id + "/" + job.HLS_MASTER_PLAYLIST_FILE_NAME - //lj.Playback_url = "http://" + server_config.Origin_server_hostname + ":" + server_config.Origin_server_port + "/" + job.Media_output_path_prefix + lj.Id + "/" + job.HLS_MASTER_PLAYLIST_FILE_NAME // Test ONLY. TODO: stream output should be uploaded to cloud storage. } - //j.IngestUrls = make([]string) - //RtmpIngestUrl = "rtmp://" + WorkerAppIp + ":" + WorkerAppPort + "/live/" + j.StreamKey - //j.IngestUrls = append(j.IngestUrls, RtmpIngestUrl) - lj.Spec = j lj.Time_created = time.Now() lj.Stop = false // Set to true when the client wants to stop this job @@ -292,13 +286,7 @@ func start_ffmpeg_live_contribution(spec demo.CreateLiveFeedSpec) error { return errors.New("DuplicateLiveFeeding") } - //liveFeedCmd = exec.Command("ffmpeg", "-stream_loop", "-1", "-re", "-i", "/tmp/1.mp4", "-c", "copy", "-vf", "drawtext=fontfile=/usr/share/fonts/truetype/freefont/FreeMonoBold.ttf:text='%{localtime}':fontcolor=white@0.8:x=7:y=7", "-an", "-f", "flv", spec.RtmpIngestUrl) liveFeedCmd = exec.Command("ffmpeg", "-stream_loop", "-1", "-re", "-i", "/tmp/1.mp4", "-c", "copy", "-f", "flv", spec.RtmpIngestUrl) - /*Log.Printf("Path: " + ffmpeg.Path + " ") - for _, arg := range ffmpeg.Args { - Log.Printf(arg + " ") - } - Log.Printf("\n")*/ Log.Println("!!!Starting live feeding...") go func() { @@ -404,13 +392,6 @@ func main_server_handler(w http.ResponseWriter, r *http.Request) { return } - // TEST ONLY!!! - /*FileContentType_test := "application/json" - w.Header().Set("Content-Type", FileContentType_test) - w.WriteHeader(http.StatusCreated) - json.NewEncoder(w).Encode(jspec) - return*/ - e1, j := createJob(jspec, warnings) if e1 != nil { http.Error(w, "500 internal server error\n Error: ", http.StatusInternalServerError) @@ -509,7 +490,7 @@ func main_server_handler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusAccepted) e1 := stopJob(j) // Update Redis - j.Stop = true // Set Stop flag to true for the local variable + j.Stop = true // Set Stop flag to true for the local variable if e1 != nil { http.Error(w, "500 internal server error\n Error: ", http.StatusInternalServerError) @@ -556,7 +537,7 @@ func main_server_handler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusAccepted) e1 := resumeJob(j) // Update Redis - j.Stop = false // Set Stop flag to false for the local variable + j.Stop = false // Set Stop flag to false for the local variable if e1 != nil { http.Error(w, "500 internal server error\n Error: ", http.StatusInternalServerError) @@ -738,4 +719,4 @@ func main() { server_addr = server_hostname + ":" + server_port fmt.Println("API server listening on: ", server_addr) http.ListenAndServe(server_addr, nil) -} +} \ No newline at end of file diff --git a/job/command.go b/job/command.go index a4ec777..3bc89e6 100755 --- a/job/command.go +++ b/job/command.go @@ -266,7 +266,6 @@ func JobSpecToShakaPackagerArgs(job_id string, j LiveJobSpec, media_output_path video_output += ("," + media_segment_template_prefix + media_segment_template) if j.Output.Stream_type == HLS { - //playlist_name := "playlist_name=" + media_output_path + output_label + ".m3u8" playlist_name := "playlist_name=" + media_output_path + output_label + "/playlist.m3u8" video_output += ("," + playlist_name) } @@ -301,7 +300,6 @@ func JobSpecToShakaPackagerArgs(job_id string, j LiveJobSpec, media_output_path audio_output += ("," + media_segment_template_prefix + media_segment_template) if j.Output.Stream_type == HLS { - //playlist_name := "playlist_name=" + media_output_path + output_label + ".m3u8" playlist_name := "playlist_name=" + media_output_path + output_label + "/playlist.m3u8" audio_output += ("," + playlist_name) @@ -643,4 +641,4 @@ func JobSpecToEncoderArgs(j LiveJobSpec, media_output_path string) ([]string, [] ffmpegArgs = append(ffmpegArgs, ffprobe_output_url) return ffmpegArgs, local_media_output_path_subdirs -} +} \ No newline at end of file diff --git a/job/validator.go b/job/validator.go index 5c68c8a..78d57d4 100755 --- a/job/validator.go +++ b/job/validator.go @@ -1,9 +1,7 @@ package job import ( - //"strings" "strconv" - //"encoding/json" "errors" ) @@ -59,10 +57,6 @@ func contains_int(a []int, v int) bool { return r } -/*func hasField(f string) bool { - -}*/ - func Validate(j *LiveJobSpec) (error, []string) { // A general note: // Any data type mismatches were already detected when (json)decoding the http job request body. @@ -312,4 +306,4 @@ func Validate(j *LiveJobSpec) (error, []string) { } return nil, warnings -} +} \ No newline at end of file diff --git a/job_sqs/receiver.go b/job_sqs/receiver.go index f2b54d9..2fa402a 100755 --- a/job_sqs/receiver.go +++ b/job_sqs/receiver.go @@ -74,4 +74,4 @@ func (receiver SqsReceiver) CreateClient() *sqs.SQS { client := sqs.New(sess) return client -} +} \ No newline at end of file diff --git a/job_sqs/sender.go b/job_sqs/sender.go index a3e142a..87ca771 100755 --- a/job_sqs/sender.go +++ b/job_sqs/sender.go @@ -65,4 +65,4 @@ func (sender SqsSender) CreateClient() *sqs.SQS { client := sqs.New(sess) return client -} +} \ No newline at end of file diff --git a/models/drm.go b/models/drm.go index 997d818..8bddd71 100755 --- a/models/drm.go +++ b/models/drm.go @@ -40,4 +40,4 @@ func Random_16bytes_as_string() (string, error) { } return rand_16bytes, nil -} +} \ No newline at end of file diff --git a/models/worker.go b/models/worker.go index 1cad090..17a7c7d 100755 --- a/models/worker.go +++ b/models/worker.go @@ -69,4 +69,4 @@ type WorkerLoad struct { type WorkerHeartbeat struct { Worker_id string LastHeartbeatTime time.Time -} +} \ No newline at end of file diff --git a/redis_client/redis_client.go b/redis_client/redis_client.go index d4fb954..0eea6b7 100755 --- a/redis_client/redis_client.go +++ b/redis_client/redis_client.go @@ -194,4 +194,4 @@ func (rc RedisClient) QLen(qname string) (int, error) { } return r, err -} +} \ No newline at end of file diff --git a/s3/s3.go b/s3/s3.go index 7944fb3..a6dc953 100755 --- a/s3/s3.go +++ b/s3/s3.go @@ -32,4 +32,4 @@ func Upload(local_file_path string, remote_file_name string, bucketname string) } return nil -} +} \ No newline at end of file diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index f675b84..e1773ed 100755 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -319,8 +319,7 @@ func sendJobToWorker(j job.LiveJob, wid string) error { } else { return errors.New("WorkerJobExecutionError") } - } //else if (j.Delete && resp.StatusCode != http.StatusAccepted) { - //} + } var e error // create_job or resume_job @@ -378,8 +377,7 @@ func sendJobToWorker(j job.LiveJob, wid string) error { // j.Id and j.StreamKey will remain the same when the job is resumed createUpdateJob(j) Log.Println("Job id = ", j.Id, " is successfully stopped on worker id = ", wid) - } // else if j.Delete { - //} + } return e } @@ -436,8 +434,7 @@ func scheduleOneJob() { Log.Println("Failed to send job to a worker") bufferJob(j) } - } // else if j.Delete { - //} + } } } @@ -701,8 +698,6 @@ func check_worker_heartbeat() error { time_now := time.Now().UnixMilli() time_lastHeartbeat := w.LastHeartbeatTime.UnixMilli() - //Log.Println("time elapsed since last heartbeat: ", time_now - time_lastHeartbeat) - //Log.Println("max time allowed no heartbeat: ", int64(max_missing_heartbeats_before_suspension * hbinterval / 1000000)) if time_lastHeartbeat != 0 && time_now-time_lastHeartbeat > int64(max_missing_heartbeats_before_suspension*hbinterval*1000) { w.State = models.WORKER_STATE_NOTAVAILABLE } @@ -814,7 +809,7 @@ func main_server_handler(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(worker) } else { Log.Println("Non-existent worker id: ", UrlLastPart) - http.Error(w, "Non-existent worker id: "+UrlLastPart, http.StatusNotFound) + http.Error(w, "Non-existent worker id: " + UrlLastPart, http.StatusNotFound) } } } @@ -1002,4 +997,4 @@ func main() { http.ListenAndServe(server_addr, nil) <-quit -} +} \ No newline at end of file diff --git a/worker/app/worker_app.go b/worker/app/worker_app.go index 4567ad9..ab9ba1f 100755 --- a/worker/app/worker_app.go +++ b/worker/app/worker_app.go @@ -374,7 +374,6 @@ func launchJob(j job.LiveJob) error { var rj RunningJob rj.Job = j rj.Command = transcoderCmd - //je := running_jobs.PushBack(rj) running_jobs.PushBack(rj) var out []byte @@ -383,8 +382,6 @@ func launchJob(j job.LiveJob) error { go func() { out, err_transcoder = transcoderCmd.CombinedOutput() // This line blocks when transcoderCmd launch succeeds if err_transcoder != nil { - //running_jobs.Remove(je) // Cleanup if transcoderCmd fails - // Let's not remove the failed job from running_jobs here, but leave it to function checkJobStatus() // checkJobStatus() does more than just removing the job, it also updates worker load with scheduler Log.Println("Errors running worker transcoder: ", string(out)) @@ -409,7 +406,6 @@ func launchJob(j job.LiveJob) error { } func reportJobStatus(report models.WorkerJobReport) error { - //Log.Println("Sending job status report at time =", time.Now()) job_status_url := job_scheduler_url + "/" + "jobstatus" b, _ := json.Marshal(report) @@ -426,18 +422,6 @@ func reportJobStatus(report models.WorkerJobReport) error { return err } - /* - defer resp.Body.Close() - bodyBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - Log.Println("Error: Failed to read response body (reportJobStatus)") - return errors.New("StatusReportFailure_fail_to_read_scheduler_response") - } - - var hb_resp models.WorkerHeartbeat - json.Unmarshal(bodyBytes, &hb_resp) - */ - // TODO: Need to handle error response (other than http code 200) if resp.StatusCode != http.StatusOK { Log.Println("Bad response from scheduler (reportJobStatus)") @@ -463,7 +447,6 @@ func readCpuUtil(j RunningJob) string { } else { ps_output := string(out) re := regexp.MustCompile(".[0-9]") // CPU util is a float, so we match all the digits and "." - //r = strings.Join(re.FindAllString(ps_output, -1), "") for _, s := range re.FindAllString(ps_output, -1) { r += strings.TrimSpace(s) } @@ -622,7 +605,6 @@ func sendHeartbeat() error { hb.LastHeartbeatTime = time.Now() b, _ := json.Marshal(hb) - //Log.Println("Sending heartbeat at time =", hb.LastHeartbeatTime) worker_heartbeat_url := job_scheduler_url + "/" + "heartbeat" req, err := http.NewRequest(http.MethodPost, worker_heartbeat_url, bytes.NewReader(b)) if err != nil { @@ -842,4 +824,4 @@ func main() { if err != nil { fmt.Println("Server failed to start. Error: ", err) } -} +} \ No newline at end of file diff --git a/worker/transcoder/worker_transcoder.go b/worker/transcoder/worker_transcoder.go index be7ad11..f0dc9bd 100755 --- a/worker/transcoder/worker_transcoder.go +++ b/worker/transcoder/worker_transcoder.go @@ -755,4 +755,4 @@ func main() { set_upload_input_info_timer(local_media_output_path, remote_media_output_path_base) <-quit -} +} \ No newline at end of file diff --git a/worker/utils/bandwidth_reader.go b/worker/utils/bandwidth_reader.go index 684ab19..7b36cac 100644 --- a/worker/utils/bandwidth_reader.go +++ b/worker/utils/bandwidth_reader.go @@ -32,4 +32,4 @@ func main() { fmt.Printf("bandwidth: %f, unit: %s", bandwidth, bandwidth_unit) time.Sleep(time.Second) } -} +} \ No newline at end of file diff --git a/worker/utils/cpuutil_reader.go b/worker/utils/cpuutil_reader.go index 64bc9a7..a5bc4c1 100644 --- a/worker/utils/cpuutil_reader.go +++ b/worker/utils/cpuutil_reader.go @@ -20,11 +20,10 @@ func main() { fmt.Printf("Errors starting cpuReader. Error: %v, iftop output: %s", err, string(out)) } else { ps_output := string(out) - //lines := strings.Split(ps_output, "\n") fmt.Printf("ps output: %s\n", ps_output) re := regexp.MustCompile(".[0-9]") // CPU util is a float, so match all the digits and "." fmt.Println("FindAllString", re.FindAllString(ps_output, -1)) r := strings.Join(re.FindAllString(ps_output, -1), "") fmt.Printf("CPU util: %s\n", r) } -} +} \ No newline at end of file