diff --git a/cookbook/slackbot/api.py b/cookbook/slackbot/api.py index 3bf4585aa..f9eb96df9 100644 --- a/cookbook/slackbot/api.py +++ b/cookbook/slackbot/api.py @@ -50,7 +50,9 @@ async def handle_message(payload: SlackPayload, db: Database): return Completed(message="Message too long", name="SKIPPED") if re.search(BOT_MENTION, user_message) and payload.authorizations: - logger.info(f"Processing message in thread {thread_ts}") + logger.info( + f"Processing message in thread {thread_ts}\nUser message: {cleaned_message}" + ) conversation = await db.get_thread_messages(thread_ts) user_context = build_user_context( @@ -58,7 +60,7 @@ async def handle_message(payload: SlackPayload, db: Database): user_question=cleaned_message, ) - result = await task(agent.run)( + result = await flow(agent.run)( user_prompt=cleaned_message, message_history=conversation, deps=user_context, diff --git a/cookbook/slackbot/core.py b/cookbook/slackbot/core.py index c2446f629..132c2dcc7 100644 --- a/cookbook/slackbot/core.py +++ b/cookbook/slackbot/core.py @@ -20,7 +20,10 @@ from raggy.documents import Document from raggy.vectorstores.tpuf import TurboPuffer, multi_query_tpuf from search import ( + explore_module_offerings, get_latest_prefect_release_notes, + review_common_3x_gotchas, + review_top_level_prefect_api, search_controlflow_docs, search_prefect_2x_docs, search_prefect_3x_docs, @@ -38,11 +41,14 @@ USE TOOLS REPEATEDLY to gather context from the docs, github issues or other tools. Any notes you take about the user will be automatically stored for your next interaction with them. Assume no knowledge of Prefect syntax without reading docs. ALWAYS include relevant links from tool outputs. +Always review the top level API of Prefect before offering code examples to avoid offering fake imports. + Generally, follow this pattern while generating each response: 1) If user offers info about their stack or objectives -> store relevant facts and continue to following steps 2) Use tools to gather context about Prefect concepts related to their question -3) Compile relevant facts and context into a single, CONCISE answer -4) If user asks a follow-up question, repeat steps 2-3 +3) Review the top level API of Prefect and drill into submodules that may be related to the user's question +4) Compile relevant facts and context into a single, CONCISE answer +5) If user asks a follow-up question, repeat steps 2-3 NEVER reference features, syntax, imports or env vars that you do not explicitly find in the docs. If not explicitly stated, assume that the user is using Prefect 3.x and vocalize this assumption. If asked an ambiguous question, simply state what you know about the user and your capabilities.""" @@ -187,6 +193,9 @@ def create_agent( search_prefect_3x_docs, search_controlflow_docs, read_github_issues, + review_top_level_prefect_api, + explore_module_offerings, + review_common_3x_gotchas, ], deps_type=UserContext, ) diff --git a/cookbook/slackbot/modules.py b/cookbook/slackbot/modules.py new file mode 100644 index 000000000..783dbd18d --- /dev/null +++ b/cookbook/slackbot/modules.py @@ -0,0 +1,190 @@ +import importlib +import inspect +import pkgutil +from types import ModuleType +from typing import Any + + +class ModuleTreeExplorer: + def __init__(self, root_module_path: str, max_depth: int = 2): + """ + Initialize the module tree explorer with a root module path. + + Args: + root_module_path: String representing the root module (e.g., 'prefect.runtime') + max_depth: Maximum depth to explore in the module tree (default: 2) + """ + self.root_module_path = root_module_path + self.max_depth = max_depth + self.tree = {} + + def _import_module(self, module_path: str) -> ModuleType | None: + """Safely import a module. + + Args: + module_path: String representing the module path (e.g., 'prefect.runtime') + """ + try: + return importlib.import_module(module_path) + except (ImportError, TypeError) as e: + print(f"Warning: Could not import {module_path}: {e}") + return None + + def _is_defined_in_module(self, item: Any, current_module: str) -> bool: + """Check if an item is defined in the current module. + + Args: + item: The item to check + current_module: The current module path + """ + try: + if inspect.ismodule(item): + return False + + # Get the module where this item is defined + if hasattr(item, "__module__"): + return item.__module__ == current_module + + return False + except Exception: + return False + + def _get_module_public_api( + self, module: ModuleType, module_path: str | None = None + ) -> dict[str, list[str]]: + """Get the public API of a module. + + Args: + module: The module to get the public API of + module_path: The path of the module (e.g., 'prefect.runtime') + """ + api = {"all": [], "classes": [], "functions": [], "constants": []} + + try: + if hasattr(module, "__all__"): + api["all"] = list(module.__all__) + items = { + name: getattr(module, name, None) + for name in module.__all__ + if hasattr(module, name) + } + else: + # Get non-underscore attributes + items = { + name: getattr(module, name, None) + for name in dir(module) + if not name.startswith("_") + } + + # Categorize items that we can safely inspect and are defined in our module + module_name = module_path or module.__name__ + for name, item in items.items(): + try: + if item is not None and self._is_defined_in_module( + item, module_name + ): + if inspect.isclass(item): + api["classes"].append(name) + elif inspect.isfunction(item): + api["functions"].append(name) + elif not inspect.ismodule(item): + api["constants"].append(name) + except (TypeError, ValueError): + # Skip items we can't properly inspect + continue + + except Exception as e: + print(f"Warning: Error inspecting module {module.__name__}: {e}") + + return api + + def _explore_submodules( + self, module: ModuleType, current_depth: int = 0 + ) -> dict[str, Any]: + """Recursively explore submodules and their APIs. + + Args: + module: The module to explore + current_depth: The current depth of the exploration + """ + result = { + "api": self._get_module_public_api(module, module.__name__), + "submodules": {}, + } + + if current_depth < self.max_depth: + try: + if hasattr(module, "__path__"): + for _, name, _ in pkgutil.iter_modules(module.__path__): + try: + full_name = f"{module.__name__}.{name}" + submodule = self._import_module(full_name) + if submodule: + result["submodules"][name] = self._explore_submodules( + submodule, current_depth + 1 + ) + except Exception as e: + print(f"Warning: Error exploring submodule {name}: {e}") + continue + except Exception as e: + print( + f"Warning: Error accessing module path for {module.__name__}: {e}" + ) + + return result + + def explore(self) -> dict[str, Any]: + """Explore the module tree starting from the root module. + + Returns: + dict[str, Any]: The explored module tree + """ + root_module = self._import_module(self.root_module_path) + if root_module: + self.tree = self._explore_submodules(root_module) + return self.tree + + def get_tree_string( + self, tree: dict[str, Any] | None = None, prefix: str = "", is_last: bool = True + ) -> str: + """Generate the module tree as a string in a hierarchical format. + + Args: + tree: The module tree to generate a string for + prefix: The prefix to use for the tree + is_last: Whether the current module is the last in its parent + """ + lines = [] + if tree is None: + tree = self.tree + lines.append(f"📦 {self.root_module_path}") + + api = tree.get("api", {}) + indent = " " if is_last else "│ " + + # Add public API categories + if api.get("all"): + lines.append( + f"{prefix}{'└── ' if is_last else '├── '}📜 __all__: {', '.join(api['all'])}" + ) + + for category, items in api.items(): + if category != "all" and items: + lines.append( + f"{prefix}{'└── ' if is_last and not tree['submodules'] else '├── '}" + f"{'🔷' if category == 'classes' else '⚡' if category == 'functions' else '📌'} " + f"{category}: {', '.join(sorted(items))}" + ) + + # Add submodules + submodules = tree.get("submodules", {}) + for idx, (name, subtree) in enumerate(submodules.items()): + is_last_module = idx == len(submodules) - 1 + lines.append(f"{prefix}{'└── ' if is_last_module else '├── '}📦 {name}") + lines.extend( + self.get_tree_string( + subtree, prefix + indent, is_last_module + ).splitlines() + ) + + return "\n".join(lines) diff --git a/cookbook/slackbot/search.py b/cookbook/slackbot/search.py index 6cae13d90..a10b2eacf 100644 --- a/cookbook/slackbot/search.py +++ b/cookbook/slackbot/search.py @@ -1,79 +1,55 @@ -import asyncio -from typing import Annotated, TypedDict - import httpx import turbopuffer as tpuf -from prefect import flow, task +from modules import ModuleTreeExplorer +from prefect import task from prefect.blocks.system import Secret -from prefect.cache_policies import NONE -from pydantic import AnyUrl, Field -from pydantic_ai import Agent, RunContext -from pydantic_ai.settings import ModelSettings from raggy.vectorstores.tpuf import multi_query_tpuf -Observation = Annotated[str, Field(description="A single observation")] - - -class MainPoints(TypedDict): - main_points: list[Observation] - relevant_links: list[AnyUrl] +@task +def review_top_level_prefect_api() -> str: + """ + Review the available submodules and the top-level API of Prefect. + """ + explorer = ModuleTreeExplorer("prefect", max_depth=0) + explorer.explore() + return explorer.get_tree_string() -class DocsAgentContext(TypedDict): - namespace: str - user_objective: str +@task +def review_common_3x_gotchas() -> list[str]: + """If needed, review common sources of confusion for Prefect 3.x users.""" + return [ + ".map and .submit are always synchronous, even if the underlying function is asynchronous", + "futures returned by .map can be resolved together, like integers = double.map(range(10)).result()", + "futures must be resolved by passing them to another task, returning them or manually calling .result() or .wait()", + "agents are replaced by workers in prefect 3.x, work pools replace the infra blocks from prefect.infrastructure", + "prefect 3.x uses pydantic 2 and server data from prefect 2.x is not compatible with 3.x", + ] -docs_agent = Agent[DocsAgentContext, MainPoints]( - "openai:gpt-4o", - model_settings=ModelSettings(temperature=0), - system_prompt=( - "Summarize the query results into main points. " - "Use the search tool to narrow in on terms related to the user's objective." - ), - result_type=MainPoints, - deps_type=DocsAgentContext, -) +@task +def explore_module_offerings(module_path: str, max_depth: int = 1) -> str: + """ + Explore and return the public API tree of a specific module and its submodules as a string. -@docs_agent.tool # type: ignore -@task(cache_policy=NONE) -def expanded_search( - ctx: RunContext[DocsAgentContext], - queries: list[str], -) -> str: - """Expand a single given query to explore different facets of the query - that are relevant to the user's objective. + Args: + module_path: String representing the module path (e.g., 'prefect.runtime') + max_depth: Maximum depth to explore in the module tree (default: 2) - For example, given a question like: - "how to get task run id from flow run" + Returns: + str: A formatted string representation of the module tree - You might use the following queries: - - "how to get task run id from flow run" - - "retrieving metadata about the parent runtime context" - - "what metadata is stored in Prefect?" - - "what is available client side versus server side?" + Example: + >>> explore_module_tree('prefect.runtime', max_depth=0) """ - return multi_query_tpuf( - queries + [ctx.deps["user_objective"]], - namespace=ctx.deps["namespace"], - n_results=10, - ) - - -@flow(flow_run_name="run docs agent with {queries} in {namespace}") -async def run_docs_agent( - queries: list[str], namespace: str, user_objective: str -) -> str: - result = await docs_agent.run( - user_prompt="\n\n".join(queries), - deps={"namespace": namespace, "user_objective": user_objective}, - ) - return f"{result.data['main_points']}\n\n{result.data['relevant_links']}" + explorer = ModuleTreeExplorer(module_path, max_depth) + explorer.explore() + return explorer.get_tree_string() @task -def search_prefect_2x_docs(queries: list[str], hypothesized_user_objective: str) -> str: +def search_prefect_2x_docs(queries: list[str]) -> str: """Searches the Prefect documentation for the given queries. It is best to use more than one, short query to get the best results. @@ -89,13 +65,11 @@ def search_prefect_2x_docs(queries: list[str], hypothesized_user_objective: str) if not tpuf.api_key: tpuf.api_key = Secret.load("tpuf-api-key", _sync=True).get() # type: ignore - return asyncio.run( - run_docs_agent(queries, "prefect-2", hypothesized_user_objective) - ) + return multi_query_tpuf(queries, namespace="prefect-2", n_results=5) @task -def search_prefect_3x_docs(queries: list[str], hypothesized_user_objective: str) -> str: +def search_prefect_3x_docs(queries: list[str]) -> str: """Searches the Prefect documentation for the given queries. It is best to use more than one, short query to get the best results. @@ -110,24 +84,18 @@ def search_prefect_3x_docs(queries: list[str], hypothesized_user_objective: str) if not tpuf.api_key: tpuf.api_key = Secret.load("tpuf-api-key", _sync=True).get() # type: ignore - return asyncio.run( - run_docs_agent(queries, "prefect-3", hypothesized_user_objective) - ) + return multi_query_tpuf(queries, namespace="prefect-3", n_results=5) @task -def search_controlflow_docs( - queries: list[str], hypothesized_user_objective: str -) -> str: +def search_controlflow_docs(queries: list[str]) -> str: """Searches the ControlFlow documentation for the given queries. ControlFlow is an agentic framework built on top of Prefect 3.x. It is best to use more than one, short query to get the best results. """ - return asyncio.run( - run_docs_agent(queries, "controlflow", hypothesized_user_objective) - ) + return multi_query_tpuf(queries, namespace="controlflow", n_results=5) @task