Skip to content

Commit

Permalink
Forward query allocation and aggregate the results
Browse files Browse the repository at this point in the history
This will forward the query allocation when received
between the connected listeners and aggregates the
results of each listeners. This helps to indicate
the upstream elements which metas are supported.
For example it avoids v4l2src buffer copy when the
downstream elements doesn't report that video meta
is supported and the pitch is different than width.
  • Loading branch information
mmontero committed Oct 28, 2020
1 parent 2f9baec commit 1e2c41d
Show file tree
Hide file tree
Showing 4 changed files with 288 additions and 3 deletions.
15 changes: 15 additions & 0 deletions gst/interpipe/gstinterpipeilistener.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,21 @@ gst_inter_pipe_ilistener_push_event (GstInterPipeIListener * self,
return iface->push_event (self, event, basetime);
}

gboolean
gst_inter_pipe_ilistener_query (GstInterPipeIListener * self, GstQuery * query)
{
GstInterPipeIListenerInterface *iface;

g_return_val_if_fail (GST_INTER_PIPE_IS_ILISTENER (self), FALSE);
g_return_val_if_fail (query, FALSE);

iface = GST_INTER_PIPE_ILISTENER_GET_IFACE (self);
g_return_val_if_fail (iface->query != NULL, FALSE);

return iface->query (self, query);
}


gboolean
gst_inter_pipe_ilistener_send_eos (GstInterPipeIListener * self)
{
Expand Down
14 changes: 14 additions & 0 deletions gst/interpipe/gstinterpipeilistener.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ struct _GstInterPipeIListenerInterface
gboolean (* node_removed) (GstInterPipeIListener *iface, const gchar *node_removed);
gboolean (* push_buffer) (GstInterPipeIListener *iface, GstBuffer *buffer, guint64 basetime);
gboolean (* push_event) (GstInterPipeIListener *iface, GstEvent *event, guint64 basetime);
gboolean (* query) (GstInterPipeIListener *iface, GstQuery *query);
gboolean (* send_eos) (GstInterPipeIListener *iface);
};

