Skip to content

Commit

Permalink
[Edge] Implement custom connection for edgesrc and edgesink
Browse files Browse the repository at this point in the history
 - Implement custom connection for edgesrc and edgesink
 - Enable the tests

Signed-off-by: gichan2-jang <[email protected]>
  • Loading branch information
gichan-jang committed Sep 24, 2024
1 parent 6013b83 commit 92de5fb
Show file tree
Hide file tree
Showing 10 changed files with 343 additions and 28 deletions.
4 changes: 2 additions & 2 deletions debian/rules
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ export NNSTREAMER_DECODERS=${NNSTREAMER_BUILD_ROOT_PATH}/ext/nnstreamer/tensor_d
export NNSTREAMER_CONVERTERS=${NNSTREAMER_BUILD_ROOT_PATH}/ext/nnstreamer/tensor_converter
export NNSTREAMER_TRAINERS=${NNSTREAMER_BUILD_ROOT_PATH}/ext/nnstreamer/tensor_trainer
export PYTHONIOENCODING=utf-8

ifeq ($(DEB_BUILD_ARCH_CPU), arm)
FLOAT16 := -Denable-float16=true
endif
Expand Down Expand Up @@ -95,4 +94,5 @@ else
endif

override_dh_shlibdeps:
dh_shlibdeps --exclude=libtensorflow2-lite-custom.so
dh_shlibdeps --exclude=libtensorflow2-lite-custom.so -l${NNSTREAMER_BUILD_ROOT_PATH}/tests/nnstreamer_edge

2 changes: 2 additions & 0 deletions gst/edge/edge_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ gst_edge_get_connect_type (void)
"Connect with MQTT brokers and directly sending stream frames via TCP connections."},
{NNS_EDGE_CONNECT_TYPE_MQTT, "MQTT",
"Sending stream frames via MQTT connections."},
{NNS_EDGE_CONNECT_TYPE_CUSTOM, "CUSTOM",
"Sending stream frames via CUSTOM connections."},
{0, NULL, NULL},
};
protocol = g_enum_register_static ("edge_protocol", protocols);
Expand Down
50 changes: 47 additions & 3 deletions gst/edge/edge_sink.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ enum
PROP_TOPIC,
PROP_WAIT_CONNECTION,
PROP_CONNECTION_TIMEOUT,
PROP_CUSTOM_LIB,

PROP_LAST
};
Expand All @@ -60,6 +61,7 @@ static void gst_edgesink_get_property (GObject * object,
static void gst_edgesink_finalize (GObject * object);

static gboolean gst_edgesink_start (GstBaseSink * basesink);
static gboolean gst_edgesink_stop (GstBaseSink * basesink);
static GstFlowReturn gst_edgesink_render (GstBaseSink * basesink,
GstBuffer * buffer);
static gboolean gst_edgesink_set_caps (GstBaseSink * basesink, GstCaps * caps);
Expand Down Expand Up @@ -128,6 +130,10 @@ gst_edgesink_class_init (GstEdgeSinkClass * klass)
"The timeout (in milliseconds) for waiting a connection to receiver. "
"0 timeout (default) means infinite wait.", 0, G_MAXUINT64, 0,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_CUSTOM_LIB,
g_param_spec_string ("custom-lib", "Custom connection lib path",
"User defined custom connection lib path.",
"", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&sinktemplate));
Expand All @@ -137,6 +143,7 @@ gst_edgesink_class_init (GstEdgeSinkClass * klass)
"Publish incoming streams", "Samsung Electronics Co., Ltd.");

gstbasesink_class->start = gst_edgesink_start;
gstbasesink_class->stop = gst_edgesink_stop;
gstbasesink_class->render = gst_edgesink_render;
gstbasesink_class->set_caps = gst_edgesink_set_caps;

Expand All @@ -158,6 +165,7 @@ gst_edgesink_init (GstEdgeSink * self)
self->connect_type = DEFAULT_CONNECT_TYPE;
self->wait_connection = FALSE;
self->connection_timeout = 0;
self->custom_lib = NULL;
}

