From 0ca4c7d7dbeecbefb72d9566433259e086ade023 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Thu, 12 Sep 2024 14:27:59 -0400 Subject: [PATCH 01/22] Add Pipeline Chat History models --- ...ipelinechathistory_pipelinechatmessages.py | 44 +++++++++++++++++++ apps/pipelines/models.py | 23 ++++++++++ 2 files changed, 67 insertions(+) create mode 100644 apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py diff --git a/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py b/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py new file mode 100644 index 000000000..aca0154de --- /dev/null +++ b/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py @@ -0,0 +1,44 @@ +# Generated by Django 5.1 on 2024-09-12 19:25 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('experiments', '0093_participant_name'), + ('pipelines', '0005_auto_20240802_0039'), + ] + + operations = [ + migrations.CreateModel( + name='PipelineChatHistory', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ('type', models.CharField(choices=[('node', 'Node History'), ('named', 'Named History'), ('global', 'Global History')], max_length=10)), + ('name', models.CharField(db_index=True, max_length=128)), + ('session', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='pipeline_chat_history', to='experiments.experimentsession')), + ], + options={ + 'ordering': ['-created_at'], + 'unique_together': {('session', 'type', 'name')}, + }, + ), + migrations.CreateModel( + name='PipelineChatMessages', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ('message_type', models.CharField(choices=[('human', 'Human'), ('ai', 'AI'), ('system', 'System')], max_length=10)), + ('content', models.TextField()), + ('chat_history', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='messages', to='pipelines.pipelinechathistory')), + ], + options={ + 'abstract': False, + }, + ), + ] diff --git a/apps/pipelines/models.py b/apps/pipelines/models.py index 1a5abf018..7e57a039a 100644 --- a/apps/pipelines/models.py +++ b/apps/pipelines/models.py @@ -182,3 +182,26 @@ class PipelineEventInputs(models.TextChoices): FULL_HISTORY = "full_history", "Full History" HISTORY_LAST_SUMMARY = "history_last_summary", "History to last summary" LAST_MESSAGE = "last_message", "Last message" + + +class PipelineChatHistoryTypes(models.TextChoices): + NODE = "node", "Node History" + NAMED = "named", "Named History" + GLOBAL = "global", "Global History" + + +class PipelineChatHistory(BaseModel): + session = models.ForeignKey(ExperimentSession, on_delete=models.CASCADE, related_name="pipeline_chat_history") + + type = models.CharField(max_length=10, choices=PipelineChatHistoryTypes.choices) + name = models.CharField(max_length=128, db_index=True) # Either the name of the named history, or the node id + + class Meta: + unique_together = [("session", "type", "name")] + ordering = ["-created_at"] + + +class PipelineChatMessages(BaseModel): + chat_history = models.ForeignKey(PipelineChatHistory, on_delete=models.CASCADE, related_name="messages") + message_type = models.CharField(max_length=10, choices=ChatMessageType.choices) + content = models.TextField() From 86726ae7a7d999cfd85867a3993c65805e7de318 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Thu, 12 Sep 2024 15:17:50 -0400 Subject: [PATCH 02/22] Use ChatPromptTemplate --- apps/pipelines/nodes/nodes.py | 9 ++++++--- apps/pipelines/tests/test_runnable_builder.py | 11 ++++------- apps/utils/langchain.py | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/apps/pipelines/nodes/nodes.py b/apps/pipelines/nodes/nodes.py index 3b6bfc951..ebd9e2f0e 100644 --- a/apps/pipelines/nodes/nodes.py +++ b/apps/pipelines/nodes/nodes.py @@ -88,11 +88,14 @@ class LLMResponseWithPrompt(LLMResponse): __human_name__ = "LLM response with prompt" source_material_id: SourceMaterialId | None = None - prompt: str = "You are a helpful assistant. Answer the user's query as best you can: {input}" + prompt: str = "You are a helpful assistant. Answer the user's query as best you can" def _process(self, input, state: PipelineState) -> PipelineState: - prompt = PromptTemplate.from_template(template=self.prompt) - context = self._get_context(input, state, prompt) + prompt = ChatPromptTemplate.from_messages([ + ("system", self.prompt), + MessagesPlaceholder("history", optional=True), + ("human", "{input}") + ]) chain = prompt | super().get_chat_model() output = chain.invoke(context, config=self._config) return output.content diff --git a/apps/pipelines/tests/test_runnable_builder.py b/apps/pipelines/tests/test_runnable_builder.py index 0a7b2e4ba..a946296de 100644 --- a/apps/pipelines/tests/test_runnable_builder.py +++ b/apps/pipelines/tests/test_runnable_builder.py @@ -219,10 +219,7 @@ def test_llm_with_prompt_response(get_llm_service, provider, pipeline, source_ma "llm_provider_id": provider.id, "llm_model": "fake-model", "source_material_id": source_material.id, - "prompt": ( - "Node 1: Use this {source_material} to answer questions about {participant_data}." - " {input}" - ), + "prompt": ("Node 1: Use this {source_material} to answer questions about {participant_data}."), }, }, "id": "llm-1", @@ -236,7 +233,7 @@ def test_llm_with_prompt_response(get_llm_service, provider, pipeline, source_ma "llm_provider_id": provider.id, "llm_model": "fake-model", "source_material_id": source_material.id, - "prompt": "Node 2: ({input})", + "prompt": "Node 2:", }, }, "id": "llm-2", @@ -250,8 +247,8 @@ def test_llm_with_prompt_response(get_llm_service, provider, pipeline, source_ma -1 ] expected_output = ( - f"Node 2: (Node 1: Use this {source_material.material} to answer questions " - f"about {participant_data.data}. {user_input})" + f"Node 2: Node 1: Use this {source_material.material} to answer questions " + f"about {participant_data.data}. {user_input}" ) assert output == expected_output diff --git a/apps/utils/langchain.py b/apps/utils/langchain.py index 05cc41a26..37077b42a 100644 --- a/apps/utils/langchain.py +++ b/apps/utils/langchain.py @@ -141,7 +141,7 @@ class FakeLlmEcho(FakeLlmSimpleTokenCount): def _call(self, messages: list[BaseMessage], *args, **kwargs) -> str | BaseMessage: self.calls.append(mock.call(messages, *args, **kwargs)) - return messages[-1] + return " ".join(message.content for message in messages) @contextmanager From b6bc111e8defe0fd88811cc9ebbbe29da9162cd0 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Thu, 12 Sep 2024 15:35:59 -0400 Subject: [PATCH 03/22] Add node_id to _process --- apps/pipelines/nodes/base.py | 2 +- apps/pipelines/nodes/nodes.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/pipelines/nodes/base.py b/apps/pipelines/nodes/base.py index 449f463b3..4f467e362 100644 --- a/apps/pipelines/nodes/base.py +++ b/apps/pipelines/nodes/base.py @@ -79,7 +79,7 @@ def process(self, node_id: str, incoming_edges: list, state: PipelineState, conf break else: # This is the first node in the graph input = state["messages"][-1] - output = self._process(input, state) + output = self._process(input, state, node_id) # Append the output to the state, otherwise do not change the state return ( PipelineState(messages=[output], outputs={node_id: output}) diff --git a/apps/pipelines/nodes/nodes.py b/apps/pipelines/nodes/nodes.py index ebd9e2f0e..f750c52ee 100644 --- a/apps/pipelines/nodes/nodes.py +++ b/apps/pipelines/nodes/nodes.py @@ -33,7 +33,7 @@ class RenderTemplate(PipelineNode): __human_name__ = "Render a template" template_string: PipelineJinjaTemplate - def _process(self, input, state: PipelineState) -> PipelineState: + def _process(self, input, state: PipelineState, node_id:str) -> PipelineState: def all_variables(in_): return {var: in_ for var in meta.find_undeclared_variables(env.parse(self.template_string))} @@ -78,7 +78,7 @@ def get_chat_model(self): class LLMResponse(PipelineNode, LLMResponseMixin): __human_name__ = "LLM response" - def _process(self, input, state: PipelineState) -> PipelineState: + def _process(self, input, state: PipelineState, node_id:str) -> PipelineState: llm = self.get_chat_model() output = llm.invoke(input, config=self._config) return output.content @@ -90,7 +90,7 @@ class LLMResponseWithPrompt(LLMResponse): source_material_id: SourceMaterialId | None = None prompt: str = "You are a helpful assistant. Answer the user's query as best you can" - def _process(self, input, state: PipelineState) -> PipelineState: + def _process(self, input, state: PipelineState, node_id:str) -> PipelineState: prompt = ChatPromptTemplate.from_messages([ ("system", self.prompt), MessagesPlaceholder("history", optional=True), @@ -140,7 +140,7 @@ class SendEmail(PipelineNode): recipient_list: str subject: str - def _process(self, input, state: PipelineState) -> PipelineState: + def _process(self, input, state: PipelineState, node_id:str) -> PipelineState: send_email_from_pipeline.delay( recipient_list=self.recipient_list.split(","), subject=self.subject, message=input ) @@ -149,7 +149,7 @@ def _process(self, input, state: PipelineState) -> PipelineState: class Passthrough(PipelineNode): __human_name__ = "Do Nothing" - def _process(self, input, state: PipelineState) -> PipelineState: + def _process(self, input, state: PipelineState, node_id:str) -> PipelineState: self.logger.debug(f"Returning input: '{input}' without modification", input=input, output=input) return input @@ -213,7 +213,7 @@ def _prompt_chain(self, reference_data): def extraction_chain(self, json_schema, reference_data): return self._prompt_chain(reference_data) | super().get_chat_model().with_structured_output(json_schema) - def _process(self, input, state: PipelineState) -> RunnableLambda: + def _process(self, input, state: PipelineState, node_id:str) -> PipelineState: json_schema = self.to_json_schema(json.loads(self.data_schema)) reference_data = self.get_reference_data(state) prompt_token_count = self._get_prompt_token_count(reference_data, json_schema) From 86c7db9e27de138e1627dbaa42bab2860424cde2 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Thu, 12 Sep 2024 15:36:56 -0400 Subject: [PATCH 04/22] Save node history --- apps/chat/bots.py | 4 +- apps/pipelines/models.py | 3 + apps/pipelines/nodes/nodes.py | 70 ++++++++++++++----- apps/pipelines/nodes/types.py | 2 + apps/pipelines/tests/test_runnable_builder.py | 68 ++++++++++++++++++ 5 files changed, 129 insertions(+), 18 deletions(-) diff --git a/apps/chat/bots.py b/apps/chat/bots.py index 830db482e..af2894095 100644 --- a/apps/chat/bots.py +++ b/apps/chat/bots.py @@ -258,5 +258,7 @@ def __init__(self, session: ExperimentSession): self.session = session def process_input(self, user_input: str, save_input_to_history=True, attachments: list["Attachment"] | None = None): - output = self.experiment.pipeline.invoke(PipelineState(messages=[user_input]), self.session) + output = self.experiment.pipeline.invoke( + PipelineState(messages=[user_input], experiment_session=self.session), self.session + ) return output["messages"][-1] diff --git a/apps/pipelines/models.py b/apps/pipelines/models.py index 7e57a039a..80b5dc15b 100644 --- a/apps/pipelines/models.py +++ b/apps/pipelines/models.py @@ -205,3 +205,6 @@ class PipelineChatMessages(BaseModel): chat_history = models.ForeignKey(PipelineChatHistory, on_delete=models.CASCADE, related_name="messages") message_type = models.CharField(max_length=10, choices=ChatMessageType.choices) content = models.TextField() + + def as_tuple(self): + return (self.message_type, self.content) diff --git a/apps/pipelines/nodes/nodes.py b/apps/pipelines/nodes/nodes.py index f750c52ee..3ea4fa11e 100644 --- a/apps/pipelines/nodes/nodes.py +++ b/apps/pipelines/nodes/nodes.py @@ -5,6 +5,7 @@ from django.utils import timezone from jinja2 import meta from jinja2.sandbox import SandboxedEnvironment +from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_core.messages import BaseMessage from langchain_core.prompts import PromptTemplate from langchain_core.runnables import RunnableLambda, RunnablePassthrough @@ -12,10 +13,14 @@ from pydantic import Field, create_model from apps.channels.models import ChannelPlatform -from apps.experiments.models import ParticipantData, SourceMaterial +from apps.chat.models import ChatMessageType +from apps.experiments.models import ExperimentSession, ParticipantData, SourceMaterial from apps.pipelines.exceptions import PipelineNodeBuildError +from apps.pipelines.models import PipelineChatHistory, PipelineChatHistoryTypes from apps.pipelines.nodes.base import PipelineNode, PipelineState from apps.pipelines.nodes.types import ( + HistoryName, + HistoryType, Keywords, LlmModel, LlmProviderId, @@ -33,7 +38,7 @@ class RenderTemplate(PipelineNode): __human_name__ = "Render a template" template_string: PipelineJinjaTemplate - def _process(self, input, state: PipelineState, node_id:str) -> PipelineState: + def _process(self, input, state: PipelineState, node_id: str) -> PipelineState: def all_variables(in_): return {var: in_ for var in meta.find_undeclared_variables(env.parse(self.template_string))} @@ -59,6 +64,8 @@ class LLMResponseMixin: llm_provider_id: LlmProviderId llm_model: LlmModel llm_temperature: LlmTemperature = 1.0 + history_type: HistoryType = None + history_name: HistoryName | None = None def get_llm_service(self): from apps.service_providers.models import LlmProvider @@ -78,7 +85,7 @@ def get_chat_model(self): class LLMResponse(PipelineNode, LLMResponseMixin): __human_name__ = "LLM response" - def _process(self, input, state: PipelineState, node_id:str) -> PipelineState: + def _process(self, input, state: PipelineState, node_id: str) -> PipelineState: llm = self.get_chat_model() output = llm.invoke(input, config=self._config) return output.content @@ -90,22 +97,24 @@ class LLMResponseWithPrompt(LLMResponse): source_material_id: SourceMaterialId | None = None prompt: str = "You are a helpful assistant. Answer the user's query as best you can" - def _process(self, input, state: PipelineState, node_id:str) -> PipelineState: - prompt = ChatPromptTemplate.from_messages([ - ("system", self.prompt), - MessagesPlaceholder("history", optional=True), - ("human", "{input}") - ]) + def _process(self, input, state: PipelineState, node_id: str) -> PipelineState: + prompt = ChatPromptTemplate.from_messages( + [("system", self.prompt), MessagesPlaceholder("history", optional=True), ("human", "{input}")] + ) + context = self._get_context(input, state, prompt, node_id) chain = prompt | super().get_chat_model() output = chain.invoke(context, config=self._config) + if self.history_type is not None: + self._save_history(state["experiment_session"], node_id, input, ChatMessageType.HUMAN) + self._save_history(state["experiment_session"], node_id, output.content, ChatMessageType.AI) return output.content - def _get_context(self, input, state: PipelineState, prompt: PromptTemplate): - session = state["experiment_session"] - context = {} + def _get_context(self, input, state: PipelineState, prompt: ChatPromptTemplate, node_id: str): + session: ExperimentSession = state["experiment_session"] + context = {"input": input} - if "input" in prompt.input_variables: - context["input"] = input + if self.history_type is not None: + context["history"] = self._get_history(session, node_id) if "source_material" in prompt.input_variables and self.source_material_id is None: raise PipelineNodeBuildError("No source material set, but the prompt expects it") @@ -120,6 +129,33 @@ def _get_context(self, input, state: PipelineState, prompt: PromptTemplate): return context + def _get_history(self, session: ExperimentSession, node_id: str): + if self.history_type == PipelineChatHistoryTypes.NAMED: + history_name = self.history_name + elif self.history_type == PipelineChatHistoryTypes.GLOBAL: + history_name = "global" + else: + history_name = node_id + + try: + history: PipelineChatHistory = session.pipeline_chat_history.get(type=self.history_type, name=history_name) + except PipelineChatHistory.DoesNotExist: + return [] + messages = history.messages.all() + return [message.as_tuple() for message in messages] + + def _save_history(self, session: ExperimentSession, node_id: str, content, message_type): + if self.history_type == PipelineChatHistoryTypes.NAMED: + history_name = self.history_name + elif self.history_type == PipelineChatHistoryTypes.GLOBAL: + history_name = "global" + else: + history_name = node_id + + history, _ = session.pipeline_chat_history.get_or_create(type=self.history_type, name=history_name) + message = history.messages.create(message_type=message_type, content=content) + return message + def _get_participant_data(self, session): if session.experiment_channel.platform == ChannelPlatform.WEB and session.participant.user is None: return "" @@ -140,7 +176,7 @@ class SendEmail(PipelineNode): recipient_list: str subject: str - def _process(self, input, state: PipelineState, node_id:str) -> PipelineState: + def _process(self, input, state: PipelineState, node_id: str) -> PipelineState: send_email_from_pipeline.delay( recipient_list=self.recipient_list.split(","), subject=self.subject, message=input ) @@ -149,7 +185,7 @@ def _process(self, input, state: PipelineState, node_id:str) -> PipelineState: class Passthrough(PipelineNode): __human_name__ = "Do Nothing" - def _process(self, input, state: PipelineState, node_id:str) -> PipelineState: + def _process(self, input, state: PipelineState, node_id: str) -> PipelineState: self.logger.debug(f"Returning input: '{input}' without modification", input=input, output=input) return input @@ -213,7 +249,7 @@ def _prompt_chain(self, reference_data): def extraction_chain(self, json_schema, reference_data): return self._prompt_chain(reference_data) | super().get_chat_model().with_structured_output(json_schema) - def _process(self, input, state: PipelineState, node_id:str) -> PipelineState: + def _process(self, input, state: PipelineState, node_id: str) -> PipelineState: json_schema = self.to_json_schema(json.loads(self.data_schema)) reference_data = self.get_reference_data(state) prompt_token_count = self._get_prompt_token_count(reference_data, json_schema) diff --git a/apps/pipelines/nodes/types.py b/apps/pipelines/nodes/types.py index 50f82e2e0..122fbb3e6 100644 --- a/apps/pipelines/nodes/types.py +++ b/apps/pipelines/nodes/types.py @@ -7,3 +7,5 @@ SourceMaterialId = TypeAliasType("SourceMaterialId", int) NumOutputs = TypeAliasType("NumOutputs", int) Keywords = TypeAliasType("Keywords", list) +HistoryType = TypeAliasType("HistoryType", str | None) +HistoryName = TypeAliasType("HistoryName", str) diff --git a/apps/pipelines/tests/test_runnable_builder.py b/apps/pipelines/tests/test_runnable_builder.py index a946296de..0b01eb673 100644 --- a/apps/pipelines/tests/test_runnable_builder.py +++ b/apps/pipelines/tests/test_runnable_builder.py @@ -635,6 +635,74 @@ def test_router_node(get_llm_service, provider, pipeline, experiment_session): assert output["messages"][-1] == "A z" +@django_db_with_data(available_apps=("apps.service_providers",)) +@mock.patch("apps.service_providers.models.LlmProvider.get_llm_service") +@mock.patch("apps.pipelines.nodes.base.PipelineNode.logger", mock.Mock()) +def test_llm_with_node_history(get_llm_service, provider, pipeline, experiment_session): + service = build_fake_llm_echo_service() + get_llm_service.return_value = service + + data = { + "edges": [ + { + "id": "llm-1->llm-2", + "source": "llm-1", + "target": "llm-2", + "sourceHandle": "output", + "targetHandle": "input", + } + ], + "nodes": [ + { + "data": { + "id": "llm-1", + "label": "Get the robot to respond", + "type": "LLMResponseWithPrompt", + "params": { + "llm_provider_id": provider.id, + "llm_model": "fake-model", + "history_type": "node", + "prompt": ("Node 1:"), + }, + }, + "id": "llm-1", + }, + { + "data": { + "id": "llm-1", + "label": "Get the robot to respond again", + "type": "LLMResponseWithPrompt", + "params": { + "llm_provider_id": provider.id, + "llm_model": "fake-model", + "prompt": "Node 2:", + # No history_type + }, + }, + "id": "llm-2", + }, + ], + } + pipeline.data = data + pipeline.set_nodes([FlowNode(**node) for node in data["nodes"]]) + runnable = PipelineGraph.build_runnable_from_pipeline(pipeline) + + user_input = "The User Input" + output_1 = runnable.invoke(PipelineState(messages=[user_input], experiment_session=experiment_session))["messages"][ + -1 + ] + expected_saved_history = f"Node 1: {user_input}" + expected_output = f"Node 2: {expected_saved_history}" + assert output_1 == expected_output + + user_input_2 = "Saying more stuff" + output_2 = runnable.invoke(PipelineState(messages=[user_input_2], experiment_session=experiment_session))[ + "messages" + ][-1] + expected_output = f"Node 2: Node 1: {expected_saved_history} {user_input_2}" + assert output_2 == expected_output + + @contextmanager def extract_structured_data_pipeline(provider, pipeline, llm=None): service = build_fake_llm_service(responses=[{"name": "John"}], token_counts=[0], fake_llm=llm) From 8ca9cca7b676468099238929f22cd0dd74133fe2 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Fri, 13 Sep 2024 21:28:13 -0400 Subject: [PATCH 05/22] Add pipeline history test --- apps/pipelines/tests/test_pipeline_history.py | 243 ++++++++++++++++++ apps/pipelines/tests/test_runnable_builder.py | 68 ----- apps/utils/langchain.py | 11 +- 3 files changed, 253 insertions(+), 69 deletions(-) create mode 100644 apps/pipelines/tests/test_pipeline_history.py diff --git a/apps/pipelines/tests/test_pipeline_history.py b/apps/pipelines/tests/test_pipeline_history.py new file mode 100644 index 000000000..264da67b6 --- /dev/null +++ b/apps/pipelines/tests/test_pipeline_history.py @@ -0,0 +1,243 @@ +from unittest import mock + +import pytest + +from apps.pipelines.flow import FlowNode +from apps.pipelines.graph import PipelineGraph +from apps.pipelines.models import PipelineChatHistory +from apps.pipelines.nodes.base import PipelineState +from apps.utils.factories.experiment import ( + ExperimentSessionFactory, +) +from apps.utils.factories.pipelines import PipelineFactory +from apps.utils.factories.service_provider_factories import LlmProviderFactory +from apps.utils.langchain import ( + FakeLlmEcho, + build_fake_llm_service, +) +from apps.utils.pytest import django_db_with_data + + +@pytest.fixture() +def provider(): + return LlmProviderFactory() + + +@pytest.fixture() +def pipeline(): + return PipelineFactory() + + +@pytest.fixture() +def experiment_session(): + return ExperimentSessionFactory() + + +@django_db_with_data(available_apps=("apps.service_providers",)) +@mock.patch("apps.service_providers.models.LlmProvider.get_llm_service") +@mock.patch("apps.pipelines.nodes.base.PipelineNode.logger", mock.Mock()) +def test_llm_with_node_history(get_llm_service, provider, pipeline, experiment_session): + llm = FakeLlmEcho() + service = build_fake_llm_service(None, [0], llm) + get_llm_service.return_value = service + + data = { + "edges": [ + { + "id": "llm-1->llm-2", + "source": "llm-1", + "target": "llm-2", + "sourceHandle": "output", + "targetHandle": "input", + } + ], + "nodes": [ + { + "data": { + "id": "llm-1", + "label": "Get the robot to respond", + "type": "LLMResponseWithPrompt", + "params": { + "llm_provider_id": provider.id, + "llm_model": "fake-model", + "history_type": "node", + "prompt": "Node 1:", + }, + }, + "id": "llm-1", + }, + { + "data": { + "id": "llm-2", + "label": "Get the robot to respond again", + "type": "LLMResponseWithPrompt", + "params": { + "llm_provider_id": provider.id, + "llm_model": "fake-model", + "prompt": "Node 2:", + # No history_type + }, + }, + "id": "llm-2", + }, + ], + } + pipeline.data = data + pipeline.set_nodes([FlowNode(**node) for node in data["nodes"]]) + runnable = PipelineGraph.build_runnable_from_pipeline(pipeline) + + user_input = "The User Input" + runnable.invoke(PipelineState(messages=[user_input], experiment_session=experiment_session))["messages"][-1] + expected_call_messages = [ + [("system", "Node 1:"), ("human", user_input)], + [("system", "Node 2:"), ("human", f"Node 1: {user_input}")], + ] + assert [ + [(message.type, message.content) for message in call] for call in llm.get_call_messages() + ] == expected_call_messages + + history = PipelineChatHistory.objects.get(session=experiment_session.id, name="llm-1") + assert history.type == "node" + assert history.messages.count() == 2 + assert [m.as_tuple() for m in history.messages.all()] == [("human", user_input), ("ai", f"Node 1: {user_input}")] + + history_2 = PipelineChatHistory.objects.filter(session=experiment_session.id, name="llm-2").count() + assert history_2 == 0 + + user_input_2 = "Saying more stuff" + output_2 = runnable.invoke(PipelineState(messages=[user_input_2], experiment_session=experiment_session))[ + "messages" + ][-1] + + expected_output = f"Node 2: Node 1: {user_input_2}" + assert output_2 == expected_output + + assert history.messages.count() == 4 + new_messages = list(reversed([m.as_tuple() for m in history.messages.order_by("-created_at")[:2]])) + assert new_messages == [("human", user_input_2), ("ai", f"Node 1: {user_input_2}")] + + history_2 = PipelineChatHistory.objects.filter(session=experiment_session.id, name="llm-2").count() + assert history_2 == 0 + + expected_call_messages = [ + [("system", "Node 1:"), ("human", user_input)], + [("system", "Node 2:"), ("human", f"Node 1: {user_input}")], + [ + ("system", "Node 1:"), + ("human", user_input), + ("ai", f"Node 1: {user_input}"), + ("human", user_input_2), + ], # History is inserted correctly for Node 1. + [("system", "Node 2:"), ("human", f"Node 1: {user_input_2}")], + ] + assert [ + [(message.type, message.content) for message in call] for call in llm.get_call_messages() + ] == expected_call_messages + + +@django_db_with_data(available_apps=("apps.service_providers",)) +@mock.patch("apps.service_providers.models.LlmProvider.get_llm_service") +@mock.patch("apps.pipelines.nodes.base.PipelineNode.logger", mock.Mock()) +def test_llm_with_multiple_node_histories(get_llm_service, provider, pipeline, experiment_session): + llm = FakeLlmEcho() + service = build_fake_llm_service(None, [0], llm) + get_llm_service.return_value = service + + data = { + "edges": [ + { + "id": "llm-1->llm-2", + "source": "llm-1", + "target": "llm-2", + "sourceHandle": "output", + "targetHandle": "input", + } + ], + "nodes": [ + { + "data": { + "id": "llm-1", + "label": "Get the robot to respond", + "type": "LLMResponseWithPrompt", + "params": { + "llm_provider_id": provider.id, + "llm_model": "fake-model", + "history_type": "node", + "prompt": "Node 1:", + }, + }, + "id": "llm-1", + }, + { + "data": { + "id": "llm-2", + "label": "Get the robot to respond again", + "type": "LLMResponseWithPrompt", + "params": { + "llm_provider_id": provider.id, + "llm_model": "fake-model", + "prompt": "Node 2:", + "history_type": "node", + }, + }, + "id": "llm-2", + }, + ], + } + pipeline.data = data + pipeline.set_nodes([FlowNode(**node) for node in data["nodes"]]) + runnable = PipelineGraph.build_runnable_from_pipeline(pipeline) + + user_input = "The User Input" + output_1 = runnable.invoke(PipelineState(messages=[user_input], experiment_session=experiment_session))["messages"][ + -1 + ] + expected_output = f"Node 2: Node 1: {user_input}" + assert output_1 == expected_output + + history = PipelineChatHistory.objects.get(session=experiment_session.id, name="llm-1") + assert history.type == "node" + assert history.messages.count() == 2 + assert [m.as_tuple() for m in history.messages.all()] == [("human", user_input), ("ai", f"Node 1: {user_input}")] + + history_2 = PipelineChatHistory.objects.get(session=experiment_session.id, name="llm-2") + assert history_2.type == "node" + assert history_2.messages.count() == 2 + assert [m.as_tuple() for m in history_2.messages.all()] == [ + ("human", f"Node 1: {user_input}"), + ("ai", expected_output), + ] + + user_input_2 = "Saying more stuff" + output_2 = runnable.invoke(PipelineState(messages=[user_input_2], experiment_session=experiment_session))[ + "messages" + ][-1] + expected_output = f"Node 2: Node 1: {user_input_2}" + assert output_2 == expected_output + + assert history.messages.count() == 4 + new_messages = list(reversed([m.as_tuple() for m in history.messages.order_by("-created_at")[:2]])) + assert new_messages == [("human", user_input_2), ("ai", f"Node 1: {user_input_2}")] + assert history_2.messages.count() == 4 + new_messages_2 = list(reversed([m.as_tuple() for m in history_2.messages.order_by("-created_at")[:2]])) + assert new_messages_2 == [("human", f"Node 1: {user_input_2}"), ("ai", output_2)] + + expected_call_messages = [ + [("system", "Node 1:"), ("human", user_input)], + [("system", "Node 2:"), ("human", f"Node 1: {user_input}")], + [ + ("system", "Node 1:"), + ("human", user_input), + ("ai", f"Node 1: {user_input}"), + ("human", user_input_2), + ], # History from node 1 is inserted + [ + ("system", "Node 2:"), + ("human", f"Node 1: {user_input}"), + ("ai", f"Node 2: Node 1: {user_input}"), + ("human", f"Node 1: {user_input_2}"), + ], # History from node 2 is inserted + ] + assert [ + [(message.type, message.content) for message in call] for call in llm.get_call_messages() + ] == expected_call_messages diff --git a/apps/pipelines/tests/test_runnable_builder.py b/apps/pipelines/tests/test_runnable_builder.py index 0b01eb673..a946296de 100644 --- a/apps/pipelines/tests/test_runnable_builder.py +++ b/apps/pipelines/tests/test_runnable_builder.py @@ -635,74 +635,6 @@ def test_router_node(get_llm_service, provider, pipeline, experiment_session): assert output["messages"][-1] == "A z" -@django_db_with_data(available_apps=("apps.service_providers",)) -@mock.patch("apps.service_providers.models.LlmProvider.get_llm_service") -@mock.patch("apps.pipelines.nodes.base.PipelineNode.logger", mock.Mock()) -def test_llm_with_node_history(get_llm_service, provider, pipeline, experiment_session): - service = build_fake_llm_echo_service() - get_llm_service.return_value = service - - data = { - "edges": [ - { - "id": "llm-1->llm-2", - "source": "llm-1", - "target": "llm-2", - "sourceHandle": "output", - "targetHandle": "input", - } - ], - "nodes": [ - { - "data": { - "id": "llm-1", - "label": "Get the robot to respond", - "type": "LLMResponseWithPrompt", - "params": { - "llm_provider_id": provider.id, - "llm_model": "fake-model", - "history_type": "node", - "prompt": ("Node 1:"), - }, - }, - "id": "llm-1", - }, - { - "data": { - "id": "llm-1", - "label": "Get the robot to respond again", - "type": "LLMResponseWithPrompt", - "params": { - "llm_provider_id": provider.id, - "llm_model": "fake-model", - "prompt": "Node 2:", - # No history_type - }, - }, - "id": "llm-2", - }, - ], - } - pipeline.data = data - pipeline.set_nodes([FlowNode(**node) for node in data["nodes"]]) - runnable = PipelineGraph.build_runnable_from_pipeline(pipeline) - - user_input = "The User Input" - output_1 = runnable.invoke(PipelineState(messages=[user_input], experiment_session=experiment_session))["messages"][ - -1 - ] - expected_saved_history = f"Node 1: {user_input}" - expected_output = f"Node 2: {expected_saved_history}" - assert output_1 == expected_output - - user_input_2 = "Saying more stuff" - output_2 = runnable.invoke(PipelineState(messages=[user_input_2], experiment_session=experiment_session))[ - "messages" - ][-1] - expected_output = f"Node 2: Node 1: {expected_saved_history} {user_input_2}" - assert output_2 == expected_output - - @contextmanager def extract_structured_data_pipeline(provider, pipeline, llm=None): service = build_fake_llm_service(responses=[{"name": "John"}], token_counts=[0], fake_llm=llm) diff --git a/apps/utils/langchain.py b/apps/utils/langchain.py index 37077b42a..4e439584c 100644 --- a/apps/utils/langchain.py +++ b/apps/utils/langchain.py @@ -140,8 +140,17 @@ class FakeLlmEcho(FakeLlmSimpleTokenCount): responses: list = [] def _call(self, messages: list[BaseMessage], *args, **kwargs) -> str | BaseMessage: + """Returns "{system_message} {user_message}" """ + self.calls.append(mock.call(messages, *args, **kwargs)) - return " ".join(message.content for message in messages) + + user_message = messages[-1].content + try: + system_message = next(message.content for message in messages if message.type == "system") + except StopIteration: + return user_message + + return f"{system_message} {user_message}" @contextmanager From 001d23165e89eb223fab74c50486261ec060aaf3 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Wed, 18 Sep 2024 21:10:58 -0400 Subject: [PATCH 06/22] Store messages as pairs --- ...ipelinechathistory_pipelinechatmessages.py | 6 ++--- apps/pipelines/models.py | 8 +++---- apps/pipelines/nodes/nodes.py | 16 ++++++------- apps/pipelines/tests/test_pipeline_history.py | 24 +++++++++---------- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py b/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py index aca0154de..5fe41d409 100644 --- a/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py +++ b/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1 on 2024-09-12 19:25 +# Generated by Django 5.1 on 2024-09-18 18:02 import django.db.models.deletion from django.db import migrations, models @@ -33,8 +33,8 @@ class Migration(migrations.Migration): ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), ('created_at', models.DateTimeField(auto_now_add=True)), ('updated_at', models.DateTimeField(auto_now=True)), - ('message_type', models.CharField(choices=[('human', 'Human'), ('ai', 'AI'), ('system', 'System')], max_length=10)), - ('content', models.TextField()), + ('human_message', models.TextField()), + ('ai_message', models.TextField()), ('chat_history', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='messages', to='pipelines.pipelinechathistory')), ], options={ diff --git a/apps/pipelines/models.py b/apps/pipelines/models.py index 80b5dc15b..b9e1ebc69 100644 --- a/apps/pipelines/models.py +++ b/apps/pipelines/models.py @@ -203,8 +203,8 @@ class Meta: class PipelineChatMessages(BaseModel): chat_history = models.ForeignKey(PipelineChatHistory, on_delete=models.CASCADE, related_name="messages") - message_type = models.CharField(max_length=10, choices=ChatMessageType.choices) - content = models.TextField() + human_message = models.TextField() + ai_message = models.TextField() - def as_tuple(self): - return (self.message_type, self.content) + def as_tuples(self): + return [(ChatMessageType.HUMAN.value, self.human_message), (ChatMessageType.AI.value, self.ai_message)] diff --git a/apps/pipelines/nodes/nodes.py b/apps/pipelines/nodes/nodes.py index 3ea4fa11e..ffd914e00 100644 --- a/apps/pipelines/nodes/nodes.py +++ b/apps/pipelines/nodes/nodes.py @@ -13,7 +13,6 @@ from pydantic import Field, create_model from apps.channels.models import ChannelPlatform -from apps.chat.models import ChatMessageType from apps.experiments.models import ExperimentSession, ParticipantData, SourceMaterial from apps.pipelines.exceptions import PipelineNodeBuildError from apps.pipelines.models import PipelineChatHistory, PipelineChatHistoryTypes @@ -105,8 +104,7 @@ def _process(self, input, state: PipelineState, node_id: str) -> PipelineState: chain = prompt | super().get_chat_model() output = chain.invoke(context, config=self._config) if self.history_type is not None: - self._save_history(state["experiment_session"], node_id, input, ChatMessageType.HUMAN) - self._save_history(state["experiment_session"], node_id, output.content, ChatMessageType.AI) + self._save_history(state["experiment_session"], node_id, input, output.content) return output.content def _get_context(self, input, state: PipelineState, prompt: ChatPromptTemplate, node_id: str): @@ -134,6 +132,7 @@ def _get_history(self, session: ExperimentSession, node_id: str): history_name = self.history_name elif self.history_type == PipelineChatHistoryTypes.GLOBAL: history_name = "global" + # TODO else: history_name = node_id @@ -141,19 +140,20 @@ def _get_history(self, session: ExperimentSession, node_id: str): history: PipelineChatHistory = session.pipeline_chat_history.get(type=self.history_type, name=history_name) except PipelineChatHistory.DoesNotExist: return [] - messages = history.messages.all() - return [message.as_tuple() for message in messages] + message_pairs = history.messages.all() + return [message for message_pair in message_pairs for message in message_pair.as_tuples()] - def _save_history(self, session: ExperimentSession, node_id: str, content, message_type): + def _save_history(self, session: ExperimentSession, node_id: str, human_message: str, ai_message: str): if self.history_type == PipelineChatHistoryTypes.NAMED: history_name = self.history_name elif self.history_type == PipelineChatHistoryTypes.GLOBAL: - history_name = "global" + return + # TODO else: history_name = node_id history, _ = session.pipeline_chat_history.get_or_create(type=self.history_type, name=history_name) - message = history.messages.create(message_type=message_type, content=content) + message = history.messages.create(human_message=human_message, ai_message=ai_message) return message def _get_participant_data(self, session): diff --git a/apps/pipelines/tests/test_pipeline_history.py b/apps/pipelines/tests/test_pipeline_history.py index 264da67b6..7f1171088 100644 --- a/apps/pipelines/tests/test_pipeline_history.py +++ b/apps/pipelines/tests/test_pipeline_history.py @@ -98,8 +98,8 @@ def test_llm_with_node_history(get_llm_service, provider, pipeline, experiment_s history = PipelineChatHistory.objects.get(session=experiment_session.id, name="llm-1") assert history.type == "node" - assert history.messages.count() == 2 - assert [m.as_tuple() for m in history.messages.all()] == [("human", user_input), ("ai", f"Node 1: {user_input}")] + assert history.messages.count() == 1 + assert history.messages.first().as_tuples() == [("human", user_input), ("ai", f"Node 1: {user_input}")] history_2 = PipelineChatHistory.objects.filter(session=experiment_session.id, name="llm-2").count() assert history_2 == 0 @@ -112,8 +112,8 @@ def test_llm_with_node_history(get_llm_service, provider, pipeline, experiment_s expected_output = f"Node 2: Node 1: {user_input_2}" assert output_2 == expected_output - assert history.messages.count() == 4 - new_messages = list(reversed([m.as_tuple() for m in history.messages.order_by("-created_at")[:2]])) + assert history.messages.count() == 2 + new_messages = history.messages.last().as_tuples() assert new_messages == [("human", user_input_2), ("ai", f"Node 1: {user_input_2}")] history_2 = PipelineChatHistory.objects.filter(session=experiment_session.id, name="llm-2").count() @@ -197,13 +197,13 @@ def test_llm_with_multiple_node_histories(get_llm_service, provider, pipeline, e history = PipelineChatHistory.objects.get(session=experiment_session.id, name="llm-1") assert history.type == "node" - assert history.messages.count() == 2 - assert [m.as_tuple() for m in history.messages.all()] == [("human", user_input), ("ai", f"Node 1: {user_input}")] + assert history.messages.count() == 1 + assert history.messages.first().as_tuples() == [("human", user_input), ("ai", f"Node 1: {user_input}")] history_2 = PipelineChatHistory.objects.get(session=experiment_session.id, name="llm-2") assert history_2.type == "node" - assert history_2.messages.count() == 2 - assert [m.as_tuple() for m in history_2.messages.all()] == [ + assert history_2.messages.count() == 1 + assert history_2.messages.first().as_tuples() == [ ("human", f"Node 1: {user_input}"), ("ai", expected_output), ] @@ -215,11 +215,11 @@ def test_llm_with_multiple_node_histories(get_llm_service, provider, pipeline, e expected_output = f"Node 2: Node 1: {user_input_2}" assert output_2 == expected_output - assert history.messages.count() == 4 - new_messages = list(reversed([m.as_tuple() for m in history.messages.order_by("-created_at")[:2]])) + assert history.messages.count() == 2 + new_messages = history.messages.last().as_tuples() assert new_messages == [("human", user_input_2), ("ai", f"Node 1: {user_input_2}")] - assert history_2.messages.count() == 4 - new_messages_2 = list(reversed([m.as_tuple() for m in history_2.messages.order_by("-created_at")[:2]])) + assert history_2.messages.count() == 2 + new_messages_2 = history_2.messages.last().as_tuples() assert new_messages_2 == [("human", f"Node 1: {user_input_2}"), ("ai", output_2)] expected_call_messages = [ From 9030289499ad67e537f61291e4b539e3f6638cad Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Wed, 18 Sep 2024 21:39:24 -0400 Subject: [PATCH 07/22] Global history --- apps/pipelines/models.py | 4 +- apps/pipelines/nodes/nodes.py | 12 +- apps/pipelines/tests/test_pipeline_history.py | 114 ++++++++++++++++++ 3 files changed, 121 insertions(+), 9 deletions(-) diff --git a/apps/pipelines/models.py b/apps/pipelines/models.py index b9e1ebc69..a44902f35 100644 --- a/apps/pipelines/models.py +++ b/apps/pipelines/models.py @@ -98,15 +98,13 @@ def invoke( pipeline_run = self._create_pipeline_run(input, session) logging_callback = PipelineLoggingCallbackHandler(pipeline_run) - if save_run_to_history and session is not None: - self._save_message_to_history(session, input["messages"][-1], ChatMessageType.HUMAN) - logging_callback.logger.debug("Starting pipeline run", input=input["messages"][-1]) try: output = runnable.invoke(input, config=RunnableConfig(callbacks=[logging_callback])) output = PipelineState(**output).json_safe() pipeline_run.output = output if save_run_to_history and session is not None: + self._save_message_to_history(session, input["messages"][-1], ChatMessageType.HUMAN) self._save_message_to_history(session, output["messages"][-1], ChatMessageType.AI) finally: if pipeline_run.status == PipelineRunStatus.ERROR: diff --git a/apps/pipelines/nodes/nodes.py b/apps/pipelines/nodes/nodes.py index ffd914e00..f7752757c 100644 --- a/apps/pipelines/nodes/nodes.py +++ b/apps/pipelines/nodes/nodes.py @@ -128,11 +128,11 @@ def _get_context(self, input, state: PipelineState, prompt: ChatPromptTemplate, return context def _get_history(self, session: ExperimentSession, node_id: str): + if self.history_type == PipelineChatHistoryTypes.GLOBAL: + return session.chat.get_langchain_messages_until_summary() + if self.history_type == PipelineChatHistoryTypes.NAMED: history_name = self.history_name - elif self.history_type == PipelineChatHistoryTypes.GLOBAL: - history_name = "global" - # TODO else: history_name = node_id @@ -144,11 +144,11 @@ def _get_history(self, session: ExperimentSession, node_id: str): return [message for message_pair in message_pairs for message in message_pair.as_tuples()] def _save_history(self, session: ExperimentSession, node_id: str, human_message: str, ai_message: str): + if self.history_type == PipelineChatHistoryTypes.GLOBAL: + return + if self.history_type == PipelineChatHistoryTypes.NAMED: history_name = self.history_name - elif self.history_type == PipelineChatHistoryTypes.GLOBAL: - return - # TODO else: history_name = node_id diff --git a/apps/pipelines/tests/test_pipeline_history.py b/apps/pipelines/tests/test_pipeline_history.py index 7f1171088..e572cada5 100644 --- a/apps/pipelines/tests/test_pipeline_history.py +++ b/apps/pipelines/tests/test_pipeline_history.py @@ -241,3 +241,117 @@ def test_llm_with_multiple_node_histories(get_llm_service, provider, pipeline, e assert [ [(message.type, message.content) for message in call] for call in llm.get_call_messages() ] == expected_call_messages + + +@django_db_with_data(available_apps=("apps.service_providers",)) +@mock.patch("apps.service_providers.models.LlmProvider.get_llm_service") +@mock.patch("apps.pipelines.nodes.base.PipelineNode.logger", mock.Mock()) +def test_global_history(get_llm_service, provider, pipeline, experiment_session): + llm = FakeLlmEcho() + service = build_fake_llm_service(None, [0], llm) + get_llm_service.return_value = service + + data = { + "edges": [ + { + "id": "llm-1->llm-2", + "source": "llm-1", + "target": "llm-2", + "sourceHandle": "output", + "targetHandle": "input", + } + ], + "nodes": [ + { + "data": { + "id": "llm-1", + "label": "Get the robot to respond", + "type": "LLMResponseWithPrompt", + "params": { + "llm_provider_id": provider.id, + "llm_model": "fake-model", + "history_type": "global", + "prompt": "Node 1:", + }, + }, + "id": "llm-1", + }, + { + "data": { + "id": "llm-2", + "label": "Get the robot to respond again", + "type": "LLMResponseWithPrompt", + "params": { + "llm_provider_id": provider.id, + "llm_model": "fake-model", + "prompt": "Node 2:", + "history_type": "node", + }, + }, + "id": "llm-2", + }, + ], + } + pipeline.data = data + pipeline.set_nodes([FlowNode(**node) for node in data["nodes"]]) + pipeline.save() + + experiment = experiment_session.experiment + experiment.pipeline_id = pipeline.id + experiment.save() + + user_input = "The User Input" + output_1 = experiment.pipeline.invoke( + PipelineState(messages=[user_input], experiment_session=experiment_session), experiment_session + )["messages"][-1] + user_input_2 = "Saying more stuff" + output_2 = experiment.pipeline.invoke( + PipelineState(messages=[user_input_2], experiment_session=experiment_session), experiment_session + )["messages"][-1] + + user_input_3 = "Tell me something interesting" + experiment.pipeline.invoke( + PipelineState(messages=[user_input_3], experiment_session=experiment_session), experiment_session + ) + + expected_call_messages = [ + # First interaction with Node 1, no history yet + [("system", "Node 1:"), ("human", user_input)], + # First interaction with Node 2, no history yet + [("system", "Node 2:"), ("human", f"Node 1: {user_input}")], + # Second interaction with Node 1. The full output from the first run is inserted. + [ + ("system", "Node 1:"), + ("human", user_input), # Input into Node 1 from the first run. + ("ai", output_1), # The output from the whole pipeline from the first run + ("human", user_input_2), + ], + # Second interaction with Node 2. Only the history of Node 2 is inserted. + [ + ("system", "Node 2:"), + ("human", f"Node 1: {user_input}"), # Input into Node 2 from the first run. + ("ai", f"Node 2: Node 1: {user_input}"), # Output of Node 2 from the first run. + ("human", f"Node 1: {user_input_2}"), # Input into Node 2 for this interaction + ], + # Third interaction with Node 1. The full output from the previous runs is inserted. + [ + ("system", "Node 1:"), + ("human", user_input), + ("ai", output_1), + ("human", user_input_2), + ("ai", output_2), + ("human", user_input_3), + ], + # Third interaction with Node 2. Only the history of Node 2 is inserted. + [ + ("system", "Node 2:"), + ("human", f"Node 1: {user_input}"), + ("ai", f"Node 2: Node 1: {user_input}"), # Output of Node 2 from the first run. + ("human", f"Node 1: {user_input_2}"), + ("ai", f"Node 2: Node 1: {user_input_2}"), # Output of Node 2 from the second run. + ("human", f"Node 1: {user_input_3}"), + ], + ] + assert [ + [(message.type, message.content) for message in call] for call in llm.get_call_messages() + ] == expected_call_messages From bc34b1beebd8a168df0eec799f70a20a437c2a98 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Thu, 19 Sep 2024 11:06:43 -0400 Subject: [PATCH 08/22] Fix value shadowing --- apps/pipelines/nodes/nodes.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/apps/pipelines/nodes/nodes.py b/apps/pipelines/nodes/nodes.py index f7752757c..e4f41642e 100644 --- a/apps/pipelines/nodes/nodes.py +++ b/apps/pipelines/nodes/nodes.py @@ -10,7 +10,7 @@ from langchain_core.prompts import PromptTemplate from langchain_core.runnables import RunnableLambda, RunnablePassthrough from langchain_text_splitters import RecursiveCharacterTextSplitter -from pydantic import Field, create_model +from pydantic import BaseModel, Field, create_model from apps.channels.models import ChannelPlatform from apps.experiments.models import ExperimentSession, ParticipantData, SourceMaterial @@ -59,7 +59,7 @@ def all_variables(in_): return template.render(content) -class LLMResponseMixin: +class LLMResponseMixin(BaseModel): llm_provider_id: LlmProviderId llm_model: LlmModel llm_temperature: LlmTemperature = 1.0 @@ -206,8 +206,6 @@ def get_output_map(self): class RouterNode(Passthrough, LLMResponseMixin): __human_name__ = "Router" - llm_provider_id: LlmProviderId - llm_model: LlmModel prompt: str = "You are an extremely helpful router {input}" num_outputs: NumOutputs = 2 keywords: Keywords = [] From f3743321245b27090040eea68714a0970d1a4d97 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Thu, 19 Sep 2024 11:06:56 -0400 Subject: [PATCH 09/22] Don't save pipeline after creating nodes in factory --- apps/utils/factories/pipelines.py | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/utils/factories/pipelines.py b/apps/utils/factories/pipelines.py index 0395c5199..6592124c9 100644 --- a/apps/utils/factories/pipelines.py +++ b/apps/utils/factories/pipelines.py @@ -18,6 +18,7 @@ class Meta: class PipelineFactory(factory.django.DjangoModelFactory): class Meta: model = Pipeline + skip_postgeneration_save = True name = "Test Pipeline" data = { From 4e0c8824f6f41d01e6782f7e623f544bd291d469 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Thu, 19 Sep 2024 11:23:12 -0400 Subject: [PATCH 10/22] Add test for named histories --- apps/pipelines/tests/test_pipeline_history.py | 123 ++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/apps/pipelines/tests/test_pipeline_history.py b/apps/pipelines/tests/test_pipeline_history.py index e572cada5..88487cddf 100644 --- a/apps/pipelines/tests/test_pipeline_history.py +++ b/apps/pipelines/tests/test_pipeline_history.py @@ -355,3 +355,126 @@ def test_global_history(get_llm_service, provider, pipeline, experiment_session) assert [ [(message.type, message.content) for message in call] for call in llm.get_call_messages() ] == expected_call_messages + + +@django_db_with_data(available_apps=("apps.service_providers",)) +@mock.patch("apps.service_providers.models.LlmProvider.get_llm_service") +@mock.patch("apps.pipelines.nodes.base.PipelineNode.logger", mock.Mock()) +def test_llm_with_named_history(get_llm_service, provider, pipeline, experiment_session): + llm = FakeLlmEcho() + service = build_fake_llm_service(None, [0], llm) + get_llm_service.return_value = service + + data = { + "edges": [ + { + "id": "llm-1->llm-2", + "source": "llm-1", + "target": "llm-2", + "sourceHandle": "output", + "targetHandle": "input", + }, + { + "id": "llm-2->llm-3", + "source": "llm-2", + "target": "llm-3", + "sourceHandle": "output", + "targetHandle": "input", + }, + ], + "nodes": [ + { + "data": { + "id": "llm-1", + "label": "Get the robot to respond", + "type": "LLMResponseWithPrompt", + "params": { + "llm_provider_id": provider.id, + "llm_model": "fake-model", + "history_type": "named", + "history_name": "history1", + "prompt": "Node 1:", + }, + }, + "id": "llm-1", + }, + { + "data": { + "id": "llm-2", + "label": "Get the robot to respond again", + "type": "LLMResponseWithPrompt", + "params": { + "llm_provider_id": provider.id, + "llm_model": "fake-model", + "prompt": "Node 2:", + "history_type": "named", + "history_name": "history1", + }, + }, + "id": "llm-2", + }, + { + "data": { + "id": "llm-3", + "label": "No History", + "type": "LLMResponseWithPrompt", + "params": { + "llm_provider_id": provider.id, + "llm_model": "fake-model", + "prompt": "Node 3:", + }, + }, + "id": "llm-3", + }, + ], + } + pipeline.data = data + pipeline.set_nodes([FlowNode(**node) for node in data["nodes"]]) + runnable = PipelineGraph.build_runnable_from_pipeline(pipeline) + + user_input = "The User Input" + runnable.invoke(PipelineState(messages=[user_input], experiment_session=experiment_session))["messages"][-1] + user_input_2 = "Second User Input" + runnable.invoke(PipelineState(messages=[user_input_2], experiment_session=experiment_session))["messages"][-1] + + expected_call_messages = [ + # First call to Node 1 + [("system", "Node 1:"), ("human", user_input)], + # Call to Node 2. Since we are using the same history as for Node 1, it's history is included. + [ + ("system", "Node 2:"), + ("human", user_input), # The input to Node 1 + ("ai", f"Node 1: {user_input}"), # The output from node 1 + ("human", f"Node 1: {user_input}"), # The input to Node 2 + ], + # First call to Node 3, there is no history here. + [("system", "Node 3:"), ("human", f"Node 2: Node 1: {user_input}")], + # Second call to Node 1. Includes the full history from node 1 and node 2 + [ + ("system", "Node 1:"), + ("human", user_input), # First input + ("ai", f"Node 1: {user_input}"), # The output of node 1 + ("human", f"Node 1: {user_input}"), # The input to Node 2 + ("ai", f"Node 2: Node 1: {user_input}"), # The output of Node 2 + ("human", user_input_2), # The second input to Node 1 + ], + # Second call to Node 2. Includes the full history from node 1 and node 2 + [ + ("system", "Node 2:"), + ("human", user_input), # First input + ("ai", f"Node 1: {user_input}"), # The output of node 1 + ("human", f"Node 1: {user_input}"), # The input to Node 2 + ("ai", f"Node 2: Node 1: {user_input}"), # The output of Node 2 + ("human", user_input_2), # The second input to Node 1 + ("ai", f"Node 1: {user_input_2}"), # The second output of Node 1 + ("human", f"Node 1: {user_input_2}"), # The second input to Node 2 (the output from Node 1) + ], + # Second Call to Node 3. Still no history is include, only the output of node 2 used as the input + [ + ("system", "Node 3:"), + ("human", f"Node 2: Node 1: {user_input_2}"), # The output from node 2 into node 3 + ], + ] + assert [ + [(message.type, message.content) for message in call] for call in llm.get_call_messages() + ] == expected_call_messages From 60b8ecb5c72de0cb268bd3c3a08b78635d5ec4a7 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Fri, 20 Sep 2024 13:56:14 -0400 Subject: [PATCH 11/22] Add history type widgets --- .../javascript/apps/pipeline/PipelineNode.tsx | 29 +++++++++++++ assets/javascript/apps/pipeline/widgets.tsx | 42 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/assets/javascript/apps/pipeline/PipelineNode.tsx b/assets/javascript/apps/pipeline/PipelineNode.tsx index 0821d6934..b5bbceceb 100644 --- a/assets/javascript/apps/pipeline/PipelineNode.tsx +++ b/assets/javascript/apps/pipeline/PipelineNode.tsx @@ -5,6 +5,8 @@ import usePipelineStore from "./stores/pipelineStore"; import { InputParam } from "./types/nodeInputTypes"; import { NodeParams } from "./types/nodeParams"; import { + HistoryNameWidget, + HistoryTypeWidget, KeywordsWidget, LlmModelWidget, LlmProviderIdWidget, @@ -157,6 +159,33 @@ export function PipelineNode({ id, data, selected }: NodeProps) { ); } + case "HistoryType": { + return ( + <> +
History Type
+ + + ); + } + case "HistoryName": { + if (params["history_type"] != "named") { + return <>; + } + return ( + <> +
History Name
+ + + ); + } default: return ( <> diff --git a/assets/javascript/apps/pipeline/widgets.tsx b/assets/javascript/apps/pipeline/widgets.tsx index 002efb7ce..6a1116da7 100644 --- a/assets/javascript/apps/pipeline/widgets.tsx +++ b/assets/javascript/apps/pipeline/widgets.tsx @@ -164,3 +164,45 @@ export function SourceMaterialIdWidget({ ); } + +export function HistoryTypeWidget({ + inputParam, + value, + onChange, +}: { + inputParam: InputParam; + value: string | string[]; + onChange: ChangeEventHandler; +}) { + return ( + + ); +} + +export function HistoryNameWidget({ + inputParam, + value, + onChange, +}: { + inputParam: InputParam; + value: string | string[]; + onChange: ChangeEventHandler; +}) { + return ( + + ); +} From d02fd2a1307feab20ef3a3b0bf0e3ecc954bbafb Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Fri, 20 Sep 2024 15:23:19 -0400 Subject: [PATCH 12/22] Compress global history when necessary --- apps/pipelines/nodes/nodes.py | 16 ++++++++++---- apps/pipelines/nodes/types.py | 1 + .../javascript/apps/pipeline/PipelineNode.tsx | 19 +++++++++++++++++ assets/javascript/apps/pipeline/widgets.tsx | 21 +++++++++++++++++++ 4 files changed, 53 insertions(+), 4 deletions(-) diff --git a/apps/pipelines/nodes/nodes.py b/apps/pipelines/nodes/nodes.py index e4f41642e..81da985fc 100644 --- a/apps/pipelines/nodes/nodes.py +++ b/apps/pipelines/nodes/nodes.py @@ -6,13 +6,14 @@ from jinja2 import meta from jinja2.sandbox import SandboxedEnvironment from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder -from langchain_core.messages import BaseMessage +from langchain_core.messages import BaseMessage, HumanMessage from langchain_core.prompts import PromptTemplate from langchain_core.runnables import RunnableLambda, RunnablePassthrough from langchain_text_splitters import RecursiveCharacterTextSplitter from pydantic import BaseModel, Field, create_model from apps.channels.models import ChannelPlatform +from apps.chat.conversation import compress_chat_history from apps.experiments.models import ExperimentSession, ParticipantData, SourceMaterial from apps.pipelines.exceptions import PipelineNodeBuildError from apps.pipelines.models import PipelineChatHistory, PipelineChatHistoryTypes @@ -24,6 +25,7 @@ LlmModel, LlmProviderId, LlmTemperature, + MaxTokenLimit, NumOutputs, PipelineJinjaTemplate, SourceMaterialId, @@ -65,6 +67,7 @@ class LLMResponseMixin(BaseModel): llm_temperature: LlmTemperature = 1.0 history_type: HistoryType = None history_name: HistoryName | None = None + max_token_limit: MaxTokenLimit = 8192 def get_llm_service(self): from apps.service_providers.models import LlmProvider @@ -112,7 +115,7 @@ def _get_context(self, input, state: PipelineState, prompt: ChatPromptTemplate, context = {"input": input} if self.history_type is not None: - context["history"] = self._get_history(session, node_id) + context["history"] = self._get_history(session, node_id, input) if "source_material" in prompt.input_variables and self.source_material_id is None: raise PipelineNodeBuildError("No source material set, but the prompt expects it") @@ -127,9 +130,14 @@ def _get_context(self, input, state: PipelineState, prompt: ChatPromptTemplate, return context - def _get_history(self, session: ExperimentSession, node_id: str): + def _get_history(self, session: ExperimentSession, node_id: str, input): if self.history_type == PipelineChatHistoryTypes.GLOBAL: - return session.chat.get_langchain_messages_until_summary() + return compress_chat_history( + chat=session.chat, + llm=self.get_chat_model(), + max_token_limit=self.max_token_limit, + input_messages=[HumanMessage(content=input)], + ) if self.history_type == PipelineChatHistoryTypes.NAMED: history_name = self.history_name diff --git a/apps/pipelines/nodes/types.py b/apps/pipelines/nodes/types.py index 122fbb3e6..02c3dd264 100644 --- a/apps/pipelines/nodes/types.py +++ b/apps/pipelines/nodes/types.py @@ -9,3 +9,4 @@ Keywords = TypeAliasType("Keywords", list) HistoryType = TypeAliasType("HistoryType", str | None) HistoryName = TypeAliasType("HistoryName", str) +MaxTokenLimit = TypeAliasType("MaxTokenLimit", int) diff --git a/assets/javascript/apps/pipeline/PipelineNode.tsx b/assets/javascript/apps/pipeline/PipelineNode.tsx index b5bbceceb..90b5e8991 100644 --- a/assets/javascript/apps/pipeline/PipelineNode.tsx +++ b/assets/javascript/apps/pipeline/PipelineNode.tsx @@ -10,6 +10,7 @@ import { KeywordsWidget, LlmModelWidget, LlmProviderIdWidget, + MaxTokenLimitWidget, SourceMaterialIdWidget, } from "./widgets"; import { NodeParameterValues } from "./types/nodeParameterValues"; @@ -71,6 +72,7 @@ export function PipelineNode({ id, data, selected }: NodeProps) { <>
Temperature
) { ); } + case "MaxTokenLimit": { + if (params["history_type"] != "global") { + return <>; + } + return ( + <> +
+ Maximum Token Limit +
+ + + ); + } default: return ( <> diff --git a/assets/javascript/apps/pipeline/widgets.tsx b/assets/javascript/apps/pipeline/widgets.tsx index 6a1116da7..bf53ab44f 100644 --- a/assets/javascript/apps/pipeline/widgets.tsx +++ b/assets/javascript/apps/pipeline/widgets.tsx @@ -206,3 +206,24 @@ export function HistoryNameWidget({ > ); } + +export function MaxTokenLimitWidget({ + inputParam, + value, + onChange, +}: { + inputParam: InputParam; + value: string | string[]; + onChange: ChangeEventHandler; +}) { + return ( + + ); +} From 3484dcf34d4de74e0d9e946e38c613baa66f373c Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Tue, 1 Oct 2024 11:21:49 -0400 Subject: [PATCH 13/22] Add new models to content types --- apps/teams/backends.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/teams/backends.py b/apps/teams/backends.py index d2b8edc74..ab9b0885c 100644 --- a/apps/teams/backends.py +++ b/apps/teams/backends.py @@ -70,7 +70,7 @@ def _get_group_permissions(self, user_obj): "syntheticvoice", ], "files": ["file"], - "pipelines": ["pipeline", "pipelinerun", "node"], + "pipelines": ["pipeline", "pipelinechathistory", "pipelinechatmessages", "pipelinerun", "node"], "service_providers": ["authprovider", "llmprovider", "voiceprovider", "messagingprovider", "traceprovider"], "teams": ["invitation", "membership", "team"], "annotations": ["tag", "customtaggeditem", "usercomment"], From 962f2366343d3044325f67107eb2b42508598bf5 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Tue, 1 Oct 2024 11:54:19 -0400 Subject: [PATCH 14/22] Add ability to select no history --- ...ipelinechathistory_pipelinechatmessages.py | 6 +-- apps/pipelines/models.py | 1 + apps/pipelines/nodes/nodes.py | 16 ++++-- apps/pipelines/nodes/types.py | 2 +- apps/pipelines/tests/test_pipeline_history.py | 50 +++++++++++++++++++ assets/javascript/apps/pipeline/widgets.tsx | 1 + 6 files changed, 67 insertions(+), 9 deletions(-) diff --git a/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py b/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py index 5fe41d409..c5eb1796d 100644 --- a/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py +++ b/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1 on 2024-09-18 18:02 +# Generated by Django 5.1 on 2024-10-01 16:04 import django.db.models.deletion from django.db import migrations, models @@ -7,7 +7,7 @@ class Migration(migrations.Migration): dependencies = [ - ('experiments', '0093_participant_name'), + ('experiments', '0094_consentform_working_version_and_more'), ('pipelines', '0005_auto_20240802_0039'), ] @@ -18,7 +18,7 @@ class Migration(migrations.Migration): ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), ('created_at', models.DateTimeField(auto_now_add=True)), ('updated_at', models.DateTimeField(auto_now=True)), - ('type', models.CharField(choices=[('node', 'Node History'), ('named', 'Named History'), ('global', 'Global History')], max_length=10)), + ('type', models.CharField(choices=[('node', 'Node History'), ('named', 'Named History'), ('global', 'Global History'), ('none', 'No History')], max_length=10)), ('name', models.CharField(db_index=True, max_length=128)), ('session', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='pipeline_chat_history', to='experiments.experimentsession')), ], diff --git a/apps/pipelines/models.py b/apps/pipelines/models.py index a44902f35..50f9b3d84 100644 --- a/apps/pipelines/models.py +++ b/apps/pipelines/models.py @@ -186,6 +186,7 @@ class PipelineChatHistoryTypes(models.TextChoices): NODE = "node", "Node History" NAMED = "named", "Named History" GLOBAL = "global", "Global History" + NONE = "none", "No History" class PipelineChatHistory(BaseModel): diff --git a/apps/pipelines/nodes/nodes.py b/apps/pipelines/nodes/nodes.py index 81da985fc..e6de0140f 100644 --- a/apps/pipelines/nodes/nodes.py +++ b/apps/pipelines/nodes/nodes.py @@ -65,7 +65,7 @@ class LLMResponseMixin(BaseModel): llm_provider_id: LlmProviderId llm_model: LlmModel llm_temperature: LlmTemperature = 1.0 - history_type: HistoryType = None + history_type: HistoryType = PipelineChatHistoryTypes.NONE history_name: HistoryName | None = None max_token_limit: MaxTokenLimit = 8192 @@ -106,15 +106,14 @@ def _process(self, input, state: PipelineState, node_id: str) -> PipelineState: context = self._get_context(input, state, prompt, node_id) chain = prompt | super().get_chat_model() output = chain.invoke(context, config=self._config) - if self.history_type is not None: - self._save_history(state["experiment_session"], node_id, input, output.content) + self._save_history(state["experiment_session"], node_id, input, output.content) return output.content def _get_context(self, input, state: PipelineState, prompt: ChatPromptTemplate, node_id: str): session: ExperimentSession = state["experiment_session"] context = {"input": input} - if self.history_type is not None: + if self.history_type != PipelineChatHistoryTypes.NONE: context["history"] = self._get_history(session, node_id, input) if "source_material" in prompt.input_variables and self.source_material_id is None: @@ -130,7 +129,10 @@ def _get_context(self, input, state: PipelineState, prompt: ChatPromptTemplate, return context - def _get_history(self, session: ExperimentSession, node_id: str, input): + def _get_history(self, session: ExperimentSession, node_id: str, input) -> list: + if self.history_type == PipelineChatHistoryTypes.NONE: + return [] + if self.history_type == PipelineChatHistoryTypes.GLOBAL: return compress_chat_history( chat=session.chat, @@ -152,7 +154,11 @@ def _get_history(self, session: ExperimentSession, node_id: str, input): return [message for message_pair in message_pairs for message in message_pair.as_tuples()] def _save_history(self, session: ExperimentSession, node_id: str, human_message: str, ai_message: str): + if self.history_type == PipelineChatHistoryTypes.NONE: + return + if self.history_type == PipelineChatHistoryTypes.GLOBAL: + # Global History is saved outside of the node return if self.history_type == PipelineChatHistoryTypes.NAMED: diff --git a/apps/pipelines/nodes/types.py b/apps/pipelines/nodes/types.py index 02c3dd264..ad08bcaa0 100644 --- a/apps/pipelines/nodes/types.py +++ b/apps/pipelines/nodes/types.py @@ -7,6 +7,6 @@ SourceMaterialId = TypeAliasType("SourceMaterialId", int) NumOutputs = TypeAliasType("NumOutputs", int) Keywords = TypeAliasType("Keywords", list) -HistoryType = TypeAliasType("HistoryType", str | None) +HistoryType = TypeAliasType("HistoryType", str) HistoryName = TypeAliasType("HistoryName", str) MaxTokenLimit = TypeAliasType("MaxTokenLimit", int) diff --git a/apps/pipelines/tests/test_pipeline_history.py b/apps/pipelines/tests/test_pipeline_history.py index 88487cddf..2d43970c6 100644 --- a/apps/pipelines/tests/test_pipeline_history.py +++ b/apps/pipelines/tests/test_pipeline_history.py @@ -478,3 +478,53 @@ def test_llm_with_named_history(get_llm_service, provider, pipeline, experiment_ assert [ [(message.type, message.content) for message in call] for call in llm.get_call_messages() ] == expected_call_messages + + +@django_db_with_data(available_apps=("apps.service_providers",)) +@mock.patch("apps.service_providers.models.LlmProvider.get_llm_service") +@mock.patch("apps.pipelines.nodes.base.PipelineNode.logger", mock.Mock()) +def test_llm_with_no_history(get_llm_service, provider, pipeline, experiment_session): + llm = FakeLlmEcho() + service = build_fake_llm_service(None, [0], llm) + get_llm_service.return_value = service + + data = { + "edges": [], + "nodes": [ + { + "data": { + "id": "llm-1", + "label": "Get the robot to respond", + "type": "LLMResponseWithPrompt", + "params": { + "llm_provider_id": provider.id, + "llm_model": "fake-model", + "history_type": "none", + "prompt": "Node 1:", + }, + }, + "id": "llm-1", + }, + ], + } + pipeline.data = data + pipeline.set_nodes([FlowNode(**node) for node in data["nodes"]]) + runnable = PipelineGraph.build_runnable_from_pipeline(pipeline) + + user_input = "The User Input" + runnable.invoke(PipelineState(messages=[user_input], experiment_session=experiment_session))["messages"][-1] + user_input_2 = "Second User Input" + runnable.invoke(PipelineState(messages=[user_input_2], experiment_session=experiment_session))["messages"][-1] + + expected_call_messages = [ + # First call to Node 1 + [("system", "Node 1:"), ("human", user_input)], + # Second call to Node 1. Includes no history. + [ + ("system", "Node 1:"), + ("human", user_input_2), # The second input to Node 1 + ], + ] + assert [ + [(message.type, message.content) for message in call] for call in llm.get_call_messages() + ] == expected_call_messages diff --git a/assets/javascript/apps/pipeline/widgets.tsx b/assets/javascript/apps/pipeline/widgets.tsx index bf53ab44f..88c02fd0d 100644 --- a/assets/javascript/apps/pipeline/widgets.tsx +++ b/assets/javascript/apps/pipeline/widgets.tsx @@ -181,6 +181,7 @@ export function HistoryTypeWidget({ onChange={onChange} value={value} > + From 72445d4572b68bbd14168420912257744e846f16 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Wed, 9 Oct 2024 23:19:40 -0400 Subject: [PATCH 15/22] Include full prompt in history compression --- apps/pipelines/nodes/nodes.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/pipelines/nodes/nodes.py b/apps/pipelines/nodes/nodes.py index 19f532c31..9551f6d11 100644 --- a/apps/pipelines/nodes/nodes.py +++ b/apps/pipelines/nodes/nodes.py @@ -6,7 +6,7 @@ from jinja2 import meta from jinja2.sandbox import SandboxedEnvironment from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder -from langchain_core.messages import BaseMessage, HumanMessage +from langchain_core.messages import BaseMessage from langchain_core.prompts import PromptTemplate from langchain_core.runnables import RunnableLambda, RunnablePassthrough from langchain_text_splitters import RecursiveCharacterTextSplitter @@ -105,6 +105,9 @@ def _process(self, input, state: PipelineState, node_id: str) -> PipelineState: [("system", self.prompt), MessagesPlaceholder("history", optional=True), ("human", "{input}")] ) context = self._get_context(input, state, prompt, node_id) + if self.history_type != PipelineChatHistoryTypes.NONE: + input_messages = prompt.invoke(context).to_messages() + context["history"] = self._get_history(state["experiment_session"], node_id, input_messages) chain = prompt | super().get_chat_model() output = chain.invoke(context, config=self._config) self._save_history(state["experiment_session"], node_id, input, output.content) @@ -114,9 +117,6 @@ def _get_context(self, input, state: PipelineState, prompt: ChatPromptTemplate, session: ExperimentSession = state["experiment_session"] context = {"input": input} - if self.history_type != PipelineChatHistoryTypes.NONE: - context["history"] = self._get_history(session, node_id, input) - if "source_material" in prompt.input_variables and self.source_material_id is None: raise PipelineNodeBuildError("No source material set, but the prompt expects it") if "source_material" in prompt.input_variables and self.source_material_id: @@ -130,7 +130,7 @@ def _get_context(self, input, state: PipelineState, prompt: ChatPromptTemplate, return context - def _get_history(self, session: ExperimentSession, node_id: str, input) -> list: + def _get_history(self, session: ExperimentSession, node_id: str, input_messages: list) -> list: if self.history_type == PipelineChatHistoryTypes.NONE: return [] @@ -139,7 +139,7 @@ def _get_history(self, session: ExperimentSession, node_id: str, input) -> list: chat=session.chat, llm=self.get_chat_model(), max_token_limit=self.max_token_limit, - input_messages=[HumanMessage(content=input)], + input_messages=input_messages, ) if self.history_type == PipelineChatHistoryTypes.NAMED: From ebf875049806cb1142d23cfb087f6b47681c309c Mon Sep 17 00:00:00 2001 From: Simon Kelly Date: Thu, 10 Oct 2024 10:32:00 +0200 Subject: [PATCH 16/22] use strict equality Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- assets/javascript/apps/pipeline/PipelineNode.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/assets/javascript/apps/pipeline/PipelineNode.tsx b/assets/javascript/apps/pipeline/PipelineNode.tsx index 13c14f597..778f48ea2 100644 --- a/assets/javascript/apps/pipeline/PipelineNode.tsx +++ b/assets/javascript/apps/pipeline/PipelineNode.tsx @@ -191,7 +191,7 @@ export function PipelineNode({ id, data, selected }: NodeProps) { ); } case "MaxTokenLimit": { - if (params["history_type"] != "global") { + if (params["history_type"] !== "global") { return <>; } return ( From 17357d89fe43f3f858c89f00bee15768bd76ec1c Mon Sep 17 00:00:00 2001 From: Simon Kelly Date: Thu, 10 Oct 2024 10:35:21 +0200 Subject: [PATCH 17/22] use exists instead of count Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- apps/pipelines/tests/test_pipeline_history.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/pipelines/tests/test_pipeline_history.py b/apps/pipelines/tests/test_pipeline_history.py index 2d43970c6..bae71677f 100644 --- a/apps/pipelines/tests/test_pipeline_history.py +++ b/apps/pipelines/tests/test_pipeline_history.py @@ -101,8 +101,7 @@ def test_llm_with_node_history(get_llm_service, provider, pipeline, experiment_s assert history.messages.count() == 1 assert history.messages.first().as_tuples() == [("human", user_input), ("ai", f"Node 1: {user_input}")] - history_2 = PipelineChatHistory.objects.filter(session=experiment_session.id, name="llm-2").count() - assert history_2 == 0 + assert not PipelineChatHistory.objects.filter(session=experiment_session.id, name="llm-2").exists() user_input_2 = "Saying more stuff" output_2 = runnable.invoke(PipelineState(messages=[user_input_2], experiment_session=experiment_session))[ From 01591ea12f8ac38568eba69a1cb880eb204a5f89 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Thu, 10 Oct 2024 08:26:59 -0400 Subject: [PATCH 18/22] Use UniqueConstraint instead of unique_together --- ...pipelinechathistory_pipelinechatmessages_and_more.py} | 9 ++++++--- apps/pipelines/models.py | 4 +++- 2 files changed, 9 insertions(+), 4 deletions(-) rename apps/pipelines/migrations/{0006_pipelinechathistory_pipelinechatmessages.py => 0006_pipelinechathistory_pipelinechatmessages_and_more.py} (85%) diff --git a/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py b/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages_and_more.py similarity index 85% rename from apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py rename to apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages_and_more.py index c5eb1796d..d7d249ef6 100644 --- a/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages.py +++ b/apps/pipelines/migrations/0006_pipelinechathistory_pipelinechatmessages_and_more.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1 on 2024-10-01 16:04 +# Generated by Django 5.1 on 2024-10-10 12:26 import django.db.models.deletion from django.db import migrations, models @@ -7,7 +7,7 @@ class Migration(migrations.Migration): dependencies = [ - ('experiments', '0094_consentform_working_version_and_more'), + ('experiments', '0095_experiment_debug_mode_enabled'), ('pipelines', '0005_auto_20240802_0039'), ] @@ -24,7 +24,6 @@ class Migration(migrations.Migration): ], options={ 'ordering': ['-created_at'], - 'unique_together': {('session', 'type', 'name')}, }, ), migrations.CreateModel( @@ -41,4 +40,8 @@ class Migration(migrations.Migration): 'abstract': False, }, ), + migrations.AddConstraint( + model_name='pipelinechathistory', + constraint=models.UniqueConstraint(fields=('session', 'type', 'name'), name='unique_session_type_name'), + ), ] diff --git a/apps/pipelines/models.py b/apps/pipelines/models.py index 50f9b3d84..5ec387740 100644 --- a/apps/pipelines/models.py +++ b/apps/pipelines/models.py @@ -196,7 +196,9 @@ class PipelineChatHistory(BaseModel): name = models.CharField(max_length=128, db_index=True) # Either the name of the named history, or the node id class Meta: - unique_together = [("session", "type", "name")] + constraints = [ + models.UniqueConstraint(fields=("session", "type", "name"), name="unique_session_type_name"), + ] ordering = ["-created_at"] From 08a3600caf4d472a5de917f10c5b78fde4a6433a Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Thu, 10 Oct 2024 08:27:11 -0400 Subject: [PATCH 19/22] Update signature of base class --- apps/pipelines/nodes/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/pipelines/nodes/base.py b/apps/pipelines/nodes/base.py index 4f467e362..27d5bd243 100644 --- a/apps/pipelines/nodes/base.py +++ b/apps/pipelines/nodes/base.py @@ -87,7 +87,7 @@ def process(self, node_id: str, incoming_edges: list, state: PipelineState, conf else PipelineState(outputs={node_id: output}) ) - def _process(self, input: str, state: PipelineState) -> PipelineState: + def _process(self, input: str, state: PipelineState, node_id: str) -> PipelineState: """The method that executes node specific functionality""" raise NotImplementedError From f5ea07de67dc7dfee27a96b05cd7ea96ac7d5c01 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Thu, 10 Oct 2024 08:29:12 -0400 Subject: [PATCH 20/22] Javascript !== Python --- assets/javascript/apps/pipeline/PipelineNode.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/assets/javascript/apps/pipeline/PipelineNode.tsx b/assets/javascript/apps/pipeline/PipelineNode.tsx index 778f48ea2..b032950cf 100644 --- a/assets/javascript/apps/pipeline/PipelineNode.tsx +++ b/assets/javascript/apps/pipeline/PipelineNode.tsx @@ -176,7 +176,7 @@ export function PipelineNode({ id, data, selected }: NodeProps) { ); } case "HistoryName": { - if (params["history_type"] != "named") { + if (params["history_type"] !== "named") { return <>; } return ( From 5b81e86269b38ec7fe5222550e39242649e366a5 Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Thu, 10 Oct 2024 08:30:59 -0400 Subject: [PATCH 21/22] Use **kwargs instead of unused arguments --- apps/pipelines/nodes/base.py | 2 +- apps/pipelines/nodes/nodes.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/pipelines/nodes/base.py b/apps/pipelines/nodes/base.py index 27d5bd243..6159c5faf 100644 --- a/apps/pipelines/nodes/base.py +++ b/apps/pipelines/nodes/base.py @@ -79,7 +79,7 @@ def process(self, node_id: str, incoming_edges: list, state: PipelineState, conf break else: # This is the first node in the graph input = state["messages"][-1] - output = self._process(input, state, node_id) + output = self._process(input=input, state=state, node_id=node_id) # Append the output to the state, otherwise do not change the state return ( PipelineState(messages=[output], outputs={node_id: output}) diff --git a/apps/pipelines/nodes/nodes.py b/apps/pipelines/nodes/nodes.py index 9551f6d11..9f66d9e71 100644 --- a/apps/pipelines/nodes/nodes.py +++ b/apps/pipelines/nodes/nodes.py @@ -40,7 +40,7 @@ class RenderTemplate(PipelineNode): __human_name__ = "Render a template" template_string: PipelineJinjaTemplate - def _process(self, input, state: PipelineState, node_id: str) -> PipelineState: + def _process(self, input, **kwargs) -> PipelineState: def all_variables(in_): return {var: in_ for var in meta.find_undeclared_variables(env.parse(self.template_string))} @@ -88,7 +88,7 @@ def get_chat_model(self): class LLMResponse(PipelineNode, LLMResponseMixin): __human_name__ = "LLM response" - def _process(self, input, state: PipelineState, node_id: str) -> PipelineState: + def _process(self, input, **kwargs) -> PipelineState: llm = self.get_chat_model() output = llm.invoke(input, config=self._config) return output.content @@ -191,7 +191,7 @@ class SendEmail(PipelineNode): recipient_list: str subject: str - def _process(self, input, state: PipelineState, node_id: str) -> PipelineState: + def _process(self, input, **kwargs) -> PipelineState: send_email_from_pipeline.delay( recipient_list=self.recipient_list.split(","), subject=self.subject, message=input ) @@ -264,7 +264,7 @@ def _prompt_chain(self, reference_data): def extraction_chain(self, json_schema, reference_data): return self._prompt_chain(reference_data) | super().get_chat_model().with_structured_output(json_schema) - def _process(self, input, state: PipelineState, node_id: str) -> PipelineState: + def _process(self, input, state: PipelineState, **kwargs) -> PipelineState: json_schema = self.to_json_schema(json.loads(self.data_schema)) reference_data = self.get_reference_data(state) prompt_token_count = self._get_prompt_token_count(reference_data, json_schema) From b6d8abc03e75407a82dd6b305dace1415f69f34e Mon Sep 17 00:00:00 2001 From: Farid Rener Date: Thu, 10 Oct 2024 08:35:54 -0400 Subject: [PATCH 22/22] Factor out history name --- apps/pipelines/nodes/nodes.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/apps/pipelines/nodes/nodes.py b/apps/pipelines/nodes/nodes.py index 9f66d9e71..476e313ab 100644 --- a/apps/pipelines/nodes/nodes.py +++ b/apps/pipelines/nodes/nodes.py @@ -130,6 +130,11 @@ def _get_context(self, input, state: PipelineState, prompt: ChatPromptTemplate, return context + def _get_history_name(self, node_id): + if self.history_type == PipelineChatHistoryTypes.NAMED: + return self.history_name + return node_id + def _get_history(self, session: ExperimentSession, node_id: str, input_messages: list) -> list: if self.history_type == PipelineChatHistoryTypes.NONE: return [] @@ -142,13 +147,10 @@ def _get_history(self, session: ExperimentSession, node_id: str, input_messages: input_messages=input_messages, ) - if self.history_type == PipelineChatHistoryTypes.NAMED: - history_name = self.history_name - else: - history_name = node_id - try: - history: PipelineChatHistory = session.pipeline_chat_history.get(type=self.history_type, name=history_name) + history: PipelineChatHistory = session.pipeline_chat_history.get( + type=self.history_type, name=self._get_history_name(node_id) + ) except PipelineChatHistory.DoesNotExist: return [] message_pairs = history.messages.all() @@ -162,12 +164,9 @@ def _save_history(self, session: ExperimentSession, node_id: str, human_message: # Global History is saved outside of the node return - if self.history_type == PipelineChatHistoryTypes.NAMED: - history_name = self.history_name - else: - history_name = node_id - - history, _ = session.pipeline_chat_history.get_or_create(type=self.history_type, name=history_name) + history, _ = session.pipeline_chat_history.get_or_create( + type=self.history_type, name=self._get_history_name(node_id) + ) message = history.messages.create(human_message=human_message, ai_message=ai_message) return message