Expand Down Expand Up @@ -193,6 +194,19 @@ gboolean gst_inter_pipe_ilistener_push_buffer (GstInterPipeIListener *iface,
gboolean gst_inter_pipe_ilistener_push_event (GstInterPipeIListener *iface,
GstEvent *event, guint64 basetime);


/**
* gst_inter_pipe_ilistener_query:
* @iface: (transfer none)(not nullable): The object that should query the #GstQuery downstream.
* @event: (transfer full)(not nullable): The #GstQuery to be pushed downstream.
*
* Ask @query to the downstream element.
*
* Return: True if the query was successful, False otherwise.
*/
gboolean gst_inter_pipe_ilistener_query (GstInterPipeIListener *iface,
GstQuery *query);

/**
* gst_inter_pipe_ilistener_send_eos:
* @iface: (transfer none)(not nullable): The object that should push the #GstEvent downstream.
Expand Down
231 changes: 228 additions & 3 deletions gst/interpipe/gstinterpipesink.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ static gboolean gst_inter_pipe_sink_set_caps (GstBaseSink * base,
GstCaps * filter);
static gboolean gst_inter_pipe_sink_event (GstBaseSink * base,
GstEvent * event);
static gboolean gst_inter_pipe_sink_propose_allocation (GstBaseSink * base,
GstQuery * query);
static gboolean gst_inter_pipe_sink_are_caps_compatible (GstInterPipeSink *
sink, GstCaps * listener_caps, GstCaps * sinkcaps);
static GstCaps *gst_inter_pipe_sink_caps_intersect (GstCaps * caps1,
Expand Down Expand Up @@ -174,7 +176,8 @@ gst_inter_pipe_sink_class_init (GstInterPipeSinkClass * klass)
basesink_class->get_caps = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_get_caps);
basesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_set_caps);
basesink_class->event = GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_event);

basesink_class->propose_allocation =
GST_DEBUG_FUNCPTR (gst_inter_pipe_sink_propose_allocation);
}

static void
Expand Down Expand Up @@ -587,6 +590,228 @@ gst_inter_pipe_sink_event (GstBaseSink * base, GstEvent * event)
event);
}


struct AllocQueryCtx
{
GstInterPipeSink *sink;
GstQuery *query;
GstAllocationParams params;
guint size;
guint min_buffers;
gboolean first_query;
guint num_listeners;
};


static gboolean
gst_inter_pipe_sink_forward_query_allocation (gpointer key, gpointer data,
gpointer user_data)
{
struct AllocQueryCtx *ctx;
GstInterPipeIListener *listener;
gchar *listener_name;
GstInterPipeSink *sink;
GstQuery *query;
GstCaps *caps;
gboolean ret = TRUE;
guint count, i, size, min;

listener = GST_INTER_PIPE_ILISTENER (data);
listener_name = (gchar *) key;
ctx = user_data;
sink = ctx->sink;

GST_DEBUG_OBJECT (sink, "Aggregating allocation from listener %s",
listener_name);

gst_query_parse_allocation (ctx->query, &caps, NULL);

query = gst_query_new_allocation (caps, FALSE);
if (!gst_inter_pipe_ilistener_query (listener, query)) {
GST_DEBUG_OBJECT (sink,
"Allocation query failed on listener %s, ignoring allocation",
listener_name);
ret = FALSE;
goto out;
}

/* Allocation Filter, extract of code from tee element */

/* Allocation Params:
* store the maximum alignment, prefix and padding, but ignore the
* allocators and the flags which are tied to downstream allocation*/
count = gst_query_get_n_allocation_params (query);
for (i = 0; i < count; i++) {
GstAllocationParams params = { 0, };

gst_query_parse_nth_allocation_param (query, i, NULL, &params);

GST_DEBUG_OBJECT (sink, "Aggregating AllocationParams align=%"
G_GSIZE_FORMAT " prefix=%" G_GSIZE_FORMAT " padding=%"
G_GSIZE_FORMAT, params.align, params.prefix, params.padding);

if (ctx->params.align < params.align)
ctx->params.align = params.align;

if (ctx->params.prefix < params.prefix)
ctx->params.prefix = params.prefix;

if (ctx->params.padding < params.padding)
ctx->params.padding = params.padding;
}

/* Allocation Pool:
* We want to keep the biggest size and biggest minimum number of buffers to
* make sure downstream requirement can be satisfied. We don't really care
* about the maximum, as this is a parameter of the downstream provided
* pool. We only read the first allocation pool as the minimum number of
* buffers is normally constant regardless of the pool being used. */
if (gst_query_get_n_allocation_pools (query) > 0) {
gst_query_parse_nth_allocation_pool (query, 0, NULL, &size, &min, NULL);

GST_DEBUG_OBJECT (sink,
"Aggregating allocation pool size=%u min_buffers=%u", size, min);

if (ctx->size < size)
ctx->size = size;

if (ctx->min_buffers < min)
ctx->min_buffers = min;
}

/* Allocation Meta:
* For allocation meta, we'll need to aggregate the argument using the new
* GstMetaInfo::agggregate_func */
count = gst_query_get_n_allocation_metas (query);
for (i = 0; i < count; i++) {
guint ctx_index;
GType api;
const GstStructure *param;

api = gst_query_parse_nth_allocation_meta (query, i, &param);

/* For the first query, copy all metas */
if (ctx->first_query) {
gst_query_add_allocation_meta (ctx->query, api, param);
continue;
}

/* Afterward, aggregate the common params */
if (gst_query_find_allocation_meta (ctx->query, api, &ctx_index)) {
const GstStructure *ctx_param;

gst_query_parse_nth_allocation_meta (ctx->query, ctx_index, &ctx_param);

/* Keep meta which has no params */
if (ctx_param == NULL && param == NULL)
continue;

GST_DEBUG_OBJECT (sink, "Dropping allocation meta %s", g_type_name (api));
gst_query_remove_nth_allocation_meta (ctx->query, ctx_index);
}
}

/* Finally, cleanup metas from the stored query that aren't support on this
* listener. */
count = gst_query_get_n_allocation_metas (ctx->query);
for (i = 0; i < count;) {
GType api = gst_query_parse_nth_allocation_meta (ctx->query, i, NULL);

if (!gst_query_find_allocation_meta (query, api, NULL)) {
GST_DEBUG_OBJECT (sink, "Dropping allocation meta %s", g_type_name (api));
gst_query_remove_nth_allocation_meta (ctx->query, i);
count--;
continue;
}

i++;
}

ctx->first_query = FALSE;
ctx->num_listeners++;

out:
gst_query_unref (query);
return ret;
}