/**
Expand Down Expand Up @@ -204,6 +212,10 @@ gst_edgesink_set_property (GObject * object, guint prop_id,
case PROP_CONNECTION_TIMEOUT:
self->connection_timeout = g_value_get_uint64 (value);
break;
case PROP_CUSTOM_LIB:
g_free (self->custom_lib);
self->custom_lib = g_value_dup_string (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
Expand Down Expand Up @@ -244,6 +256,9 @@ gst_edgesink_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_CONNECTION_TIMEOUT:
g_value_set_uint64 (value, self->connection_timeout);
break;
case PROP_CUSTOM_LIB:
g_value_set_string (value, self->custom_lib);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
Expand All @@ -267,6 +282,9 @@ gst_edgesink_finalize (GObject * object)
g_free (self->topic);
self->topic = NULL;

g_free (self->custom_lib);
self->custom_lib = NULL;

if (self->edge_h) {
nns_edge_release_handle (self->edge_h);
self->edge_h = NULL;
Expand All @@ -286,9 +304,17 @@ gst_edgesink_start (GstBaseSink * basesink)
int ret;
char *port = NULL;

ret =
nns_edge_create_handle (NULL, self->connect_type,
NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
if (NNS_EDGE_CONNECT_TYPE_CUSTOM != self->connect_type) {
ret = nns_edge_create_handle (NULL, self->connect_type,
NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
} else {
if (!self->custom_lib) {
nns_loge ("Failed to start edgesink. Custom library is not set.");
return FALSE;
}
ret = nns_edge_custom_create_handle (NULL, self->custom_lib,
NNS_EDGE_NODE_TYPE_PUB, &self->edge_h);
}

if (NNS_EDGE_ERROR_NONE != ret) {
nns_loge ("Failed to get nnstreamer edge handle.");
Expand Down Expand Up @@ -350,6 +376,24 @@ gst_edgesink_start (GstBaseSink * basesink)
return TRUE;
}

/**
* @brief Stop processing of edgesink
*/
static gboolean
gst_edgesink_stop (GstBaseSink * basesink)
{
GstEdgeSink *self = GST_EDGESINK (basesink);
int ret;

ret = nns_edge_stop (self->edge_h);
if (NNS_EDGE_ERROR_NONE != ret) {
nns_loge ("Failed to stop edge. error code(%d)", ret);
return FALSE;
}

return TRUE;
}

/**
* @brief render buffer, send buffer
*/
Expand Down
2 changes: 2 additions & 0 deletions gst/edge/edge_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ struct _GstEdgeSink
nns_edge_h edge_h;
gboolean wait_connection;
guint64 connection_timeout;

gchar *custom_lib;
};

