Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix LLM Agents Kafka pipeline #1793

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/source/extra_info/known_issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,5 @@ limitations under the License.

- TrainAEStage fails with a Segmentation fault ([#1641](https://github.com/nv-morpheus/Morpheus/issues/1641))
- vdb_upload example pipeline triggers an internal error in Triton ([#1649](https://github.com/nv-morpheus/Morpheus/issues/1649))
- LLM Agents Kafka pipeline is broken ([#1791](https://github.com/nv-morpheus/Morpheus/issues/1791))

Refer to [open issues in the Morpheus project](https://github.com/nv-morpheus/Morpheus/issues)
10 changes: 8 additions & 2 deletions examples/llm/agents/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,6 @@ python examples/llm/main.py agents simple [OPTIONS]

### Run example (Kafka Pipeline):

> **Warning**: The Kafka Agents pipeline is currently broken [#1791](https://github.com/nv-morpheus/Morpheus/issues/1791)

The Kafka Example in the Morpheus LLM Agents demonstrates an streaming implementation, utilizing Kafka messages to
facilitate the near real-time processing of LLM queries. This example is similar to the Simple example but makes use of
a KafkaSourceStage to stream and retrieve messages from the Kafka topic
Expand Down Expand Up @@ -202,6 +200,14 @@ python examples/llm/main.py agents kafka [OPTIONS]
- **Description**: The name of the model to use in OpenAI.
- **Default**: `gpt-3.5-turbo-instruct`

- `--bootstrap_servers TEXT`
- **Description**: The Kafka bootstrap servers to connect to, if undefined the client will attempt to infer the bootrap servers from the environment.
- **Default**: `auto`

- `--topic TEXT`
- **Description**: The Kafka topic to listen to for input messages.
- **Default**: `input`

- `--help`
- **Description**: Show the help message with options and commands details.

Expand Down
81 changes: 81 additions & 0 deletions examples/llm/agents/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from langchain.agents import AgentType
from langchain.agents import initialize_agent
from langchain.agents import load_tools
from langchain.agents.agent import AgentExecutor
from langchain.llms.openai import OpenAI

from morpheus.config import Config
from morpheus.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode
from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.llm.llm_engine_stage import LLMEngineStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage

logger = logging.getLogger(__name__)


def _build_agent_executor(model_name: str) -> AgentExecutor:

llm = OpenAI(model=model_name, temperature=0.0, client=None)

tools = load_tools(["serpapi", "llm-math"], llm=llm)

agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)

return agent_executor


def _build_engine(model_name: str) -> LLMEngine:

engine = LLMEngine()

engine.add_node("extracter", node=ExtracterNode())

engine.add_node("agent",
inputs=[("/extracter")],
node=LangChainAgentNode(agent_executor=_build_agent_executor(model_name=model_name)))

engine.add_task_handler(inputs=["/agent"], handler=SimpleTaskHandler())

return engine


def build_common_pipeline(config: Config, pipe: LinearPipeline, task_payload: dict,
model_name: str) -> InMemorySinkStage:
"""
Construct the elements of the pipeline common to the simple and kafka agent pipelines.
This method should be called after the source stage has been set.
"""
pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=task_payload))

pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name)))

sink = pipe.add_stage(InMemorySinkStage(config))

pipe.add_stage(MonitorStage(config, description="Agent rate", unit="events", delayed_start=True))

return sink
62 changes: 10 additions & 52 deletions examples/llm/agents/kafka_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,55 +15,22 @@
import logging
import time

from langchain.agents import AgentType
from langchain.agents import initialize_agent
from langchain.agents import load_tools
from langchain.agents.agent import AgentExecutor
from langchain.llms.openai import OpenAIChat

from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode
from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.input.kafka_source_stage import KafkaSourceStage
from morpheus.stages.llm.llm_engine_stage import LLMEngineStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage

logger = logging.getLogger(__name__)


def _build_agent_executor(model_name: str) -> AgentExecutor:

llm = OpenAIChat(model_name=model_name, model_kwargs={"temperature": 0.0}, client=None)

tools = load_tools(["serpapi", "llm-math"], llm=llm)

agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)

return agent_executor


def _build_engine(model_name: str) -> LLMEngine:
from .common import build_common_pipeline

engine = LLMEngine()

engine.add_node("extracter", node=ExtracterNode())

engine.add_node("agent",
inputs=[("/extracter")],
node=LangChainAgentNode(agent_executor=_build_agent_executor(model_name=model_name)))

engine.add_task_handler(inputs=["/extracter"], handler=SimpleTaskHandler())

return engine
logger = logging.getLogger(__name__)


def pipeline(num_threads: int, pipeline_batch_size: int, model_max_batch_size: int, model_name: str) -> float:
def pipeline(num_threads: int,
pipeline_batch_size: int,
model_max_batch_size: int,
model_name: str,
bootstrap_servers: str,
topic: str) -> float:
config = Config()
config.mode = PipelineModes.OTHER

Expand All @@ -78,18 +45,9 @@ def pipeline(num_threads: int, pipeline_batch_size: int, model_max_batch_size: i

pipe = LinearPipeline(config)

pipe.set_source(KafkaSourceStage(config, bootstrap_servers="auto", input_topic=["input"]))

pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task))

# pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name)))

sink = pipe.add_stage(InMemorySinkStage(config))
pipe.set_source(KafkaSourceStage(config, bootstrap_servers=bootstrap_servers, input_topic=[topic]))

# pipe.add_stage(MonitorStage(config, description="Upload rate", unit="events", delayed_start=True))
sink = build_common_pipeline(config=config, pipe=pipe, task_payload=completion_task, model_name=model_name)

start_time = time.time()

Expand Down
14 changes: 14 additions & 0 deletions examples/llm/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,20 @@ def simple(**kwargs):
default='gpt-3.5-turbo-instruct',
help="The name of the model to use in OpenAI",
)
@click.option(
"--bootstrap_servers",
required=True,
type=str,
default='auto',
help="The Kafka bootstrap servers to connect to",
)
@click.option(
"--topic",
required=True,
type=str,
default='input',
help="The Kafka topic to listen to for input messages",
)
def kafka(**kwargs):

from .kafka_pipeline import pipeline as _pipeline
Expand Down
54 changes: 3 additions & 51 deletions examples/llm/agents/simple_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,56 +15,17 @@
import logging
import time

from langchain.agents import AgentType
from langchain.agents import initialize_agent
from langchain.agents import load_tools
from langchain.agents.agent import AgentExecutor
from langchain.llms.openai import OpenAI

import cudf

from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.llm import LLMEngine
from morpheus.llm.nodes.extracter_node import ExtracterNode
from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode
from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler
from morpheus.messages import ControlMessage
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.stages.general.monitor_stage import MonitorStage
from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage
from morpheus.stages.llm.llm_engine_stage import LLMEngineStage
from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.utils.concat_df import concat_dataframes

logger = logging.getLogger(__name__)


def _build_agent_executor(model_name: str) -> AgentExecutor:

llm = OpenAI(model=model_name, temperature=0.0, client=None)

tools = load_tools(["serpapi", "llm-math"], llm=llm)

agent_executor = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)

return agent_executor


def _build_engine(model_name: str) -> LLMEngine:
from .common import build_common_pipeline

engine = LLMEngine()

engine.add_node("extracter", node=ExtracterNode())

engine.add_node("agent",
inputs=[("/extracter")],
node=LangChainAgentNode(agent_executor=_build_agent_executor(model_name=model_name)))

engine.add_task_handler(inputs=["/agent"], handler=SimpleTaskHandler())

return engine
logger = logging.getLogger(__name__)


def pipeline(
Expand Down Expand Up @@ -95,16 +56,7 @@ def pipeline(

pipe.set_source(InMemorySourceStage(config, dataframes=source_dfs, repeat=repeat_count))

pipe.add_stage(
DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=completion_task))

pipe.add_stage(MonitorStage(config, description="Source rate", unit='questions'))

pipe.add_stage(LLMEngineStage(config, engine=_build_engine(model_name=model_name)))

sink = pipe.add_stage(InMemorySinkStage(config))

pipe.add_stage(MonitorStage(config, description="Upload rate", unit="events", delayed_start=True))
sink = build_common_pipeline(config=config, pipe=pipe, task_payload=completion_task, model_name=model_name)

start_time = time.time()

Expand Down
Loading