From 8b1abc82ce7a219f26012bc5082653be1caaa32b Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 12 Jul 2024 12:41:40 -0700 Subject: [PATCH 1/7] Re-work the CLI & Yaml config parsing diferentiate the explicit cli args the user specified on the command line from the cli args which include default values the user didn't specify. Move default values out of the code blocks into globals Precedence is: Explicit cli args Yaml config (if there is one) Default cli args In the previous version there were conflicting docstrings over which takes precedence, however in practice CLI args took precenence, since there was no separation of the explicit vs default values in practice the yaml config was never used. Fix bug type-o causing yaml schema definitions to be ignored. Resulting code is 122 lines shorter --- examples/llm/vdb_upload/run.py | 26 +- examples/llm/vdb_upload/vdb_config.yaml | 2 +- examples/llm/vdb_upload/vdb_utils.py | 698 ++++++++++-------------- 3 files changed, 311 insertions(+), 415 deletions(-) diff --git a/examples/llm/vdb_upload/run.py b/examples/llm/vdb_upload/run.py index b889f6c17a..973f2cfcb0 100644 --- a/examples/llm/vdb_upload/run.py +++ b/examples/llm/vdb_upload/run.py @@ -17,7 +17,7 @@ import click from vdb_upload.vdb_utils import build_cli_configs -from vdb_upload.vdb_utils import build_final_config +from vdb_upload.vdb_utils import build_config from vdb_upload.vdb_utils import is_valid_service logger = logging.getLogger(__name__) @@ -144,7 +144,8 @@ def run(): default="http://localhost:19530", help="URI for connecting to Vector Database server.", ) -def pipeline(**kwargs): +@click.pass_context +def pipeline(ctx: click.Context, **kwargs): """ Configure and run the data processing pipeline based on the specified command-line options. @@ -154,6 +155,8 @@ def pipeline(**kwargs): Parameters ---------- + ctx: click.Context + Click context object. **kwargs : dict Keyword arguments containing command-line options. @@ -161,18 +164,19 @@ def pipeline(**kwargs): ------- The result of the internal pipeline function call. """ + vdb_config_path = kwargs.pop('vdb_config_path', None) - cli_source_conf, cli_embed_conf, cli_pipe_conf, cli_tok_conf, cli_vdb_conf = build_cli_configs(**kwargs) - final_config = build_final_config(vdb_config_path, - cli_source_conf, - cli_embed_conf, - cli_pipe_conf, - cli_tok_conf, - cli_vdb_conf) + # When a config file is provided, only merge the explicit flags set by the user + explicit_cli_args = {} + for (key, value) in kwargs.items(): + if ctx.get_parameter_source(key) is not click.core.ParameterSource.DEFAULT: + explicit_cli_args[key] = value + + config = build_config(vdb_conf_path=vdb_config_path, explicit_cli_args=explicit_cli_args, implicit_cli_args=kwargs) # Call the internal pipeline function with the final config dictionary from .pipeline import pipeline as _pipeline - return _pipeline(**final_config) + return _pipeline(**config) @run.command() @@ -188,7 +192,7 @@ def pipeline(**kwargs): type=click.Path(file_okay=True, dir_okay=False), help="Location to save the cache to", ) -def langchain(**kwargs): +def langchain(**kwargs) from .langchain import chain return chain(**kwargs) diff --git a/examples/llm/vdb_upload/vdb_config.yaml b/examples/llm/vdb_upload/vdb_config.yaml index 0931665637..406c4dced8 100644 --- a/examples/llm/vdb_upload/vdb_config.yaml +++ b/examples/llm/vdb_upload/vdb_config.yaml @@ -75,7 +75,7 @@ vdb_pipeline: output_batch_size: 2048 # Number of chunked documents per output batch request_timeout_sec: 2.0 run_indefinitely: true - stop_after_rec: 0 + stop_after_rec: 64 strip_markup: true web_scraper_config: chunk_overlap: 51 diff --git a/examples/llm/vdb_upload/vdb_utils.py b/examples/llm/vdb_upload/vdb_utils.py index 3aee5584ec..ba5b49dd36 100644 --- a/examples/llm/vdb_upload/vdb_utils.py +++ b/examples/llm/vdb_upload/vdb_utils.py @@ -24,6 +24,120 @@ logger = logging.getLogger(__name__) +DEFAULT_RSS_URLS = [ + "https://www.theregister.com/security/headlines.atom", + "https://isc.sans.edu/dailypodcast.xml", + "https://threatpost.com/feed/", + "http://feeds.feedburner.com/TheHackersNews?format=xml", + "https://www.bleepingcomputer.com/feed/", + "https://therecord.media/feed/", + "https://blog.badsectorlabs.com/feeds/all.atom.xml", + "https://krebsonsecurity.com/feed/", + "https://www.darkreading.com/rss_simple.asp", + "https://blog.malwarebytes.com/feed/", + "https://msrc.microsoft.com/blog/feed", + "https://securelist.com/feed", + "https://www.crowdstrike.com/blog/feed/", + "https://threatconnect.com/blog/rss/", + "https://news.sophos.com/en-us/feed/", + "https://www.us-cert.gov/ncas/current-activity.xml", + "https://www.csoonline.com/feed", + "https://www.cyberscoop.com/feed", + "https://research.checkpoint.com/feed", + "https://feeds.fortinet.com/fortinet/blog/threat-research", + "https://www.mcafee.com/blogs/rss", + "https://www.digitalshadows.com/blog-and-research/rss.xml", + "https://www.nist.gov/news-events/cybersecurity/rss.xml", + "https://www.sentinelone.com/blog/rss/", + "https://www.bitdefender.com/blog/api/rss/labs/", + "https://www.welivesecurity.com/feed/", + "https://unit42.paloaltonetworks.com/feed/", + "https://mandiant.com/resources/blog/rss.xml", + "https://www.wired.com/feed/category/security/latest/rss", + "https://www.wired.com/feed/tag/ai/latest/rss", + "https://blog.google/threat-analysis-group/rss/", + "https://intezer.com/feed/", +] + +DEFAULT_RSS_CONFIG = { + # RSS feeds can take a while to pull, smaller batch sizes allows the pipeline to feel more responsive + "batch_size": 32, + "output_batch_size": 2048, + "cache_dir": "./.cache/http", + "stop_after_rec": 0, + "feed_input": DEFAULT_RSS_URLS, + "strip_markup": True, + "web_scraper_config": {} +} + +DEFAULT_EMBEDDINGS_MODEL_KWARGS = {"force_convert_inputs": True, "use_shared_memory": False} + +DEFAULT_PIPELINE_CONFIG = {"edge_buffer_size": 128, "max_batch_size": 256} + +DEFAULT_TOKENIZER_CONFIG = { + "model_name": "bert-base-uncased-hash", + "model_kwargs": { + "add_special_tokens": False, + "column": "content", + "do_lower_case": True, + "truncation": True, + "vocab_hash_file": "data/bert-base-uncased-hash.txt", + } +} + +DEFAULT_VDB_CONFIG = { + # Vector db upload has some significant transaction overhead, batch size here should be as large as possible + 'batch_size': 16384, + 'recreate': True, + 'truncate_long_strings': True +} + +DEFAULT_MILVUS_CONFIG = { + "index_conf": { + "field_name": "embedding", + "metric_type": "L2", + "index_type": "HNSW", + "params": { + "M": 8, + "efConstruction": 64, + }, + }, + "schema_conf": { + "enable_dynamic_field": True, + "schema_fields": [ + { + "name": "id", + "dtype": "INT64", + "description": "Primary key for the collection", + "is_primary": True, + "auto_id": True + }, + { + "name": "title", "dtype": "VARCHAR", "description": "The title of the RSS Page", "max_length": 65_535 + }, + { + "name": "source", "dtype": "VARCHAR", "description": "The URL of the RSS Page", "max_length": 65_535 + }, + { + "name": "summary", + "dtype": "VARCHAR", + "description": "The summary of the RSS Page", + "max_length": 65_535 + }, + { + "name": "content", + "dtype": "VARCHAR", + "description": "A chunk of text from the RSS Page", + "max_length": 65_535 + }, + { + "name": "embedding", "dtype": "FLOAT_VECTOR", "description": "Embedding vectors", "dim": 384 + }, + ], + "description": "RSS collection schema" + } +} + def build_milvus_config(resource_schema_config: dict): schema_fields = [] @@ -87,248 +201,14 @@ def merge_dicts(dict_1, dict_2): dict The merged dictionary. """ + result = dict_1.copy() for key, value in dict_2.items(): - if key in dict_1 and isinstance(dict_1[key], dict) and isinstance(value, dict): - merge_dicts(dict_1[key], value) + dict_1_value = dict_1.get(key) + if isinstance(dict_1_value, dict) and isinstance(value, dict): + result[key] = merge_dicts(dict_1_value, value) else: - dict_1[key] = value - return dict_1 - - -def merge_configs(file_config, cli_config): - """ - Merge two configuration dictionaries, prioritizing the CLI configuration. - - This function merges configurations provided from a file and the CLI, with the CLI configuration taking precedence - in case of overlapping keys. Nested dictionaries are merged recursively. - - Parameters - ---------- - file_config : dict - The configuration dictionary loaded from a file. - cli_config : dict - The configuration dictionary provided via CLI arguments. - - Returns - ------- - dict - A merged dictionary with CLI configurations overriding file configurations where they overlap. - """ - return merge_dicts(file_config.copy(), {k: v for k, v in cli_config.items() if v is not None}) - - -def _build_default_rss_source(enable_cache, - enable_monitors, - interval_secs, - run_indefinitely, - stop_after, - vector_db_resource_name, - content_chunking_size, - rss_request_timeout_sec, - feed_inputs): - return { - 'type': 'rss', - 'name': 'rss-cli', - 'config': { - # RSS feeds can take a while to pull, smaller batch sizes allows the pipeline to feel more responsive - "batch_size": 32, - "output_batch_size": 2048, - "cache_dir": "./.cache/http", - "cooldown_interval_sec": interval_secs, - "stop_after_rec": stop_after or 0, - "enable_cache": enable_cache, - "enable_monitor": enable_monitors, - "feed_input": feed_inputs if feed_inputs else build_rss_urls(), - "interval_sec": interval_secs, - "request_timeout_sec": rss_request_timeout_sec, - "run_indefinitely": run_indefinitely, - "strip_markup": True, - "vdb_resource_name": vector_db_resource_name, - "web_scraper_config": { - "chunk_size": content_chunking_size, - "enable_cache": enable_cache, - } - } - } - - -def _build_default_filesystem_source(enable_monitors, - file_source, - pipeline_batch_size, - run_indefinitely, - vector_db_resource_name, - content_chunking_size, - num_threads): - return { - 'type': 'filesystem', - 'name': 'filesystem-cli', - 'config': { - "batch_size": pipeline_batch_size, - "enable_monitor": enable_monitors, - "extractor_config": { - "chunk_size": content_chunking_size, - "num_threads": num_threads, - }, - "filenames": file_source, - "vdb_resource_name": vector_db_resource_name, - "watch": run_indefinitely, - } - } - - -def build_cli_configs(source_type, - enable_cache, - embedding_size, - isolate_embeddings, - embedding_model_name, - enable_monitors, - file_source, - interval_secs, - pipeline_batch_size, - run_indefinitely, - stop_after, - vector_db_resource_name, - vector_db_service, - vector_db_uri, - content_chunking_size, - num_threads, - rss_request_timeout_sec, - model_max_batch_size, - model_fea_length, - triton_server_url, - feed_inputs): - """ - Create configuration dictionaries based on CLI arguments. - - Constructs individual configuration dictionaries for various components of the data processing pipeline, - such as source, embeddings, pipeline, tokenizer, and vector database configurations. - - Parameters - ---------- - source_type : list of str - Types of data sources (e.g., 'rss', 'filesystem'). - enable_cache : bool - Flag to enable caching. - embedding_size : int - Size of the embeddings. - isolate_embeddings : bool - Flag to isolate embeddings. - embedding_model_name : str - Name of the embedding model. - enable_monitors : bool - Flag to enable monitor functionality. - file_source : list of str - File sources or paths to be processed. - interval_secs : int - Interval in seconds for operations. - pipeline_batch_size : int - Batch size for the pipeline. - run_indefinitely : bool - Flag to run the process indefinitely. - stop_after : int - Stop after a certain number of records. - vector_db_resource_name : str - Name of the resource in the vector database. - vector_db_service : str - Name of the vector database service. - vector_db_uri : str - URI for the vector database server. - content_chunking_size : int - Size of content chunks. - num_threads : int - Number of threads to use. - rss_request_timeout_sec : float - Timeout in seconds for RSS requests. - model_max_batch_size : int - Maximum batch size for the model. - model_fea_length : int - Feature length for the model. - triton_server_url : str - URL of the Triton server. - feed_inputs : list of str - RSS feed inputs. - - Returns - ------- - tuple - A tuple containing five dictionaries for source, embeddings, pipeline, tokenizer, and vector database - configurations. - """ - - # Source Configuration - cli_source_conf = {} - if 'rss' in source_type: - cli_source_conf['rss'] = _build_default_rss_source(enable_cache, - enable_monitors, - interval_secs, - run_indefinitely, - stop_after, - vector_db_resource_name, - content_chunking_size, - rss_request_timeout_sec, - feed_inputs) - if 'filesystem' in source_type: - cli_source_conf['filesystem'] = _build_default_filesystem_source(enable_monitors, - file_source, - pipeline_batch_size, - run_indefinitely, - vector_db_resource_name, - content_chunking_size, - num_threads) - - # Embeddings Configuration - cli_embeddings_conf = { - "feature_length": model_fea_length, - "max_batch_size": model_max_batch_size, - "model_kwargs": { - "force_convert_inputs": True, - "model_name": embedding_model_name, - "server_url": triton_server_url, - "use_shared_memory": False, - }, - "num_threads": num_threads, - } - - # Pipeline Configuration - cli_pipeline_conf = { - "edge_buffer_size": 128, - "embedding_size": embedding_size, - "feature_length": model_fea_length, - "isolate_embeddings": isolate_embeddings, - "max_batch_size": 256, - "num_threads": num_threads, - "pipeline_batch_size": pipeline_batch_size, - } - - # Tokenizer Configuration - cli_tokenizer_conf = { - "model_name": "bert-base-uncased-hash", - "model_kwargs": { - "add_special_tokens": False, - "column": "content", - "do_lower_case": True, - "truncation": True, - "vocab_hash_file": "data/bert-base-uncased-hash.txt", - } - } - - # VDB Configuration - cli_vdb_conf = { - # Vector db upload has some significant transaction overhead, batch size here should be as large as possible - 'batch_size': 16384, - 'embedding_size': embedding_size, - 'recreate': True, - 'resource_name': vector_db_resource_name, - 'resource_schemas': { - vector_db_resource_name: - build_defualt_milvus_config(embedding_size) if (vector_db_service == 'milvus') else None, - }, - 'service': vector_db_service, - 'truncate_long_strings': True, - 'uri': vector_db_uri, - } - - return cli_source_conf, cli_embeddings_conf, cli_pipeline_conf, cli_tokenizer_conf, cli_vdb_conf + result[key] = value + return result def build_pipeline_config(pipeline_config: dict): @@ -366,161 +246,206 @@ def build_pipeline_config(pipeline_config: dict): return config -def build_final_config(vdb_conf_path, - cli_source_conf, - cli_embeddings_conf, - cli_pipeline_conf, - cli_tokenizer_conf, - cli_vdb_conf): +def _set_values_if_exists(dest_dict: dict[str, typing.Any], src_dict: dict[str, typing.Any], mapping: dict[str, str]): + """ + Set values in dest_dict if they exist in src_dict + Since a single source key can map to multiple destination keys, the mapping dictionary is in the form of: + {dest_key: src_key} + """ + for dest_key, src_key in mapping.items(): + # Explicitly using an `in` test here since `None` is a valid value + if src_key in src_dict: + dest_dict[dest_key] = src_dict[src_key] + + +def _cli_args_to_config(cli_args: dict[str, typing.Any], include_defaults: bool = False) -> dict: + """ + CLI arguments are in a flat structure, this function converts them to the nested structure used byt the yaml congig + allowing for easy merging of the two. + """ + config = {} + source_configs = [] + source_type = cli_args.get('source_type', []) + if 'rss' in source_type: + if include_defaults: + rss_config = DEFAULT_RSS_CONFIG.copy() + else: + rss_config = {"web_scraper_config": {}} + + _set_values_if_exists(rss_config['web_scraper_config'], + cli_args, { + "content_chunking_size": "chunk_size", "enable_cache": "enable_cache" + }) + _set_values_if_exists( + rss_config, + cli_args, + { + "cooldown_interval_sec": "interval_secs", + "stop_after_rec": "stop_after", + "enable_cache": "enable_cache", + "enable_monitor": "enable_monitors", + "interval_sec": "interval_secs", + "request_timeout_sec": "rss_request_timeout_sec", + "run_indefinitely": "run_indefinitely", + "vdb_resource_name": "vector_db_resource_name", + }) + + # Handle feed inputs separately + if len(cli_args.get('feed_inputs', [])) > 0: + rss_config['feed_input'] = cli_args['feed_inputs'] + + source_configs.append({'type': 'rss', 'name': 'rss-cli', 'config': rss_config}) + + if 'filesystem' in source_type: + fs_config = {"extractor_config": {}} + _set_values_if_exists(fs_config["extractor_config"], + cli_args, { + "chunk_size": "content_chunking_size", "num_threads": "num_threads" + }) + + _set_values_if_exists( + fs_config, + cli_args, + { + "batch_size": "pipeline_batch_size", + "enable_monitor": "enable_monitors", + "filenames": "file_source", + "vdb_resource_name": "vector_db_resource_name", + "watch": "run_indefinitely" + }) + + source_configs.append({'type': 'filesystem', 'name': 'filesystem-cli', 'config': fs_config}) + + config['source_configs'] = source_configs + + embeddings_model_kwargs = {} + if include_defaults: + embeddings_model_kwargs.update(DEFAULT_EMBEDDINGS_MODEL_KWARGS.copy()) + + _set_values_if_exists(embeddings_model_kwargs, + cli_args, { + "model_name": "embedding_model_name", "server_url": "triton_server_url" + }) + + embeddings_config = {"model_kwargs": embeddings_model_kwargs} + + _set_values_if_exists(embeddings_config, + cli_args, + { + "feature_length": "model_fea_length", + "max_batch_size": "model_max_batch_size", + "num_threads": "num_threads" + }) + + config['embeddings_config'] = embeddings_config + + # These values will be replaced later with a morpheus.config.Config object + pipeline_config = {} + if include_defaults: + pipeline_config.update(DEFAULT_PIPELINE_CONFIG.copy()) + + _set_values_if_exists( + pipeline_config, + cli_args, + { + "embedding_size": "embedding_size", + "feature_length": "model_fea_length", + "isolate_embeddings": "isolate_embeddings", + "num_threads": "num_threads", + "pipeline_batch_size": "pipeline_batch_size", + }) + + config['pipeline_config'] = pipeline_config + + if include_defaults: + config['tokenizer_config'] = DEFAULT_TOKENIZER_CONFIG.copy() + + vdb_config = {} + if include_defaults: + vdb_config.update(DEFAULT_VDB_CONFIG.copy()) + + _set_values_if_exists( + vdb_config, + cli_args, + { + 'embedding_size': 'embedding_size', + 'resource_name': 'vector_db_resource_name', + 'service': 'vector_db_service', + 'uri': 'vector_db_uri' + }) + + # Milvus configs to be built later if needed. The reason here is that we could use the default embedding size, but + # override the service type or name in other levels, we need the final resolved values of all three, for now just + # stub in hte resource name + resource_name = vdb_config.get('resource_name') + if resource_name is not None: + vdb_config["resource_schemas"] = {resource_name: None} + + return config + + +def build_config(vdb_conf_path: str | None, + explicit_cli_args: dict[str, typing.Any], + implicit_cli_args: dict[str, typing.Any]): """ Load and merge configurations from the CLI and YAML file. This function combines the configurations provided via the CLI with those specified in a YAML file. - If a YAML configuration file is specified and exists, it will merge its settings with the CLI settings, - with the YAML settings taking precedence. + If a YAML configuration file is specified and exists, it will merge its settings with the CLI settings. + + The order of precedence is as follows: Explict CLI args set by user > YAML settings > default values of CLI args. Parameters ---------- vdb_conf_path : str Path to the YAML configuration file. - cli_source_conf : dict - Source configuration provided via CLI. - cli_embeddings_conf : dict - Embeddings configuration provided via CLI. - cli_pipeline_conf : dict - Pipeline configuration provided via CLI. - cli_tokenizer_conf : dict - Tokenizer configuration provided via CLI. - cli_vdb_conf : dict - Vector Database (VDB) configuration provided via CLI. + explicit_cli_args : dict[str, typing.Any] + CLI args explicitly set by the user. + implicit_cli_args : dict[str, typing.Any] + CLI args including default values not explicitly set by the user. Returns ------- dict - A dictionary containing the final merged configuration for the pipeline, including source, embeddings, - tokenizer, and VDB configurations. - - Notes - ----- - The function prioritizes the YAML file configurations over CLI configurations. In case of overlapping - settings, the values from the YAML file will overwrite those from the CLI. + A dictionary containing the final merged configuration for the pipeline. """ - final_config = {} # Load and merge configurations from the YAML file if it exists if vdb_conf_path: with open(vdb_conf_path, 'r', encoding='utf-8') as file: - vdb_pipeline_config = yaml.safe_load(file).get('vdb_pipeline', {}) - - embeddings_conf = merge_configs(vdb_pipeline_config.get('embeddings', {}), cli_embeddings_conf) - pipeline_conf = merge_configs(vdb_pipeline_config.get('pipeline', {}), cli_pipeline_conf) - source_conf = vdb_pipeline_config.get('sources', []) + list(cli_source_conf.values()) - tokenizer_conf = merge_configs(vdb_pipeline_config.get('tokenizer', {}), cli_tokenizer_conf) - vdb_conf = vdb_pipeline_config.get('vdb', {}) - resource_schema = vdb_conf.pop("resource_schema", None) - - if resource_schema: - vdb_conf["resource_kwargs"] = build_milvus_config(resource_schema) - vdb_conf = merge_configs(vdb_conf, cli_vdb_conf) - - pipeline_conf['embedding_size'] = vdb_conf.get('embedding_size', 384) - - final_config.update({ - 'embeddings_config': embeddings_conf, - 'pipeline_config': build_pipeline_config(pipeline_conf), - 'source_config': source_conf, - 'tokenizer_config': tokenizer_conf, - 'vdb_config': vdb_conf, - }) - else: - # Use CLI configurations only - final_config.update({ - 'embeddings_config': cli_embeddings_conf, - 'pipeline_config': build_pipeline_config(cli_pipeline_conf), - 'source_config': list(cli_source_conf.values()), - 'tokenizer_config': cli_tokenizer_conf, - 'vdb_config': cli_vdb_conf, - }) - - # If no sources are specified either via CLI or in the yaml config, add a default RSS source - if (not final_config['source_config']): - final_config['source_config'].append( - _build_default_rss_source(enable_cache=True, - enable_monitors=True, - interval_secs=60, - run_indefinitely=True, - stop_after=None, - vector_db_resource_name="RSS", - content_chunking_size=128, - rss_request_timeout_sec=30, - feed_inputs=build_rss_urls())) + yaml_config = yaml.safe_load(file).get('vdb_pipeline', {}) - return final_config + yaml_config['vdb_config'] = yaml_config.pop('vdb', {}) + else: + yaml_config = {} + implicit_cli_config = _cli_args_to_config(implicit_cli_args, include_defaults=True) + explicit_cli_config = _cli_args_to_config(explicit_cli_args, include_defaults=False) -def build_defualt_milvus_config(embedding_size: int) -> typing.Dict[str, typing.Any]: - """ - Builds the configuration for Milvus. + final_config = merge_dicts(implicit_cli_config, yaml_config) + final_config = merge_dicts(final_config, explicit_cli_config) - This function creates a dictionary configuration for a Milvus collection. - It includes the index configuration and the schema configuration, with - various fields like id, title, link, summary, page_content, and embedding. + # Handle the resource schema separately, the reason is we need both the service type, resource name and the + # embedding size to all be defined, some or all of these values could be defined at any level. + vdb_config = final_config['vdb_config'] + if vdb_config.get('service') == 'milvus': + # Replace the resource schema configs with Milvus config objects + resource_schema_configs = vdb_config.pop("resource_schemas", {}) + resource_schemas = {} + for (resource_name, resource_schema) in resource_schema_configs.items(): + if resource_schema is None: + resource_schema = DEFAULT_MILVUS_CONFIG.copy() + # Update the embedding_size + resource_schema['schema_conf']['schema_fields'][-1]['dim'] = vdb_config['embedding_size'] - Parameters - ---------- - embedding_size : int - The size of the embedding vector. + resource_schemas[resource_name] = build_milvus_config(resource_schema) - Returns - ------- - typing.Dict[str, Any] - A dictionary containing the configuration settings for Milvus. - """ + vdb_config['resource_schemas'] = resource_schemas - milvus_resource_kwargs = { - "index_conf": { - "field_name": "embedding", - "metric_type": "L2", - "index_type": "HNSW", - "params": { - "M": 8, - "efConstruction": 64, - }, - }, - "schema_conf": { - "enable_dynamic_field": True, - "schema_fields": [ - pymilvus.FieldSchema(name="id", - dtype=pymilvus.DataType.INT64, - description="Primary key for the collection", - is_primary=True, - auto_id=True).to_dict(), - pymilvus.FieldSchema(name="title", - dtype=pymilvus.DataType.VARCHAR, - description="The title of the RSS Page", - max_length=65_535).to_dict(), - pymilvus.FieldSchema(name="source", - dtype=pymilvus.DataType.VARCHAR, - description="The URL of the RSS Page", - max_length=65_535).to_dict(), - pymilvus.FieldSchema(name="summary", - dtype=pymilvus.DataType.VARCHAR, - description="The summary of the RSS Page", - max_length=65_535).to_dict(), - pymilvus.FieldSchema(name="content", - dtype=pymilvus.DataType.VARCHAR, - description="A chunk of text from the RSS Page", - max_length=65_535).to_dict(), - pymilvus.FieldSchema(name="embedding", - dtype=pymilvus.DataType.FLOAT_VECTOR, - description="Embedding vectors", - dim=embedding_size).to_dict(), - ], - "description": "Test collection schema" - } - } + # convert the pipeline config to a morpheus.config.Config object + final_config['pipeline_config'] = build_pipeline_config(final_config['pipeline_config']) - return milvus_resource_kwargs + return final_config def build_rss_urls() -> typing.List[str]: @@ -533,37 +458,4 @@ def build_rss_urls() -> typing.List[str]: A list of URLs as strings, each pointing to a different RSS feed. """ - return [ - "https://www.theregister.com/security/headlines.atom", - "https://isc.sans.edu/dailypodcast.xml", - "https://threatpost.com/feed/", - "http://feeds.feedburner.com/TheHackersNews?format=xml", - "https://www.bleepingcomputer.com/feed/", - "https://therecord.media/feed/", - "https://blog.badsectorlabs.com/feeds/all.atom.xml", - "https://krebsonsecurity.com/feed/", - "https://www.darkreading.com/rss_simple.asp", - "https://blog.malwarebytes.com/feed/", - "https://msrc.microsoft.com/blog/feed", - "https://securelist.com/feed", - "https://www.crowdstrike.com/blog/feed/", - "https://threatconnect.com/blog/rss/", - "https://news.sophos.com/en-us/feed/", - "https://www.us-cert.gov/ncas/current-activity.xml", - "https://www.csoonline.com/feed", - "https://www.cyberscoop.com/feed", - "https://research.checkpoint.com/feed", - "https://feeds.fortinet.com/fortinet/blog/threat-research", - "https://www.mcafee.com/blogs/rss", - "https://www.digitalshadows.com/blog-and-research/rss.xml", - "https://www.nist.gov/news-events/cybersecurity/rss.xml", - "https://www.sentinelone.com/blog/rss/", - "https://www.bitdefender.com/blog/api/rss/labs/", - "https://www.welivesecurity.com/feed/", - "https://unit42.paloaltonetworks.com/feed/", - "https://mandiant.com/resources/blog/rss.xml", - "https://www.wired.com/feed/category/security/latest/rss", - "https://www.wired.com/feed/tag/ai/latest/rss", - "https://blog.google/threat-analysis-group/rss/", - "https://intezer.com/feed/", - ] + return DEFAULT_RSS_URLS.copy() From 3b5c9757302f8aadea91973b3f3f17392077c429 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 12 Jul 2024 13:04:49 -0700 Subject: [PATCH 2/7] Lint cleanups --- examples/llm/vdb_upload/langchain.py | 4 ++-- examples/llm/vdb_upload/run.py | 3 +-- examples/llm/vdb_upload/vdb_utils.py | 13 ------------- 3 files changed, 3 insertions(+), 17 deletions(-) diff --git a/examples/llm/vdb_upload/langchain.py b/examples/llm/vdb_upload/langchain.py index f296e077d3..9b9b616bd3 100644 --- a/examples/llm/vdb_upload/langchain.py +++ b/examples/llm/vdb_upload/langchain.py @@ -20,7 +20,7 @@ from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores.milvus import Milvus -from examples.llm.vdb_upload.vdb_utils import build_rss_urls +from examples.llm.vdb_upload.vdb_utils import DEFAULT_RSS_URLS from morpheus.utils.logging_timer import log_time logger = logging.getLogger(__name__) @@ -28,7 +28,7 @@ def chain(model_name, save_cache): with log_time(msg="Seeding with chain took {duration} ms. {rate_per_sec} docs/sec", log_fn=logger.debug) as log: - loader = RSSFeedLoader(urls=build_rss_urls()) + loader = RSSFeedLoader(urls=DEFAULT_RSS_URLS.copy()) documents = loader.load() diff --git a/examples/llm/vdb_upload/run.py b/examples/llm/vdb_upload/run.py index 973f2cfcb0..c43ef91ed7 100644 --- a/examples/llm/vdb_upload/run.py +++ b/examples/llm/vdb_upload/run.py @@ -16,7 +16,6 @@ import os import click -from vdb_upload.vdb_utils import build_cli_configs from vdb_upload.vdb_utils import build_config from vdb_upload.vdb_utils import is_valid_service @@ -192,7 +191,7 @@ def pipeline(ctx: click.Context, **kwargs): type=click.Path(file_okay=True, dir_okay=False), help="Location to save the cache to", ) -def langchain(**kwargs) +def langchain(**kwargs): from .langchain import chain return chain(**kwargs) diff --git a/examples/llm/vdb_upload/vdb_utils.py b/examples/llm/vdb_upload/vdb_utils.py index ba5b49dd36..74aa1f9432 100644 --- a/examples/llm/vdb_upload/vdb_utils.py +++ b/examples/llm/vdb_upload/vdb_utils.py @@ -446,16 +446,3 @@ def build_config(vdb_conf_path: str | None, final_config['pipeline_config'] = build_pipeline_config(final_config['pipeline_config']) return final_config - - -def build_rss_urls() -> typing.List[str]: - """ - Builds a list of RSS feed URLs. - - Returns - ------- - typing.List[str] - A list of URLs as strings, each pointing to a different RSS feed. - """ - - return DEFAULT_RSS_URLS.copy() From c81f1bae66c35e7ee3fbc4f8964f334a073462c7 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 12 Jul 2024 13:14:03 -0700 Subject: [PATCH 3/7] Update tests, merge_configs was removed, but merge_dicts still exists --- .../examples/llm/vdb_upload/test_vdb_utils.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/examples/llm/vdb_upload/test_vdb_utils.py b/tests/examples/llm/vdb_upload/test_vdb_utils.py index 38080f402a..e55950e4ef 100644 --- a/tests/examples/llm/vdb_upload/test_vdb_utils.py +++ b/tests/examples/llm/vdb_upload/test_vdb_utils.py @@ -28,29 +28,29 @@ def test_is_valid_service_with_mixed_case(import_vdb_update_utils_module): assert import_vdb_update_utils_module.is_valid_service(None, None, "MilVuS") == "milvus" -def test_merge_configs_non_overlapping(import_vdb_update_utils_module): +def test_merge_dicts_non_overlapping(import_vdb_update_utils_module): file_config = {"key1": "value1"} cli_config = {"key2": "value2"} expected = {"key1": "value1", "key2": "value2"} - assert import_vdb_update_utils_module.merge_configs(file_config, cli_config) == expected + assert import_vdb_update_utils_module.merge_dicts(file_config, cli_config) == expected -def test_merge_configs_overlapping(import_vdb_update_utils_module): +def test_merge_dicts_overlapping(import_vdb_update_utils_module): file_config = {"key1": "value1", "key2": "old_value"} cli_config = {"key2": "new_value"} expected = {"key1": "value1", "key2": "new_value"} - assert import_vdb_update_utils_module.merge_configs(file_config, cli_config) == expected + assert import_vdb_update_utils_module.merge_dicts(file_config, cli_config) == expected -def test_merge_configs_none_in_cli(import_vdb_update_utils_module): +def test_merge_dicts_none_in_cli(import_vdb_update_utils_module): file_config = {"key1": "value1", "key2": "value2"} cli_config = {"key2": None} - expected = {"key1": "value1", "key2": "value2"} - assert import_vdb_update_utils_module.merge_configs(file_config, cli_config) == expected + expected = {"key1": "value1", "key2": None} + assert import_vdb_update_utils_module.merge_dicts(file_config, cli_config) == expected -def test_merge_configs_empty(import_vdb_update_utils_module): +def test_merge_dicts_empty(import_vdb_update_utils_module): file_config = {} cli_config = {"key1": "value1"} expected = {"key1": "value1"} - assert import_vdb_update_utils_module.merge_configs(file_config, cli_config) == expected + assert import_vdb_update_utils_module.merge_dicts(file_config, cli_config) == expected From b646ba6d032a49064bdd5ca6e0d1ce202b8001f2 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 12 Jul 2024 13:33:47 -0700 Subject: [PATCH 4/7] Fix config handling --- examples/llm/vdb_upload/vdb_utils.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/examples/llm/vdb_upload/vdb_utils.py b/examples/llm/vdb_upload/vdb_utils.py index 74aa1f9432..9923946798 100644 --- a/examples/llm/vdb_upload/vdb_utils.py +++ b/examples/llm/vdb_upload/vdb_utils.py @@ -65,7 +65,7 @@ "output_batch_size": 2048, "cache_dir": "./.cache/http", "stop_after_rec": 0, - "feed_input": DEFAULT_RSS_URLS, + "feed_input": DEFAULT_RSS_URLS.copy(), "strip_markup": True, "web_scraper_config": {} } @@ -264,7 +264,7 @@ def _cli_args_to_config(cli_args: dict[str, typing.Any], include_defaults: bool allowing for easy merging of the two. """ config = {} - source_configs = [] + source_config = {} source_type = cli_args.get('source_type', []) if 'rss' in source_type: if include_defaults: @@ -274,7 +274,7 @@ def _cli_args_to_config(cli_args: dict[str, typing.Any], include_defaults: bool _set_values_if_exists(rss_config['web_scraper_config'], cli_args, { - "content_chunking_size": "chunk_size", "enable_cache": "enable_cache" + "chunk_size": "content_chunking_size", "enable_cache": "enable_cache" }) _set_values_if_exists( rss_config, @@ -294,7 +294,7 @@ def _cli_args_to_config(cli_args: dict[str, typing.Any], include_defaults: bool if len(cli_args.get('feed_inputs', [])) > 0: rss_config['feed_input'] = cli_args['feed_inputs'] - source_configs.append({'type': 'rss', 'name': 'rss-cli', 'config': rss_config}) + source_config['rss'] = {'type': 'rss', 'name': 'rss-cli', 'config': rss_config} if 'filesystem' in source_type: fs_config = {"extractor_config": {}} @@ -314,9 +314,9 @@ def _cli_args_to_config(cli_args: dict[str, typing.Any], include_defaults: bool "watch": "run_indefinitely" }) - source_configs.append({'type': 'filesystem', 'name': 'filesystem-cli', 'config': fs_config}) + source_config['filesystem'] = {'type': 'filesystem', 'name': 'filesystem-cli', 'config': fs_config} - config['source_configs'] = source_configs + config['source_config'] = source_config embeddings_model_kwargs = {} if include_defaults: @@ -381,6 +381,8 @@ def _cli_args_to_config(cli_args: dict[str, typing.Any], include_defaults: bool if resource_name is not None: vdb_config["resource_schemas"] = {resource_name: None} + config['vdb_config'] = vdb_config + return config @@ -415,7 +417,11 @@ def build_config(vdb_conf_path: str | None, with open(vdb_conf_path, 'r', encoding='utf-8') as file: yaml_config = yaml.safe_load(file).get('vdb_pipeline', {}) + # Yaml specific transforms yaml_config['vdb_config'] = yaml_config.pop('vdb', {}) + sources = yaml_config.pop('sources', []) + yaml_config['source_config'] = {src['type']: src for src in sources} + else: yaml_config = {} @@ -425,6 +431,9 @@ def build_config(vdb_conf_path: str | None, final_config = merge_dicts(implicit_cli_config, yaml_config) final_config = merge_dicts(final_config, explicit_cli_config) + # Flatten the source configs into a list + final_config['source_config'] = list(final_config.pop('source_config').values()) + # Handle the resource schema separately, the reason is we need both the service type, resource name and the # embedding size to all be defined, some or all of these values could be defined at any level. vdb_config = final_config['vdb_config'] From 62c0ceaf898ef1903e5f43787a9064a7622aa0b1 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 12 Jul 2024 13:55:34 -0700 Subject: [PATCH 5/7] Rename rss source --- examples/llm/vdb_upload/vdb_config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/llm/vdb_upload/vdb_config.yaml b/examples/llm/vdb_upload/vdb_config.yaml index 406c4dced8..1a3085037d 100644 --- a/examples/llm/vdb_upload/vdb_config.yaml +++ b/examples/llm/vdb_upload/vdb_config.yaml @@ -31,7 +31,7 @@ vdb_pipeline: sources: - type: rss - name: "rss_cve" + name: "rss" config: batch_size: 128 # Number of rss feeds per batch cache_dir: "./.cache/http" From d21c5533dc1c7a3de851ec57e047d6cb36041085 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 12 Jul 2024 14:00:50 -0700 Subject: [PATCH 6/7] Fix yaml to config name mappings --- examples/llm/vdb_upload/vdb_utils.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/examples/llm/vdb_upload/vdb_utils.py b/examples/llm/vdb_upload/vdb_utils.py index 9923946798..496df72797 100644 --- a/examples/llm/vdb_upload/vdb_utils.py +++ b/examples/llm/vdb_upload/vdb_utils.py @@ -138,6 +138,13 @@ } } +YAML_TO_CONFIG_MAPPING = { + 'embeddings': 'embeddings_config', + 'pipeline': 'pipeline_config', + 'tokenizer': 'tokenizer_config', + 'vdb': 'vdb_config' +} + def build_milvus_config(resource_schema_config: dict): schema_fields = [] @@ -294,7 +301,7 @@ def _cli_args_to_config(cli_args: dict[str, typing.Any], include_defaults: bool if len(cli_args.get('feed_inputs', [])) > 0: rss_config['feed_input'] = cli_args['feed_inputs'] - source_config['rss'] = {'type': 'rss', 'name': 'rss-cli', 'config': rss_config} + source_config['rss'] = {'type': 'rss', 'name': 'rss', 'config': rss_config} if 'filesystem' in source_type: fs_config = {"extractor_config": {}} @@ -418,9 +425,11 @@ def build_config(vdb_conf_path: str | None, yaml_config = yaml.safe_load(file).get('vdb_pipeline', {}) # Yaml specific transforms - yaml_config['vdb_config'] = yaml_config.pop('vdb', {}) + for (yaml_key, config_key) in YAML_TO_CONFIG_MAPPING.items(): + yaml_config[config_key] = yaml_config.pop(yaml_key, {}) + sources = yaml_config.pop('sources', []) - yaml_config['source_config'] = {src['type']: src for src in sources} + yaml_config['source_config'] = {src['name']: src for src in sources} else: yaml_config = {} From 6c588baabac494f8fdf19c401ed70f776517f258 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Fri, 12 Jul 2024 14:05:24 -0700 Subject: [PATCH 7/7] Remove stop_after_rec, no need to set default values --- examples/llm/vdb_upload/vdb_config.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/llm/vdb_upload/vdb_config.yaml b/examples/llm/vdb_upload/vdb_config.yaml index 1a3085037d..35ff2facb5 100644 --- a/examples/llm/vdb_upload/vdb_config.yaml +++ b/examples/llm/vdb_upload/vdb_config.yaml @@ -75,7 +75,6 @@ vdb_pipeline: output_batch_size: 2048 # Number of chunked documents per output batch request_timeout_sec: 2.0 run_indefinitely: true - stop_after_rec: 64 strip_markup: true web_scraper_config: chunk_overlap: 51