static gboolean
gst_inter_pipe_sink_propose_allocation (GstBaseSink * base, GstQuery * query)
{
struct AllocQueryCtx ctx = { 0 };
GstInterPipeSink *sink;
GHashTable *listeners;
GHashTableIter iter;
gboolean ret = TRUE;
gpointer key, value;

sink = GST_INTER_PIPE_SINK (base);

g_mutex_lock (&sink->listeners_mutex);
listeners = GST_INTER_PIPE_SINK_LISTENERS (sink);

ctx.sink = sink;
ctx.query = query;
ctx.first_query = TRUE;
gst_allocation_params_init (&ctx.params);

g_hash_table_iter_init (&iter, listeners);

while (g_hash_table_iter_next (&iter, &key, &value)) {
ret |= gst_inter_pipe_sink_forward_query_allocation (key, value, &ctx);
}

if (ret) {
guint count = gst_query_get_n_allocation_metas (query);
guint i;

GST_DEBUG_OBJECT (sink,
"Final allocation parameters: align=%" G_GSIZE_FORMAT " prefix=%"
G_GSIZE_FORMAT " padding %" G_GSIZE_FORMAT, ctx.params.align,
ctx.params.prefix, ctx.params.padding);

GST_DEBUG_OBJECT (sink, "Final allocation pools: size=%u min_buffers=%u",
ctx.size, ctx.min_buffers);

GST_DEBUG_OBJECT (sink, "Final %u allocation meta:", count);

for (i = 0; i < count; i++) {
GST_DEBUG_OBJECT (sink, " + aggregated allocation meta %s",
g_type_name (gst_query_parse_nth_allocation_meta (ctx.query, i,
NULL)));
}

/* Allocate one more buffers when multiplexing so we don't starve the
* downstream threads. */
if (ctx.num_listeners > 1)
ctx.min_buffers++;

/* Check that we actually have parameters besides the defaults. */
if (ctx.params.align || ctx.params.prefix || ctx.params.padding) {
gst_query_add_allocation_param (ctx.query, NULL, &ctx.params);
}

/* When size == 0, buffers created from this pool would have no memory
* allocated. */
if (ctx.size) {
gst_query_add_allocation_pool (ctx.query, NULL, ctx.size,
ctx.min_buffers, 0);
}

} else {
guint count = gst_query_get_n_allocation_metas (query);
guint i;

for (i = 1; i <= count; i++) {
gst_query_remove_nth_allocation_meta (query, count - i);
}
}

g_mutex_unlock (&sink->listeners_mutex);

return ret;
}

/* Appsink Callbacks */
static void
gst_inter_pipe_sink_push_to_listener (gpointer key, gpointer data,
Expand Down Expand Up @@ -747,9 +972,9 @@ gst_inter_pipe_sink_add_listener (GstInterPipeINode * iface,
has_listeners = 0 != g_hash_table_size (listeners);

if (!sink->caps_negotiated && !has_listeners
&& !gst_caps_is_equal (srccaps, sinkcaps)) {
&& !gst_caps_is_equal (srccaps, sinkcaps)) {

if (!gst_pad_push_event (GST_INTER_PIPE_SINK_PAD (sink),
if (!gst_pad_push_event (GST_INTER_PIPE_SINK_PAD (sink),
gst_event_new_reconfigure ()))
goto reconfigure_event_error;

Expand Down
31 changes: 31 additions & 0 deletions gst/interpipe/gstinterpipesrc.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ static gboolean gst_inter_pipe_src_push_buffer (GstInterPipeIListener * iface,
static gboolean gst_inter_pipe_src_push_event (GstInterPipeIListener * iface,
GstEvent * event, guint64 basetime);
static gboolean gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface);
static gboolean gst_inter_pipe_src_push_query (GstInterPipeIListener * iface,
GstQuery * query);
static gboolean gst_inter_pipe_src_listen_node (GstInterPipeSrc * src,
const gchar * node_name);
static gboolean gst_inter_pipe_src_start (GstBaseSrc * base);
Expand Down Expand Up @@ -508,6 +510,7 @@ gst_inter_pipe_ilistener_init (GstInterPipeIListenerInterface * iface)
iface->set_caps = gst_inter_pipe_src_set_caps;
iface->push_buffer = gst_inter_pipe_src_push_buffer;
iface->push_event = gst_inter_pipe_src_push_event;
iface->query = gst_inter_pipe_src_push_query;
iface->send_eos = gst_inter_pipe_src_send_eos;
}

Expand Down Expand Up @@ -739,6 +742,7 @@ gst_inter_pipe_src_push_event (GstInterPipeIListener * iface, GstEvent * event,
}
}


static gboolean
gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface)
{
Expand All @@ -758,6 +762,33 @@ gst_inter_pipe_src_send_eos (GstInterPipeIListener * iface)
return TRUE;
}


static gboolean
gst_inter_pipe_src_push_query (GstInterPipeIListener * iface, GstQuery * query)
{
GstInterPipeSrc *src;
GstPad *srcpad;
GstPad *peerpad;
gboolean ret = TRUE;

src = GST_INTER_PIPE_SRC (iface);
srcpad = GST_INTER_PIPE_SRC_PAD (GST_APP_SRC (src));

peerpad = gst_pad_get_peer (srcpad);
if (!peerpad) {
ret = FALSE;
goto out;
}

ret = gst_pad_query (peerpad, query);

gst_object_unref (peerpad);

out:
return ret;
}


static gboolean
gst_inter_pipe_src_listen_node (GstInterPipeSrc * src, const gchar * node_name)
{
Expand Down

0 comments on commit 1e2c41d

Please sign in to comment.