/**
Expand Down
51 changes: 49 additions & 2 deletions gst/edge/edge_src.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ enum
PROP_DEST_PORT,
PROP_CONNECT_TYPE,
PROP_TOPIC,
PROP_CUSTOM_LIB,

PROP_LAST
};
Expand All @@ -51,6 +52,7 @@ static void gst_edgesrc_get_property (GObject * object, guint prop_id,
static void gst_edgesrc_class_finalize (GObject * object);

static gboolean gst_edgesrc_start (GstBaseSrc * basesrc);
static gboolean gst_edgesrc_stop (GstBaseSrc * basesrc);
static GstFlowReturn gst_edgesrc_create (GstBaseSrc * basesrc, guint64 offset,
guint size, GstBuffer ** out_buf);

Expand Down Expand Up @@ -109,6 +111,10 @@ gst_edgesrc_class_init (GstEdgeSrcClass * klass)
"The main topic of the host and option if necessary. "
"(topic)/(optional topic for main topic).", "",
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_CUSTOM_LIB,
g_param_spec_string ("custom-lib", "Custom connection lib path",
"User defined custom connection lib path.",
"", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));

gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&srctemplate));
Expand All @@ -118,6 +124,7 @@ gst_edgesrc_class_init (GstEdgeSrcClass * klass)
"Subscribe and push incoming streams", "Samsung Electronics Co., Ltd.");

gstbasesrc_class->start = gst_edgesrc_start;
gstbasesrc_class->stop = gst_edgesrc_stop;
gstbasesrc_class->create = gst_edgesrc_create;
gstelement_class->change_state = gst_edgesrc_change_state;

Expand All @@ -142,6 +149,7 @@ gst_edgesrc_init (GstEdgeSrc * self)
self->msg_queue = g_async_queue_new ();
self->connect_type = DEFAULT_CONNECT_TYPE;
self->playing = FALSE;
self->custom_lib = NULL;
}

/**
Expand Down Expand Up @@ -177,6 +185,10 @@ gst_edgesrc_set_property (GObject * object, guint prop_id, const GValue * value,
g_free (self->topic);
self->topic = g_value_dup_string (value);
break;
case PROP_CUSTOM_LIB:
g_free (self->custom_lib);
self->custom_lib = g_value_dup_string (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
Expand Down Expand Up @@ -211,6 +223,9 @@ gst_edgesrc_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_TOPIC:
g_value_set_string (value, self->topic);
break;
case PROP_CUSTOM_LIB:
g_value_set_string (value, self->custom_lib);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
Expand All @@ -233,6 +248,9 @@ gst_edgesrc_class_finalize (GObject * object)
g_free (self->topic);
self->topic = NULL;

g_free (self->custom_lib);
self->custom_lib = NULL;

if (self->msg_queue) {
while ((data_h = g_async_queue_try_pop (self->msg_queue))) {
nns_edge_data_destroy (data_h);
Expand Down Expand Up @@ -290,6 +308,7 @@ _nns_edge_event_cb (nns_edge_event_h event_h, void *user_data)
int ret = NNS_EDGE_ERROR_NONE;

GstEdgeSrc *self = GST_EDGESRC (user_data);

if (0 != nns_edge_event_get_type (event_h, &event_type)) {
nns_loge ("Failed to get event type!");
return NNS_EDGE_ERROR_UNKNOWN;
Expand Down Expand Up @@ -327,9 +346,17 @@ gst_edgesrc_start (GstBaseSrc * basesrc)
int ret;
char *port = NULL;

ret =
nns_edge_create_handle (NULL, self->connect_type,
if (NNS_EDGE_CONNECT_TYPE_CUSTOM != self->connect_type) {
ret = nns_edge_create_handle (NULL, self->connect_type,
NNS_EDGE_NODE_TYPE_SUB, &self->edge_h);
} else {
if (!self->custom_lib) {
nns_loge ("Failed to create custom handle. custom-lib path is not set.");
return FALSE;
}
ret = nns_edge_custom_create_handle (NULL, self->custom_lib,
NNS_EDGE_NODE_TYPE_SUB, &self->edge_h);
}

if (NNS_EDGE_ERROR_NONE != ret) {
nns_loge ("Failed to get nnstreamer edge handle.");
Expand Down Expand Up @@ -369,6 +396,26 @@ gst_edgesrc_start (GstBaseSrc * basesrc)
return TRUE;
}

/**
* @brief Stop edgesrc, called when state changed ready to null
*/
static gboolean
gst_edgesrc_stop (GstBaseSrc * basesrc)
{
GstEdgeSrc *self = GST_EDGESRC (basesrc);
int ret;

self->playing = FALSE;
ret = nns_edge_stop (self->edge_h);

if (NNS_EDGE_ERROR_NONE != ret) {
nns_loge ("Failed to stop edgesrc. error code(%d)", ret);
return FALSE;
}

return TRUE;
}

/**
* @brief Create a buffer containing the subscribed data
*/
Expand Down
2 changes: 2 additions & 0 deletions gst/edge/edge_src.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ struct _GstEdgeSrc
GAsyncQueue *msg_queue;

gboolean playing;

gchar* custom_lib;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion packaging/nnstreamer.spec
Original file line number Diff line number Diff line change
Expand Up @@ -989,7 +989,7 @@ export ORC_DEBUG=2
bash %{test_script} ./tests/nnstreamer_datarepo
%endif
%if 0%{?nnstreamer_edge_support}
bash %{test_script} ./tests/nnstreamer_edge
LD_LIBRARY_PATH=./tests/nnstreamer_edge bash %{test_script} ./tests/nnstreamer_edge
%endif
%if 0%{mvncsdk2_support}
LD_LIBRARY_PATH=${NNSTREAMER_BUILD_ROOT_PATH}/tests/nnstreamer_filter_mvncsdk2:. bash %{test_script} ./tests/nnstreamer_filter_mvncsdk2/unittest_filter_mvncsdk2
Expand Down
21 changes: 1 addition & 20 deletions tests/nnstreamer_edge/edge/unittest_edge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include "unittest_util.h"

