From 9f874964d54f9141468e29e859315813b8d48304 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 31 Oct 2023 12:14:02 -0700 Subject: [PATCH 1/5] Add docstrings, lazily import AgentExecutor only on type-checking --- morpheus/llm/nodes/langchain_agent_node.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/morpheus/llm/nodes/langchain_agent_node.py b/morpheus/llm/nodes/langchain_agent_node.py index 12b2b239c8..3ae71692b6 100644 --- a/morpheus/llm/nodes/langchain_agent_node.py +++ b/morpheus/llm/nodes/langchain_agent_node.py @@ -14,18 +14,28 @@ import asyncio import logging - -from langchain.agents import AgentExecutor +import typing from morpheus.llm import LLMContext from morpheus.llm import LLMNodeBase logger = logging.getLogger(__name__) +if typing.TYPE_CHECKING: + from langchain.agents import AgentExecutor + class LangChainAgentNode(LLMNodeBase): + """ + Executes a LangChain agent in an LLMEngine + + Parameters + ---------- + agent_executor : AgentExecutor + The agent executor to use to execute. + """ - def __init__(self, agent_executor: AgentExecutor): + def __init__(self, agent_executor: "AgentExecutor"): super().__init__() self._agent_executor = agent_executor From 05c4afc26e6fb0f060fd701a22a9ff3aa09b081a Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 31 Oct 2023 12:15:04 -0700 Subject: [PATCH 2/5] Add type hints --- morpheus/llm/nodes/langchain_agent_node.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/morpheus/llm/nodes/langchain_agent_node.py b/morpheus/llm/nodes/langchain_agent_node.py index 3ae71692b6..3fbb018df0 100644 --- a/morpheus/llm/nodes/langchain_agent_node.py +++ b/morpheus/llm/nodes/langchain_agent_node.py @@ -45,7 +45,7 @@ def __init__(self, agent_executor: "AgentExecutor"): def get_input_names(self): return self._input_names - async def _run_single(self, **kwargs): + async def _run_single(self, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: all_lists = all(isinstance(v, list) for v in kwargs.values()) @@ -68,7 +68,7 @@ async def _run_single(self, **kwargs): # We are not dealing with a list, so run single return await self._agent_executor.arun(**kwargs) - async def execute(self, context: LLMContext): + async def execute(self, context: LLMContext) -> LLMContext: input_dict = context.get_inputs() From f0a4a7233386000a4b902066b14c912a3287ea42 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 31 Oct 2023 14:31:41 -0700 Subject: [PATCH 3/5] Tests for LangChainAgentNode --- tests/llm/nodes/test_langchain_agent_node.py | 66 ++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 tests/llm/nodes/test_langchain_agent_node.py diff --git a/tests/llm/nodes/test_langchain_agent_node.py b/tests/llm/nodes/test_langchain_agent_node.py new file mode 100644 index 0000000000..47f345e188 --- /dev/null +++ b/tests/llm/nodes/test_langchain_agent_node.py @@ -0,0 +1,66 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +import pytest + +from _utils.llm import execute_node +from morpheus.llm import LLMNodeBase +from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode + + +@pytest.fixture(name="mock_agent_executor") +def mock_agent_executor_fixture(): + mock_agent_ex = mock.MagicMock() + mock_agent_ex.return_value = mock_agent_ex + mock_agent_ex.input_keys = ["prompt"] + mock_agent_ex.arun = mock.AsyncMock() + return mock_agent_ex + + +def test_constructor(mock_agent_executor: mock.MagicMock): + node = LangChainAgentNode(agent_executor=mock_agent_executor) + assert isinstance(node, LLMNodeBase) + + +def test_get_input_names(mock_agent_executor: mock.MagicMock): + node = LangChainAgentNode(agent_executor=mock_agent_executor) + assert node.get_input_names() == ["prompt"] + + +@pytest.mark.parametrize( + "values,arun_return,expected_output,expected_calls", + [({ + 'prompt': "prompt1" + }, list(range(3)), list(range(3)), [mock.call(prompt="prompt1")]), + ({ + 'a': ['b', 'c', 'd'], 'c': ['d', 'e', 'f'], 'e': ['f', 'g', 'h'] + }, + list(range(3)), [list(range(3))] * 3, + [mock.call(a='b', c='d', e='f'), mock.call(a='c', c='e', e='g'), mock.call(a='d', c='f', e='h')])], + ids=["not-lists", "all-lists"]) +def test_execute( + mock_agent_executor: mock.MagicMock, + values: dict, + arun_return: list, + expected_output: list, + expected_calls: list[mock.call], +): + mock_agent_executor.arun.return_value = arun_return + + node = LangChainAgentNode(agent_executor=mock_agent_executor) + assert execute_node(node, **values) == expected_output + mock_agent_executor.arun.assert_has_calls(expected_calls) From d47cbe4a745d96b6569005f5a2df3c30b71b1597 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 31 Oct 2023 14:37:28 -0700 Subject: [PATCH 4/5] Move mock_agent_executor fixture to conftest --- tests/llm/nodes/conftest.py | 9 +++++++++ tests/llm/nodes/test_langchain_agent_node.py | 9 --------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/llm/nodes/conftest.py b/tests/llm/nodes/conftest.py index 7be47d2850..b92e06ec3c 100644 --- a/tests/llm/nodes/conftest.py +++ b/tests/llm/nodes/conftest.py @@ -25,3 +25,12 @@ def mock_llm_client_fixture(): mock_client.return_value = mock_client mock_client.generate_batch_async = mock.AsyncMock() return mock_client + + +@pytest.fixture(name="mock_agent_executor") +def mock_agent_executor_fixture(): + mock_agent_ex = mock.MagicMock() + mock_agent_ex.return_value = mock_agent_ex + mock_agent_ex.input_keys = ["prompt"] + mock_agent_ex.arun = mock.AsyncMock() + return mock_agent_ex diff --git a/tests/llm/nodes/test_langchain_agent_node.py b/tests/llm/nodes/test_langchain_agent_node.py index 47f345e188..29cd7f3548 100644 --- a/tests/llm/nodes/test_langchain_agent_node.py +++ b/tests/llm/nodes/test_langchain_agent_node.py @@ -22,15 +22,6 @@ from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode -@pytest.fixture(name="mock_agent_executor") -def mock_agent_executor_fixture(): - mock_agent_ex = mock.MagicMock() - mock_agent_ex.return_value = mock_agent_ex - mock_agent_ex.input_keys = ["prompt"] - mock_agent_ex.arun = mock.AsyncMock() - return mock_agent_ex - - def test_constructor(mock_agent_executor: mock.MagicMock): node = LangChainAgentNode(agent_executor=mock_agent_executor) assert isinstance(node, LLMNodeBase) From b6168e5a66e43f0a82b047b9014fb06f1e045469 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Tue, 31 Oct 2023 14:52:55 -0700 Subject: [PATCH 5/5] Add end-to-end pipeline test --- .../nodes/test_langchain_agent_node_pipe.py | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 tests/llm/nodes/test_langchain_agent_node_pipe.py diff --git a/tests/llm/nodes/test_langchain_agent_node_pipe.py b/tests/llm/nodes/test_langchain_agent_node_pipe.py new file mode 100644 index 0000000000..39e1824cea --- /dev/null +++ b/tests/llm/nodes/test_langchain_agent_node_pipe.py @@ -0,0 +1,65 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +import pytest + +from _utils import assert_results +from _utils.dataset_manager import DatasetManager +from morpheus.config import Config +from morpheus.llm import LLMEngine +from morpheus.llm.nodes.extracter_node import ExtracterNode +from morpheus.llm.nodes.langchain_agent_node import LangChainAgentNode +from morpheus.llm.task_handlers.simple_task_handler import SimpleTaskHandler +from morpheus.messages import ControlMessage +from morpheus.pipeline.linear_pipeline import LinearPipeline +from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage +from morpheus.stages.llm.llm_engine_stage import LLMEngineStage +from morpheus.stages.output.compare_dataframe_stage import CompareDataFrameStage +from morpheus.stages.preprocess.deserialize_stage import DeserializeStage + + +def _build_engine(mock_agent_executor: mock.MagicMock) -> LLMEngine: + engine = LLMEngine() + engine.add_node("extracter", node=ExtracterNode()) + engine.add_node("chain", inputs=["/extracter"], node=LangChainAgentNode(agent_executor=mock_agent_executor)) + engine.add_task_handler(inputs=["/chain"], handler=SimpleTaskHandler()) + + return engine + + +@pytest.mark.use_python +def test_pipeline(config: Config, dataset_cudf: DatasetManager, mock_agent_executor: mock.MagicMock): + input_df = dataset_cudf["filter_probs.csv"] + expected_df = input_df.copy(deep=True) + + mock_agent_executor.arun.return_value = 'frogs' + expected_df['response'] = 'frogs' + expected_calls = [mock.call(prompt=x) for x in expected_df['v3'].values_host] + + task_payload = {"task_type": "llm_engine", "task_dict": {"input_keys": ['v3']}} + + pipe = LinearPipeline(config) + pipe.set_source(InMemorySourceStage(config, dataframes=[input_df])) + pipe.add_stage( + DeserializeStage(config, message_type=ControlMessage, task_type="llm_engine", task_payload=task_payload)) + pipe.add_stage(LLMEngineStage(config, engine=_build_engine(mock_agent_executor))) + sink = pipe.add_stage(CompareDataFrameStage(config, compare_df=expected_df)) + + pipe.run() + + assert_results(sink.get_results()) + mock_agent_executor.arun.assert_has_calls(expected_calls)