diff --git a/example/python_example/README.md b/example/python_example/README.md new file mode 100644 index 0000000..0d2fe60 --- /dev/null +++ b/example/python_example/README.md @@ -0,0 +1,55 @@ +# Dynamic source switching with interpipes + +## Introduction + +In this example, we attempt to show dynamic switching of interpipes when an `rtsp` source goes down and a dummy source takes it's place. + +NOTE: Rtsp source is used here, but this can be replicated to any other source as well. + +## Usage + +1. The following command is used to run the pipeline: + + ```bash + python3 test_pipeline.py [rtsp_url_2] ... [rtsp_url_N] + + ``` + * For eg: + * ```python3 test_pipeline.py "rtsp://127.x.x.1:8554/stream1" "rtsp://127.x.x.2:8554/stream2"``` + +# Explanation + +The example code ingests rtsp stream(s) and saves every frame as a jpg image. + +This functionality is achieved by splitting the pipeline into 3:- + +* Original Source Pipeline(s)- (OSP) -> This pipeline consists of the rtsp source and which is decoded to raw buffers. + +* Main Pipeline -> Ingests decoded raw buffers from various source pipelines, and saves them as a jpg image. + +* Dummy Pipeline(s) -> Acts as a substitute and sends in frames, when any of the OSP is down. + +### Initial Flow of buffers +![Initial Pipeline](/gst-interpipe/example/python_example/img_files/Source%20Switching.drawio.png) + +### Flow of buffers when source has changed +![Pipeline with Source switched](/gst-interpipe/example/python_example/img_files/Source%20Switched.drawio.png) + + +## How is this achieved? + +A buffer probe is attached to the decoding element and the triggers `source_stream_pad_buffer_probe` function. This keeps updating the most recent buffer timestamps that pass through the decoding element. + +The variable `PERMISSIBLE_TIMEOUT` holds the timelimit in seconds an OSP can be down before switching to a dummy is initiated. + +The function `handle_source_downtime` is running in a separate thread which is executed every `PERMISSIBLE_TIMEOUT/4` seconds. + +### How does it work? + +* This function checks if a source pipeline is alive by checking if the recent buffer timestamp has exceeded the `PERMISSIBLE_TIMEOUT` value. If yes, the interpipesrc _`listen-to`_ property is changed to the corresponding dummy source pipeline. + +* Since `handle_source_downtime` runs every _`PERMISSIBLE_TIMEOUT/4`_ seconds, list of interpipesrc(s) listening to a dummy source are stored in `current_dummies`. We iterate through the recent timestamp stored in the list `timers2` and check if any of the previously "dead" source pipelines, are pushing out buffers. + +* If a new buffer hasn't arrived in the OSP _(the timestamp would still exceed the _`PERMISSIBLE_TIMEOUT/4`_ threshold)_ it is restarted. + +* If a source is back online(_the difference between the current time and latest timestamp is less than `PERMISSIBLE_TIMEOUT/4` seconds_) the interpipsrc _`listen-to`_ property is changed back to the correspondingh source interpipesink. \ No newline at end of file diff --git a/example/python_example/img_files/Source Switched.drawio.png b/example/python_example/img_files/Source Switched.drawio.png new file mode 100644 index 0000000..0f1543f Binary files /dev/null and b/example/python_example/img_files/Source Switched.drawio.png differ diff --git a/example/python_example/img_files/Source Switching.drawio.png b/example/python_example/img_files/Source Switching.drawio.png new file mode 100644 index 0000000..5d00101 Binary files /dev/null and b/example/python_example/img_files/Source Switching.drawio.png differ diff --git a/example/python_example/test_pipeline.py b/example/python_example/test_pipeline.py new file mode 100644 index 0000000..3bcd0b4 --- /dev/null +++ b/example/python_example/test_pipeline.py @@ -0,0 +1,273 @@ +"""A sample gst-pipeline to help showcase switching of sources.""" +import gi + +gi.require_version("Gst", "1.0") +import sys +import time +from pathlib import Path +from threading import Timer + +import test_utils +from gi.repository import GLib, GObject, Gst +from logzero import logger +from test_utils import create_gst_ele + +PERMISSIBLE_TIMEOUT = 10 +INTERPIPESINK = [] +INTERPIPESRC = [] +DUMMYSINKS = [] +SRC_PIPELINES = [] +VIDEO_SOURCE_TIMER = [] + +class RepeatedTimer: + """Custom Timer class to execute function every x seconds.""" + + def __init__(self, interval, function, *args, **kwargs): + """ + Doesn't create multiple threads and takes into account time drifts https://stackoverflow.com/a/40965385. + + Args: + interval (int): Time in (s) for the function to be repeated + function (callback_function): The function that is to be called repeatedly + """ + self._timer = None + self.interval = interval + self.function = function + self.args = args + self.kwargs = kwargs + self.is_running = False + self.next_call = time.time() + self.start() + + def _run(self): + self.is_running = False + self.start() + self.function(*self.args, **self.kwargs) + + def start(self): + """Start the thread process for Timer.""" + if not self.is_running: + self.next_call += self.interval + self._timer = Timer(self.next_call - time.time(), self._run) + self._timer.start() + self.is_running = True + + def stop(self): + """Stop the Timer thread.""" + self._timer.cancel() + self.is_running = False + + +def create_dummy_pipeline(index, dummy_pipeline): + + vid_test_source = create_gst_ele("videotestsrc", f"dummysrc_{index}") + vid_test_source.set_property("is-live", True) + vid_test_source.set_property("pattern", "black") + + dummy_inter_sink = create_gst_ele("interpipesink") + dummy_inter_sink.set_property("name", f"dummy_source_{index}") + dummy_inter_sink.set_property("forward-eos", True) + dummy_inter_sink.set_property("sync", False) + DUMMYSINKS.append(dummy_inter_sink) + + dummy_vidconv = create_gst_ele("videoconvert",f"dummy_conv_{index}") + video_scale = create_gst_ele("videoscale", f"videoscaler{index}") + caps_vc = create_gst_ele("capsfilter", f"normal_caps{index}") + caps_vc.set_property("caps", Gst.Caps.from_string("video/x-raw,width=1080,height=720,format=RGB")) + + dummy_pipeline.add(vid_test_source) + dummy_pipeline.add(dummy_vidconv) + dummy_pipeline.add(video_scale) + dummy_pipeline.add(caps_vc) + dummy_pipeline.add(dummy_inter_sink) + + + vid_test_source.link(dummy_vidconv) + dummy_vidconv.link(video_scale) + video_scale.link(caps_vc) + caps_vc.link(dummy_inter_sink) + + + + +def create_source_pipeline(index, url): + src_pipeline = Gst.Pipeline.new(f"source-pipeline{index}") + if not src_pipeline: + logger.error("Unable to create source-pipeline") + + def on_rtspsrc_pad_added(r, pad): + r.link(queue) + + rtsp_source = create_gst_ele("rtspsrc") + rtsp_source.set_property("location", url) + rtsp_source.set_property("do-rtsp-keep-alive", 1) + rtsp_source.connect("pad-added", on_rtspsrc_pad_added) + + queue = create_gst_ele("queue", "queue") + rtsp_decoder = create_gst_ele("rtph264depay", f"decoder{index}") + h264parser = create_gst_ele("h264parse", "parser1") + avdec_decoder = create_gst_ele("avdec_h264", "av_decoder") + + vid_converter = create_gst_ele("videoconvert", f"src_convertor{index}") + video_scale = create_gst_ele("videoscale", f"src_videoscaler_{index}") + + filter1 = create_gst_ele("capsfilter", f"filter{index}") + filter1.set_property("caps", Gst.Caps.from_string("video/x-raw,width=1080,height=720,format=RGB")) + + input_interpipesink = create_gst_ele("interpipesink") + input_interpipesink.set_property("name", f"sink{index}") + input_interpipesink.set_property("forward-eos", False) + input_interpipesink.set_property("sync", False) + input_interpipesink.set_property("drop", True) + input_interpipesink.set_property("forward-events", True) + + + logger.debug("Adding elements to Source Pipeline \n") + src_pipeline.add(rtsp_source) + src_pipeline.add(queue) + src_pipeline.add(rtsp_decoder) + src_pipeline.add(h264parser) + src_pipeline.add(avdec_decoder) + + src_pipeline.add(vid_converter) + src_pipeline.add(video_scale) + src_pipeline.add(filter1) + + src_pipeline.add(input_interpipesink) + + logger.debug("Linking elements in the Source Pipeline \n") + # rtsp_source.link(queue) + queue.link(rtsp_decoder) + rtsp_decoder.link(h264parser) + h264parser.link(avdec_decoder) + + avdec_decoder.link(vid_converter) + vid_converter.link(video_scale) + video_scale.link(filter1) + filter1.link(input_interpipesink) + + INTERPIPESINK.append(input_interpipesink) + SRC_PIPELINES.append(src_pipeline) + VIDEO_SOURCE_TIMER.append(0) + + decoder_srcpad = rtsp_decoder.get_static_pad("src") + if not decoder_srcpad: + logger.error("Unable to create src pad\n") + decoder_srcpad.add_probe( + Gst.PadProbeType.BUFFER, + test_utils.source_stream_pad_buffer_probe, + VIDEO_SOURCE_TIMER, + index, + rtsp_decoder, + ) + + +def create_main_pipeline(index, main_pipeline): + output_path = f"./source_{index}/image_%05d.jpg" + interpipesrc = create_gst_ele("interpipesrc", f"interpipesrc{index}") + interpipesrc.set_property("listen-to", f"sink{index}") + interpipesrc.set_property("is-live", True) + interpipesrc.set_property("stream-sync", 1) + interpipesrc.set_property("emit-signals", True) + interpipesrc.set_property("allow-renegotiation", True) + interpipesrc.set_property("do-timestamp", True) + + vid_converter = create_gst_ele("videoconvert", f"main_convertor{index}") + + filter1 = create_gst_ele("capsfilter", f"filter{index}") + filter1.set_property("caps", Gst.Caps.from_string("video/x-raw, format=RGB")) + + img_enc = create_gst_ele("jpegenc", f"image_decoder_{index}") + + sink = create_gst_ele("multifilesink", f"image_sink_{index}") + + Path(output_path).parent.mkdir(parents=True,exist_ok=True) + sink.set_property("location", output_path) + sink.set_property("qos", True) + + main_pipeline.add(interpipesrc) + main_pipeline.add(vid_converter) + main_pipeline.add(filter1) + main_pipeline.add(img_enc) + main_pipeline.add(sink) + + interpipesrc.link(vid_converter) + + vid_converter.link(filter1) + filter1.link(img_enc) + img_enc.link(sink) + INTERPIPESRC.append(interpipesrc) + +def main(args): + if len(args) < 2: + logger.warning("usage: %s [rtsp_url_2] ... [rtsp_url_N]\n" % args[0]) + sys.exit() + + global perf_data + number_sources = len(args) - 1 + sources_urls = args[1:] + + # Standard GStreamer initialization + Gst.init(None) + logger.debug("Creating Pipeline \n ") + + global dummy_pipeline, main_pipeline + main_pipeline = Gst.Pipeline() + if not main_pipeline: + logger.error("Unable to create Pipeline") + dummy_pipeline = Gst.Pipeline.new(f"dummy-pipeline") + if not dummy_pipeline: + logger.error("Unable to create dummy-pipeline") + + + logger.debug("Creating element \n ") + for index, stream_url in enumerate(sources_urls): + create_source_pipeline(index, stream_url) + create_dummy_pipeline(index, dummy_pipeline) + create_main_pipeline(index, main_pipeline) + + # create an event loop and feed gstreamer bus mesages to it + loop = GLib.MainLoop() + for src_pipeline in SRC_PIPELINES: + src_bus = src_pipeline.get_bus() + src_bus.add_signal_watch() + src_bus.connect("message", test_utils.bus_call_src_pipeline, loop, src_pipeline) + + + bus = main_pipeline.get_bus() + bus.add_signal_watch() + bus.connect("message", test_utils.bus_call, loop, main_pipeline) + + logger.debug("Starting pipeline \n") + # start play back and listed to events + dummy_pipeline.set_state(Gst.State.PLAYING) + for src_pipeline1 in SRC_PIPELINES: + src_pipeline1.set_state(Gst.State.PLAYING) + time.sleep(number_sources) + main_pipeline.set_state(Gst.State.PLAYING) + + repeated_timer = RepeatedTimer( + PERMISSIBLE_TIMEOUT / 4, + test_utils.handle_source_downtime, + VIDEO_SOURCE_TIMER, + INTERPIPESRC, + INTERPIPESINK, + DUMMYSINKS, + SRC_PIPELINES, + PERMISSIBLE_TIMEOUT, +) + try: + loop.run() + except: + pass + + # cleanup + logger.debug("Exiting app\n") + repeated_timer.stop() + main_pipeline.set_state(Gst.State.NULL) + for src_pipeline2 in SRC_PIPELINES: + src_pipeline2.set_state(Gst.State.NULL) + dummy_pipeline.set_state(Gst.State.NULL) + +if __name__ == '__main__': + main(sys.argv) diff --git a/example/python_example/test_utils.py b/example/python_example/test_utils.py new file mode 100644 index 0000000..06403eb --- /dev/null +++ b/example/python_example/test_utils.py @@ -0,0 +1,223 @@ +import time +import gi + +gi.require_version("Gst", "1.0") +from gi.repository import Gst +from logzero import logger + + +def handle_source_downtime(timers2, interpipesrcs, interpipesinks, dummysink, src_pipelines, permissible_cam_timeout): + """ + Change to dummy source, if buffers stop flowing in original source pipeline. + + Args: + timers2 (list): Timestamps of when buffers were recently updated + interpipesrcs (list): List of interpipesources in mainpipeline + interpipesinks (list): List of interpipesinks which send buffers to mainpipeline + dummysink (list): List of dummy interpipesinks in dummypipeline + src_pipelines (list): List containing source pipelines + permissible_cam_timeout (float): Maximum time for which a video source can be without fresh data. + """ + time_1 = time.time() + current_dummies = [ + i for i, _ in enumerate(timers2) if interpipesrcs[i].get_property("listen-to").startswith("dummy_source_") + ] + # Iterate through all the sources' latest buffer timestamps + for i, time_2 in enumerate(timers2): + # If the first buffer for said video source hasn't flown yet, then the dummy source will not be + # able to negotiate a proper shape, etc to send videotestsrc buffers in + # NOTE: Lines 157-159 are commented out as caps aren't set(or required) for dummy buffers from original source. + # if data_started[i] is False: + # logger.debug(f"Missing first contact from source {i+1} to gst core pipeline") + # continue + + # If said video source is currently linked to a dummy source + if i in current_dummies: + # Restart source if actual video source hasn't sent a buffer in the recent past + if time_1 - time_2 > permissible_cam_timeout: + src_pipelines[i].set_state(Gst.State.NULL) + src_pipelines[i].set_state(Gst.State.PLAYING) + + # Else switch the dummy input back to the actual video source + else: + original_sink = interpipesinks[i].get_property("name") + interpipesrcs[i].set_property("listen-to", original_sink) + current_dummies.remove(i) + logger.debug(f"Input {i} is listening back to {original_sink}\n") + + # If said video source is linked to a actual video source albeit a zombie (no buffers for a while) + elif (i not in current_dummies) and time_1 - time_2 > permissible_cam_timeout: + logger.debug(f"No buffers are recieved from actual source: {i}\n") + dummy = Gst.Object.get_name(dummysink[i]) + interpipesrcs[i].set_property("listen-to", dummy) + current_dummies.append(i) + logger.debug(f"Input: {i} has been changed to its dummy input\n") + # If no switch is to be made for said video source, then move on to next cam + else: + continue + logger.debug(f"Current dummies: {current_dummies}\n") + + +def source_stream_pad_buffer_probe(_pad, info, vid_source_timer, index, _source): + gst_buffer = info.get_buffer() + # logger.info(f"buffers coming through for source_pipeline:{_source.name}-{index} \n") + if not gst_buffer: + logger.error("Unable to get GstBuffer ") + return Gst.PadProbeReturn.DROP + time_2 = time.time() + vid_source_timer[index] = time_2 + return Gst.PadProbeReturn.OK + + +def create_gst_ele(element, name=""): + """ + Create GStreamer Element along with catching any failures in the making. + + Args: + element (str): Type of element to be made + name (str): Name to be assigned internal to GStreamer for that element + + Returns: + Gstreamer Element: Element that was requested + """ + if name: + ele = Gst.ElementFactory.make(element, name) + else: + ele = Gst.ElementFactory.make(element) + if not ele: + logger.error(f"Unable to create Element: {element}, with name {name}") + return ele + + +def bus_call(_bus, message, loop, gst_pipeline): + """ + Handle all messages coming through to the core pipeline. + + Args: + bus (Gst.Bus): Reference to bus of pipeline + message (Gst.Message): Type of message that is to be handled + loop (GObject.MainLoop): Main event loop + gst_pipeline (Gst.Pipeline): reference to main pipeline + uris: contains list of input uris + Returns: + True, if messages handled correctly, else throws error + """ + type_ = message.type + if type_ == Gst.MessageType.EOS: + logger.debug(f"End-of-stream from: {Gst.Object.get_name(message.src)}\n") + loop.quit() + elif type_ == Gst.MessageType.WARNING: + err, debug = message.parse_warning() + if "Impossible to configure latency" not in debug: + # errors from flvmuxer in the case of rtmp output are ignored + logger.debug(f"{err}:- {debug}\n") + elif type_ == Gst.MessageType.ERROR: + err, debug = message.parse_error() + if "watchdog" in Gst.Object.get_name(message.src): + logger.debug("GStreamer Mux watchdog triggered") + gst_pipeline.pipeline.set_state(Gst.State.NULL) + gst_pipeline.pipeline.send_event(Gst.Event.new_eos()) + logger.debug("Sent EoS") + gst_pipeline.cleanup() + logger.debug("Restart completed") + elif "interpipesrc" in str(Gst.Object.get_name(message.src)).lower(): + if "Internal data stream error" in str(err): + logger.debug(f"Internal data stream error in {Gst.Object.get_name(message.src)}\n") + else: + logger.debug(f"{err}:- {debug}\n") + else: + logger.debug(f"{err}:- {debug}\n") + loop.quit() + elif type_ in ( + Gst.MessageType.STATE_CHANGED, + Gst.MessageType.STREAM_STATUS, + Gst.MessageType.ELEMENT, + Gst.MessageType.TAG, + Gst.MessageType.DURATION_CHANGED, + Gst.MessageType.ASYNC_DONE, + Gst.MessageType.NEW_CLOCK, + Gst.MessageType.PROGRESS, + Gst.MessageType.BUFFERING, + Gst.MessageType.QOS, + Gst.MessageType.LATENCY, + ): + pass + elif type_ == Gst.MessageType.STREAM_START: + logger.debug("New Data Stream started\n") + else: + logger.debug(f"Unknown message type {type_}: {message}\n") + return True + + +def bus_call_src_pipeline(_bus, message, loop, gst_pipeline): + """ + Handle all messages coming through to the source pipeline. + + Args: + bus (Gst.Bus): Reference to bus of pipeline + message (Gst.Message): Type of message that is to be handled + loop (GObject.MainLoop): Main event loop + gst_pipeline (Gst.Pipeline): reference to the source pipeline + index (int): index + uri_name (str): input uri fed to src pipeline + files_list (list): list containing the files matching wildcard pattern + input_datatype (string): input datatype of source uri + + Returns: + True, if messages handled correctly, else throws error + """ + type_ = message.type + if type_ == Gst.MessageType.EOS: + logger.debug("EoS from URI\n") + loop.quit() + elif type_ == Gst.MessageType.WARNING: + err, debug = message.parse_warning() + logger.debug(f"{err}:- {debug}\n") + elif type_ == Gst.MessageType.ERROR: + parse_error_message(message, gst_pipeline, loop) + elif type_ in ( + Gst.MessageType.STATE_CHANGED, + Gst.MessageType.STREAM_STATUS, + Gst.MessageType.ELEMENT, + Gst.MessageType.TAG, + Gst.MessageType.DURATION_CHANGED, + Gst.MessageType.ASYNC_DONE, + Gst.MessageType.NEW_CLOCK, + Gst.MessageType.PROGRESS, + Gst.MessageType.BUFFERING, + Gst.MessageType.QOS, + Gst.MessageType.LATENCY, + ): + pass + elif type_ == Gst.MessageType.ELEMENT: + logger.debug(f"{message} and {message.src}\n") + elif type_ == Gst.MessageType.STREAM_START: + logger.debug(f"New Data Stream started at source {Gst.Object.get_name(gst_pipeline)}\n") + else: + if "tsdemux" in str(Gst.Object.get_name(message.src)): + return True + logger.debug(f"Unknown message type {type_}: {message}, source: {Gst.Object.get_name(message.src)}\n") + return True + + +def parse_error_message(message, gst_pipeline, loop): + """ + Parse error messages sent from bus and processes them accordingly. + + Args: + message (Gst.MessageType.ERROR): Error message from bus + gst_pipeline (Gst.Pipeline): reference to the source pipeline + loop (GObject.MainLoop): Main event loop + """ + err, debug = message.parse_error() + if any(domain in err.domain for domain in ("stream", "resource")): + logger.debug(f"Stream/Resource Error. Pausing the pipeline {Gst.Object.get_name(gst_pipeline)}\n") + try: + gst_pipeline.set_state(Gst.State.NULL) + + # Suppose it occurs on first run, the gst_pipeline.pipeline object itself hasn't been created yet + except AttributeError: + pass + else: + logger.error(f"{err}:- {debug}\n") + loop.quit()