static int data_received;
static const char *CUSTOM_LIB_PATH = "./libnnstreamer-edge-custom-test.so";
static const char *CUSTOM_LIB_PATH = "libnnstreamer-edge-custom-test.so";

Check warning on line 19 in tests/nnstreamer_edge/edge/unittest_edge.cc

View workflow job for this annotation

GitHub Actions / CPP-Linter

tests/nnstreamer_edge/edge/unittest_edge.cc:19:20 [cppcoreguidelines-avoid-non-const-global-variables]

variable 'CUSTOM_LIB_PATH' is non-const and globally accessible, consider making it const

/**
* @brief Test for edgesink get and set properties.
Expand Down Expand Up @@ -181,7 +181,6 @@ new_data_cb (GstElement *element, GstBuffer *buffer, gpointer user_data)
gint *output, i;
gboolean ret;

g_critical ("[DEBUG] NEW DATA RECEIVED!");
data_received++;
mem_res = gst_buffer_get_memory (buffer, 0);
ret = gst_memory_map (mem_res, &info_res, GST_MAP_READ);
Expand Down Expand Up @@ -275,9 +274,6 @@ TEST (edgeSinkSrc, runNormal)
*/
TEST (edgeCustom, sinkNormal)
{
/** @todo TDD: Enable this test later. */
GTEST_SKIP ();

gchar *pipeline = nullptr;
GstElement *gstpipe = nullptr;

Expand Down Expand Up @@ -305,9 +301,6 @@ TEST (edgeCustom, sinkNormal)
*/
TEST (edgeCustom, sinkInvalidProp_n)
{
/** @todo TDD: Enable this test later. */
GTEST_SKIP ();

gchar *pipeline = nullptr;
GstElement *gstpipe = nullptr;

Expand All @@ -331,9 +324,6 @@ TEST (edgeCustom, sinkInvalidProp_n)
*/
TEST (edgeCustom, sinkInvalidProp2_n)
{
/** @todo TDD: Enable this test later. */
GTEST_SKIP ();

gchar *pipeline = nullptr;
GstElement *gstpipe = nullptr;

Expand All @@ -357,9 +347,6 @@ TEST (edgeCustom, sinkInvalidProp2_n)
*/
TEST (edgeCustom, srcNormal)
{
/** @todo TDD: Enable this test later. */
GTEST_SKIP ();

gchar *pipeline = nullptr;
GstElement *gstpipe = nullptr;

Expand All @@ -386,9 +373,6 @@ TEST (edgeCustom, srcNormal)
*/
TEST (edgeCustom, srcInvalidProp_n)
{
/** @todo TDD: Enable this test later. */
GTEST_SKIP ();

gchar *pipeline = nullptr;
GstElement *gstpipe = nullptr;

Expand All @@ -411,9 +395,6 @@ TEST (edgeCustom, srcInvalidProp_n)
*/
TEST (edgeCustom, srcInvalidProp2_n)
{
/** @todo TDD: Enable this test later. */
GTEST_SKIP ();

gchar *pipeline = nullptr;
GstElement *gstpipe = nullptr;

Expand Down
8 changes: 8 additions & 0 deletions tests/nnstreamer_edge/meson.build
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# Test lib for nnstreamer-edge custom connection
library('nnstreamer-edge-custom-test',
join_paths('nnstreamer-edge-custom-test.c'),
dependencies: edge_deps,
install: get_option('install-test'),
install_dir: unittest_install_dir
)

unittest_edge = executable('unittest_edge',
join_paths('edge', 'unittest_edge.cc'),
dependencies: [nnstreamer_unittest_deps],
Expand Down
Loading

0 comments on commit 92de5fb

Please sign in to comment.