diff --git a/examples/llm/common/web_scraper_module.py b/examples/llm/common/web_scraper_module.py index ad4d7509ce..6b665b4aff 100644 --- a/examples/llm/common/web_scraper_module.py +++ b/examples/llm/common/web_scraper_module.py @@ -71,7 +71,6 @@ def download_and_split(msg: MessageMeta, text_splitter, link_column, session) -> try: # Try to get the page content response = session.get(url) - logger.info(f"RESPONSE TEXT: {response.text}") if (not response.ok): logger.warning( @@ -93,8 +92,6 @@ def download_and_split(msg: MessageMeta, text_splitter, link_column, session) -> row_cp.update({"page_content": text}) final_rows.append(row_cp) - logger.info(final_rows) - if isinstance(response, requests_cache.models.response.CachedResponse): logger.debug("Processed cached page: '%s'", url) else: diff --git a/examples/llm/vdb_upload/README.md b/examples/llm/vdb_upload/README.md index 5b73616aa5..a3c81960a7 100644 --- a/examples/llm/vdb_upload/README.md +++ b/examples/llm/vdb_upload/README.md @@ -155,12 +155,31 @@ options and a pipeline to run. For the purposes of this document, we'll focus on incorporates various functionalities like handling RSS and filesystem sources, embedding configurations, and vector database (VDB) settings. +#### Configuration Balance Considerations + +When configuring the Morpheus Pipeline, especially for stages like the RSS source and the Vector Database Upload, it's +important to balance responsiveness and performance. + +- **RSS Source Stage**: The RSS source stage is responsible for yielding webpage links for processing. A larger batch size + at this stage can lead to decreased responsiveness, as the subsequent web scraper stage may take a considerable amount of + time to retrieve and process all the items in each batch. To ensure a responsive experience for users, it's recommended + to configure the RSS source stage with a relatively smaller batch size. This adjustment tends to have minimal impact on + overall performance while significantly improving the time to process each batch of links. + +- **Vector Database Upload Stage**: At the other end of the pipeline, the Vector Database Upload stage has its own + considerations. This stage experiences a significant transaction overhead. To mitigate this, it is advisable to configure + this stage with the largest batch size possible. This approach helps in efficiently managing transaction overheads and + improves the throughput of the pipeline, especially when dealing with large volumes of data. + +Balancing these configurations ensures that the pipeline runs efficiently, with optimized responsiveness at the RSS +source stage and improved throughput at the Vector Database Upload stage. + ### Run example: Default example usage, with pre-defined RSS source ```bash -python examples/llm/main.py vdb_upload \ +python examples/llm/main.py vdb_upload pipeline \ --enable_cache \ --enable_monitors \ --embedding_model_name all-MiniLM-L6-v2 @@ -171,7 +190,7 @@ Usage with CLI-Defined Sources: *Example: Defining an RSS Source via CLI* ```bash -python examples/llm/main.py vdb_upload \ +python examples/llm/main.py vdb_upload pipeline \ --source_type rss \ --interval_secs 300 \ --rss_request_timeout_sec 5.0 \ @@ -183,9 +202,9 @@ python examples/llm/main.py vdb_upload \ *Example: Defining a Filesystem Source via CLI* ```bash -python examples/llm/main.py vdb_upload \ +python examples/llm/main.py vdb_upload pipeline \ --source_type filesystem \ - --file_source "/path/to/files1" "/path/to/files2" \ + --file_source "./morpheus/data/scratch/*" \ --enable_monitors \ --embedding_model_name all-MiniLM-L6-v2 ``` @@ -193,9 +212,9 @@ python examples/llm/main.py vdb_upload \ *Example: Combining RSS and Filesystem Sources via CLI* ```bash -python examples/llm/main.py vdb_upload \ - --source_type rss filesystem \ - --file_source "/path/to/files" \ +python examples/llm/main.py vdb_upload pipeline \ + --source_type rss --source_type filesystem \ + --file_source "./morpheus/data/scratch/*" \ --interval_secs 600 \ --enable_cache \ --enable_monitors \ @@ -224,7 +243,7 @@ vdb_pipeline: ``` ```bash -python examples/llm/main.py vdb_upload \ +python examples/llm/main.py vdb_upload pipeline \ --vdb_config_path "./vdb_config.yaml" ``` diff --git a/examples/llm/vdb_upload/common.py b/examples/llm/vdb_upload/common.py index 330bb3cc70..43a692d5fd 100644 --- a/examples/llm/vdb_upload/common.py +++ b/examples/llm/vdb_upload/common.py @@ -185,7 +185,7 @@ def process_vdb_sources(pipe: Pipeline, config: Config, vdb_source_config: typin """ vdb_sources = [] for source_info in vdb_source_config: - validate_source_config(source_info) # Assuming this function exists + validate_source_config(source_info) source_type = source_info['type'] source_name = source_info['name'] source_config = source_info['config'] diff --git a/examples/llm/vdb_upload/module/rss_source_pipe.py b/examples/llm/vdb_upload/module/rss_source_pipe.py index 45a7d646f5..f8c9b6bf05 100644 --- a/examples/llm/vdb_upload/module/rss_source_pipe.py +++ b/examples/llm/vdb_upload/module/rss_source_pipe.py @@ -36,13 +36,14 @@ class RSSSourceParamContract(BaseModel): - batch_size: int = 128 + batch_size: int = 32 cache_dir: str = "./.cache/http" cooldown_interval_sec: int = 600 enable_cache: bool = False enable_monitor: bool = True feed_input: List[str] = Field(default_factory=list) interval_sec: int = 600 + output_batch_size: int = 2048 request_timeout_sec: float = 2.0 run_indefinitely: bool = True stop_after: int = 0 @@ -125,7 +126,7 @@ def _rss_source_pipe(builder: mrc.Builder): schema_transform_definition = SchemaTransformInterface.get_definition("schema_transform", transform_config) deserialize_definition = DeserializeInterface.get_definition("deserialize", - {"batch_size": validated_config.batch_size}) + {"batch_size": validated_config.output_batch_size}) monitor_m1 = Monitor.get_definition("monitor_m1", {"description": "RSSSourcePipe RSS Source", "silence_monitors": not enable_monitor}) diff --git a/examples/llm/vdb_upload/run.py b/examples/llm/vdb_upload/run.py index c6b881d018..78395ec718 100644 --- a/examples/llm/vdb_upload/run.py +++ b/examples/llm/vdb_upload/run.py @@ -149,7 +149,9 @@ def build_cli_configs(source_type, enable_cache, embedding_size, isolate_embeddi 'type': 'rss', 'name': 'rss-cli', 'config': { - "batch_size": pipeline_batch_size, + # RSS feeds can take a while to pull, smaller batch sizes allows the pipeline to feel more responsive + "batch_size": 32, + "output_batch_size": 2048, "cache_dir": "./.cache/http", "cooldown_interval_sec": interval_secs, "enable_cache": enable_cache, @@ -196,8 +198,11 @@ def build_cli_configs(source_type, enable_cache, embedding_size, isolate_embeddi # Pipeline Configuration cli_pipeline_conf = { + "edge_buffer_size": 128, + "embedding_size": embedding_size, "feature_length": model_fea_length, "isolate_embeddings": isolate_embeddings, + "max_batch_size": 256, "num_threads": num_threads, "pipeline_batch_size": pipeline_batch_size, } @@ -311,7 +316,7 @@ def build_final_config(vdb_conf_path, cli_source_conf, cli_embeddings_conf, cli_ vdb_conf = merge_configs(vdb_pipeline_config.get('vdb', {}), cli_vdb_conf) # TODO: class labels depends on this, so it should be a pipeline level parameter, not a vdb level parameter - pipeline_conf['embedding_size'] = vdb_conf.get('embedding_size') + pipeline_conf['embedding_size'] = vdb_conf.get('embedding_size', 384) final_config.update({ 'embeddings_config': embeddings_conf, diff --git a/examples/llm/vdb_upload/vdb_config.yaml b/examples/llm/vdb_upload/vdb_config.yaml index 5fca0894a7..5a5397771d 100644 --- a/examples/llm/vdb_upload/vdb_config.yaml +++ b/examples/llm/vdb_upload/vdb_config.yaml @@ -27,13 +27,13 @@ vdb_pipeline: feature_length: 512 max_batch_size: 256 num_threads: 10 - pipeline_batch_size: 1024 + pipeline_batch_size: 128 sources: - type: rss name: "rss_cve" config: - batch_size: 128 # Number of rss feeds to process at a time + batch_size: 32 # Number of rss feeds per batch cache_dir: "./.cache/http" cooldown_interval_sec: 600 enable_cache: False @@ -72,6 +72,7 @@ vdb_pipeline: - "https://blog.google/threat-analysis-group/rss/" - "https://intezer.com/feed/" interval_sec: 600 + output_batch_size: 2048 # Number of chunked documents per output batch request_timeout_sec: 2.0 run_indefinitely: true stop_after: 0 diff --git a/morpheus/stages/general/linear_modules_stage.py b/morpheus/stages/general/linear_modules_stage.py index a0a67c081a..c4489f0ae2 100644 --- a/morpheus/stages/general/linear_modules_stage.py +++ b/morpheus/stages/general/linear_modules_stage.py @@ -103,7 +103,7 @@ def _get_cpp_module_node(self, builder: mrc.Builder) -> mrc.SegmentObject: def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject: if (isinstance(self._module_config, dict) and "module_id" in self._module_config): - module = load_module(self._module_config["module_id"], builder=builder) + module = load_module(self._module_config, builder=builder) else: module = self._module_config.load(builder) diff --git a/tests/conftest.py b/tests/conftest.py index d1726eebca..76752f2feb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,4 @@ -# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/tests/examples/llm/common/test_content_extractor_module.py b/tests/examples/llm/common/test_content_extractor_module.py new file mode 100644 index 0000000000..1548b31970 --- /dev/null +++ b/tests/examples/llm/common/test_content_extractor_module.py @@ -0,0 +1,123 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import types +from unittest.mock import patch, MagicMock + +import cudf +import fsspec +import pytest +from _utils import TEST_DIRS +from _utils import assert_results + +from morpheus.config import Config +from morpheus.messages import MessageMeta +from morpheus.pipeline import LinearPipeline +from morpheus.stages.general.linear_modules_stage import LinearModulesStage +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage + +# Mock dependencies +file_meta_mock = MagicMock() +text_converter_mock = MagicMock() + +# TODO + +@pytest.mark.use_python +@pytest.mark.use_cudf +@pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'llm/common/content_extractor_module.py')) +def test_http_client_source_stage_pipe(config: Config, mock_rest_server: str, import_mod: types.ModuleType): + url = f"{mock_rest_server}/www/index" + + df = cudf.DataFrame({"link": [url]}) + df_expected = cudf.DataFrame({"link": [url], "page_content": "website title some paragraph"}) + + web_scraper_definition = import_mod.WebScraperInterface.get_definition("web_scraper", + module_config={"web_scraper_config": { + "link_column": "link", "chunk_size": 100, + "enable_cache": False, + "cache_path": "./.cache/http/RSSDownloadStage.sqlite", + "cache_dir": "./.cache/llm/rss"}}) + + pipe = LinearPipeline(config) + pipe.set_source(InMemorySourceStage(config, [df])) + pipe.add_stage(LinearModulesStage(config, + web_scraper_definition, + input_type=MessageMeta, + output_type=MessageMeta, + input_port_name="input", + output_port_name="output")) + comp_stage = pipe.add_stage(CompareDataFrameStage(config, compare_df=df_expected)) + pipe.run() + + print(comp_stage.get_messages()) + + assert_results(comp_stage.get_results()) + + +# 1. Test with Mocked Files and Converters +def test_parse_files_with_mocked_files(): + with patch('your_module.get_file_meta', return_value=file_meta_mock), \ + patch('your_module.TextConverter', return_value=text_converter_mock): + open_files = [MagicMock(spec=fsspec.core.OpenFile) for _ in range(5)] + expected_data = [{'content': 'mock content'}] * len(open_files) + text_converter_mock.convert.return_value = expected_data + + result = your_module.parse_files(open_files) + + assert isinstance(result, MessageMeta) + assert len(result.df) == len(open_files) + assert result.df.to_dict('records') == expected_data + + +# 2. Test Handling of Exceptions During File Processing +def test_parse_files_with_exception(): + with patch('your_module.get_file_meta', side_effect=Exception("Error")), \ + patch('your_module.logger') as logger_mock: + open_files = [MagicMock(spec=fsspec.core.OpenFile) for _ in range(2)] + + result = your_module.parse_files(open_files) + + assert logger_mock.error.called + assert isinstance(result, MessageMeta) + assert result.df.empty + + +# 3. Test Batch Processing +def test_parse_files_batch_processing(): + batch_size = 2 + open_files = [MagicMock(spec=fsspec.core.OpenFile) for _ in range(5)] + + # Modify your_module.batch_size accordingly + your_module.batch_size = batch_size + + result = your_module.parse_files(open_files) + + assert len(result.df) == len(open_files) # Assuming each file results in one row + + +# 4. Test Processing Different File Types +@pytest.mark.parametrize("file_type, converter", [("pdf", pdf_converter_mock), ("txt", text_converter_mock)]) +def test_parse_files_different_file_types(file_type, converter): + with patch('your_module.get_file_meta', return_value={"file_type": file_type}), \ + patch(f'your_module.{converter.__class__.__name__}', return_value=converter): + open_files = [MagicMock(spec=fsspec.core.OpenFile) for _ in range(2)] + converter.convert.return_value = [{'content': 'mock content'}] + + result = your_module.parse_files(open_files) + + assert converter.convert.called + assert len(result.df) == len(open_files) diff --git a/tests/examples/llm/common/test_web_scraper_module.py b/tests/examples/llm/common/test_web_scraper_module.py index 868afa59db..1502b62f5f 100644 --- a/tests/examples/llm/common/test_web_scraper_module.py +++ b/tests/examples/llm/common/test_web_scraper_module.py @@ -1,3 +1,17 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import os import types