Skip to content

Commit

Permalink
Add unit tests, fix docs and examples, tweak configuration parameters…
Browse files Browse the repository at this point in the history
… for best performance
  • Loading branch information
drobison00 committed Jan 16, 2024
1 parent c2b1706 commit b516233
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 20 deletions.
3 changes: 0 additions & 3 deletions examples/llm/common/web_scraper_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ def download_and_split(msg: MessageMeta, text_splitter, link_column, session) ->
try:
# Try to get the page content
response = session.get(url)
logger.info(f"RESPONSE TEXT: {response.text}")

if (not response.ok):
logger.warning(
Expand All @@ -93,8 +92,6 @@ def download_and_split(msg: MessageMeta, text_splitter, link_column, session) ->
row_cp.update({"page_content": text})
final_rows.append(row_cp)

logger.info(final_rows)

if isinstance(response, requests_cache.models.response.CachedResponse):
logger.debug("Processed cached page: '%s'", url)
else:
Expand Down
35 changes: 27 additions & 8 deletions examples/llm/vdb_upload/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,31 @@ options and a pipeline to run. For the purposes of this document, we'll focus on
incorporates various functionalities like handling RSS and filesystem sources, embedding configurations, and vector
database (VDB) settings.
#### Configuration Balance Considerations
When configuring the Morpheus Pipeline, especially for stages like the RSS source and the Vector Database Upload, it's
important to balance responsiveness and performance.
- **RSS Source Stage**: The RSS source stage is responsible for yielding webpage links for processing. A larger batch size
at this stage can lead to decreased responsiveness, as the subsequent web scraper stage may take a considerable amount of
time to retrieve and process all the items in each batch. To ensure a responsive experience for users, it's recommended
to configure the RSS source stage with a relatively smaller batch size. This adjustment tends to have minimal impact on
overall performance while significantly improving the time to process each batch of links.
- **Vector Database Upload Stage**: At the other end of the pipeline, the Vector Database Upload stage has its own
considerations. This stage experiences a significant transaction overhead. To mitigate this, it is advisable to configure
this stage with the largest batch size possible. This approach helps in efficiently managing transaction overheads and
improves the throughput of the pipeline, especially when dealing with large volumes of data.
Balancing these configurations ensures that the pipeline runs efficiently, with optimized responsiveness at the RSS
source stage and improved throughput at the Vector Database Upload stage.
### Run example:
Default example usage, with pre-defined RSS source
```bash
python examples/llm/main.py vdb_upload \
python examples/llm/main.py vdb_upload pipeline \
--enable_cache \
--enable_monitors \
--embedding_model_name all-MiniLM-L6-v2
Expand All @@ -171,7 +190,7 @@ Usage with CLI-Defined Sources:
*Example: Defining an RSS Source via CLI*

```bash
python examples/llm/main.py vdb_upload \
python examples/llm/main.py vdb_upload pipeline \
--source_type rss \
--interval_secs 300 \
--rss_request_timeout_sec 5.0 \
Expand All @@ -183,19 +202,19 @@ python examples/llm/main.py vdb_upload \
*Example: Defining a Filesystem Source via CLI*

```bash
python examples/llm/main.py vdb_upload \
python examples/llm/main.py vdb_upload pipeline \
--source_type filesystem \
--file_source "/path/to/files1" "/path/to/files2" \
--file_source "./morpheus/data/scratch/*" \
--enable_monitors \
--embedding_model_name all-MiniLM-L6-v2
```

*Example: Combining RSS and Filesystem Sources via CLI*

```bash
python examples/llm/main.py vdb_upload \
--source_type rss filesystem \
--file_source "/path/to/files" \
python examples/llm/main.py vdb_upload pipeline \
--source_type rss --source_type filesystem \
--file_source "./morpheus/data/scratch/*" \
--interval_secs 600 \
--enable_cache \
--enable_monitors \
Expand Down Expand Up @@ -224,7 +243,7 @@ vdb_pipeline:
```
```bash
python examples/llm/main.py vdb_upload \
python examples/llm/main.py vdb_upload pipeline \
--vdb_config_path "./vdb_config.yaml"
```

Expand Down
2 changes: 1 addition & 1 deletion examples/llm/vdb_upload/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def process_vdb_sources(pipe: Pipeline, config: Config, vdb_source_config: typin
"""
vdb_sources = []
for source_info in vdb_source_config:
validate_source_config(source_info) # Assuming this function exists
validate_source_config(source_info)
source_type = source_info['type']
source_name = source_info['name']
source_config = source_info['config']
Expand Down
5 changes: 3 additions & 2 deletions examples/llm/vdb_upload/module/rss_source_pipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@


class RSSSourceParamContract(BaseModel):
batch_size: int = 128
batch_size: int = 32
cache_dir: str = "./.cache/http"
cooldown_interval_sec: int = 600
enable_cache: bool = False
enable_monitor: bool = True
feed_input: List[str] = Field(default_factory=list)
interval_sec: int = 600
output_batch_size: int = 2048
request_timeout_sec: float = 2.0
run_indefinitely: bool = True
stop_after: int = 0
Expand Down Expand Up @@ -125,7 +126,7 @@ def _rss_source_pipe(builder: mrc.Builder):
schema_transform_definition = SchemaTransformInterface.get_definition("schema_transform", transform_config)

deserialize_definition = DeserializeInterface.get_definition("deserialize",
{"batch_size": validated_config.batch_size})
{"batch_size": validated_config.output_batch_size})

monitor_m1 = Monitor.get_definition("monitor_m1", {"description": "RSSSourcePipe RSS Source",
"silence_monitors": not enable_monitor})
Expand Down
9 changes: 7 additions & 2 deletions examples/llm/vdb_upload/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ def build_cli_configs(source_type, enable_cache, embedding_size, isolate_embeddi
'type': 'rss',
'name': 'rss-cli',
'config': {
"batch_size": pipeline_batch_size,
# RSS feeds can take a while to pull, smaller batch sizes allows the pipeline to feel more responsive
"batch_size": 32,
"output_batch_size": 2048,
"cache_dir": "./.cache/http",
"cooldown_interval_sec": interval_secs,
"enable_cache": enable_cache,
Expand Down Expand Up @@ -196,8 +198,11 @@ def build_cli_configs(source_type, enable_cache, embedding_size, isolate_embeddi

# Pipeline Configuration
cli_pipeline_conf = {
"edge_buffer_size": 128,
"embedding_size": embedding_size,
"feature_length": model_fea_length,
"isolate_embeddings": isolate_embeddings,
"max_batch_size": 256,
"num_threads": num_threads,
"pipeline_batch_size": pipeline_batch_size,
}
Expand Down Expand Up @@ -311,7 +316,7 @@ def build_final_config(vdb_conf_path, cli_source_conf, cli_embeddings_conf, cli_
vdb_conf = merge_configs(vdb_pipeline_config.get('vdb', {}), cli_vdb_conf)

# TODO: class labels depends on this, so it should be a pipeline level parameter, not a vdb level parameter
pipeline_conf['embedding_size'] = vdb_conf.get('embedding_size')
pipeline_conf['embedding_size'] = vdb_conf.get('embedding_size', 384)

final_config.update({
'embeddings_config': embeddings_conf,
Expand Down
5 changes: 3 additions & 2 deletions examples/llm/vdb_upload/vdb_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ vdb_pipeline:
feature_length: 512
max_batch_size: 256
num_threads: 10
pipeline_batch_size: 1024
pipeline_batch_size: 128

sources:
- type: rss
name: "rss_cve"
config:
batch_size: 128 # Number of rss feeds to process at a time
batch_size: 32 # Number of rss feeds per batch
cache_dir: "./.cache/http"
cooldown_interval_sec: 600
enable_cache: False
Expand Down Expand Up @@ -72,6 +72,7 @@ vdb_pipeline:
- "https://blog.google/threat-analysis-group/rss/"
- "https://intezer.com/feed/"
interval_sec: 600
output_batch_size: 2048 # Number of chunked documents per output batch
request_timeout_sec: 2.0
run_indefinitely: true
stop_after: 0
Expand Down
2 changes: 1 addition & 1 deletion morpheus/stages/general/linear_modules_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _get_cpp_module_node(self, builder: mrc.Builder) -> mrc.SegmentObject:
def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
if (isinstance(self._module_config, dict) and
"module_id" in self._module_config):
module = load_module(self._module_config["module_id"], builder=builder)
module = load_module(self._module_config, builder=builder)
else:
module = self._module_config.load(builder)

Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
123 changes: 123 additions & 0 deletions tests/examples/llm/common/test_content_extractor_module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import os
import types
from unittest.mock import patch, MagicMock

import cudf
import fsspec
import pytest
from _utils import TEST_DIRS
from _utils import assert_results

from morpheus.config import Config
from morpheus.messages import MessageMeta
from morpheus.pipeline import LinearPipeline
from morpheus.stages.general.linear_modules_stage import LinearModulesStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage

# Mock dependencies
file_meta_mock = MagicMock()
text_converter_mock = MagicMock()

# TODO

@pytest.mark.use_python
@pytest.mark.use_cudf
@pytest.mark.import_mod(os.path.join(TEST_DIRS.examples_dir, 'llm/common/content_extractor_module.py'))
def test_http_client_source_stage_pipe(config: Config, mock_rest_server: str, import_mod: types.ModuleType):
url = f"{mock_rest_server}/www/index"

df = cudf.DataFrame({"link": [url]})
df_expected = cudf.DataFrame({"link": [url], "page_content": "website title some paragraph"})

web_scraper_definition = import_mod.WebScraperInterface.get_definition("web_scraper",
module_config={"web_scraper_config": {
"link_column": "link", "chunk_size": 100,
"enable_cache": False,
"cache_path": "./.cache/http/RSSDownloadStage.sqlite",
"cache_dir": "./.cache/llm/rss"}})

pipe = LinearPipeline(config)
pipe.set_source(InMemorySourceStage(config, [df]))
pipe.add_stage(LinearModulesStage(config,
web_scraper_definition,
input_type=MessageMeta,
output_type=MessageMeta,
input_port_name="input",
output_port_name="output"))
comp_stage = pipe.add_stage(CompareDataFrameStage(config, compare_df=df_expected))
pipe.run()

print(comp_stage.get_messages())

assert_results(comp_stage.get_results())


# 1. Test with Mocked Files and Converters
def test_parse_files_with_mocked_files():
with patch('your_module.get_file_meta', return_value=file_meta_mock), \
patch('your_module.TextConverter', return_value=text_converter_mock):
open_files = [MagicMock(spec=fsspec.core.OpenFile) for _ in range(5)]
expected_data = [{'content': 'mock content'}] * len(open_files)
text_converter_mock.convert.return_value = expected_data

result = your_module.parse_files(open_files)

assert isinstance(result, MessageMeta)
assert len(result.df) == len(open_files)
assert result.df.to_dict('records') == expected_data


# 2. Test Handling of Exceptions During File Processing
def test_parse_files_with_exception():
with patch('your_module.get_file_meta', side_effect=Exception("Error")), \
patch('your_module.logger') as logger_mock:
open_files = [MagicMock(spec=fsspec.core.OpenFile) for _ in range(2)]

result = your_module.parse_files(open_files)

assert logger_mock.error.called
assert isinstance(result, MessageMeta)
assert result.df.empty


# 3. Test Batch Processing
def test_parse_files_batch_processing():
batch_size = 2
open_files = [MagicMock(spec=fsspec.core.OpenFile) for _ in range(5)]

# Modify your_module.batch_size accordingly
your_module.batch_size = batch_size

result = your_module.parse_files(open_files)

assert len(result.df) == len(open_files) # Assuming each file results in one row


# 4. Test Processing Different File Types
@pytest.mark.parametrize("file_type, converter", [("pdf", pdf_converter_mock), ("txt", text_converter_mock)])
def test_parse_files_different_file_types(file_type, converter):
with patch('your_module.get_file_meta', return_value={"file_type": file_type}), \
patch(f'your_module.{converter.__class__.__name__}', return_value=converter):
open_files = [MagicMock(spec=fsspec.core.OpenFile) for _ in range(2)]
converter.convert.return_value = [{'content': 'mock content'}]

result = your_module.parse_files(open_files)

assert converter.convert.called
assert len(result.df) == len(open_files)
14 changes: 14 additions & 0 deletions tests/examples/llm/common/test_web_scraper_module.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import types

Expand Down

0 comments on commit b516233

Please sign in to comment.