Skip to content

Commit

Permalink
add RTP/input/output stats
Browse files Browse the repository at this point in the history
  • Loading branch information
freddy36 committed Mar 14, 2024
1 parent a2b6e4d commit 06793b9
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 10 deletions.
112 changes: 112 additions & 0 deletions data.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdarg.h>
#include <stdint.h>
#include <float.h>

#include "libfuncs/io.h"
#include "libfuncs/log.h"
Expand Down Expand Up @@ -256,6 +259,11 @@ INPUT * input_new(const char *name, CHANNEL *channel) {
r->sock = -1;
r->channel = channel;

r->rtp_stats.last_sequence_number = -1;
r->traffic_stats.min.traffic = UINT64_MAX;
r->traffic_stats.min.kpbs = DBL_MAX;
r->traffic_stats.min.padding = DBL_MAX;

if (config->write_input_file) {
if (asprintf(&tmp, "mptsd-input-%s.ts", channel->id) > 0)
r->ifd = open(tmp, O_CREAT | O_WRONLY | O_TRUNC, 0644);
Expand Down Expand Up @@ -294,6 +302,10 @@ OUTPUT *output_new() {
OUTPUT *o = calloc(1, sizeof(OUTPUT));
o->obuf_ms = 100;

o->traffic_stats.min.traffic = UINT64_MAX;
o->traffic_stats.min.kpbs = DBL_MAX;
o->traffic_stats.min.padding = DBL_MAX;

o->psibuf = cbuf_init(50 * 1316, "psi");
if (!o->psibuf) {
LOGf("ERROR: Can't allocate PSI input buffer\n");
Expand Down Expand Up @@ -434,9 +446,109 @@ void proxy_log(INPUT *r, char *msg) {
LOGf("INPUT : [%-12s] %s fd: %d src: %s\n", r->channel->id, msg, r->sock, r->channel->source);
}

void proxy_logf(INPUT *r, const char *fmt, ...) {
char msg[1024];
va_list args;
va_start(args, fmt);
vsnprintf(msg, sizeof(msg) - 1, fmt, args);
va_end(args);
msg[sizeof(msg) - 1] = '\0';

proxy_log(r, msg);
}

void proxy_close(LIST *inputs, INPUT **input) {
proxy_log(*input, "Stop");
// If there are no clients left, no "Timeout" messages will be logged
list_del_entry(inputs, *input);
input_free(input);
}

ssize_t parse_rtp(const uint8_t *buf, size_t len, RTP_HEADER *rtp_header) {
size_t rtp_length = RTP_HEADER_SIZE;

if (len < RTP_HEADER_SIZE)
return -1;

rtp_header->version = buf[0] >> 6;
rtp_header->padding = !!((buf[0]) & (1 << 5));
rtp_header->extension = !!((buf[0]) & (1 << 4));
rtp_header->cc = buf[0] & 0xFF;

rtp_header->marker = !!((buf[1]) & (1 << 8));
rtp_header->payload_type = buf[1] & 0x7f;

// Sequence number
rtp_header->sequence_number = (buf[2] << 8) | (buf[3]);

// Timestamp
rtp_header->timestamp =
(buf[4] << 24) | (buf[5] << 16) | (buf[6] << 8) | (buf[7]);

// SSRC identifier
rtp_header->ssrc =
(buf[8] << 24) | (buf[9] << 16) | (buf[10] << 8) | (buf[11]);

rtp_length += rtp_header->cc * 4;

return rtp_length;
}

ssize_t handle_rtp_input(uint8_t *buf, size_t len, INPUT *input) {
RTP_HEADER rtp_header;
ssize_t rtp_length = parse_rtp(buf, len, &rtp_header);

RTP_STATS *stats = &input->rtp_stats;

stats->packets_received++;
if (stats->last_sequence_number >= 0) {
uint16_t expected_sequence_number = stats->last_sequence_number + 1;
if (rtp_header.sequence_number != expected_sequence_number) {
int32_t lost_packets =
rtp_header.sequence_number - expected_sequence_number;
// counter wrapped
if (lost_packets < 0) {
lost_packets += 0xFFFF;
}
proxy_logf(
input,
"RTP packets lost at sequence number %u: %i (total: %lu)",
rtp_header.sequence_number, lost_packets, stats->packets_lost);

// if the sequence number is 1, assume that the
// source was restarted (don't update packet_lost
// counter)
if (rtp_header.sequence_number != 1) {
stats->packets_lost += lost_packets;
}
}
}
stats->last_sequence_number = rtp_header.sequence_number;
stats->ssrc = rtp_header.ssrc;

return rtp_length;
}

void update_traffic_stats(TRAFFIC_STATS *stats, double kbps, double padding,
uint64_t traffic) {
// last
stats->last.kpbs = kbps;
stats->last.padding = padding;
stats->last.traffic = traffic;

// min
if (kbps < stats->min.kpbs)
stats->min.kpbs = kbps;
if (padding < stats->min.padding)
stats->min.padding = padding;
if (traffic < stats->min.traffic)
stats->min.traffic = traffic;

// max
if (kbps > stats->max.kpbs)
stats->max.kpbs = kbps;
if (padding > stats->max.padding)
stats->max.padding = padding;
if (traffic > stats->max.traffic)
stats->max.traffic = traffic;
}
43 changes: 43 additions & 0 deletions data.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,37 @@

#include "pidref.h"

typedef struct {
uint8_t version : 2;
uint8_t padding : 1;
uint8_t extension : 1;
uint8_t cc : 4;
uint8_t marker : 1;
uint8_t payload_type : 7;
uint16_t sequence_number;
uint32_t timestamp;
uint32_t ssrc;
} RTP_HEADER;

typedef struct {
int32_t last_sequence_number;
uint32_t ssrc;
uint64_t packets_received;
uint64_t packets_lost;
} RTP_STATS;

typedef struct {
double kpbs;
double padding;
uint64_t traffic;
} TRAFFIC_STATS_ENTRY;

typedef struct {
TRAFFIC_STATS_ENTRY min;
TRAFFIC_STATS_ENTRY max;
TRAFFIC_STATS_ENTRY last;
} TRAFFIC_STATS;

typedef enum { udp_sock, tcp_sock } channel_source;

typedef struct {
Expand Down Expand Up @@ -134,6 +165,12 @@ typedef struct {
int cookie; /* Used in chanconf to determine if the restreamer is alrady checked */
int ifd;

uint64_t traffic;
uint64_t traffic_period;
uint64_t padding_period;
TRAFFIC_STATS traffic_stats;
RTP_STATS rtp_stats;

pthread_t thread;

uint16_t output_pcr_pid;
Expand Down Expand Up @@ -186,6 +223,8 @@ typedef struct {
uint64_t traffic_period;
uint64_t padding_period;

TRAFFIC_STATS traffic_stats;

uint8_t pid_pat_cont:4;
uint8_t pid_nit_cont:4;
uint8_t pid_sdt_cont:4;
Expand Down Expand Up @@ -261,4 +300,8 @@ void nit_free (NIT **nit);
void proxy_log (INPUT *r, char *msg);
void proxy_close (LIST *inputs, INPUT **input);

ssize_t handle_rtp_input (uint8_t *buf, size_t len, INPUT *input);
ssize_t parse_rtp (const uint8_t *buf, size_t len, RTP_HEADER *rtp_header);

void update_traffic_stats (TRAFFIC_STATS *stats, double kbps, double padding, uint64_t traffic);
#endif
46 changes: 36 additions & 10 deletions input.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <unistd.h>
#include <signal.h>
#include <string.h>
#include <inttypes.h>

#include "libfuncs/io.h"
#include "libfuncs/log.h"
Expand Down Expand Up @@ -376,8 +377,8 @@ int in_worktime(int start, int end) {
void * input_stream(void *self) {
INPUT *r = self;
INPUT_STREAM *s = &r->stream;
struct timeval stats_ts, now;
char buffer[RTP_HEADER_SIZE + FRAME_PACKET_SIZE];
char *buf = buffer + RTP_HEADER_SIZE;

signal(SIGPIPE, SIG_IGN);

Expand All @@ -387,6 +388,7 @@ void * input_stream(void *self) {
proxy_log(r, "Worktime has not yet begin, sleeping.");

int http_code = 0;
gettimeofday(&stats_ts, NULL);
while (keep_going) {
if (input_check_state(r) == 2) // r->dienow is on
goto QUIT;
Expand Down Expand Up @@ -416,12 +418,15 @@ void * input_stream(void *self) {
}

ssize_t readen;
ssize_t rtp_length;
int i = 0;
int max_zero_reads = MAX_ZERO_READS;

// Reset all stream parameters on reconnect.
input_stream_reset(r);

for (;;) {
gettimeofday(&now, NULL);
r->working = in_worktime(r->channel->worktime_start, r->channel->worktime_end);
if (!r->working) {
proxy_log(r, "Worktime ended.");
Expand All @@ -434,14 +439,16 @@ void * input_stream(void *self) {
}

if (sproto == tcp_sock) {
readen = fdread_ex(r->sock, buf, FRAME_PACKET_SIZE, TCP_READ_TIMEOUT, TCP_READ_RETRIES, 1);
readen = fdread_ex(r->sock, buffer, FRAME_PACKET_SIZE, TCP_READ_TIMEOUT, TCP_READ_RETRIES, 1);
} else {
if (!rtp) {
readen = fdread_ex(r->sock, buf, FRAME_PACKET_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0);
} else {
if (!rtp) { // plain UDP
readen = fdread_ex(r->sock, buffer, FRAME_PACKET_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0);
} else { // RTP
readen = fdread_ex(r->sock, buffer, FRAME_PACKET_SIZE + RTP_HEADER_SIZE, UDP_READ_TIMEOUT, UDP_READ_RETRIES, 0);
if (readen > RTP_HEADER_SIZE)
readen -= RTP_HEADER_SIZE;
if (readen > RTP_HEADER_SIZE) {
rtp_length = handle_rtp_input((uint8_t *)buffer, readen, r);
i = rtp_length; // skip RTP header during TS reading
}
}
}

Expand All @@ -456,14 +463,20 @@ void * input_stream(void *self) {
continue;
}

int i;
for (i=0; i<readen; i+=188) {
r->traffic += readen - i;
r->traffic_period += readen - i;

for (; i<readen; i+=188) {

if (r->dienow)
goto QUIT;
uint8_t *ts_packet = (uint8_t *)buf + i;
uint8_t *ts_packet = (uint8_t *)buffer + i;
uint16_t pid = ts_packet_get_pid(ts_packet);

if(pid == 0x1FFF) { // NULL packets (Stuffing)
r->padding_period += 188;
}

int pat_result = process_pat(r, pid, ts_packet);
if (pat_result == -2)
goto RECONNECT;
Expand Down Expand Up @@ -505,6 +518,19 @@ void * input_stream(void *self) {
}
}

// stats
unsigned long long stats_interval = timeval_diff_msec(&stats_ts, &now);
if (stats_interval > config->timeouts.stats) {
stats_ts = now;
double kbps = (double)(r->traffic_period * 8) / 1000;
double padding = ((double)r->padding_period / r->traffic_period) * 100;

update_traffic_stats(&r->traffic_stats, kbps, padding, r->traffic_period);

r->traffic_period = 0;
r->padding_period = 0;
}

max_zero_reads = MAX_ZERO_READS;
}
proxy_log(r, "fdread timeout");
Expand Down
2 changes: 2 additions & 0 deletions output_write.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ void * output_handle_write(void *_config) {
double out_mbps = (double)out_kbps / 1000;
double opadding = ((double)o->padding_period / o->traffic_period) * 100;

update_traffic_stats(&o->traffic_stats, out_kbps, opadding, o->traffic_period);

if (!conf->quiet) {
LOGf("STAT : Pad:%6.2f%% Traf:%5.2f Mbps | %8.2f | %7" PRIu64 "\n",
opadding,
Expand Down

0 comments on commit 06793b9

Please sign in to comment.