diff --git a/docs/source/developer_guide/guides/2_real_world_phishing.md b/docs/source/developer_guide/guides/2_real_world_phishing.md index e104288a35..ccdccab74a 100644 --- a/docs/source/developer_guide/guides/2_real_world_phishing.md +++ b/docs/source/developer_guide/guides/2_real_world_phishing.md @@ -761,20 +761,20 @@ def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: return builder.make_source(self.unique_name, self.source_generator) ``` -The `source_generator` method is where most of the RabbitMQ-specific code exists. When we have a message that we wish to emit into the pipeline, we simply `yield` it. +The `source_generator` method is where most of the RabbitMQ-specific code exists. When we have a message that we wish to emit into the pipeline, we simply `yield` it. We continue this process until the `is_stop_requested()` method returns `True`. ```python def source_generator(self) -> collections.abc.Iterator[MessageMeta]: try: - while not self._stop_requested: - (method_frame, header_frame, body) = self._channel.basic_get(self._queue_name) + while not self.is_stop_requested(): + (method_frame, _, body) = self._channel.basic_get(self._queue_name) if method_frame is not None: try: buffer = StringIO(body.decode("utf-8")) df = cudf.io.read_json(buffer, orient='records', lines=True) yield MessageMeta(df=df) except Exception as ex: - logger.exception("Error occurred converting RabbitMQ message to Dataframe: {}".format(ex)) + logger.exception("Error occurred converting RabbitMQ message to Dataframe: %s", ex) finally: self._channel.basic_ack(method_frame.delivery_tag) else: @@ -824,11 +824,11 @@ class RabbitMQSourceStage(PreallocatorMixin, SingleOutputSource): Hostname or IP of the RabbitMQ server. exchange : str Name of the RabbitMQ exchange to connect to. - exchange_type : str + exchange_type : str, optional RabbitMQ exchange type; defaults to `fanout`. - queue_name : str + queue_name : str, optional Name of the queue to listen to. If left blank, RabbitMQ will generate a random queue name bound to the exchange. - poll_interval : str + poll_interval : str, optional Amount of time between polling RabbitMQ for new messages """ @@ -854,9 +854,6 @@ class RabbitMQSourceStage(PreallocatorMixin, SingleOutputSource): self._poll_interval = pd.Timedelta(poll_interval) - # Flag to indicate whether or not we should stop - self._stop_requested = False - @property def name(self) -> str: return "from-rabbitmq" @@ -867,18 +864,12 @@ class RabbitMQSourceStage(PreallocatorMixin, SingleOutputSource): def compute_schema(self, schema: StageSchema): schema.output_schema.set_type(MessageMeta) - def stop(self): - # Indicate we need to stop - self._stop_requested = True - - return super().stop() - def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: return builder.make_source(self.unique_name, self.source_generator) def source_generator(self) -> collections.abc.Iterator[MessageMeta]: try: - while not self._stop_requested: + while not self.is_stop_requested(): (method_frame, _, body) = self._channel.basic_get(self._queue_name) if method_frame is not None: try: diff --git a/docs/source/developer_guide/guides/4_source_cpp_stage.md b/docs/source/developer_guide/guides/4_source_cpp_stage.md index 4b8f9eb601..692ba30d5b 100644 --- a/docs/source/developer_guide/guides/4_source_cpp_stage.md +++ b/docs/source/developer_guide/guides/4_source_cpp_stage.md @@ -493,13 +493,10 @@ def __init__(self, self._exchange_type = exchange_type self._queue_name = queue_name - self._connection = None + self._connection: pika.BlockingConnection = None self._channel = None self._poll_interval = pd.Timedelta(poll_interval) - - # Flag to indicate whether or not we should stop - self._stop_requested = False ``` ```python def connect(self): diff --git a/examples/developer_guide/2_2_rabbitmq/rabbitmq_source_stage.py b/examples/developer_guide/2_2_rabbitmq/rabbitmq_source_stage.py index cc33bbe90c..3ac84b0db9 100644 --- a/examples/developer_guide/2_2_rabbitmq/rabbitmq_source_stage.py +++ b/examples/developer_guide/2_2_rabbitmq/rabbitmq_source_stage.py @@ -77,9 +77,6 @@ def __init__(self, self._poll_interval = pd.Timedelta(poll_interval) - # Flag to indicate whether or not we should stop - self._stop_requested = False - @property def name(self) -> str: return "from-rabbitmq" @@ -90,18 +87,12 @@ def supports_cpp_node(self) -> bool: def compute_schema(self, schema: StageSchema): schema.output_schema.set_type(MessageMeta) - def stop(self): - # Indicate we need to stop - self._stop_requested = True - - return super().stop() - def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: return builder.make_source(self.unique_name, self.source_generator) def source_generator(self) -> collections.abc.Iterator[MessageMeta]: try: - while not self._stop_requested: + while not self.is_stop_requested(): (method_frame, _, body) = self._channel.basic_get(self._queue_name) if method_frame is not None: try: diff --git a/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/rabbitmq_source_stage.py b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/rabbitmq_source_stage.py index 4516f7d87b..e02b3dc6ab 100755 --- a/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/rabbitmq_source_stage.py +++ b/examples/developer_guide/4_rabbitmq_cpp_stage/src/rabbitmq_cpp_stage/rabbitmq_source_stage.py @@ -67,14 +67,11 @@ def __init__(self, self._exchange_type = exchange_type self._queue_name = queue_name - self._connection = None + self._connection: pika.BlockingConnection = None self._channel = None self._poll_interval = pd.Timedelta(poll_interval) - # Flag to indicate whether or not we should stop - self._stop_requested = False - @property def name(self) -> str: return "from-rabbitmq" @@ -117,7 +114,7 @@ def connect(self): def source_generator(self): try: - while not self._stop_requested: + while not self.is_stop_requested(): (method_frame, _, body) = self._channel.basic_get(self._queue_name) if method_frame is not None: try: diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/stages/multi_file_source.py b/examples/digital_fingerprinting/production/morpheus/dfp/stages/multi_file_source.py index 16b70a8d3c..effe04ad47 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/stages/multi_file_source.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/stages/multi_file_source.py @@ -128,7 +128,7 @@ def _polling_generate_frames_fsspec(self) -> typing.Iterable[fsspec.core.OpenFil curr_time = time.monotonic() next_update_epoch = curr_time - while (True): + while (not self.is_stop_requested()): # Before doing any work, find the next update epoch after the current time while (next_update_epoch <= curr_time): # Only ever add `self._watch_interval` to next_update_epoch so all updates are at repeating intervals diff --git a/python/morpheus/morpheus/controllers/rss_controller.py b/python/morpheus/morpheus/controllers/rss_controller.py index a1972c406f..13ed6b438c 100644 --- a/python/morpheus/morpheus/controllers/rss_controller.py +++ b/python/morpheus/morpheus/controllers/rss_controller.py @@ -15,8 +15,12 @@ import logging import os import time +from collections.abc import Callable +from collections.abc import Iterable from dataclasses import asdict from dataclasses import dataclass +from datetime import datetime +from datetime import timedelta from urllib.parse import urlparse import requests @@ -24,6 +28,8 @@ import cudf +from morpheus.messages import MessageMeta + logger = logging.getLogger(__name__) IMPORT_EXCEPTION = None @@ -72,6 +78,12 @@ class RSSController: Request timeout in secs to fetch the feed. strip_markup : bool, optional, default = False When true, strip HTML & XML markup from the from the content, summary and title fields. + stop_after: int, default = 0 + Stops ingesting after emitting `stop_after` records (rows in the dataframe). Useful for testing. Disabled if `0` + interval_secs : float, optional, default = 600 + Interval in seconds between fetching new feed items. + should_stop_fn: Callable[[], bool] + Function that returns a boolean indicating if the watcher should stop processing files. """ # Fields which may contain HTML or XML content @@ -89,7 +101,10 @@ def __init__(self, cache_dir: str = "./.cache/http", cooldown_interval: int = 600, request_timeout: float = 2.0, - strip_markup: bool = False): + strip_markup: bool = False, + stop_after: int = 0, + interval_secs: float = 600, + should_stop_fn: Callable[[], bool] = None): if IMPORT_EXCEPTION is not None: raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION @@ -104,6 +119,11 @@ def __init__(self, self._request_timeout = request_timeout self._strip_markup = strip_markup + if should_stop_fn is None: + self._should_stop_fn = lambda: False + else: + self._should_stop_fn = should_stop_fn + # Validate feed_input for f in self._feed_input: if not RSSController.is_url(f) and not os.path.exists(f): @@ -113,7 +133,14 @@ def __init__(self, # If feed_input is URL. Runs indefinitely run_indefinitely = any(RSSController.is_url(f) for f in self._feed_input) + if (stop_after > 0 and run_indefinitely): + raise ValueError("Cannot set both `stop_after` and `run_indefinitely` to True.") + + self._stop_after = stop_after self._run_indefinitely = run_indefinitely + self._interval_secs = interval_secs + self._interval_td = timedelta(seconds=self._interval_secs) + self._enable_cache = enable_cache if enable_cache: @@ -381,3 +408,45 @@ def is_url(cls, feed_input: str) -> bool: return parsed_url.scheme != '' and parsed_url.netloc != '' except Exception: return False + + def feed_generator(self) -> Iterable[MessageMeta]: + """ + Fetch RSS feed entries and yield as MessageMeta object. + """ + stop_requested = False + records_emitted = 0 + + while (not stop_requested and not self._should_stop_fn()): + try: + for df in self.fetch_dataframes(): + df_size = len(df) + + if logger.isEnabledFor(logging.DEBUG): + logger.info("Received %d new entries...", df_size) + logger.info("Emitted %d records so far.", records_emitted) + + yield MessageMeta(df=df) + + records_emitted += df_size + + if (0 < self._stop_after <= records_emitted): + stop_requested = True + logger.info("Stop limit reached... preparing to halt the source.") + break + + except Exception as exc: + if not self.run_indefinitely: + logger.error("Failed either in the process of fetching or processing entries: %s.", exc) + raise + logger.error("Failed either in the process of fetching or processing entries: %s.", exc) + + if not self.run_indefinitely: + stop_requested = True + continue + + logger.info("Waiting for %d seconds before fetching again...", self._interval_secs) + sleep_until = datetime.now() + self._interval_td + while (datetime.now() < sleep_until and not self._should_stop_fn()): + time.sleep(1) + + logger.info("RSS source exhausted, stopping.") diff --git a/python/morpheus/morpheus/modules/input/rss_source.py b/python/morpheus/morpheus/modules/input/rss_source.py index 1454a67b05..719ecec3f2 100644 --- a/python/morpheus/morpheus/modules/input/rss_source.py +++ b/python/morpheus/morpheus/modules/input/rss_source.py @@ -13,13 +13,11 @@ # limitations under the License. import logging -import time import mrc from pydantic import ValidationError from morpheus.controllers.rss_controller import RSSController -from morpheus.messages import MessageMeta from morpheus.modules.schemas.rss_source_schema import RSSSourceSchema from morpheus.utils.module_utils import ModuleLoaderFactory from morpheus.utils.module_utils import register_module @@ -57,6 +55,7 @@ def _rss_source(builder: mrc.Builder): module_config = builder.get_current_module_config() rss_config = module_config.get("rss_source", {}) + try: validated_config = RSSSourceSchema(**rss_config) except ValidationError as e: @@ -74,50 +73,10 @@ def _rss_source(builder: mrc.Builder): cache_dir=validated_config.cache_dir, cooldown_interval=validated_config.cooldown_interval_sec, request_timeout=validated_config.request_timeout_sec, - strip_markup=validated_config.strip_markup) - - stop_requested = False - - def fetch_feeds() -> MessageMeta: - """ - Fetch RSS feed entries and yield as MessageMeta object. - """ - nonlocal stop_requested - records_emitted = 0 - - while (not stop_requested): - try: - for df in controller.fetch_dataframes(): - df_size = len(df) - - if logger.isEnabledFor(logging.DEBUG): - logger.info("Received %d new entries...", df_size) - logger.info("Emitted %d records so far.", records_emitted) - - yield MessageMeta(df=df) - - records_emitted += df_size - - if (0 < validated_config.stop_after_rec <= records_emitted): - stop_requested = True - logger.info("Stop limit reached... preparing to halt the source.") - break - - except Exception as exc: - if not controller.run_indefinitely: - logger.error("Failed either in the process of fetching or processing entries: %s.", exc) - raise - logger.error("Failed either in the process of fetching or processing entries: %s.", exc) - - if not controller.run_indefinitely: - stop_requested = True - continue - - logger.info("Waiting for %d seconds before fetching again...", validated_config.interval_sec) - time.sleep(validated_config.interval_sec) - - logger.info("RSS source exhausted, stopping.") + strip_markup=validated_config.strip_markup, + stop_after=validated_config.stop_after_rec, + interval_secs=validated_config.interval_sec) - node = builder.make_source("fetch_feeds", fetch_feeds) + node = builder.make_source("fetch_feeds", controller.feed_generator) builder.register_module_output("output", node) diff --git a/python/morpheus/morpheus/pipeline/single_output_source.py b/python/morpheus/morpheus/pipeline/single_output_source.py index c9bd1fd826..c0b3480f24 100644 --- a/python/morpheus/morpheus/pipeline/single_output_source.py +++ b/python/morpheus/morpheus/pipeline/single_output_source.py @@ -40,6 +40,10 @@ def __init__(self, c: Config): self._create_ports(0, 1) + # Flag to indicate if we need to stop, subclasses should check this value periodically, typically at the start + # of a polling loop + self._stop_requested = False + # pylint: disable=unused-argument def _post_build_single(self, builder: mrc.Builder, out_node: mrc.SegmentObject) -> mrc.SegmentObject: return out_node @@ -74,3 +78,31 @@ def _post_build(self, builder: mrc.Builder, out_ports_nodes: list[mrc.SegmentObj logger.info("Added source: %s\n └─> %s", self, pretty_print_type_name(self.output_ports[0].output_type)) return [ret_val] + + def stop(self): + """ + This method is invoked by the pipeline whenever there is an unexpected shutdown. + Subclasses should override this method to perform any necessary cleanup operations. + """ + + # Indicate we need to stop + self.request_stop() + + return super().stop() + + def request_stop(self): + """ + Request the source to stop processing data. + """ + self._stop_requested = True + + def is_stop_requested(self) -> bool: + """ + Returns `True` if a stop has been requested. + + Returns + ------- + bool: + True if a stop has been requested, False otherwise. + """ + return self._stop_requested diff --git a/python/morpheus/morpheus/stages/input/appshield_source_stage.py b/python/morpheus/morpheus/stages/input/appshield_source_stage.py index 3343bd30f9..acd22a54fa 100644 --- a/python/morpheus/morpheus/stages/input/appshield_source_stage.py +++ b/python/morpheus/morpheus/stages/input/appshield_source_stage.py @@ -108,7 +108,8 @@ def __init__(self, sort_glob=sort_glob, recursive=recursive, queue_max_size=queue_max_size, - batch_timeout=batch_timeout) + batch_timeout=batch_timeout, + should_stop_fn=self.is_stop_requested) @property def name(self) -> str: diff --git a/python/morpheus/morpheus/stages/input/arxiv_source.py b/python/morpheus/morpheus/stages/input/arxiv_source.py index c1ed77c0cb..180a74dad2 100644 --- a/python/morpheus/morpheus/stages/input/arxiv_source.py +++ b/python/morpheus/morpheus/stages/input/arxiv_source.py @@ -96,7 +96,6 @@ def __init__(self, self._total_pdfs = 0 self._total_pages = 0 self._total_chunks = 0 - self._stop_requested = False self._cache_dir = cache_dir @property @@ -142,7 +141,7 @@ def _generate_frames(self): ) for x in search_results.results(): - if self._stop_requested: + if self.is_stop_requested(): break full_path = os.path.join(self._cache_dir, x._get_default_filename()) @@ -175,7 +174,7 @@ def _process_pages(self, pdf_path: str): logger.debug("Processing %s/%s: %s", len(documents), self._total_pages, pdf_path) if self._total_pages > self._max_pages: - self._stop_requested = True + self.request_stop() return documents except PdfStreamError: diff --git a/python/morpheus/morpheus/stages/input/autoencoder_source_stage.py b/python/morpheus/morpheus/stages/input/autoencoder_source_stage.py index ba9cbcc1d3..b0529c7164 100644 --- a/python/morpheus/morpheus/stages/input/autoencoder_source_stage.py +++ b/python/morpheus/morpheus/stages/input/autoencoder_source_stage.py @@ -106,7 +106,8 @@ def __init__(self, sort_glob=sort_glob, recursive=recursive, queue_max_size=queue_max_size, - batch_timeout=batch_timeout) + batch_timeout=batch_timeout, + should_stop_fn=self.is_stop_requested) @property def input_count(self) -> int: diff --git a/python/morpheus/morpheus/stages/input/control_message_kafka_source_stage.py b/python/morpheus/morpheus/stages/input/control_message_kafka_source_stage.py index 602b360c8d..a4ed7d501d 100644 --- a/python/morpheus/morpheus/stages/input/control_message_kafka_source_stage.py +++ b/python/morpheus/morpheus/stages/input/control_message_kafka_source_stage.py @@ -110,9 +110,6 @@ def __init__(self, self._async_commits = async_commits self._client = None - # Flag to indicate whether we should stop - self._stop_requested = False - self._poll_interval = pd.Timedelta(poll_interval).total_seconds() self._started = False @@ -150,7 +147,7 @@ def _process_msg(self, consumer, msg): consumer.commit(message=msg, asynchronous=self._async_commits) if self._stop_after > 0 and self._records_emitted >= self._stop_after: - self._stop_requested = True + self.request_stop() return control_messages @@ -162,7 +159,7 @@ def _source_generator(self): do_sleep = False - while not self._stop_requested: + while not self.is_stop_requested(): msg = consumer.poll(timeout=1.0) if msg is None: @@ -180,7 +177,7 @@ def _source_generator(self): else: raise ck.KafkaException(msg_error) - if do_sleep and not self._stop_requested: + if do_sleep and not self.is_stop_requested(): time.sleep(self._poll_interval) finally: diff --git a/python/morpheus/morpheus/stages/input/http_client_source_stage.py b/python/morpheus/morpheus/stages/input/http_client_source_stage.py index 4a101e0992..8924f9093a 100644 --- a/python/morpheus/morpheus/stages/input/http_client_source_stage.py +++ b/python/morpheus/morpheus/stages/input/http_client_source_stage.py @@ -185,7 +185,7 @@ def _generate_frames(self) -> typing.Iterator[MessageMeta]: request_args.update(self._requst_kwargs) - while (self._stop_after == 0 or num_records_emitted < self._stop_after): + while (not self.is_stop_requested() and (self._stop_after == 0 or num_records_emitted < self._stop_after)): if self._query_params_fn is not None: request_args['params'] = self._query_params_fn() diff --git a/python/morpheus/morpheus/stages/input/http_server_source_stage.py b/python/morpheus/morpheus/stages/input/http_server_source_stage.py index ed8d99612f..0c1619c905 100644 --- a/python/morpheus/morpheus/stages/input/http_server_source_stage.py +++ b/python/morpheus/morpheus/stages/input/http_server_source_stage.py @@ -149,6 +149,17 @@ def supports_cpp_node(self) -> bool: def compute_schema(self, schema: StageSchema): schema.output_schema.set_type(MessageMeta) + def stop(self): + """ + Performs cleanup steps when pipeline is stopped. + """ + logger.debug("Stopping HttpServerSourceStage") + # Indicate we need to stop + if self._http_server is not None: + self._http_server.stop() + + return super().stop() + def _parse_payload(self, payload: str) -> HttpParseResponse: try: if self._payload_to_df_fn is not None: @@ -238,10 +249,10 @@ def _generate_frames(self) -> typing.Iterator[MessageMeta]: # shutdown since we already returned an OK response to the client. df = None try: - df = self._queue.get() + df = self._queue.get(block=False) self._queue_size -= 1 except queue.Empty: - if (not self._http_server.is_running()): + if (not self._http_server.is_running() or self.is_stop_requested()): self._processing = False else: logger.debug("Queue empty, sleeping ...") diff --git a/python/morpheus/morpheus/stages/input/kafka_source_stage.py b/python/morpheus/morpheus/stages/input/kafka_source_stage.py index 8893110cf8..af5ac7be3e 100644 --- a/python/morpheus/morpheus/stages/input/kafka_source_stage.py +++ b/python/morpheus/morpheus/stages/input/kafka_source_stage.py @@ -125,9 +125,6 @@ def __init__(self, self._async_commits = async_commits self._client = None - # Flag to indicate whether or not we should stop - self._stop_requested = False - self._poll_interval = pd.Timedelta(poll_interval).total_seconds() self._started = False @@ -144,16 +141,6 @@ def supports_cpp_node(self): def compute_schema(self, schema: StageSchema): schema.output_schema.set_type(MessageMeta) - def stop(self): - """ - Performs cleanup steps when pipeline is stopped. - """ - - # Indicate we need to stop - self._stop_requested = True - - return super().stop() - def _process_batch(self, consumer, batch): message_meta = None if len(batch): @@ -183,7 +170,7 @@ def _process_batch(self, consumer, batch): self._num_messages += 1 if self._stop_after > 0 and self._records_emitted >= self._stop_after: - self._stop_requested = True + self.request_stop() batch.clear() @@ -197,7 +184,7 @@ def _source_generator(self): batch = [] - while not self._stop_requested: + while not self.is_stop_requested(): do_process_batch = False do_sleep = False @@ -224,7 +211,7 @@ def _source_generator(self): if message_meta is not None: yield message_meta - if do_sleep and not self._stop_requested: + if do_sleep and not self.is_stop_requested(): time.sleep(self._poll_interval) message_meta = self._process_batch(consumer, batch) diff --git a/python/morpheus/morpheus/stages/input/rss_source_stage.py b/python/morpheus/morpheus/stages/input/rss_source_stage.py index a67d7997cb..c9d9d01ac3 100644 --- a/python/morpheus/morpheus/stages/input/rss_source_stage.py +++ b/python/morpheus/morpheus/stages/input/rss_source_stage.py @@ -18,8 +18,8 @@ from morpheus.cli import register_stage from morpheus.config import Config +from morpheus.controllers.rss_controller import RSSController from morpheus.messages import MessageMeta -from morpheus.modules.input.rss_source import RSSSourceLoaderFactory from morpheus.pipeline.preallocator_mixin import PreallocatorMixin from morpheus.pipeline.single_output_source import SingleOutputSource from morpheus.pipeline.stage_schema import StageSchema @@ -69,45 +69,25 @@ def __init__(self, request_timeout: float = 2.0, strip_markup: bool = False): super().__init__(c) - self._stop_requested = False - if (batch_size is None): batch_size = c.pipeline_batch_size - if (stop_after > 0): - if (run_indefinitely): - raise ValueError("Cannot set both `stop_after` and `run_indefinitely` to True.") - - run_indefinitely = False - - self._module_config = { - "rss_source": { - "feed_input": feed_input, - "interval_sec": interval_secs, - "stop_after_rec": stop_after, - "run_indefinitely": run_indefinitely, - "batch_size": batch_size, - "enable_cache": enable_cache, - "cache_dir": cache_dir, - "cooldown_interval_sec": cooldown_interval, - "request_timeout_sec": request_timeout, - "strip_markup": strip_markup - } - } - - self._module_loader = RSSSourceLoaderFactory.get_instance("rss_source_stage", self._module_config) + self._controller = RSSController(feed_input=feed_input, + batch_size=batch_size, + run_indefinitely=run_indefinitely, + enable_cache=enable_cache, + cache_dir=cache_dir, + cooldown_interval=cooldown_interval, + request_timeout=request_timeout, + strip_markup=strip_markup, + stop_after=stop_after, + interval_secs=interval_secs, + should_stop_fn=self.is_stop_requested) @property def name(self) -> str: return "from-rss" - def stop(self): - """ - Stop the RSS source stage. - """ - self._stop_requested = True - return super().stop() - def supports_cpp_node(self): return False @@ -115,8 +95,5 @@ def compute_schema(self, schema: StageSchema): schema.output_schema.set_type(MessageMeta) def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject: - module = self._module_loader.load(builder=builder) - - mod_out_node = module.output_port("output") - - return mod_out_node + source = builder.make_source(self.unique_name, self._controller.feed_generator) + return source diff --git a/python/morpheus/morpheus/utils/directory_watcher.py b/python/morpheus/morpheus/utils/directory_watcher.py index 3fe6274b44..b0c79fe97d 100644 --- a/python/morpheus/morpheus/utils/directory_watcher.py +++ b/python/morpheus/morpheus/utils/directory_watcher.py @@ -16,6 +16,7 @@ import logging import os import queue +from collections.abc import Callable import mrc from watchdog.events import FileSystemEvent @@ -58,6 +59,8 @@ class DirectoryWatcher(): Maximum queue size to hold the file paths to be processed that match `input_glob`. batch_timeout: float Timeout to retrieve batch messages from the queue. + should_stop_fn: Callable[[], bool] + Function that returns a boolean indicating if the watcher should stop processing files. """ def __init__(self, @@ -67,7 +70,8 @@ def __init__(self, sort_glob: bool, recursive: bool, queue_max_size: int, - batch_timeout: float): + batch_timeout: float, + should_stop_fn: Callable[[], bool] = None): self._input_glob = input_glob self._watch_directory = watch_directory @@ -76,6 +80,10 @@ def __init__(self, self._recursive = recursive self._queue_max_size = queue_max_size self._batch_timeout = batch_timeout + if should_stop_fn is None: + self._should_stop_fn = lambda: False + else: + self._should_stop_fn = should_stop_fn # Determine the directory to watch and the match pattern from the glob glob_split = self._input_glob.split("*", 1) @@ -152,7 +160,7 @@ def _generate_via_polling(self): snapshot = EmptyDirectorySnapshot() - while (True): + while (not self._should_stop_fn()): # Get a new snapshot new_snapshot = DirectorySnapshot(self._dir_to_watch, recursive=self._recursive)