diff --git a/backend/danswer/agent_search/answer_query/graph_builder.py b/backend/danswer/agent_search/answer_query/graph_builder.py new file mode 100644 index 00000000000..bded80549b8 --- /dev/null +++ b/backend/danswer/agent_search/answer_query/graph_builder.py @@ -0,0 +1,100 @@ +from langgraph.graph import END +from langgraph.graph import START +from langgraph.graph import StateGraph + +from danswer.agent_search.answer_query.nodes.answer_check import answer_check +from danswer.agent_search.answer_query.nodes.answer_generation import answer_generation +from danswer.agent_search.answer_query.nodes.format_answer import format_answer +from danswer.agent_search.answer_query.states import AnswerQueryInput +from danswer.agent_search.answer_query.states import AnswerQueryOutput +from danswer.agent_search.answer_query.states import AnswerQueryState +from danswer.agent_search.expanded_retrieval.graph_builder import ( + expanded_retrieval_graph_builder, +) + + +def answer_query_graph_builder() -> StateGraph: + graph = StateGraph( + state_schema=AnswerQueryState, + input=AnswerQueryInput, + output=AnswerQueryOutput, + ) + + ### Add nodes ### + + expanded_retrieval = expanded_retrieval_graph_builder().compile() + graph.add_node( + node="expanded_retrieval_for_initial_decomp", + action=expanded_retrieval, + ) + graph.add_node( + node="answer_check", + action=answer_check, + ) + graph.add_node( + node="answer_generation", + action=answer_generation, + ) + graph.add_node( + node="format_answer", + action=format_answer, + ) + + ### Add edges ### + + graph.add_edge( + start_key=START, + end_key="expanded_retrieval_for_initial_decomp", + ) + graph.add_edge( + start_key="expanded_retrieval_for_initial_decomp", + end_key="answer_generation", + ) + graph.add_edge( + start_key="answer_generation", + end_key="answer_check", + ) + graph.add_edge( + start_key="answer_check", + end_key="format_answer", + ) + graph.add_edge( + start_key="format_answer", + end_key=END, + ) + + return graph + + +if __name__ == "__main__": + from danswer.db.engine import get_session_context_manager + from danswer.llm.factory import get_default_llms + from danswer.context.search.models import SearchRequest + + graph = answer_query_graph_builder() + compiled_graph = graph.compile() + primary_llm, fast_llm = get_default_llms() + search_request = SearchRequest( + query="Who made Excel and what other products did they make?", + ) + with get_session_context_manager() as db_session: + inputs = AnswerQueryInput( + search_request=search_request, + primary_llm=primary_llm, + fast_llm=fast_llm, + db_session=db_session, + query_to_answer="Who made Excel?", + ) + output = compiled_graph.invoke( + input=inputs, + # debug=True, + # subgraphs=True, + ) + print(output) + # for namespace, chunk in compiled_graph.stream( + # input=inputs, + # # debug=True, + # subgraphs=True, + # ): + # print(namespace) + # print(chunk) diff --git a/backend/danswer/agent_search/answer_query/nodes/answer_check.py b/backend/danswer/agent_search/answer_query/nodes/answer_check.py new file mode 100644 index 00000000000..ba9b541f2a2 --- /dev/null +++ b/backend/danswer/agent_search/answer_query/nodes/answer_check.py @@ -0,0 +1,30 @@ +from langchain_core.messages import HumanMessage +from langchain_core.messages import merge_message_runs + +from danswer.agent_search.answer_query.states import AnswerQueryState +from danswer.agent_search.answer_query.states import QACheckOutput +from danswer.agent_search.shared_graph_utils.prompts import BASE_CHECK_PROMPT + + +def answer_check(state: AnswerQueryState) -> QACheckOutput: + msg = [ + HumanMessage( + content=BASE_CHECK_PROMPT.format( + question=state["search_request"].query, + base_answer=state["answer"], + ) + ) + ] + + fast_llm = state["fast_llm"] + response = list( + fast_llm.stream( + prompt=msg, + ) + ) + + response_str = merge_message_runs(response, chunk_separator="")[0].content + + return QACheckOutput( + answer_quality=response_str, + ) diff --git a/backend/danswer/agent_search/answer_query/nodes/answer_generation.py b/backend/danswer/agent_search/answer_query/nodes/answer_generation.py new file mode 100644 index 00000000000..c33d0619638 --- /dev/null +++ b/backend/danswer/agent_search/answer_query/nodes/answer_generation.py @@ -0,0 +1,32 @@ +from langchain_core.messages import HumanMessage +from langchain_core.messages import merge_message_runs + +from danswer.agent_search.answer_query.states import AnswerQueryState +from danswer.agent_search.answer_query.states import QAGenerationOutput +from danswer.agent_search.shared_graph_utils.prompts import BASE_RAG_PROMPT +from danswer.agent_search.shared_graph_utils.utils import format_docs + + +def answer_generation(state: AnswerQueryState) -> QAGenerationOutput: + query = state["query_to_answer"] + docs = state["reordered_documents"] + + print(f"Number of verified retrieval docs: {len(docs)}") + + msg = [ + HumanMessage( + content=BASE_RAG_PROMPT.format(question=query, context=format_docs(docs)) + ) + ] + + fast_llm = state["fast_llm"] + response = list( + fast_llm.stream( + prompt=msg, + ) + ) + + answer_str = merge_message_runs(response, chunk_separator="")[0].content + return QAGenerationOutput( + answer=answer_str, + ) diff --git a/backend/danswer/agent_search/answer_query/nodes/format_answer.py b/backend/danswer/agent_search/answer_query/nodes/format_answer.py new file mode 100644 index 00000000000..117d157d08d --- /dev/null +++ b/backend/danswer/agent_search/answer_query/nodes/format_answer.py @@ -0,0 +1,16 @@ +from danswer.agent_search.answer_query.states import AnswerQueryOutput +from danswer.agent_search.answer_query.states import AnswerQueryState +from danswer.agent_search.answer_query.states import SearchAnswerResults + + +def format_answer(state: AnswerQueryState) -> AnswerQueryOutput: + return AnswerQueryOutput( + decomp_answer_results=[ + SearchAnswerResults( + query=state["query_to_answer"], + quality=state["answer_quality"], + answer=state["answer"], + documents=state["reordered_documents"], + ) + ], + ) diff --git a/backend/danswer/agent_search/answer_query/states.py b/backend/danswer/agent_search/answer_query/states.py new file mode 100644 index 00000000000..a7973cf59e0 --- /dev/null +++ b/backend/danswer/agent_search/answer_query/states.py @@ -0,0 +1,45 @@ +from typing import Annotated +from typing import TypedDict + +from pydantic import BaseModel + +from danswer.agent_search.core_state import PrimaryState +from danswer.agent_search.shared_graph_utils.operators import dedup_inference_sections +from danswer.context.search.models import InferenceSection + + +class SearchAnswerResults(BaseModel): + query: str + answer: str + quality: str + documents: Annotated[list[InferenceSection], dedup_inference_sections] + + +class QACheckOutput(TypedDict, total=False): + answer_quality: str + + +class QAGenerationOutput(TypedDict, total=False): + answer: str + + +class ExpandedRetrievalOutput(TypedDict): + reordered_documents: Annotated[list[InferenceSection], dedup_inference_sections] + + +class AnswerQueryState( + PrimaryState, + QACheckOutput, + QAGenerationOutput, + ExpandedRetrievalOutput, + total=True, +): + query_to_answer: str + + +class AnswerQueryInput(PrimaryState, total=True): + query_to_answer: str + + +class AnswerQueryOutput(TypedDict): + decomp_answer_results: list[SearchAnswerResults] diff --git a/backend/danswer/agent_search/core_state.py b/backend/danswer/agent_search/core_state.py new file mode 100644 index 00000000000..cc0057136eb --- /dev/null +++ b/backend/danswer/agent_search/core_state.py @@ -0,0 +1,15 @@ +from typing import TypedDict + +from sqlalchemy.orm import Session + +from danswer.context.search.models import SearchRequest +from danswer.llm.interfaces import LLM + + +class PrimaryState(TypedDict, total=False): + search_request: SearchRequest + primary_llm: LLM + fast_llm: LLM + # a single session for the entire agent search + # is fine if we are only reading + db_session: Session diff --git a/backend/danswer/agent_search/deep_answer/edges.py b/backend/danswer/agent_search/deep_answer/edges.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/backend/danswer/agent_search/deep_answer/graph_builder.py b/backend/danswer/agent_search/deep_answer/graph_builder.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/backend/danswer/agent_search/deep_answer/nodes/answer_generation.py b/backend/danswer/agent_search/deep_answer/nodes/answer_generation.py new file mode 100644 index 00000000000..99389eb4883 --- /dev/null +++ b/backend/danswer/agent_search/deep_answer/nodes/answer_generation.py @@ -0,0 +1,114 @@ +from typing import Any + +from langchain_core.messages import HumanMessage + +from danswer.agent_search.main.states import MainState +from danswer.agent_search.shared_graph_utils.prompts import COMBINED_CONTEXT +from danswer.agent_search.shared_graph_utils.prompts import MODIFIED_RAG_PROMPT +from danswer.agent_search.shared_graph_utils.utils import format_docs +from danswer.agent_search.shared_graph_utils.utils import normalize_whitespace + + +# aggregate sub questions and answers +def deep_answer_generation(state: MainState) -> dict[str, Any]: + """ + Generate answer + + Args: + state (messages): The current state + + Returns: + dict: The updated state with re-phrased question + """ + print("---DEEP GENERATE---") + + question = state["original_question"] + docs = state["deduped_retrieval_docs"] + + deep_answer_context = state["core_answer_dynamic_context"] + + print(f"Number of verified retrieval docs - deep: {len(docs)}") + + combined_context = normalize_whitespace( + COMBINED_CONTEXT.format( + deep_answer_context=deep_answer_context, formated_docs=format_docs(docs) + ) + ) + + msg = [ + HumanMessage( + content=MODIFIED_RAG_PROMPT.format( + question=question, combined_context=combined_context + ) + ) + ] + + # Grader + model = state["fast_llm"] + response = model.invoke(msg) + + return { + "deep_answer": response.content, + } + + +def final_stuff(state: MainState) -> dict[str, Any]: + """ + Invokes the agent model to generate a response based on the current state. Given + the question, it will decide to retrieve using the retriever tool, or simply end. + + Args: + state (messages): The current state + + Returns: + dict: The updated state with the agent response appended to messages + """ + print("---FINAL---") + + messages = state["log_messages"] + time_ordered_messages = [x.pretty_repr() for x in messages] + time_ordered_messages.sort() + + print("Message Log:") + print("\n".join(time_ordered_messages)) + + initial_sub_qas = state["initial_sub_qas"] + initial_sub_qa_list = [] + for initial_sub_qa in initial_sub_qas: + if initial_sub_qa["sub_answer_check"] == "yes": + initial_sub_qa_list.append( + f' Question:\n {initial_sub_qa["sub_question"]}\n --\n Answer:\n {initial_sub_qa["sub_answer"]}\n -----' + ) + + initial_sub_qa_context = "\n".join(initial_sub_qa_list) + + base_answer = state["base_answer"] + + print(f"Final Base Answer:\n{base_answer}") + print("--------------------------------") + print(f"Initial Answered Sub Questions:\n{initial_sub_qa_context}") + print("--------------------------------") + + if not state.get("deep_answer"): + print("No Deep Answer was required") + return {} + + deep_answer = state["deep_answer"] + sub_qas = state["sub_qas"] + sub_qa_list = [] + for sub_qa in sub_qas: + if sub_qa["sub_answer_check"] == "yes": + sub_qa_list.append( + f' Question:\n {sub_qa["sub_question"]}\n --\n Answer:\n {sub_qa["sub_answer"]}\n -----' + ) + + sub_qa_context = "\n".join(sub_qa_list) + + print(f"Final Base Answer:\n{base_answer}") + print("--------------------------------") + print(f"Final Deep Answer:\n{deep_answer}") + print("--------------------------------") + print("Sub Questions and Answers:") + print(sub_qa_context) + + return {} diff --git a/backend/danswer/agent_search/deep_answer/nodes/deep_decomp.py b/backend/danswer/agent_search/deep_answer/nodes/deep_decomp.py new file mode 100644 index 00000000000..d61357640a0 --- /dev/null +++ b/backend/danswer/agent_search/deep_answer/nodes/deep_decomp.py @@ -0,0 +1,78 @@ +import json +import re +from datetime import datetime +from typing import Any + +from langchain_core.messages import HumanMessage + +from danswer.agent_search.main.states import MainState +from danswer.agent_search.shared_graph_utils.prompts import DEEP_DECOMPOSE_PROMPT +from danswer.agent_search.shared_graph_utils.utils import format_entity_term_extraction +from danswer.agent_search.shared_graph_utils.utils import generate_log_message + + +def decompose(state: MainState) -> dict[str, Any]: + """ """ + + node_start_time = datetime.now() + + question = state["original_question"] + base_answer = state["base_answer"] + + # get the entity term extraction dict and properly format it + entity_term_extraction_dict = state["retrieved_entities_relationships"][ + "retrieved_entities_relationships" + ] + + entity_term_extraction_str = format_entity_term_extraction( + entity_term_extraction_dict + ) + + initial_question_answers = state["initial_sub_qas"] + + addressed_question_list = [ + x["sub_question"] + for x in initial_question_answers + if x["sub_answer_check"] == "yes" + ] + failed_question_list = [ + x["sub_question"] + for x in initial_question_answers + if x["sub_answer_check"] == "no" + ] + + msg = [ + HumanMessage( + content=DEEP_DECOMPOSE_PROMPT.format( + question=question, + entity_term_extraction_str=entity_term_extraction_str, + base_answer=base_answer, + answered_sub_questions="\n - ".join(addressed_question_list), + failed_sub_questions="\n - ".join(failed_question_list), + ), + ) + ] + + # Grader + model = state["fast_llm"] + response = model.invoke(msg) + + cleaned_response = re.sub(r"```json\n|\n```", "", response.pretty_repr()) + parsed_response = json.loads(cleaned_response) + + sub_questions_dict = {} + for sub_question_nr, sub_question_dict in enumerate( + parsed_response["sub_questions"] + ): + sub_question_dict["answered"] = False + sub_question_dict["verified"] = False + sub_questions_dict[sub_question_nr] = sub_question_dict + + return { + "decomposed_sub_questions_dict": sub_questions_dict, + "log_messages": generate_log_message( + message="deep - decompose", + node_start_time=node_start_time, + graph_start_time=state["graph_start_time"], + ), + } diff --git a/backend/danswer/agent_search/deep_answer/nodes/entity_term_extraction.py b/backend/danswer/agent_search/deep_answer/nodes/entity_term_extraction.py new file mode 100644 index 00000000000..e369707ee5c --- /dev/null +++ b/backend/danswer/agent_search/deep_answer/nodes/entity_term_extraction.py @@ -0,0 +1,40 @@ +import json +import re +from typing import Any + +from langchain_core.messages import HumanMessage +from langchain_core.messages import merge_message_runs + +from danswer.agent_search.main.states import MainState +from danswer.agent_search.shared_graph_utils.prompts import ENTITY_TERM_PROMPT +from danswer.agent_search.shared_graph_utils.utils import format_docs + + +def entity_term_extraction(state: MainState) -> dict[str, Any]: + """Extract entities and terms from the question and context""" + + question = state["original_question"] + docs = state["deduped_retrieval_docs"] + + doc_context = format_docs(docs) + + msg = [ + HumanMessage( + content=ENTITY_TERM_PROMPT.format(question=question, context=doc_context), + ) + ] + fast_llm = state["fast_llm"] + # Grader + llm_response_list = list( + fast_llm.stream( + prompt=msg, + ) + ) + llm_response = merge_message_runs(llm_response_list, chunk_separator="")[0].content + + cleaned_response = re.sub(r"```json\n|\n```", "", llm_response) + parsed_response = json.loads(cleaned_response) + + return { + "retrieved_entities_relationships": parsed_response, + } diff --git a/backend/danswer/agent_search/deep_answer/nodes/sub_qa_level_aggregator.py b/backend/danswer/agent_search/deep_answer/nodes/sub_qa_level_aggregator.py new file mode 100644 index 00000000000..d384dc51380 --- /dev/null +++ b/backend/danswer/agent_search/deep_answer/nodes/sub_qa_level_aggregator.py @@ -0,0 +1,30 @@ +from typing import Any + +from danswer.agent_search.main.states import MainState + + +# aggregate sub questions and answers +def sub_qa_level_aggregator(state: MainState) -> dict[str, Any]: + sub_qas = state["sub_qas"] + + dynamic_context_list = [ + "Below you will find useful information to answer the original question:" + ] + checked_sub_qas = [] + + for core_answer_sub_qa in sub_qas: + question = core_answer_sub_qa["sub_question"] + answer = core_answer_sub_qa["sub_answer"] + verified = core_answer_sub_qa["sub_answer_check"] + + if verified == "yes": + dynamic_context_list.append( + f"Question:\n{question}\n\nAnswer:\n{answer}\n\n---\n\n" + ) + checked_sub_qas.append({"sub_question": question, "sub_answer": answer}) + dynamic_context = "\n".join(dynamic_context_list) + + return { + "core_answer_dynamic_context": dynamic_context, + "checked_sub_qas": checked_sub_qas, + } diff --git a/backend/danswer/agent_search/deep_answer/nodes/sub_qa_manager.py b/backend/danswer/agent_search/deep_answer/nodes/sub_qa_manager.py new file mode 100644 index 00000000000..11167cd04bc --- /dev/null +++ b/backend/danswer/agent_search/deep_answer/nodes/sub_qa_manager.py @@ -0,0 +1,19 @@ +from typing import Any + +from danswer.agent_search.main.states import MainState + + +def sub_qa_manager(state: MainState) -> dict[str, Any]: + """ """ + + sub_questions_dict = state["decomposed_sub_questions_dict"] + + sub_questions = {} + + for sub_question_nr, sub_question_dict in sub_questions_dict.items(): + sub_questions[sub_question_nr] = sub_question_dict["sub_question"] + + return { + "sub_questions": sub_questions, + "num_new_question_iterations": 0, + } diff --git a/backend/danswer/agent_search/deep_answer/states.py b/backend/danswer/agent_search/deep_answer/states.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/backend/danswer/agent_search/expanded_retrieval/edges.py b/backend/danswer/agent_search/expanded_retrieval/edges.py new file mode 100644 index 00000000000..b4b8a88ffb7 --- /dev/null +++ b/backend/danswer/agent_search/expanded_retrieval/edges.py @@ -0,0 +1,44 @@ +from collections.abc import Hashable + +from langchain_core.messages import HumanMessage +from langchain_core.messages import merge_message_runs +from langgraph.types import Send + +from danswer.agent_search.expanded_retrieval.nodes.doc_retrieval import RetrieveInput +from danswer.agent_search.expanded_retrieval.states import ExpandedRetrievalInput +from danswer.agent_search.shared_graph_utils.prompts import REWRITE_PROMPT_MULTI +from danswer.llm.interfaces import LLM + + +def parallel_retrieval_edge(state: ExpandedRetrievalInput) -> list[Send | Hashable]: + print(f"parallel_retrieval_edge state: {state.keys()}") + + # This should be better... + question = state.get("query_to_answer") or state["search_request"].query + llm: LLM = state["fast_llm"] + + msg = [ + HumanMessage( + content=REWRITE_PROMPT_MULTI.format(question=question), + ) + ] + llm_response_list = list( + llm.stream( + prompt=msg, + ) + ) + llm_response = merge_message_runs(llm_response_list, chunk_separator="")[0].content + + print(f"llm_response: {llm_response}") + + rewritten_queries = llm_response.split("\n") + + print(f"rewritten_queries: {rewritten_queries}") + + return [ + Send( + "doc_retrieval", + RetrieveInput(query_to_retrieve=query, **state), + ) + for query in rewritten_queries + ] diff --git a/backend/danswer/agent_search/expanded_retrieval/graph_builder.py b/backend/danswer/agent_search/expanded_retrieval/graph_builder.py new file mode 100644 index 00000000000..4c8f421eb79 --- /dev/null +++ b/backend/danswer/agent_search/expanded_retrieval/graph_builder.py @@ -0,0 +1,88 @@ +from langgraph.graph import END +from langgraph.graph import START +from langgraph.graph import StateGraph + +from danswer.agent_search.expanded_retrieval.edges import parallel_retrieval_edge +from danswer.agent_search.expanded_retrieval.nodes.doc_reranking import doc_reranking +from danswer.agent_search.expanded_retrieval.nodes.doc_retrieval import doc_retrieval +from danswer.agent_search.expanded_retrieval.nodes.doc_verification import ( + doc_verification, +) +from danswer.agent_search.expanded_retrieval.nodes.verification_kickoff import ( + verification_kickoff, +) +from danswer.agent_search.expanded_retrieval.states import ExpandedRetrievalInput +from danswer.agent_search.expanded_retrieval.states import ExpandedRetrievalOutput +from danswer.agent_search.expanded_retrieval.states import ExpandedRetrievalState + + +def expanded_retrieval_graph_builder() -> StateGraph: + graph = StateGraph( + state_schema=ExpandedRetrievalState, + input=ExpandedRetrievalInput, + output=ExpandedRetrievalOutput, + ) + + ### Add nodes ### + + graph.add_node( + node="doc_retrieval", + action=doc_retrieval, + ) + graph.add_node( + node="verification_kickoff", + action=verification_kickoff, + ) + graph.add_node( + node="doc_verification", + action=doc_verification, + ) + graph.add_node( + node="doc_reranking", + action=doc_reranking, + ) + + ### Add edges ### + + graph.add_conditional_edges( + source=START, + path=parallel_retrieval_edge, + path_map=["doc_retrieval"], + ) + graph.add_edge( + start_key="doc_retrieval", + end_key="verification_kickoff", + ) + graph.add_edge( + start_key="doc_verification", + end_key="doc_reranking", + ) + graph.add_edge( + start_key="doc_reranking", + end_key=END, + ) + + return graph + + +if __name__ == "__main__": + from danswer.db.engine import get_session_context_manager + from danswer.llm.factory import get_default_llms + from danswer.context.search.models import SearchRequest + + graph = expanded_retrieval_graph_builder() + compiled_graph = graph.compile() + primary_llm, fast_llm = get_default_llms() + search_request = SearchRequest( + query="Who made Excel and what other products did they make?", + ) + with get_session_context_manager() as db_session: + inputs = ExpandedRetrievalInput( + search_request=search_request, + primary_llm=primary_llm, + fast_llm=fast_llm, + db_session=db_session, + query_to_answer="Who made Excel?", + ) + for thing in compiled_graph.stream(inputs, debug=True): + print(thing) diff --git a/backend/danswer/agent_search/expanded_retrieval/nodes/doc_reranking.py b/backend/danswer/agent_search/expanded_retrieval/nodes/doc_reranking.py new file mode 100644 index 00000000000..a92c6a59093 --- /dev/null +++ b/backend/danswer/agent_search/expanded_retrieval/nodes/doc_reranking.py @@ -0,0 +1,11 @@ +from danswer.agent_search.expanded_retrieval.states import DocRerankingOutput +from danswer.agent_search.expanded_retrieval.states import ExpandedRetrievalState + + +def doc_reranking(state: ExpandedRetrievalState) -> DocRerankingOutput: + print(f"doc_reranking state: {state.keys()}") + + verified_documents = state["verified_documents"] + reranked_documents = verified_documents + + return DocRerankingOutput(reranked_documents=reranked_documents) diff --git a/backend/danswer/agent_search/expanded_retrieval/nodes/doc_retrieval.py b/backend/danswer/agent_search/expanded_retrieval/nodes/doc_retrieval.py new file mode 100644 index 00000000000..b98de7f8d3b --- /dev/null +++ b/backend/danswer/agent_search/expanded_retrieval/nodes/doc_retrieval.py @@ -0,0 +1,47 @@ +from danswer.agent_search.expanded_retrieval.states import DocRetrievalOutput +from danswer.agent_search.expanded_retrieval.states import ExpandedRetrievalState +from danswer.context.search.models import InferenceSection +from danswer.context.search.models import SearchRequest +from danswer.context.search.pipeline import SearchPipeline +from danswer.db.engine import get_session_context_manager + + +class RetrieveInput(ExpandedRetrievalState): + query_to_retrieve: str + + +def doc_retrieval(state: RetrieveInput) -> DocRetrievalOutput: + # def doc_retrieval(state: RetrieveInput) -> Command[Literal["doc_verification"]]: + """ + Retrieve documents + + Args: + state (dict): The current graph state + + Returns: + state (dict): New key added to state, documents, that contains retrieved documents + """ + print(f"doc_retrieval state: {state.keys()}") + + state["query_to_retrieve"] + + documents: list[InferenceSection] = [] + llm = state["primary_llm"] + fast_llm = state["fast_llm"] + # db_session = state["db_session"] + query_to_retrieve = state["search_request"].query + with get_session_context_manager() as db_session1: + documents = SearchPipeline( + search_request=SearchRequest( + query=query_to_retrieve, + ), + user=None, + llm=llm, + fast_llm=fast_llm, + db_session=db_session1, + ).reranked_sections + + print(f"retrieved documents: {len(documents)}") + return DocRetrievalOutput( + retrieved_documents=documents, + ) diff --git a/backend/danswer/agent_search/expanded_retrieval/nodes/doc_verification.py b/backend/danswer/agent_search/expanded_retrieval/nodes/doc_verification.py new file mode 100644 index 00000000000..ab2907efd13 --- /dev/null +++ b/backend/danswer/agent_search/expanded_retrieval/nodes/doc_verification.py @@ -0,0 +1,60 @@ +from langchain_core.messages import HumanMessage +from langchain_core.messages import merge_message_runs + +from danswer.agent_search.expanded_retrieval.states import DocVerificationOutput +from danswer.agent_search.expanded_retrieval.states import ExpandedRetrievalState +from danswer.agent_search.shared_graph_utils.models import BinaryDecision +from danswer.agent_search.shared_graph_utils.prompts import VERIFIER_PROMPT +from danswer.context.search.models import InferenceSection + + +class DocVerificationInput(ExpandedRetrievalState, total=True): + doc_to_verify: InferenceSection + + +def doc_verification(state: DocVerificationInput) -> DocVerificationOutput: + """ + Check whether the document is relevant for the original user question + + Args: + state (VerifierState): The current state + + Returns: + dict: ict: The updated state with the final decision + """ + + print(f"doc_verification state: {state.keys()}") + + original_query = state["search_request"].query + doc_to_verify = state["doc_to_verify"] + document_content = doc_to_verify.combined_content + + msg = [ + HumanMessage( + content=VERIFIER_PROMPT.format( + question=original_query, document_content=document_content + ) + ) + ] + + fast_llm = state["fast_llm"] + response = list( + fast_llm.stream( + prompt=msg, + ) + ) + + response_string = merge_message_runs(response, chunk_separator="")[0].content + # Convert string response to proper dictionary format + decision_dict = {"decision": response_string.lower()} + formatted_response = BinaryDecision.model_validate(decision_dict) + + print(f"Verdict: {formatted_response.decision}") + + verified_documents = [] + if formatted_response.decision == "yes": + verified_documents.append(doc_to_verify) + + return DocVerificationOutput( + verified_documents=verified_documents, + ) diff --git a/backend/danswer/agent_search/expanded_retrieval/nodes/verification_kickoff.py b/backend/danswer/agent_search/expanded_retrieval/nodes/verification_kickoff.py new file mode 100644 index 00000000000..56e96b8f9fa --- /dev/null +++ b/backend/danswer/agent_search/expanded_retrieval/nodes/verification_kickoff.py @@ -0,0 +1,27 @@ +from typing import Literal + +from langgraph.types import Command +from langgraph.types import Send + +from danswer.agent_search.expanded_retrieval.nodes.doc_verification import ( + DocVerificationInput, +) +from danswer.agent_search.expanded_retrieval.states import ExpandedRetrievalState + + +def verification_kickoff( + state: ExpandedRetrievalState, +) -> Command[Literal["doc_verification"]]: + print(f"verification_kickoff state: {state.keys()}") + + documents = state["retrieved_documents"] + return Command( + update={}, + goto=[ + Send( + node="doc_verification", + arg=DocVerificationInput(doc_to_verify=doc, **state), + ) + for doc in documents + ], + ) diff --git a/backend/danswer/agent_search/expanded_retrieval/prompts.py b/backend/danswer/agent_search/expanded_retrieval/prompts.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/backend/danswer/agent_search/expanded_retrieval/states.py b/backend/danswer/agent_search/expanded_retrieval/states.py new file mode 100644 index 00000000000..73b0c8713eb --- /dev/null +++ b/backend/danswer/agent_search/expanded_retrieval/states.py @@ -0,0 +1,36 @@ +from typing import Annotated +from typing import TypedDict + +from danswer.agent_search.core_state import PrimaryState +from danswer.agent_search.shared_graph_utils.operators import dedup_inference_sections +from danswer.context.search.models import InferenceSection + + +class DocRetrievalOutput(TypedDict, total=False): + retrieved_documents: Annotated[list[InferenceSection], dedup_inference_sections] + + +class DocVerificationOutput(TypedDict, total=False): + verified_documents: Annotated[list[InferenceSection], dedup_inference_sections] + + +class DocRerankingOutput(TypedDict, total=False): + reranked_documents: Annotated[list[InferenceSection], dedup_inference_sections] + + +class ExpandedRetrievalState( + PrimaryState, + DocRetrievalOutput, + DocVerificationOutput, + DocRerankingOutput, + total=True, +): + query_to_answer: str + + +class ExpandedRetrievalInput(PrimaryState, total=True): + query_to_answer: str + + +class ExpandedRetrievalOutput(TypedDict): + reordered_documents: Annotated[list[InferenceSection], dedup_inference_sections] diff --git a/backend/danswer/agent_search/main/edges.py b/backend/danswer/agent_search/main/edges.py new file mode 100644 index 00000000000..0f9f6c09131 --- /dev/null +++ b/backend/danswer/agent_search/main/edges.py @@ -0,0 +1,61 @@ +from collections.abc import Hashable + +from langgraph.types import Send + +from danswer.agent_search.answer_query.states import AnswerQueryInput +from danswer.agent_search.main.states import MainState + + +def parallelize_decompozed_answer_queries(state: MainState) -> list[Send | Hashable]: + return [ + Send( + "answer_query", + AnswerQueryInput( + **state, + query_to_answer=query, + ), + ) + for query in state["initial_decomp_queries"] + ] + + +# def continue_to_answer_sub_questions(state: QAState) -> Union[Hashable, list[Hashable]]: +# # Routes re-written queries to the (parallel) retrieval steps +# # Notice the 'Send()' API that takes care of the parallelization +# return [ +# Send( +# "sub_answers_graph", +# ResearchQAState( +# sub_question=sub_question["sub_question_str"], +# sub_question_nr=sub_question["sub_question_nr"], +# graph_start_time=state["graph_start_time"], +# primary_llm=state["primary_llm"], +# fast_llm=state["fast_llm"], +# ), +# ) +# for sub_question in state["sub_questions"] +# ] + + +# def continue_to_deep_answer(state: QAState) -> Union[Hashable, list[Hashable]]: +# print("---GO TO DEEP ANSWER OR END---") + +# base_answer = state["base_answer"] + +# question = state["original_question"] + +# BASE_CHECK_MESSAGE = [ +# HumanMessage( +# content=BASE_CHECK_PROMPT.format(question=question, base_answer=base_answer) +# ) +# ] + +# model = state["fast_llm"] +# response = model.invoke(BASE_CHECK_MESSAGE) + +# print(f"CAN WE CONTINUE W/O GENERATING A DEEP ANSWER? - {response.pretty_repr()}") + +# if response.pretty_repr() == "no": +# return "decompose" +# else: +# return "end" diff --git a/backend/danswer/agent_search/main/graph_builder.py b/backend/danswer/agent_search/main/graph_builder.py new file mode 100644 index 00000000000..6a282639e97 --- /dev/null +++ b/backend/danswer/agent_search/main/graph_builder.py @@ -0,0 +1,98 @@ +from langgraph.graph import END +from langgraph.graph import START +from langgraph.graph import StateGraph + +from danswer.agent_search.answer_query.graph_builder import answer_query_graph_builder +from danswer.agent_search.expanded_retrieval.graph_builder import ( + expanded_retrieval_graph_builder, +) +from danswer.agent_search.main.edges import parallelize_decompozed_answer_queries +from danswer.agent_search.main.nodes.base_decomp import main_decomp_base +from danswer.agent_search.main.nodes.generate_initial_answer import ( + generate_initial_answer, +) +from danswer.agent_search.main.states import MainInput +from danswer.agent_search.main.states import MainState + + +def main_graph_builder() -> StateGraph: + graph = StateGraph( + state_schema=MainState, + input=MainInput, + ) + + ### Add nodes ### + + graph.add_node( + node="base_decomp", + action=main_decomp_base, + ) + answer_query_subgraph = answer_query_graph_builder().compile() + graph.add_node( + node="answer_query", + action=answer_query_subgraph, + ) + expanded_retrieval_subgraph = expanded_retrieval_graph_builder().compile() + graph.add_node( + node="expanded_retrieval", + action=expanded_retrieval_subgraph, + ) + graph.add_node( + node="generate_initial_answer", + action=generate_initial_answer, + ) + + ### Add edges ### + graph.add_edge( + start_key=START, + end_key="expanded_retrieval", + ) + + graph.add_edge( + start_key=START, + end_key="base_decomp", + ) + graph.add_conditional_edges( + source="base_decomp", + path=parallelize_decompozed_answer_queries, + path_map=["answer_query"], + ) + graph.add_edge( + start_key=["answer_query", "expanded_retrieval"], + end_key="generate_initial_answer", + ) + graph.add_edge( + start_key="generate_initial_answer", + end_key=END, + ) + + return graph + + +if __name__ == "__main__": + from danswer.db.engine import get_session_context_manager + from danswer.llm.factory import get_default_llms + from danswer.context.search.models import SearchRequest + + graph = main_graph_builder() + compiled_graph = graph.compile() + primary_llm, fast_llm = get_default_llms() + search_request = SearchRequest( + query="If i am familiar with the function that I need, how can I type it into a cell?", + ) + with get_session_context_manager() as db_session: + inputs = MainInput( + search_request=search_request, + primary_llm=primary_llm, + fast_llm=fast_llm, + db_session=db_session, + ) + for thing in compiled_graph.stream( + input=inputs, + # stream_mode="debug", + # debug=True, + subgraphs=True, + ): + # print(thing) + print() + print() diff --git a/backend/danswer/agent_search/main/nodes/base_decomp.py b/backend/danswer/agent_search/main/nodes/base_decomp.py new file mode 100644 index 00000000000..ce3dbd10818 --- /dev/null +++ b/backend/danswer/agent_search/main/nodes/base_decomp.py @@ -0,0 +1,31 @@ +from langchain_core.messages import HumanMessage + +from danswer.agent_search.main.states import BaseDecompOutput +from danswer.agent_search.main.states import MainState +from danswer.agent_search.shared_graph_utils.prompts import INITIAL_DECOMPOSITION_PROMPT +from danswer.agent_search.shared_graph_utils.utils import clean_and_parse_list_string + + +def main_decomp_base(state: MainState) -> BaseDecompOutput: + question = state["search_request"].query + + msg = [ + HumanMessage( + content=INITIAL_DECOMPOSITION_PROMPT.format(question=question), + ) + ] + + # Get the rewritten queries in a defined format + model = state["fast_llm"] + response = model.invoke(msg) + + content = response.pretty_repr() + list_of_subquestions = clean_and_parse_list_string(content) + + decomp_list: list[str] = [ + sub_question["sub_question"].strip() for sub_question in list_of_subquestions + ] + + return BaseDecompOutput( + initial_decomp_queries=decomp_list, + ) diff --git a/backend/danswer/agent_search/main/nodes/generate_initial_answer.py b/backend/danswer/agent_search/main/nodes/generate_initial_answer.py new file mode 100644 index 00000000000..92d70c062db --- /dev/null +++ b/backend/danswer/agent_search/main/nodes/generate_initial_answer.py @@ -0,0 +1,53 @@ +from langchain_core.messages import HumanMessage + +from danswer.agent_search.main.states import InitialAnswerOutput +from danswer.agent_search.main.states import MainState +from danswer.agent_search.shared_graph_utils.prompts import INITIAL_RAG_PROMPT +from danswer.agent_search.shared_graph_utils.utils import format_docs + + +def generate_initial_answer(state: MainState) -> InitialAnswerOutput: + print("---GENERATE INITIAL---") + + question = state["search_request"].query + docs = state["documents"] + + decomp_answer_results = state["decomp_answer_results"] + + good_qa_list: list[str] = [] + + _SUB_QUESTION_ANSWER_TEMPLATE = """ + Sub-Question:\n - {sub_question}\n --\nAnswer:\n - {sub_answer}\n\n + """ + for decomp_answer_result in decomp_answer_results: + if ( + decomp_answer_result.quality.lower() == "yes" + and len(decomp_answer_result.answer) > 0 + and decomp_answer_result.answer != "I don't know" + ): + good_qa_list.append( + _SUB_QUESTION_ANSWER_TEMPLATE.format( + sub_question=decomp_answer_result.query, + sub_answer=decomp_answer_result.answer, + ) + ) + + sub_question_answer_str = "\n\n------\n\n".join(good_qa_list) + + msg = [ + HumanMessage( + content=INITIAL_RAG_PROMPT.format( + question=question, + context=format_docs(docs), + answered_sub_questions=sub_question_answer_str, + ) + ) + ] + + # Grader + model = state["fast_llm"] + response = model.invoke(msg) + answer = response.pretty_repr() + + print(answer) + return InitialAnswerOutput(initial_answer=answer) diff --git a/backend/danswer/agent_search/main/states.py b/backend/danswer/agent_search/main/states.py new file mode 100644 index 00000000000..e6124f5a5eb --- /dev/null +++ b/backend/danswer/agent_search/main/states.py @@ -0,0 +1,37 @@ +from operator import add +from typing import Annotated +from typing import TypedDict + +from danswer.agent_search.answer_query.states import SearchAnswerResults +from danswer.agent_search.core_state import PrimaryState +from danswer.agent_search.shared_graph_utils.operators import dedup_inference_sections +from danswer.context.search.models import InferenceSection + + +class BaseDecompOutput(TypedDict, total=False): + initial_decomp_queries: list[str] + + +class InitialAnswerOutput(TypedDict, total=False): + initial_answer: str + + +class MainState( + PrimaryState, + BaseDecompOutput, + InitialAnswerOutput, + total=True, +): + documents: Annotated[list[InferenceSection], dedup_inference_sections] + decomp_answer_results: Annotated[list[SearchAnswerResults], add] + + +class MainInput(PrimaryState, total=True): + pass + + +class MainOutput(TypedDict): + """ + This is not used because defining the output only matters for filtering the output of + a .invoke() call but we are streaming so we just yield the entire state. + """ diff --git a/backend/danswer/agent_search/run_graph.py b/backend/danswer/agent_search/run_graph.py new file mode 100644 index 00000000000..6cdd0653778 --- /dev/null +++ b/backend/danswer/agent_search/run_graph.py @@ -0,0 +1,27 @@ +from danswer.agent_search.primary_graph.graph_builder import build_core_graph +from danswer.llm.answering.answer import AnswerStream +from danswer.llm.interfaces import LLM +from danswer.tools.tool import Tool + + +def run_graph( + query: str, + llm: LLM, + tools: list[Tool], +) -> AnswerStream: + graph = build_core_graph() + + inputs = { + "original_query": query, + "messages": [], + "tools": tools, + "llm": llm, + } + compiled_graph = graph.compile() + output = compiled_graph.invoke(input=inputs) + yield from output + + +if __name__ == "__main__": + pass + # run_graph("What is the capital of France?", llm, []) diff --git a/backend/danswer/agent_search/shared_graph_utils/models.py b/backend/danswer/agent_search/shared_graph_utils/models.py new file mode 100644 index 00000000000..162d651fe51 --- /dev/null +++ b/backend/danswer/agent_search/shared_graph_utils/models.py @@ -0,0 +1,12 @@ +from typing import Literal + +from pydantic import BaseModel + + +# Pydantic models for structured outputs +class RewrittenQueries(BaseModel): + rewritten_queries: list[str] + + +class BinaryDecision(BaseModel): + decision: Literal["yes", "no"] diff --git a/backend/danswer/agent_search/shared_graph_utils/operators.py b/backend/danswer/agent_search/shared_graph_utils/operators.py new file mode 100644 index 00000000000..f6e5c91ebda --- /dev/null +++ b/backend/danswer/agent_search/shared_graph_utils/operators.py @@ -0,0 +1,9 @@ +from danswer.context.search.models import InferenceSection +from danswer.llm.answering.prune_and_merge import _merge_sections + + +def dedup_inference_sections( + list1: list[InferenceSection], list2: list[InferenceSection] +) -> list[InferenceSection]: + deduped = _merge_sections(list1 + list2) + return deduped diff --git a/backend/danswer/agent_search/shared_graph_utils/prompts.py b/backend/danswer/agent_search/shared_graph_utils/prompts.py new file mode 100644 index 00000000000..a3eeba29fb9 --- /dev/null +++ b/backend/danswer/agent_search/shared_graph_utils/prompts.py @@ -0,0 +1,427 @@ +REWRITE_PROMPT_MULTI_ORIGINAL = """ \n + Please convert an initial user question into a 2-3 more appropriate short and pointed search queries for retrievel from a + document store. Particularly, try to think about resolving ambiguities and make the search queries more specific, + enabling the system to search more broadly. + Also, try to make the search queries not redundant, i.e. not too similar! \n\n + Here is the initial question: + \n ------- \n + {question} + \n ------- \n + Formulate the queries separated by '--' (Do not say 'Query 1: ...', just write the querytext): """ + +REWRITE_PROMPT_MULTI = """ \n + Please create a list of 2-3 sample documents that could answer an original question. Each document + should be about as long as the original question. \n + Here is the initial question: + \n ------- \n + {question} + \n ------- \n + Formulate the sample documents separated by '--' (Do not say 'Document 1: ...', just write the text): """ + +BASE_RAG_PROMPT = """ \n + You are an assistant for question-answering tasks. Use the context provided below - and only the + provided context - to answer the question. If you don't know the answer or if the provided context is + empty, just say "I don't know". Do not use your internal knowledge! + + Again, only use the provided context and do not use your internal knowledge! If you cannot answer the + question based on the context, say "I don't know". It is a matter of life and death that you do NOT + use your internal knowledge, just the provided information! + + Use three sentences maximum and keep the answer concise. + answer concise.\nQuestion:\n {question} \nContext:\n {context} \n\n + \n\n + Answer:""" + +BASE_CHECK_PROMPT = """ \n + Please check whether 1) the suggested answer seems to fully address the original question AND 2)the + original question requests a simple, factual answer, and there are no ambiguities, judgements, + aggregations, or any other complications that may require extra context. (I.e., if the question is + somewhat addressed, but the answer would benefit from more context, then answer with 'no'.) + + Please only answer with 'yes' or 'no' \n + Here is the initial question: + \n ------- \n + {question} + \n ------- \n + Here is the proposed answer: + \n ------- \n + {base_answer} + \n ------- \n + Please answer with yes or no:""" + +VERIFIER_PROMPT = """ \n + Please check whether the document seems to be relevant for the answer of the question. Please + only answer with 'yes' or 'no' \n + Here is the initial question: + \n ------- \n + {question} + \n ------- \n + Here is the document text: + \n ------- \n + {document_content} + \n ------- \n + Please answer with yes or no:""" + +INITIAL_DECOMPOSITION_PROMPT_BASIC = """ \n + Please decompose an initial user question into not more than 4 appropriate sub-questions that help to + answer the original question. The purpose for this decomposition is to isolate individulal entities + (i.e., 'compare sales of company A and company B' -> 'what are sales for company A' + 'what are sales + for company B'), split ambiguous terms (i.e., 'what is our success with company A' -> 'what are our + sales with company A' + 'what is our market share with company A' + 'is company A a reference customer + for us'), etc. Each sub-question should be realistically be answerable by a good RAG system. \n + + Here is the initial question: + \n ------- \n + {question} + \n ------- \n + + Please formulate your answer as a list of subquestions: + + Answer: + """ + +REWRITE_PROMPT_SINGLE = """ \n + Please convert an initial user question into a more appropriate search query for retrievel from a + document store. \n + Here is the initial question: + \n ------- \n + {question} + \n ------- \n + + Formulate the query: """ + +MODIFIED_RAG_PROMPT = """You are an assistant for question-answering tasks. Use the context provided below + - and only this context - to answer the question. If you don't know the answer, just say "I don't know". + Use three sentences maximum and keep the answer concise. + Pay also particular attention to the sub-questions and their answers, at least it may enrich the answer. + Again, only use the provided context and do not use your internal knowledge! If you cannot answer the + question based on the context, say "I don't know". It is a matter of life and death that you do NOT + use your internal knowledge, just the provided information! + + \nQuestion: {question} + \nContext: {combined_context} \n + + Answer:""" + +ORIG_DEEP_DECOMPOSE_PROMPT = """ \n + An initial user question needs to be answered. An initial answer has been provided but it wasn't quite + good enough. Also, some sub-questions had been answered and this information has been used to provide + the initial answer. Some other subquestions may have been suggested based on little knowledge, but they + were not directly answerable. Also, some entities, relationships and terms are givenm to you so that + you have an idea of how the avaiolable data looks like. + + Your role is to generate 3-5 new sub-questions that would help to answer the initial question, + considering: + + 1) The initial question + 2) The initial answer that was found to be unsatisfactory + 3) The sub-questions that were answered + 4) The sub-questions that were suggested but not answered + 5) The entities, relationships and terms that were extracted from the context + + The individual questions should be answerable by a good RAG system. + So a good idea would be to use the sub-questions to resolve ambiguities and/or to separate the + question for different entities that may be involved in the original question, but in a way that does + not duplicate questions that were already tried. + + Additional Guidelines: + - The sub-questions should be specific to the question and provide richer context for the question, + resolve ambiguities, or address shortcoming of the initial answer + - Each sub-question - when answered - should be relevant for the answer to the original question + - The sub-questions should be free from comparisions, ambiguities,judgements, aggregations, or any + other complications that may require extra context. + - The sub-questions MUST have the full context of the original question so that it can be executed by + a RAG system independently without the original question available + (Example: + - initial question: "What is the capital of France?" + - bad sub-question: "What is the name of the river there?" + - good sub-question: "What is the name of the river that flows through Paris?" + - For each sub-question, please provide a short explanation for why it is a good sub-question. So + generate a list of dictionaries with the following format: + [{{"sub_question": , "explanation": , "search_term": }}, ...] + + \n\n + Here is the initial question: + \n ------- \n + {question} + \n ------- \n + + Here is the initial sub-optimal answer: + \n ------- \n + {base_answer} + \n ------- \n + + Here are the sub-questions that were answered: + \n ------- \n + {answered_sub_questions} + \n ------- \n + + Here are the sub-questions that were suggested but not answered: + \n ------- \n + {failed_sub_questions} + \n ------- \n + + And here are the entities, relationships and terms extracted from the context: + \n ------- \n + {entity_term_extraction_str} + \n ------- \n + + Please generate the list of good, fully contextualized sub-questions that would help to address the + main question. Again, please find questions that are NOT overlapping too much with the already answered + sub-questions or those that already were suggested and failed. + In other words - what can we try in addition to what has been tried so far? + + Please think through it step by step and then generate the list of json dictionaries with the following + format: + + {{"sub_questions": [{{"sub_question": , + "explanation": , + "search_term": }}, + ...]}} """ + +DEEP_DECOMPOSE_PROMPT = """ \n + An initial user question needs to be answered. An initial answer has been provided but it wasn't quite + good enough. Also, some sub-questions had been answered and this information has been used to provide + the initial answer. Some other subquestions may have been suggested based on little knowledge, but they + were not directly answerable. Also, some entities, relationships and terms are givenm to you so that + you have an idea of how the avaiolable data looks like. + + Your role is to generate 4-6 new sub-questions that would help to answer the initial question, + considering: + + 1) The initial question + 2) The initial answer that was found to be unsatisfactory + 3) The sub-questions that were answered + 4) The sub-questions that were suggested but not answered + 5) The entities, relationships and terms that were extracted from the context + + The individual questions should be answerable by a good RAG system. + So a good idea would be to use the sub-questions to resolve ambiguities and/or to separate the + question for different entities that may be involved in the original question, but in a way that does + not duplicate questions that were already tried. + + Additional Guidelines: + - The sub-questions should be specific to the question and provide richer context for the question, + resolve ambiguities, or address shortcoming of the initial answer + - Each sub-question - when answered - should be relevant for the answer to the original question + - The sub-questions should be free from comparisions, ambiguities,judgements, aggregations, or any + other complications that may require extra context. + - The sub-questions MUST have the full context of the original question so that it can be executed by + a RAG system independently without the original question available + (Example: + - initial question: "What is the capital of France?" + - bad sub-question: "What is the name of the river there?" + - good sub-question: "What is the name of the river that flows through Paris?" + - For each sub-question, please also provide a search term that can be used to retrieve relevant + documents from a document store. + \n\n + Here is the initial question: + \n ------- \n + {question} + \n ------- \n + + Here is the initial sub-optimal answer: + \n ------- \n + {base_answer} + \n ------- \n + + Here are the sub-questions that were answered: + \n ------- \n + {answered_sub_questions} + \n ------- \n + + Here are the sub-questions that were suggested but not answered: + \n ------- \n + {failed_sub_questions} + \n ------- \n + + And here are the entities, relationships and terms extracted from the context: + \n ------- \n + {entity_term_extraction_str} + \n ------- \n + + Please generate the list of good, fully contextualized sub-questions that would help to address the + main question. Again, please find questions that are NOT overlapping too much with the already answered + sub-questions or those that already were suggested and failed. + In other words - what can we try in addition to what has been tried so far? + + Generate the list of json dictionaries with the following format: + + {{"sub_questions": [{{"sub_question": , + "search_term": }}, + ...]}} """ + +DECOMPOSE_PROMPT = """ \n + For an initial user question, please generate at 5-10 individual sub-questions whose answers would help + \n to answer the initial question. The individual questions should be answerable by a good RAG system. + So a good idea would be to \n use the sub-questions to resolve ambiguities and/or to separate the + question for different entities that may be involved in the original question. + + In order to arrive at meaningful sub-questions, please also consider the context retrieved from the + document store, expressed as entities, relationships and terms. You can also think about the types + mentioned in brackets + + Guidelines: + - The sub-questions should be specific to the question and provide richer context for the question, + and or resolve ambiguities + - Each sub-question - when answered - should be relevant for the answer to the original question + - The sub-questions should be free from comparisions, ambiguities,judgements, aggregations, or any + other complications that may require extra context. + - The sub-questions MUST have the full context of the original question so that it can be executed by + a RAG system independently without the original question available + (Example: + - initial question: "What is the capital of France?" + - bad sub-question: "What is the name of the river there?" + - good sub-question: "What is the name of the river that flows through Paris?" + - For each sub-question, please provide a short explanation for why it is a good sub-question. So + generate a list of dictionaries with the following format: + [{{"sub_question": , "explanation": , "search_term": }}, ...] + + \n\n + Here is the initial question: + \n ------- \n + {question} + \n ------- \n + + And here are the entities, relationships and terms extracted from the context: + \n ------- \n + {entity_term_extraction_str} + \n ------- \n + + Please generate the list of good, fully contextualized sub-questions that would help to address the + main question. Don't be too specific unless the original question is specific. + Please think through it step by step and then generate the list of json dictionaries with the following + format: + {{"sub_questions": [{{"sub_question": , + "explanation": , + "search_term": }}, + ...]}} """ + +#### Consolidations +COMBINED_CONTEXT = """------- + Below you will find useful information to answer the original question. First, you see a number of + sub-questions with their answers. This information should be considered to be more focussed and + somewhat more specific to the original question as it tries to contextualized facts. + After that will see the documents that were considered to be relevant to answer the original question. + + Here are the sub-questions and their answers: + \n\n {deep_answer_context} \n\n + \n\n Here are the documents that were considered to be relevant to answer the original question: + \n\n {formated_docs} \n\n + ---------------- + """ + +SUB_QUESTION_EXPLANATION_RANKER_PROMPT = """------- + Below you will find a question that we ultimately want to answer (the original question) and a list of + motivations in arbitrary order for generated sub-questions that are supposed to help us answering the + original question. The motivations are formatted as : . + (Again, the numbering is arbitrary and does not necessarily mean that 1 is the most relevant + motivation and 2 is less relevant.) + + Please rank the motivations in order of relevance for answering the original question. Also, try to + ensure that the top questions do not duplicate too much, i.e. that they are not too similar. + Ultimately, create a list with the motivation numbers where the number of the most relevant + motivations comes first. + + Here is the original question: + \n\n {original_question} \n\n + \n\n Here is the list of sub-question motivations: + \n\n {sub_question_explanations} \n\n + ---------------- + + Please think step by step and then generate the ranked list of motivations. + + Please format your answer as a json object in the following format: + {{"reasonning": , + "ranked_motivations": }} + """ + + +INITIAL_DECOMPOSITION_PROMPT = """ \n + Please decompose an initial user question into 2 or 3 appropriate sub-questions that help to + answer the original question. The purpose for this decomposition is to isolate individulal entities + (i.e., 'compare sales of company A and company B' -> 'what are sales for company A' + 'what are sales + for company B'), split ambiguous terms (i.e., 'what is our success with company A' -> 'what are our + sales with company A' + 'what is our market share with company A' + 'is company A a reference customer + for us'), etc. Each sub-question should be realistically be answerable by a good RAG system. \n + + For each sub-question, please also create one search term that can be used to retrieve relevant + documents from a document store. + + Here is the initial question: + \n ------- \n + {question} + \n ------- \n + + Please formulate your answer as a list of json objects with the following format: + + [{{"sub_question": , "search_term": }}, ...] + + Answer: + """ + +INITIAL_RAG_PROMPT = """ \n + You are an assistant for question-answering tasks. Use the information provided below - and only the + provided information - to answer the provided question. + + The information provided below consists of: + 1) a number of answered sub-questions - these are very important(!) and definitely should be + considered to answer the question. + 2) a number of documents that were also deemed relevant for the question. + + If you don't know the answer or if the provided information is empty or insufficient, just say + "I don't know". Do not use your internal knowledge! + + Again, only use the provided informationand do not use your internal knowledge! It is a matter of life + and death that you do NOT use your internal knowledge, just the provided information! + + Try to keep your answer concise. + + And here is the question and the provided information: + \n + \nQuestion:\n {question} + + \nAnswered Sub-questions:\n {answered_sub_questions} + + \nContext:\n {context} \n\n + \n\n + + Answer:""" + +ENTITY_TERM_PROMPT = """ \n + Based on the original question and the context retieved from a dataset, please generate a list of + entities (e.g. companies, organizations, industries, products, locations, etc.), terms and concepts + (e.g. sales, revenue, etc.) that are relevant for the question, plus their relations to each other. + + \n\n + Here is the original question: + \n ------- \n + {question} + \n ------- \n + And here is the context retrieved: + \n ------- \n + {context} + \n ------- \n + + Please format your answer as a json object in the following format: + + {{"retrieved_entities_relationships": {{ + "entities": [{{ + "entity_name": , + "entity_type": + }}], + "relationships": [{{ + "name": , + "type": , + "entities": [, ] + }}], + "terms": [{{ + "term_name": , + "term_type": , + "similar_to": + }}] + }} + }} + """ diff --git a/backend/danswer/agent_search/shared_graph_utils/utils.py b/backend/danswer/agent_search/shared_graph_utils/utils.py new file mode 100644 index 00000000000..24c505ac585 --- /dev/null +++ b/backend/danswer/agent_search/shared_graph_utils/utils.py @@ -0,0 +1,101 @@ +import ast +import json +import re +from collections.abc import Sequence +from datetime import datetime +from datetime import timedelta +from typing import Any + +from danswer.context.search.models import InferenceSection + + +def normalize_whitespace(text: str) -> str: + """Normalize whitespace in text to single spaces and strip leading/trailing whitespace.""" + import re + + return re.sub(r"\s+", " ", text.strip()) + + +# Post-processing +def format_docs(docs: Sequence[InferenceSection]) -> str: + return "\n\n".join(doc.combined_content for doc in docs) + + +def clean_and_parse_list_string(json_string: str) -> list[dict]: + # Remove any prefixes/labels before the actual JSON content + json_string = re.sub(r"^.*?(?=\[)", "", json_string, flags=re.DOTALL) + + # Remove markdown code block markers and any newline prefixes + cleaned_string = re.sub(r"```json\n|\n```", "", json_string) + cleaned_string = cleaned_string.replace("\\n", " ").replace("\n", " ") + cleaned_string = " ".join(cleaned_string.split()) + + # Try parsing with json.loads first, fall back to ast.literal_eval + try: + return json.loads(cleaned_string) + except json.JSONDecodeError: + try: + return ast.literal_eval(cleaned_string) + except (ValueError, SyntaxError) as e: + raise ValueError(f"Failed to parse JSON string: {cleaned_string}") from e + + +def clean_and_parse_json_string(json_string: str) -> dict[str, Any]: + # Remove markdown code block markers and any newline prefixes + cleaned_string = re.sub(r"```json\n|\n```", "", json_string) + cleaned_string = cleaned_string.replace("\\n", " ").replace("\n", " ") + cleaned_string = " ".join(cleaned_string.split()) + # Parse the cleaned string into a Python dictionary + return json.loads(cleaned_string) + + +def format_entity_term_extraction(entity_term_extraction_dict: dict[str, Any]) -> str: + entities = entity_term_extraction_dict["entities"] + terms = entity_term_extraction_dict["terms"] + relationships = entity_term_extraction_dict["relationships"] + + entity_strs = ["\nEntities:\n"] + for entity in entities: + entity_str = f"{entity['entity_name']} ({entity['entity_type']})" + entity_strs.append(entity_str) + + entity_str = "\n - ".join(entity_strs) + + relationship_strs = ["\n\nRelationships:\n"] + for relationship in relationships: + relationship_str = f"{relationship['name']} ({relationship['type']}): {relationship['entities']}" + relationship_strs.append(relationship_str) + + relationship_str = "\n - ".join(relationship_strs) + + term_strs = ["\n\nTerms:\n"] + for term in terms: + term_str = f"{term['term_name']} ({term['term_type']}): similar to {term['similar_to']}" + term_strs.append(term_str) + + term_str = "\n - ".join(term_strs) + + return "\n".join(entity_strs + relationship_strs + term_strs) + + +def _format_time_delta(time: timedelta) -> str: + seconds_from_start = f"{((time).seconds):03d}" + microseconds_from_start = f"{((time).microseconds):06d}" + return f"{seconds_from_start}.{microseconds_from_start}" + + +def generate_log_message( + message: str, + node_start_time: datetime, + graph_start_time: datetime | None = None, +) -> str: + current_time = datetime.now() + + if graph_start_time is not None: + graph_time_str = _format_time_delta(current_time - graph_start_time) + else: + graph_time_str = "N/A" + + node_time_str = _format_time_delta(current_time - node_start_time) + + return f"{graph_time_str} ({node_time_str} s): {message}" diff --git a/backend/danswer/tools/message.py b/backend/danswer/tools/message.py index b0259c29b2a..6d261a8bf11 100644 --- a/backend/danswer/tools/message.py +++ b/backend/danswer/tools/message.py @@ -25,6 +25,9 @@ class ToolCallSummary(BaseModel__v1): tool_call_request: AIMessage tool_call_result: ToolMessage + class Config: + arbitrary_types_allowed = True + def tool_call_tokens( tool_call_summary: ToolCallSummary, llm_tokenizer: BaseTokenizer diff --git a/backend/danswer/utils/timing.py b/backend/danswer/utils/timing.py index 0d4eb7a14d4..4aa2a8e5483 100644 --- a/backend/danswer/utils/timing.py +++ b/backend/danswer/utils/timing.py @@ -33,12 +33,12 @@ def wrapped_func(*args: Any, **kwargs: Any) -> Any: elapsed_time_str = f"{elapsed_time:.3f}" log_name = func_name or func.__name__ args_str = f" args={args} kwargs={kwargs}" if include_args else "" - final_log = f"{log_name}{args_str} took {elapsed_time_str} seconds" - if debug_only: - logger.debug(final_log) - else: - # These are generally more important logs so the level is a bit higher - logger.notice(final_log) + f"{log_name}{args_str} took {elapsed_time_str} seconds" + # if debug_only: + # logger.debug(final_log) + # else: + # # These are generally more important logs so the level is a bit higher + # logger.notice(final_log) if not print_only: optional_telemetry( diff --git a/backend/requirements/default.txt b/backend/requirements/default.txt index 8a13bb8a74f..01a99c975fd 100644 --- a/backend/requirements/default.txt +++ b/backend/requirements/default.txt @@ -26,9 +26,14 @@ huggingface-hub==0.20.1 jira==3.5.1 jsonref==1.1.0 trafilatura==1.12.2 -langchain==0.1.17 -langchain-core==0.1.50 -langchain-text-splitters==0.0.1 +langchain==0.3.7 +langchain-core==0.3.24 +langchain-openai==0.2.9 +langchain-text-splitters==0.3.2 +langchainhub==0.1.21 +langgraph==0.2.59 +langgraph-checkpoint==2.0.5 +langgraph-sdk==0.1.44 litellm==1.53.1 lxml==5.3.0 lxml_html_clean==0.2.2