From 06793b959144830b4e515b16ab1b0d5503b04ecc Mon Sep 17 00:00:00 2001 From: Frederik Kriewitz Date: Wed, 13 Mar 2024 17:22:18 +0000 Subject: [PATCH] add RTP/input/output stats --- data.c | 112 +++++++++++++++++++++++++++++++++++++++++++++++++ data.h | 43 +++++++++++++++++++ input.c | 46 +++++++++++++++----- output_write.c | 2 + 4 files changed, 193 insertions(+), 10 deletions(-) diff --git a/data.c b/data.c index 02e3ccc..99d38b7 100644 --- a/data.c +++ b/data.c @@ -25,6 +25,9 @@ #include #include #include +#include +#include +#include #include "libfuncs/io.h" #include "libfuncs/log.h" @@ -256,6 +259,11 @@ INPUT * input_new(const char *name, CHANNEL *channel) { r->sock = -1; r->channel = channel; + r->rtp_stats.last_sequence_number = -1; + r->traffic_stats.min.traffic = UINT64_MAX; + r->traffic_stats.min.kpbs = DBL_MAX; + r->traffic_stats.min.padding = DBL_MAX; + if (config->write_input_file) { if (asprintf(&tmp, "mptsd-input-%s.ts", channel->id) > 0) r->ifd = open(tmp, O_CREAT | O_WRONLY | O_TRUNC, 0644); @@ -294,6 +302,10 @@ OUTPUT *output_new() { OUTPUT *o = calloc(1, sizeof(OUTPUT)); o->obuf_ms = 100; + o->traffic_stats.min.traffic = UINT64_MAX; + o->traffic_stats.min.kpbs = DBL_MAX; + o->traffic_stats.min.padding = DBL_MAX; + o->psibuf = cbuf_init(50 * 1316, "psi"); if (!o->psibuf) { LOGf("ERROR: Can't allocate PSI input buffer\n"); @@ -434,9 +446,109 @@ void proxy_log(INPUT *r, char *msg) { LOGf("INPUT : [%-12s] %s fd: %d src: %s\n", r->channel->id, msg, r->sock, r->channel->source); } +void proxy_logf(INPUT *r, const char *fmt, ...) { + char msg[1024]; + va_list args; + va_start(args, fmt); + vsnprintf(msg, sizeof(msg) - 1, fmt, args); + va_end(args); + msg[sizeof(msg) - 1] = '\0'; + + proxy_log(r, msg); +} + void proxy_close(LIST *inputs, INPUT **input) { proxy_log(*input, "Stop"); // If there are no clients left, no "Timeout" messages will be logged list_del_entry(inputs, *input); input_free(input); } + +ssize_t parse_rtp(const uint8_t *buf, size_t len, RTP_HEADER *rtp_header) { + size_t rtp_length = RTP_HEADER_SIZE; + + if (len < RTP_HEADER_SIZE) + return -1; + + rtp_header->version = buf[0] >> 6; + rtp_header->padding = !!((buf[0]) & (1 << 5)); + rtp_header->extension = !!((buf[0]) & (1 << 4)); + rtp_header->cc = buf[0] & 0xFF; + + rtp_header->marker = !!((buf[1]) & (1 << 8)); + rtp_header->payload_type = buf[1] & 0x7f; + + // Sequence number + rtp_header->sequence_number = (buf[2] << 8) | (buf[3]); + + // Timestamp + rtp_header->timestamp = + (buf[4] << 24) | (buf[5] << 16) | (buf[6] << 8) | (buf[7]); + + // SSRC identifier + rtp_header->ssrc = + (buf[8] << 24) | (buf[9] << 16) | (buf[10] << 8) | (buf[11]); + + rtp_length += rtp_header->cc * 4; + + return rtp_length; +} + +ssize_t handle_rtp_input(uint8_t *buf, size_t len, INPUT *input) { + RTP_HEADER rtp_header; + ssize_t rtp_length = parse_rtp(buf, len, &rtp_header); + + RTP_STATS *stats = &input->rtp_stats; + + stats->packets_received++; + if (stats->last_sequence_number >= 0) { + uint16_t expected_sequence_number = stats->last_sequence_number + 1; + if (rtp_header.sequence_number != expected_sequence_number) { + int32_t lost_packets = + rtp_header.sequence_number - expected_sequence_number; + // counter wrapped + if (lost_packets < 0) { + lost_packets += 0xFFFF; + } + proxy_logf( + input, + "RTP packets lost at sequence number %u: %i (total: %lu)", + rtp_header.sequence_number, lost_packets, stats->packets_lost); + + // if the sequence number is 1, assume that the + // source was restarted (don't update packet_lost + // counter) + if (rtp_header.sequence_number != 1) { + stats->packets_lost += lost_packets; + } + } + } + stats->last_sequence_number = rtp_header.sequence_number; + stats->ssrc = rtp_header.ssrc; + + return rtp_length; +} + +void update_traffic_stats(TRAFFIC_STATS *stats, double kbps, double padding, + uint64_t traffic) { + // last + stats->last.kpbs = kbps; + stats->last.padding = padding; + stats->last.traffic = traffic; + + // min + if (kbps < stats->min.kpbs) + stats->min.kpbs = kbps; + if (padding < stats->min.padding) + stats->min.padding = padding; + if (traffic < stats->min.traffic) + stats->min.traffic = traffic; + + // max + if (kbps > stats->max.kpbs) + stats->max.kpbs = kbps; + if (padding > stats->max.padding) + stats->max.padding = padding; + if (traffic > stats->max.traffic) + stats->max.traffic = traffic; +} \ No newline at end of file diff --git a/data.h b/data.h index 6ca8335..97eca9c 100644 --- a/data.h +++ b/data.h @@ -35,6 +35,37 @@ #include "pidref.h" +typedef struct { + uint8_t version : 2; + uint8_t padding : 1; + uint8_t extension : 1; + uint8_t cc : 4; + uint8_t marker : 1; + uint8_t payload_type : 7; + uint16_t sequence_number; + uint32_t timestamp; + uint32_t ssrc; +} RTP_HEADER; + +typedef struct { + int32_t last_sequence_number; + uint32_t ssrc; + uint64_t packets_received; + uint64_t packets_lost; +} RTP_STATS; + +typedef struct { + double kpbs; + double padding; + uint64_t traffic; +} TRAFFIC_STATS_ENTRY; + +typedef struct { + TRAFFIC_STATS_ENTRY min; + TRAFFIC_STATS_ENTRY max; + TRAFFIC_STATS_ENTRY last; +} TRAFFIC_STATS; + typedef enum { udp_sock, tcp_sock } channel_source; typedef struct { @@ -134,6 +165,12 @@ typedef struct { int cookie; /* Used in chanconf to determine if the restreamer is alrady checked */ int ifd; + uint64_t traffic; + uint64_t traffic_period; + uint64_t padding_period; + TRAFFIC_STATS traffic_stats; + RTP_STATS rtp_stats; + pthread_t thread; uint16_t output_pcr_pid; @@ -186,6 +223,8 @@ typedef struct { uint64_t traffic_period; uint64_t padding_period; + TRAFFIC_STATS traffic_stats; + uint8_t pid_pat_cont:4; uint8_t pid_nit_cont:4; uint8_t pid_sdt_cont:4; @@ -261,4 +300,8 @@ void nit_free (NIT **nit); void proxy_log (INPUT *r, char *msg); void proxy_close (LIST *inputs, INPUT **input); +ssize_t handle_rtp_input (uint8_t *buf, size_t len, INPUT *input); +ssize_t parse_rtp (const uint8_t *buf, size_t len, RTP_HEADER *rtp_header); + +void update_traffic_stats (TRAFFIC_STATS *stats, double kbps, double padding, uint64_t traffic); #endif diff --git a/input.c b/input.c index f6d55d7..1610a4e 100644 --- a/input.c +++ b/input.c @@ -21,6 +21,7 @@ #include #include #include +#include #include "libfuncs/io.h" #include "libfuncs/log.h" @@ -376,8 +377,8 @@ int in_worktime(int start, int end) { void * input_stream(void *self) { INPUT *r = self; INPUT_STREAM *s = &r->stream; + struct timeval stats_ts, now; char buffer[RTP_HEADER_SIZE + FRAME_PACKET_SIZE]; - char *buf = buffer + RTP_HEADER_SIZE; signal(SIGPIPE, SIG_IGN); @@ -387,6 +388,7 @@ void * input_stream(void *self) { proxy_log(r, "Worktime has not yet begin, sleeping."); int http_code = 0; + gettimeofday(&stats_ts, NULL); while (keep_going) { if (input_check_state(r) == 2) // r->dienow is on goto QUIT; @@ -416,12 +418,15 @@ void * input_stream(void *self) { } ssize_t readen; + ssize_t rtp_length; + int i = 0; int max_zero_reads = MAX_ZERO_READS; // Reset all stream parameters on reconnect. input_stream_reset(r); for (;;) { + gettimeofday(&now, NULL); r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end); if (!r->working) { proxy_log(r, "Worktime ended."); @@ -434,14 +439,16 @@ void * input_stream(void *self) { } if (sproto == tcp_sock) { - readen = fdread_ex(r->sock, buf, FRAME_PACKET_SIZE, TCP_READ_TIMEOUT, TCP_READ_RETRIES, 1); + readen = fdread_ex(r->sock, buffer, FRAME_PACKET_SIZE, TCP_READ_TIMEOUT, TCP_READ_RETRIES, 1); } else { - if (!rtp) { - readen = fdread_ex(r->sock, buf, FRAME_PACKET_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0); - } else { + if (!rtp) { // plain UDP + readen = fdread_ex(r->sock, buffer, FRAME_PACKET_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0); + } else { // RTP readen = fdread_ex(r->sock, buffer, FRAME_PACKET_SIZE + RTP_HEADER_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0); - if (readen > RTP_HEADER_SIZE) - readen -= RTP_HEADER_SIZE; + if (readen > RTP_HEADER_SIZE) { + rtp_length = handle_rtp_input((uint8_t *)buffer, readen, r); + i = rtp_length; // skip RTP header during TS reading + } } } @@ -456,14 +463,20 @@ void * input_stream(void *self) { continue; } - int i; - for (i=0; itraffic += readen - i; + r->traffic_period += readen - i; + + for (; idienow) goto QUIT; - uint8_t *ts_packet = (uint8_t *)buf + i; + uint8_t *ts_packet = (uint8_t *)buffer + i; uint16_t pid = ts_packet_get_pid(ts_packet); + if(pid == 0x1FFF) { // NULL packets (Stuffing) + r->padding_period += 188; + } + int pat_result = process_pat(r, pid, ts_packet); if (pat_result == -2) goto RECONNECT; @@ -505,6 +518,19 @@ void * input_stream(void *self) { } } + // stats + unsigned long long stats_interval = timeval_diff_msec(&stats_ts, &now); + if (stats_interval > config->timeouts.stats) { + stats_ts = now; + double kbps = (double)(r->traffic_period * 8) / 1000; + double padding = ((double)r->padding_period / r->traffic_period) * 100; + + update_traffic_stats(&r->traffic_stats, kbps, padding, r->traffic_period); + + r->traffic_period = 0; + r->padding_period = 0; + } + max_zero_reads = MAX_ZERO_READS; } proxy_log(r, "fdread timeout"); diff --git a/output_write.c b/output_write.c index 5337080..ddfeaa9 100644 --- a/output_write.c +++ b/output_write.c @@ -207,6 +207,8 @@ void * output_handle_write(void *_config) { double out_mbps = (double)out_kbps / 1000; double opadding = ((double)o->padding_period / o->traffic_period) * 100; + update_traffic_stats(&o->traffic_stats, out_kbps, opadding, o->traffic_period); + if (!conf->quiet) { LOGf("STAT : Pad:%6.2f%% Traf:%5.2f Mbps | %8.2f | %7" PRIu64 "\n", opadding,