From 270fb20d2e10fac138745c106def9182c81818a9 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 2 Jul 2024 15:49:49 -0700 Subject: [PATCH 1/5] Fix mis-named monitor stage --- examples/llm/agents/simple_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/llm/agents/simple_pipeline.py b/examples/llm/agents/simple_pipeline.py index 9b2b95c611..e398161c37 100644 --- a/examples/llm/agents/simple_pipeline.py +++ b/examples/llm/agents/simple_pipeline.py @@ -104,7 +104,7 @@ def pipeline( sink = pipe.add_stage(InMemorySinkStage(config)) - pipe.add_stage(MonitorStage(config, description="Upload rate", unit="events", delayed_start=True)) + pipe.add_stage(MonitorStage(config, description="Agent rate", unit="events", delayed_start=True)) start_time = time.time() From dd49797d5e4d0f02111f5a5e23d550382ea2a197 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 2 Jul 2024 15:53:45 -0700 Subject: [PATCH 2/5] Move _build_engine and _build_agent_executor to be shared --- examples/llm/agents/common.py | 67 +++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 examples/llm/agents/common.py diff --git a/examples/llm/agents/common.py b/examples/llm/agents/common.py new file mode 100644 index 0000000000..a7ff541746 --- /dev/null +++ b/examples/llm/agents/common.py @@ -0,0 +1,67 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from langchain.agents import AgentType +from langchain.agents import initialize_agent +from langchain.agents import load_tools +from langchain.agents.agent import AgentExecutor +from langchain.llms.openai import OpenAI + +import cudf + +from morpheus.config import Config +from morpheus.config import PipelineModes +from morpheus.llm import LLMEngine +from morpheus.llm.nodes.extracter_node import ExtracterNode +from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode +from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler +from morpheus.messages import ControlMessage +from morpheus.pipeline.linear_pipeline import LinearPipeline +from morpheus.stages.general.monitor_stage import MonitorStage +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.llm.llm_engine_stage import LLMEngineStage +from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage +from morpheus.stages.preprocess.deserialize_stage import DeserializeStage +from morpheus.utils.concat_df import concat_dataframes + +logger = logging.getLogger(__name__) + + +def _build_agent_executor(model_name: str) -> AgentExecutor: + + llm = OpenAI(model=model_name, temperature=0.0, client=None) + + tools = load_tools(["serpapi", "llm-math"], llm=llm) + + agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True) + + return agent_executor + + +def _build_engine(model_name: str) -> LLMEngine: + + engine = LLMEngine() + + engine.add_node("extracter", node=ExtracterNode()) + + engine.add_node("agent", + inputs=[("/extracter")], + node=LangChainAgentNode(agent_executor=_build_agent_executor(model_name=model_name))) + + engine.add_task_handler(inputs=["/agent"], handler=SimpleTaskHandler()) + + return engine From 040900696565e20a959b79626e883f8c0d197ae6 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 2 Jul 2024 16:59:41 -0700 Subject: [PATCH 3/5] WIP --- examples/llm/agents/common.py | 20 +++++++++ examples/llm/agents/kafka_pipeline.py | 62 +++++--------------------- examples/llm/agents/run.py | 14 ++++++ examples/llm/agents/simple_pipeline.py | 54 ++-------------------- 4 files changed, 47 insertions(+), 103 deletions(-) diff --git a/examples/llm/agents/common.py b/examples/llm/agents/common.py index a7ff541746..2aff4e7b44 100644 --- a/examples/llm/agents/common.py +++ b/examples/llm/agents/common.py @@ -65,3 +65,23 @@ def _build_engine(model_name: str) -> LLMEngine: engine.add_task_handler(inputs=["/agent"], handler=SimpleTaskHandler()) return engine + + +def build_common_pipeline(config: Config, pipe: LinearPipeline, task_payload: dict, + model_name: str) -> InMemorySinkStage: + """ + Construct the elements of the pipeline common to the simple and kafka agent pipelines. + This method should be called after the source stage has been set. + """ + pipe.add_stage( + DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=task_payload)) + + pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions')) + + pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name))) + + sink = pipe.add_stage(InMemorySinkStage(config)) + + pipe.add_stage(MonitorStage(config, description="Agent rate", unit="events", delayed_start=True)) + + return sink diff --git a/examples/llm/agents/kafka_pipeline.py b/examples/llm/agents/kafka_pipeline.py index 91a8ee8add..4784a6dd24 100644 --- a/examples/llm/agents/kafka_pipeline.py +++ b/examples/llm/agents/kafka_pipeline.py @@ -15,55 +15,22 @@ import logging import time -from langchain.agents import AgentType -from langchain.agents import initialize_agent -from langchain.agents import load_tools -from langchain.agents.agent import AgentExecutor -from langchain.llms.openai import OpenAIChat - from morpheus.config import Config from morpheus.config import PipelineModes -from morpheus.llm import LLMEngine -from morpheus.llm.nodes.extracter_node import ExtracterNode -from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode -from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler -from morpheus.messages import ControlMessage from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.stages.input.kafka_source_stage import KafkaSourceStage -from morpheus.stages.llm.llm_engine_stage import LLMEngineStage -from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage -from morpheus.stages.preprocess.deserialize_stage import DeserializeStage - -logger = logging.getLogger(__name__) - - -def _build_agent_executor(model_name: str) -> AgentExecutor: - - llm = OpenAIChat(model_name=model_name, model_kwargs={"temperature": 0.0}, client=None) - - tools = load_tools(["serpapi", "llm-math"], llm=llm) - - agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True) - - return agent_executor - -def _build_engine(model_name: str) -> LLMEngine: +from .common import build_common_pipeline - engine = LLMEngine() - - engine.add_node("extracter", node=ExtracterNode()) - - engine.add_node("agent", - inputs=[("/extracter")], - node=LangChainAgentNode(agent_executor=_build_agent_executor(model_name=model_name))) - - engine.add_task_handler(inputs=["/extracter"], handler=SimpleTaskHandler()) - - return engine +logger = logging.getLogger(__name__) -def pipeline(num_threads: int, pipeline_batch_size: int, model_max_batch_size: int, model_name: str) -> float: +def pipeline(num_threads: int, + pipeline_batch_size: int, + model_max_batch_size: int, + model_name: str, + bootstrap_servers: str, + topic: str) -> float: config = Config() config.mode = PipelineModes.OTHER @@ -78,18 +45,9 @@ def pipeline(num_threads: int, pipeline_batch_size: int, model_max_batch_size: i pipe = LinearPipeline(config) - pipe.set_source(KafkaSourceStage(config, bootstrap_servers="auto", input_topic=["input"])) - - pipe.add_stage( - DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task)) - - # pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions')) - - pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name))) - - sink = pipe.add_stage(InMemorySinkStage(config)) + pipe.set_source(KafkaSourceStage(config, bootstrap_servers=bootstrap_servers, input_topic=[topic])) - # pipe.add_stage(MonitorStage(config, description="Upload rate", unit="events", delayed_start=True)) + sink = build_common_pipeline(config=config, pipe=pipe, task_payload=completion_task, model_name=model_name) start_time = time.time() diff --git a/examples/llm/agents/run.py b/examples/llm/agents/run.py index 2c53f8c35d..6bec05ae43 100644 --- a/examples/llm/agents/run.py +++ b/examples/llm/agents/run.py @@ -91,6 +91,20 @@ def simple(**kwargs): default='gpt-3.5-turbo-instruct', help="The name of the model to use in OpenAI", ) +@click.option( + "--bootstrap_servers", + required=True, + type=str, + default='auto', + help="The Kafka bootstrap servers to connect to", +) +@click.option( + "--topic", + required=True, + type=str, + default='input', + help="The Kafka topic to listen to for input messages", +) def kafka(**kwargs): from .kafka_pipeline import pipeline as _pipeline diff --git a/examples/llm/agents/simple_pipeline.py b/examples/llm/agents/simple_pipeline.py index e398161c37..78bfc00039 100644 --- a/examples/llm/agents/simple_pipeline.py +++ b/examples/llm/agents/simple_pipeline.py @@ -15,56 +15,17 @@ import logging import time -from langchain.agents import AgentType -from langchain.agents import initialize_agent -from langchain.agents import load_tools -from langchain.agents.agent import AgentExecutor -from langchain.llms.openai import OpenAI - import cudf from morpheus.config import Config from morpheus.config import PipelineModes -from morpheus.llm import LLMEngine -from morpheus.llm.nodes.extracter_node import ExtracterNode -from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode -from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler -from morpheus.messages import ControlMessage from morpheus.pipeline.linear_pipeline import LinearPipeline -from morpheus.stages.general.monitor_stage import MonitorStage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage -from morpheus.stages.llm.llm_engine_stage import LLMEngineStage -from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage -from morpheus.stages.preprocess.deserialize_stage import DeserializeStage from morpheus.utils.concat_df import concat_dataframes -logger = logging.getLogger(__name__) - - -def _build_agent_executor(model_name: str) -> AgentExecutor: - - llm = OpenAI(model=model_name, temperature=0.0, client=None) - - tools = load_tools(["serpapi", "llm-math"], llm=llm) - - agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True) - - return agent_executor - - -def _build_engine(model_name: str) -> LLMEngine: +from .common import build_common_pipeline - engine = LLMEngine() - - engine.add_node("extracter", node=ExtracterNode()) - - engine.add_node("agent", - inputs=[("/extracter")], - node=LangChainAgentNode(agent_executor=_build_agent_executor(model_name=model_name))) - - engine.add_task_handler(inputs=["/agent"], handler=SimpleTaskHandler()) - - return engine +logger = logging.getLogger(__name__) def pipeline( @@ -95,16 +56,7 @@ def pipeline( pipe.set_source(InMemorySourceStage(config, dataframes=source_dfs, repeat=repeat_count)) - pipe.add_stage( - DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task)) - - pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions')) - - pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name))) - - sink = pipe.add_stage(InMemorySinkStage(config)) - - pipe.add_stage(MonitorStage(config, description="Agent rate", unit="events", delayed_start=True)) + sink = build_common_pipeline(config=config, pipe=pipe, task_payload=completion_task, model_name=model_name) start_time = time.time() From f9facbfada7bc6dcc77b0c51ec50a072c0808a8d Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 3 Jul 2024 08:03:17 -0700 Subject: [PATCH 4/5] Remove unused imports --- examples/llm/agents/common.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/examples/llm/agents/common.py b/examples/llm/agents/common.py index 2aff4e7b44..15f5ce4f39 100644 --- a/examples/llm/agents/common.py +++ b/examples/llm/agents/common.py @@ -13,7 +13,6 @@ # limitations under the License. import logging -import time from langchain.agents import AgentType from langchain.agents import initialize_agent @@ -21,10 +20,7 @@ from langchain.agents.agent import AgentExecutor from langchain.llms.openai import OpenAI -import cudf - from morpheus.config import Config -from morpheus.config import PipelineModes from morpheus.llm import LLMEngine from morpheus.llm.nodes.extracter_node import ExtracterNode from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode @@ -32,11 +28,9 @@ from morpheus.messages import ControlMessage from morpheus.pipeline.linear_pipeline import LinearPipeline from morpheus.stages.general.monitor_stage import MonitorStage -from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.llm.llm_engine_stage import LLMEngineStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -from morpheus.utils.concat_df import concat_dataframes logger = logging.getLogger(__name__) From b580f6b89efd494d30fad6e19290b2eb1995b1cb Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 3 Jul 2024 08:14:18 -0700 Subject: [PATCH 5/5] Remove messages indicating the pipeline is broken, add documentation for new flags --- docs/source/extra_info/known_issues.md | 1 - examples/llm/agents/README.md | 10 ++++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/docs/source/extra_info/known_issues.md b/docs/source/extra_info/known_issues.md index 674f1330bc..af9cb91077 100644 --- a/docs/source/extra_info/known_issues.md +++ b/docs/source/extra_info/known_issues.md @@ -19,6 +19,5 @@ limitations under the License. - TrainAEStage fails with a Segmentation fault ([#1641](https://github.com/nv-morpheus/Morpheus/issues/1641)) - vdb_upload example pipeline triggers an internal error in Triton ([#1649](https://github.com/nv-morpheus/Morpheus/issues/1649)) -- LLM Agents Kafka pipeline is broken ([#1791](https://github.com/nv-morpheus/Morpheus/issues/1791)) Refer to [open issues in the Morpheus project](https://github.com/nv-morpheus/Morpheus/issues) diff --git a/examples/llm/agents/README.md b/examples/llm/agents/README.md index fea3a5a98a..374e631918 100644 --- a/examples/llm/agents/README.md +++ b/examples/llm/agents/README.md @@ -157,8 +157,6 @@ python examples/llm/main.py agents simple [OPTIONS] ### Run example (Kafka Pipeline): -> **Warning**: The Kafka Agents pipeline is currently broken [#1791](https://github.com/nv-morpheus/Morpheus/issues/1791) - The Kafka Example in the Morpheus LLM Agents demonstrates an streaming implementation, utilizing Kafka messages to facilitate the near real-time processing of LLM queries. This example is similar to the Simple example but makes use of a KafkaSourceStage to stream and retrieve messages from the Kafka topic @@ -202,6 +200,14 @@ python examples/llm/main.py agents kafka [OPTIONS] - **Description**: The name of the model to use in OpenAI. - **Default**: `gpt-3.5-turbo-instruct` +- `--bootstrap_servers TEXT` + - **Description**: The Kafka bootstrap servers to connect to, if undefined the client will attempt to infer the bootrap servers from the environment. + - **Default**: `auto` + +- `--topic TEXT` + - **Description**: The Kafka topic to listen to for input messages. + - **Default**: `input` + - `--help` - **Description**: Show the help message with options and commands details.