diff --git a/old_src/agent/__init__.py b/old_src/agent/__init__.py deleted file mode 100644 index a0994e9..0000000 --- a/old_src/agent/__init__.py +++ /dev/null @@ -1,347 +0,0 @@ -import aiohttp -import time - -from telebot import types as telebot_types - -from .utils import calculate_number_of_tokens -from .tools import ToolExecutor -from .prompt_manager import PromptManager -from database import AsyncDatabase -from logger import MessageSpan - - -class Agent: - """ - A Chat Bot that generates informed prompts based on the current conversation history. - """ - - def __init__(self, agent_config: dict): - # Instance Configuration - self.model_name = agent_config["model"]["name"] - self.model_api_url = agent_config["model"]["api_url"] - self.model_engine = agent_config["model"]["engine"] - - # Model Parameters - self.max_completion_tokens = agent_config["model"]["max_completion_tokens"] - self.max_prompt_tokens = agent_config["model"]["max_prompt_tokens"] - self.temperature = agent_config["model"]["temperature"] - self.sampler_order = agent_config["model"]["sampler_order"] - self.top_p = agent_config["model"]["top_p"] - self.top_k = agent_config["model"]["top_k"] - self.stop_sequences = agent_config["chat_ml"]["stop_sequences"] - - # Agent Behavior - self.max_completion_tries = agent_config["agent"]["max_completion_tries"] - self.max_self_recurse_depth = agent_config["agent"]["max_self_recurse_depth"] - - # Initialize an empty Map to track open context slots on the server - self.model_chat_slots = {} - # Initialize the Tool Executor - self.tool_executor = ToolExecutor() - # Initialize the Prompt Manager - self.prompt_manager = PromptManager(agent_config) - - # State Helpers - - def name(self): - """ - Return the name of the chat bot - """ - return self.prompt_manager.name - - def set_name(self, name: str): - """ - Set the persona name for the chat bot - """ - self.prompt_manager.set_name(name) - - async def clear_chat(self, chat_id: str): - """ - Clear the chat from the model's available context - """ - if chat_id in self.model_chat_slots: - session, _ = self.model_chat_slots[chat_id] - await session.close() - del self.model_chat_slots[chat_id] - - async def clear_all_chats(self): - """ - Close all open sessions - """ - for session, _ in self.model_chat_slots.values(): - await session.close() - self.model_chat_slots = {} - - # Where the magic happens - - async def build_prompt( - self, - message: telebot_types.Message, - database: AsyncDatabase, - span: MessageSpan, - batch_size: int = 10, - offset: int = 1, - ) -> tuple[str, int]: - """ - Build the most up to date prompt for the chat given the current conversation history. - Max out at `token_limit` tokens. - - - chat: the chat to build the log for - - token_limit: the maximum number of tokens the log can use - - batch_size: the number of messages to pull at a time - """ - - # Keep track of how many tokens we're using - used_tokens = 0 - token_limit = self.max_prompt_tokens - - chat = message.chat - chat_id = chat.id - - # Generate our system prompt - system_prompt, system_prompt_tokens = self.prompt_manager.system_prompt( - message.chat, token_limit=token_limit - ) - - # Generate our agent prompt for the model to complete on - prompt_response, prompt_response_tokens = self.prompt_manager.prompt_response( - message=message, token_limit=token_limit - system_prompt_tokens - ) - - # Now start filling in the chat log with as many tokens as we can - basic_prompt_tokens = system_prompt_tokens + prompt_response_tokens - used_tokens += basic_prompt_tokens - chat_log_lines = [] - offset = 1 - done = False - try: - # Keep pulling messages until we're done or we've used up all our tokens - while not done and used_tokens < token_limit: - # Get `batch_size` messages from the chat history - messages = await database.get_chat_last_messages( - chat_id, batch_size, offset, span=span - ) - # Construct condition on whether to break due to no more messages - # If set we won't re-enter the loop - done = messages is None or len(messages) < batch_size - - # Iterate over the messages we've pulled - for message in messages: - line = self.prompt_manager.chat_message(message) - # Calculate the number of tokens this line would use - additional_tokens = calculate_number_of_tokens(line) - - # Break if this would exceed our token limit before appending to the log - if used_tokens + additional_tokens > token_limit: - done = True - # Break out of the for loop. - # Since we're 'done' we won't continue the while loop - break - - # Update our used tokens count - used_tokens += additional_tokens - - # Actually append the line to the log - chat_log_lines.append(line) - - # Update our offset - offset += batch_size - - # Now build our prompt in reverse - chat_log = "" - for line in reversed(chat_log_lines): - chat_log = f"{chat_log}{line}" - - return f"{system_prompt}{chat_log}{prompt_response}", used_tokens - except Exception as e: - # Log the error, but return the portion of the prompt we've built so far - span.warn( - f"Agent::build_prompt(): error building prompt: {str(e)}. Returning partial prompt." - ) - return f"{system_prompt}{prompt_response}", basic_prompt_tokens - - async def complete( - self, prompt: str, chat_id: int, span: MessageSpan - ) -> tuple[str, int]: - """ - Complete on a prompt against our model within a given number of tries. - - Returns a str containing the prompt's completion. - """ - - if chat_id in self.model_chat_slots: - session, slot_id = self.model_chat_slots[chat_id] - else: - session = aiohttp.ClientSession() - slot_id = -1 - - params = { - "prompt": prompt, - "temperature": self.temperature, - "top_p": self.top_p, - "top_k": self.top_k, - } - - # Update the parameters based on the model engine - if self.model_engine == "llamacpp": - params.update( - { - "n_predict": self.max_completion_tokens, - # NOTE: for now just set both of these - "id_slot": slot_id, - "slot_id": slot_id, - "typical_p": 1, - "tfs_z": 1, - "stop": self.stop_sequences, - "cache_prompt": True, - "use_default_badwordsids": False, - } - ) - else: - raise Exception("Agent::complete(): unsupported model engine") - - max_tries = self.max_completion_tries - tries = 0 - errors = [] - compounded_result = "" - last_result = "" - while tries < max_tries: - span.debug( - f"Agent::complete(): attempt: {tries} | slot_id: {slot_id}", - ) - span.debug( - f"Agent::complete(): compounded result: {compounded_result}", - ) - # Append the compound result to the prompt - params["prompt"] = f"{params['prompt']}{last_result}" - try: - async with session.post(self.model_api_url, json=params) as response: - if response.status == 200: - # Read the response - response_data = await response.json() - - # Get the slot id from the response - if "id_slot" in response_data: - slot_id = response_data["id_slot"] - elif "slot_id" in response_data: - slot_id = response_data["slot_id"] - self.model_chat_slots[chat_id] = session, slot_id - - # Determine if we're stopped - stopped = ( - response_data["stopped_eos"] - or response_data["stopped_word"] - ) - last_result = response_data["content"] - compounded_result = f"{compounded_result}{last_result}" - token_count = calculate_number_of_tokens(compounded_result) - # If we're stopped, return the compounded result - if stopped: - return compounded_result, token_count - # Otherwise, check if we've exceeded the token limit, and return if so - elif token_count > self.max_completion_tokens: - span.warn("Agent::complete(): Exceeded token limit") - return compounded_result, token_count - else: - raise Exception( - f"Agent::complete(): Non 200 status code: {response.status}" - ) - except Exception as e: - span.warn(f"Agent::complete(): Error completing prompt: {str(e)}") - errors.append(e) - finally: - tries += 1 - # If we get here, we've failed to complete the prompt after max_tries - raise Exception(f"Agent::complete(): Failed to complete prompt: {errors}") - - # TODO: split out the response yielding from rendering the response - async def yield_response( - self, message: telebot_types.Message, database: AsyncDatabase, span: MessageSpan - ): - """ - Yield a response from the agent given it's current state and the message it's responding to. - - Yield a string containing the response. - """ - start = time.time() - span.info("Agent::yield_response()") - - chat_id = message.chat.id - max_self_recurse_depth = self.max_self_recurse_depth - - # Build the prompt - prompt, used_tokens = await self.build_prompt(message, database, span) - - span.info(f"Agent::yield_response(): prompt used tokens: {used_tokens}") - span.debug("Agent::yield_response(): prompt built: " + prompt) - - try: - # Keep track of the tokens we've seen out of completion - completion_tokens = 0 - # Keep track of the tokens we generate from the completion - recursive_tokens = 0 - # Keep track of the depth of self recursion - self_recurse_depth = 0 - while self_recurse_depth < max_self_recurse_depth: - # Complete and determine the tokens used - completion, used_tokens = await self.complete(prompt, chat_id, span) - completion_tokens += used_tokens - - span.debug("Agent::yield_response(): completion: " + completion) - - tool_message = None - try: - # Attempt to parse a tool call from the completion - tool_message = self.tool_executor.handle_completion( - completion, self_recurse_depth, span - ) - - span.debug( - f"Agent::yield_response(): tool_message: {tool_message}", - ) - except Exception as e: - span.warn( - "Error handling tools message: " + str(e), - ) - raise e - finally: - # If there's nothing to do, return the completion - if tool_message is None: - total_time = time.time() - start - # Log - # - completion tokens -- how many tokens the model generated - # - recursive tokens -- how many tokens we generated due to recursion - # - depth -- how deep we recursed - # - time -- how long we spent - span.info( - f"Agent::yield_response(): completion tokens: {completion_tokens} | recursive tokens: {recursive_tokens} | depth: {self_recurse_depth} | time: {total_time}" - ) - yield completion - return - - self_recurse_depth += 1 - - if self_recurse_depth >= max_self_recurse_depth: - span.warn( - "Agent::yield_response(): Function call depth exceeded", - ) - raise Exception("Function call depth exceeded") - - # Build up the new prompt and recurse - tool_message = self.prompt_manager.tool_message(tool_message) - # Keep track of the tokens we generate from the tool message - recursive_tokens += calculate_number_of_tokens(tool_message) - - # TODO: I should probably check that the token limit isn't exceeded here - # For now, just let it go through no matter how big the recursion gets - prompt, _ = self.prompt_manager.prompt_response( - f"{prompt}{completion}{tool_message}", token_limit=-1 - ) - - span.debug( - f"Agent::yield_response(): continuing recursion on prompt: {prompt}" - ) - - yield "Gathering some more information to help you better..." - except Exception as e: - raise e diff --git a/old_src/agent/prompt_manager.py b/old_src/agent/prompt_manager.py deleted file mode 100644 index 564a24a..0000000 --- a/old_src/agent/prompt_manager.py +++ /dev/null @@ -1,170 +0,0 @@ -import datetime -from pydantic import BaseModel -import yaml -import json - -from .utils import calculate_number_of_tokens, fmt_chat_details, fmt_msg_user_name -from .tools import get_tools, FunctionCall - -from telebot import types as telebot_types -import database - - -class SystemPromptSchema(BaseModel): - """ - Description of the agent's system prompt - """ - - # Role of the agent within the chat - # Configurable by: - # name - the username of the agent - # date - the current date - Role: str - # The Stated Objective of the agent - Objective: str - # The tools available to the agent - # Configurable by: - # tools - a formatted list of tools available to the agent - # tool_schema - json schema for calling tools - # max_self_recurse_depth - the maximum depth of recursion the agent can use - Tools: str - - -class PromptManager: - """ - PromptManager is responsible for formatting prompts based on simple data types and schemas - It should most definitely not be responsible for interacting with the model or application state - """ - - def __init__(self, agent_config: dict): - # Chat ML Configuration - self.user_prepend = agent_config["chat_ml"]["user_prepend"] - self.user_append = agent_config["chat_ml"]["user_append"] - self.stop_sequences = agent_config["chat_ml"]["stop_sequences"] - self.line_separator = "\n" - - self.tools = get_tools() - self.tool_schema = json.loads(FunctionCall.schema_json()) - self.max_self_recurse_depth = agent_config["agent"]["max_self_recurse_depth"] - - # Persona Configuration and Templates - self.name = "chat-bot" - with open(agent_config["agent"]["system_prompt_template"], "r") as f: - yaml_content = yaml.safe_load(f) - self.system_prompt_schema = SystemPromptSchema( - Role=yaml_content.get("Role", ""), - Objective=yaml_content.get("Objective", ""), - Tools=yaml_content.get("Tools", ""), - ) - - def set_name(self, name: str): - """ - Set the persona name for the chat bot - """ - self.name = name - - def tool_message(self, tool_message: str) -> str: - """ - Format a tool message into a proper chat log line - - tool: the tool message to format - Returns a string representing the chat log line - """ - # NOTE: this is a lil hacky, but in every context we generate this message, we're appeninfing - # to a completed response, so add a user append at the beginning - return f"{self.line_separator}{self.user_append}{self.line_separator}{self.user_prepend}tool{self.line_separator}{tool_message}{self.user_append}{self.line_separator}" - - def chat_message(self, message: telebot_types.Message | database.Message) -> str: - """ - Format either a telebot message or a database message into a chat log line - - - message: the message to format -- either a telebot.types.Message or a database.Message - - Returns a string representing the chat log line - """ - - from_user_name = fmt_msg_user_name(message.from_user) - is_reply = message.reply_to_message is not None - - sender = from_user_name - if is_reply: - to_user_name = fmt_msg_user_name(message.reply_to_message.from_user) - sender = f"{from_user_name} (in reply to {to_user_name})" - return f"{self.user_prepend}{sender}{self.line_separator}{message.text}{self.line_separator}{self.user_append}{self.line_separator}" - - def prompt_response( - self, - message: telebot_types.Message | database.Message | str | None = None, - text: str = "", - token_limit: int = 2048, - ) -> tuple[str, int]: - """ - Prompt a simple response from the model: - - message: the message to prompt a response from (optional) - - text: text to start the model off on (optional) - - token_limit: the maximum number of tokens the prompt can use - - Returns a tuple of (prompt, used_tokens) - """ - - base = "" - if message is not None and isinstance( - message, (telebot_types.Message, database.Message) - ): - base = self.chat_message(message) - elif message is not None and isinstance(message, str): - base = message - - prompt = f"{base}{self.user_prepend}{self.name}{self.line_separator}{text}" - - used_tokens = calculate_number_of_tokens(prompt) - - if token_limit == -1: - return prompt, used_tokens - elif used_tokens > token_limit: - raise Exception("prompt_response(): prompt exceeds token limit") - - return prompt, used_tokens - - def system_prompt( - self, - chat: telebot_types.Chat, - token_limit: int = 2048, - ) -> tuple[str, int]: - """ - Build a system prompt for a specific chat - - - chat: the chat to build the system prompt for - - token_limit: the maximum number of tokens the prompt can use - - Returns a tuple of (system_prompt, used_tokens) - """ - - # Format the date as a human readble string to give to the system - # Day of the week, Month Day, Year @ Hour:Minute:Second - # ex. "Monday, January 1, 2021 @ 12:00:00" - date = datetime.datetime.now().strftime("%A, %B %d, %Y @ %H:%M:%S") - variables = { - "date": date, - "name": self.name, - "chat_details": fmt_chat_details(chat), - "tools": self.tools, - "tool_schema": self.tool_schema, - "max_self_recurse_depth": self.max_self_recurse_depth, - } - - system_prompt = "" - for _, value in self.system_prompt_schema.dict().items(): - formatted_value = value.format(**variables) - formatted_value = formatted_value.replace("\n", " ") - system_prompt += f"{formatted_value}" - - system_prompt = f"{self.user_prepend}SYSTEM{self.line_separator}{system_prompt}{self.user_append}{self.line_separator}" - - # Update our used tokens count - used_tokens = calculate_number_of_tokens(system_prompt) - - # Check if we're over our token limit - if used_tokens > token_limit: - raise Exception("build_system_prompt(): system prompt exceeds token limit") - - return system_prompt, used_tokens diff --git a/old_src/agent/tools/__init__.py b/old_src/agent/tools/__init__.py deleted file mode 100644 index dc40cc4..0000000 --- a/old_src/agent/tools/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .schema import FunctionCall -from .functions import get_tools -from .executor import ToolExecutor - -__all__ = ["get_tools", "FunctionCall", "ToolExecutor"] diff --git a/old_src/agent/tools/executor.py b/old_src/agent/tools/executor.py deleted file mode 100644 index 4d276a6..0000000 --- a/old_src/agent/tools/executor.py +++ /dev/null @@ -1,85 +0,0 @@ -import os -from os import sys - -from .validator import ( - validate_function_call_schema, -) -from .utils import ( - validate_and_extract_tool_calls, -) -from .functions import ( - get_tools, -) -from logger import MessageSpan - -# NOTE: the sys path append is necessary to import the functions module -sys.path.append(os.path.join(os.path.dirname(__file__), ".")) -import functions - - -class ToolExecutor: - def __init__(self): - self.tools = get_tools() - - def process_completion_and_validate(self, completion): - validation, tool_calls, error_message = validate_and_extract_tool_calls( - completion - ) - - if validation: - return tool_calls, error_message - else: - tool_calls = None - return tool_calls, error_message - - def execute_function_call(self, tool_call): - function_name = tool_call.get("name") - function_to_call = getattr(functions, function_name, None) - function_args = tool_call.get("arguments", {}) - - if not function_to_call: - raise ValueError(f"Function {function_name} not found") - - function_response = function_to_call(*function_args.values()) - results_dict = f'{{"name": "{function_name}", "content": {function_response}}}' - return results_dict - - def handle_completion( - self, completion: str, depth: int, span: MessageSpan, line_separator="\n" - ) -> str | None: - try: - tool_calls, error_message = self.process_completion_and_validate(completion) - tool_message = "Current call depth: " + str(depth) + line_separator - if tool_calls: - for tool_call in tool_calls: - validation, message = validate_function_call_schema( - tool_call, self.tools - ) - if validation: - span.info( - f"Function call {tool_call.get('name')} is valid | arguments: {tool_call.get('arguments')}" - ) - try: - function_response = self.execute_function_call(tool_call) - tool_message += f"{line_separator}{function_response}{line_separator}{line_separator}" - span.info( - f"Function call {tool_call.get('name')} executed successfully with response: {function_response}" - ) - except Exception as e: - span.error( - f"Error executing function call {tool_call.get('name')}: {e}" - ) - tool_name = tool_call.get("name") - tool_message += f"{line_separator}There was an error when executing the function: {tool_name}{line_separator}Here's the error traceback: {e}{line_separator}Please call this function again with correct arguments within XML tags {line_separator}{line_separator}" - else: - span.warn(f"Function call {tool_call.get('name')} is invalid") - tool_name = tool_call.get("name") - tool_message += f"{line_separator}There was an error validating function call against function signature: {tool_name}{line_separator}Here's the error traceback: {message}{line_separator}Please call this function again with correct arguments within XML tags {line_separator}{line_separator}" - return tool_message - elif error_message: - span.error(f"Error parsing function calls: {error_message}") - tool_message += f"{line_separator}There was an error parsing function calls{line_separator}Here's the error stack trace: {error_message}{line_separator}Please call the function again with correct syntax within XML tags {line_separator}{line_separator}" - return tool_message - return None - except Exception as e: - raise e diff --git a/old_src/agent/tools/functions.py b/old_src/agent/tools/functions.py deleted file mode 100644 index 97e76e3..0000000 --- a/old_src/agent/tools/functions.py +++ /dev/null @@ -1,233 +0,0 @@ -import requests -import collections.abc -#hyper needs the four following aliases to be done manually. -collections.Iterable = collections.abc.Iterable -collections.Mapping = collections.abc.Mapping -collections.MutableSet = collections.abc.MutableSet -collections.MutableMapping = collections.abc.MutableMapping - -from hyper.contrib import HTTP20Adapter - -import yfinance as yf - -from typing import List -from duckduckgo_search import DDGS -from langchain.tools import tool -from langchain_core.utils.function_calling import convert_to_openai_tool - - -from config import Config -CONFIG = Config() - -@tool -def google_search(query): - """ Search google results. - Args: - query (str): The query to search for. - Returns: - list: A list of dictionaries containing the title, link, snippet, and other information about the search results.""" - - r = requests.get("https://www.searchapi.io/api/v1/search", - params = { - "q": query, - "engine": "google", - "api_key": CONFIG.searchapi_token - }) - - results = r.json() - organic_results = results.get("organic_results") - for result in organic_results: - if "favicon" in result: - del result["favicon"] - if "snippet_highlighted_words" in result: - del result["snippet_highlighted_words"] - return organic_results - -@tool -def get_text_from_page(url): - """ Get the text from a page. - Args: - url (str): The URL of the page to get the text from. - Returns: - str: The text from the page.""" - r = requests.get(url) - text = r.text - # now we scrape all the html and crap to keep only the text - from bs4 import BeautifulSoup - soup = BeautifulSoup(text, 'html.parser') - # kill all script and style elements - for script in soup(["script", "style"]): - script.decompose() # rip it out - # get text - text = soup.get_text() - # break into lines and remove leading and trailing space on each - lines = (line.strip() for line in text.splitlines()) - # break multi-headlines into a line each - chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) - # drop blank lines - text = '\n'.join(chunk for chunk in chunks if chunk) - return text - -@tool -def duckduckgo_get_answer(query): - """ Get an answer from DuckDuckGo's Instant Answer API. - Args: - query (str): The query to search for. - Returns: - dict: A dictionary containing the answer, answer type, abstract, abstract source, abstract URL, definition, definition source, definition URL, and image. - """ - - session = requests.Session() - session.mount('https://', HTTP20Adapter()) - r = session.get("https://api.duckduckgo.com", - params = { - "q": query, - "format": "json", - "no_html": 1, - "skip_disambig": 1 - }) - output = r.json() - print(output) - return { - "answer": output.get("Answer"), - "answer_type": output.get("AnswerType"), - "abstract": output.get("Abstract"), - "abstract_text": output.get("AbstractText"), - "abstract_source": output.get("AbstractSource"), - "abstract_url": output.get("AbstractURL"), - "definition": output.get("Definition"), - "definition_source": output.get("DefinitionSource"), - "definition_url": output.get("DefinitionURL"), - "image": output.get("Image"), - } - - -# @tool -# def duckduckgo_search_text(query: str) -> dict: -# """ -# Search DuckDuckGo for the top result of a given text query. -# Use when probing for general information, or when a user requests a web search. -# Args: -# query (str): The query to search for. -# Returns: -# dict: the top 5 results from DuckDuckGo. If an error occurs, an exception is returns within the "error" key. -# """ -# try: -# search = DDGS() -# results = search.text(query, max_results=5) -# return {"results": results} -# except Exception as e: -# return {"error": str(e)} - - -# @tool -# def duckduckgo_search_answer(query: str) -> dict: -# """ -# Search DuckDuckGo for the top answer of a given question. -# Use when trying to answer a specific question that is outside the scope of the model's knowledge base. -# Args: -# query (str): The question to search for. -# Returns: -# dict: the top answer from DuckDuckGo. If an error occurs, an exception is returns within the "error" key. -# """ -# try: -# search = DDGS() -# results = search.answers(query) -# return {"results": results} -# except Exception as e: -# return {"error": str(e)} - - -@tool -def get_current_stock_price(symbol: str) -> float | None: - """ - Get the current stock price for a given symbol. - - Args: - symbol (str): The stock symbol. - - Returns: - float: The current stock price, or None if an error occurs. - """ - try: - stock = yf.Ticker(symbol) - # Use "regularMarketPrice" for regular market hours, or "currentPrice" for pre/post market - current_price = stock.info.get( - "regularMarketPrice", stock.info.get("currentPrice") - ) - return current_price if current_price else None - except Exception as _e: - return None - - -@tool -def get_current_cryptocurrency_price_usd(symbol: str) -> dict | None: - """ - Get current price of a cryptocurrency in USD. - - Args: - symbol (str): The non-truncated cryptocurrency name to get the price of in USD (e.g. "bitcoin", "ethereum", "solana", "aleph", etc.). - - Returns: - dict: The price of the cryptocurrency in the form {"coin": {"usd": }}, or None if an error occurs. - """ - - url = ( - f"https://api.coingecko.com/api/v3/simple/price?ids={symbol}&vs_currencies=usd" - ) - response = requests.get(url) - if response.status_code == 200: - output = response.json() - # CoinGecko returns an empty dictionary if the coin doesn't exist -- fun :upside_down_face: - if output == {}: - return None - return output - else: - return None - -@tool -def get_cryptocurrency_info(symbol: str) -> dict | None: - """ - Get the informations about a cryptocurrency. It includes where it's listed, - in what categories it is, the socials urls (twitter, telegram, homepage...), the market data as well in USD (includes information like the market cap, price, atl, ath...). - """ - - url = ( - f"https://api.coingecko.com/api/v3/coins/{symbol}?localization=false&tickers=false&market_data=true&community_data=false&developer_data=false&sparkline=false" - ) - response = requests.get(url) - if response.status_code == 200: - output = response.json() - if 'market_data' in output: - # let's keep only the usd price on all values that have currencies inside market_data - for key, value in output['market_data'].items(): - if isinstance(value, dict): - if 'usd' in value: - output['market_data'][key] = value['usd'] - # CoinGecko returns an empty dictionary if the coin doesn't exist -- fun :upside_down_face: - if output == {}: - return None - return output - else: - return None - - -def get_tools() -> List[dict]: - """ - Get our available tools as OpenAPI-compatible tools - """ - - # Register Functions Here - functions = [ - # duckduckgo_search_text, - # duckduckgo_search_answer, - google_search, - get_text_from_page, - duckduckgo_get_answer, - get_current_stock_price, - get_current_cryptocurrency_price_usd, - get_cryptocurrency_info - ] - - tools = [convert_to_openai_tool(f) for f in functions] - return tools diff --git a/old_src/agent/tools/schema.py b/old_src/agent/tools/schema.py deleted file mode 100644 index 1874b1a..0000000 --- a/old_src/agent/tools/schema.py +++ /dev/null @@ -1,23 +0,0 @@ -from pydantic import BaseModel -from typing import Dict, Literal, Optional - -class FunctionCall(BaseModel): - arguments: dict - """ - The arguments to call the function with, as generated by the model in JSON - format. Note that the model does not always generate valid JSON, and may - hallucinate parameters not defined by your function schema. Validate the - arguments in your code before calling your function. - """ - - name: str - """The name of the function to call.""" - -class FunctionDefinition(BaseModel): - name: str - description: Optional[str] = None - parameters: Optional[Dict[str, object]] = None - -class FunctionSignature(BaseModel): - function: FunctionDefinition - type: Literal["function"] diff --git a/old_src/agent/tools/utils.py b/old_src/agent/tools/utils.py deleted file mode 100644 index 7b56bf3..0000000 --- a/old_src/agent/tools/utils.py +++ /dev/null @@ -1,66 +0,0 @@ -import json -import ast -# import xml.etree.ElementTree as ET - - - -def parse_json_garbage(s): - s = s[next(idx for idx, c in enumerate(s) if c in "{["):] - try: - return json.loads(s) - except json.JSONDecodeError as e: - return json.loads(s[:e.pos]) - - -def validate_and_extract_tool_calls(agent_response): - validation_result = False - tool_calls = [] - error_message = None - - # Extract JSON data - try: - json_data = parse_json_garbage(agent_response) - if 'arguments' and 'name' in json_data: - tool_calls.append(json_data) - validation_result = True - except Exception as e: - # error_message = f"Cannot extract JSON data: {e}" - pass - - # try: - # # wrap content in root element - # xml_root_element = f"{agent_response}" - # root = ET.fromstring(xml_root_element) - - # # extract JSON data - # for element in root.findall(".//tool_call"): - # json_data = None - # try: - # json_text = element.text.strip() - # try: - # # Prioritize json.loads for better error handling - # json_data = json.loads(json_text) - # except json.JSONDecodeError as json_err: - # try: - # # Fallback to ast.literal_eval if json.loads fails - # json_data = ast.literal_eval(json_text) - # except (SyntaxError, ValueError) as eval_err: - # error_message = ( - # f"JSON parsing failed with both json.loads and ast.literal_eval:\n" - # f"- JSON Decode Error: {json_err}\n" - # f"- Fallback Syntax/Value Error: {eval_err}\n" - # f"- Problematic JSON text: {json_text}" - # ) - # continue - # except Exception as e: - # error_message = f"Cannot strip text: {e}" - - # if json_data is not None: - # tool_calls.append(json_data) - # validation_result = True - - # except ET.ParseError as err: - # error_message = f"XML Parse Error: {err}" - - # Return default values if no valid data is extracted - return validation_result, tool_calls, error_message diff --git a/old_src/agent/tools/validator.py b/old_src/agent/tools/validator.py deleted file mode 100644 index d3de5ac..0000000 --- a/old_src/agent/tools/validator.py +++ /dev/null @@ -1,151 +0,0 @@ -import ast -import json -from jsonschema import validate -from pydantic import ValidationError -from .schema import FunctionCall, FunctionSignature - - -def validate_function_call_schema(call, signatures): - try: - call_data = FunctionCall(**call) - except ValidationError as e: - return False, str(e) - - for signature in signatures: - try: - signature_data = FunctionSignature(**signature) - if signature_data.function.name == call_data.name: - # Validate types in function arguments - for arg_name, arg_schema in signature_data.function.parameters.get( - "properties", {} - ).items(): - if arg_name in call_data.arguments: - call_arg_value = call_data.arguments[arg_name] - if call_arg_value: - try: - validate_argument_type( - arg_name, call_arg_value, arg_schema - ) - except Exception as arg_validation_error: - return False, str(arg_validation_error) - - # Check if all required arguments are present - required_arguments = signature_data.function.parameters.get( - "required", [] - ) - result, missing_arguments = check_required_arguments( - call_data.arguments, required_arguments - ) - if not result: - return False, f"Missing required arguments: {missing_arguments}" - - return True, None - except Exception as e: - # Handle validation errors for the function signature - return False, str(e) - - # No matching function signature found - return False, f"No matching function signature found for function: {call_data.name}" - - -def check_required_arguments(call_arguments, required_arguments): - missing_arguments = [arg for arg in required_arguments if arg not in call_arguments] - return not bool(missing_arguments), missing_arguments - - -def validate_enum_value(arg_name, arg_value, enum_values): - if arg_value not in enum_values: - raise Exception( - f"Invalid value '{arg_value}' for parameter {arg_name}. Expected one of {', '.join(map(str, enum_values))}" - ) - - -def validate_argument_type(arg_name, arg_value, arg_schema): - arg_type = arg_schema.get("type", None) - if arg_type: - if arg_type == "string" and "enum" in arg_schema: - enum_values = arg_schema["enum"] - if None not in enum_values and enum_values != []: - try: - validate_enum_value(arg_name, arg_value, enum_values) - except Exception as e: - # Propagate the validation error message - raise Exception(f"Error validating function call: {e}") - - python_type = get_python_type(arg_type) - if not isinstance(arg_value, python_type): - raise Exception( - f"Type mismatch for parameter {arg_name}. Expected: {arg_type}, Got: {type(arg_value)}" - ) - - -def get_python_type(json_type): - type_mapping = { - "string": str, - "number": (int, float), - "integer": int, - "boolean": bool, - "array": list, - "object": dict, - "null": type(None), - } - return type_mapping[json_type] - - -def validate_json_data(json_object, json_schema): - valid = False - error_message = None - result_json = None - - try: - # Attempt to load JSON using json.loads - try: - result_json = json.loads(json_object) - except json.decoder.JSONDecodeError: - # If json.loads fails, try ast.literal_eval - try: - result_json = ast.literal_eval(json_object) - except (SyntaxError, ValueError) as e: - try: - result_json = extract_json_from_markdown(json_object) - except Exception as e: - error_message = f"JSON decoding error: {e}" - inference_logger.info( - f"Validation failed for JSON data: {error_message}" - ) - return valid, result_json, error_message - - # Return early if both json.loads and ast.literal_eval fail - if result_json is None: - error_message = "Failed to decode JSON data" - inference_logger.info(f"Validation failed for JSON data: {error_message}") - return valid, result_json, error_message - - # Validate each item in the list against schema if it's a list - if isinstance(result_json, list): - for index, item in enumerate(result_json): - try: - validate(instance=item, schema=json_schema) - inference_logger.info( - f"Item {index+1} is valid against the schema." - ) - except ValidationError as e: - error_message = f"Validation failed for item {index+1}: {e}" - break - else: - # Default to validation without list - try: - validate(instance=result_json, schema=json_schema) - except ValidationError as e: - error_message = f"Validation failed: {e}" - - except Exception as e: - error_message = f"Error occurred: {e}" - - if error_message is None: - valid = True - inference_logger.info("JSON data is valid against the schema.") - else: - inference_logger.info(f"Validation failed for JSON data: {error_message}") - - return valid, result_json, error_message diff --git a/old_src/agent/utils.py b/old_src/agent/utils.py deleted file mode 100644 index 6e43f5a..0000000 --- a/old_src/agent/utils.py +++ /dev/null @@ -1,41 +0,0 @@ -import sys -import nltk - -from telebot import types as telebot_types - -sys.path.append("..") -import database - - -def calculate_number_of_tokens(line: str): - """ - Determine the token length of a line of text - """ - tokens = nltk.word_tokenize(line) - return len(tokens) - - -def fmt_chat_details(chat: telebot_types.Chat, line_separator="\n"): - """ - Construct appropriate chat details for the model prompt - - Errors: - If the provided chat is neither a private nor group chat. - """ - - # TODO: some of these are only called on .getChat, these might not be available. Verify! - if chat.type in ["private"]: - return f"""Private Chat Details:{line_separator}Username: {chat.username or ""}{line_separator}First Name: {chat.first_name or ""}{line_separator}Last Name: {chat.last_name or ""}{line_separator}Bio: {chat.bio or ""}""" - - elif chat.type in ["group", "supergroup"]: - return f"""Group Chat Details:{line_separator}Title: {chat.title or ""}{line_separator}Description: {chat.description or ""}{line_separator}Members: {chat.active_usernames or ""}""" - else: - raise Exception("chat_details(): chat is neither private nor group") - - -def fmt_msg_user_name(user: database.User | telebot_types.User): - """ - Determine the appropriate identifier to which associate a user with - the chat context - """ - return user.username or ((user.first_name or "") + " " + (user.last_name or "")) diff --git a/scripts/dev.sh b/scripts/dev.sh index 04b29ce..491ee2c 100755 --- a/scripts/dev.sh +++ b/scripts/dev.sh @@ -5,7 +5,7 @@ source venv/bin/activate source .env export DATABASE_PATH=:memory: export DEBUG=True -python3 src/main.py +python3 -m src.main # Deactivate the virtual environment deactivate diff --git a/scripts/run.sh b/scripts/run.sh index 396e868..2eda137 100755 --- a/scripts/run.sh +++ b/scripts/run.sh @@ -15,7 +15,7 @@ if [ -z "$LOG_PATH" ]; then fi export DEBUG=False -python3 src/main.py >/dev/null 2>&1 +python3 -m src.main >/dev/null 2>&1 # Deactivate the virtual environment deactivate diff --git a/src/commands/clear.py b/src/commands/clear.py index c2f1aa9..c8e1399 100644 --- a/src/commands/clear.py +++ b/src/commands/clear.py @@ -17,7 +17,6 @@ async def clear_command_handler(message: Message): # Clear the chat history from the database and the agent chat_id = message.chat.id await config.DATABASE.clear_chat_history(chat_id) - await config.AGENT.clear_chat(chat_id) # TODO: this is not defined # Send a message to the user acknowledging the clear await config.BOT.edit_message_text( @@ -25,9 +24,7 @@ async def clear_command_handler(message: Message): message_id=reply.message_id, text="Chat history cleared.", ) - - # Ok - return None except Exception as e: span.error(f"Error handling /clear command: {e}") + finally: return None diff --git a/src/commands/help.py b/src/commands/help.py index 8c69e53..c3a0999 100644 --- a/src/commands/help.py +++ b/src/commands/help.py @@ -16,9 +16,7 @@ async def help_command_handler(message: Message): for command, description in config.BOT_COMMANDS: help_text += f"/{command} - {description}\n" await config.BOT.reply_to(message, help_text) - - # Ok - return None except Exception as e: span.error(f"Error handling /help command: {e}") + finally: return None diff --git a/src/commands/message.py b/src/commands/message.py index 8919874..fbf8a0f 100644 --- a/src/commands/message.py +++ b/src/commands/message.py @@ -1,10 +1,19 @@ -from telebot.types import Message +import telebot.types as telebot_types +from libertai_agents.interfaces.messages import Message as LibertaiMessage +from libertai_agents.interfaces.messages import MessageRoleEnum from src.config import config -from src.utils.telegram import get_mentions_in_message +from src.utils.telegram import ( + get_formatted_message_content, + get_formatted_username, + get_mentions_in_message, +) +# Max number of messages we will pass +MESSAGES_NUMBER = 50 -async def text_message_handler(message: Message): + +async def text_message_handler(message: telebot_types.Message): """ Handle all text messages. Use the agent to construct an informed response @@ -13,6 +22,8 @@ async def text_message_handler(message: Message): span = config.LOGGER.get_span(message) span.info("Received text message") + reply: telebot_types.Message | None = None + try: chat_id = message.chat.id @@ -46,35 +57,46 @@ async def text_message_handler(message: Message): result = "I'm thinking..." reply = await config.BOT.reply_to(message, result) - # TODO: Implement rendering logic here based on the code and content - # Attempt to reply to the message - try: - async for content in config.AGENT.yield_response( - message, config.DATABASE, span - ): # TODO: not defined - # Check for an updated response, otherwise just do nothing - if content != result: - result = content - # Update the message - reply = await config.BOT.edit_message_text( - chat_id=chat_id, message_id=reply.message_id, text=result - ) - except Exception as e: - # Attempt to edit the message to indicate an error + messages: list[LibertaiMessage] = [] + + chat_history = await config.DATABASE.get_chat_last_messages( + chat_id, MESSAGES_NUMBER, span=span + ) + # Iterate over the messages we've pulled + for chat_msg in chat_history: + message_username = get_formatted_username(chat_msg.from_user) + message_content = get_formatted_message_content(chat_msg) + # TODO: support multiple users with names + role = ( + MessageRoleEnum.assistant + if message_username == get_formatted_username(config.BOT_INFO) + else MessageRoleEnum.user + ) + messages.append(LibertaiMessage(role=role, content=message_content)) + + # TODO: pass system prompt with chat details when libertai-agents new version released + async for response_msg in config.AGENT.generate_answer(messages): + if response_msg.content != result: + result = response_msg.content + # Update the message + reply = await config.BOT.edit_message_text( + chat_id=chat_id, message_id=reply.message_id, text=result + ) + + except Exception as e: + span.error(f"Error handling text message: {e}") + # Attempt to edit the message to indicate an error + + if reply is not None: reply = await config.BOT.edit_message_text( chat_id=message.chat.id, message_id=reply.message_id, text="I'm sorry, I got confused. Please try again.", ) - # Raise the error up to our handler - raise e - finally: - # Attempt to update the message history to reflect the final response + finally: + # Attempt to update the message history to reflect the final response + if reply is not None: await config.DATABASE.add_message( reply, use_edit_date=True, reply_to_message_id=message.message_id ) - - except Exception as e: - span.error(f"Error handling text message: {e}") - finally: return None diff --git a/src/utils/database.py b/src/utils/database.py index bdc118d..720fafb 100644 --- a/src/utils/database.py +++ b/src/utils/database.py @@ -149,7 +149,7 @@ async def get_chat_last_messages( limit: int = 10, offset: int = 0, span: MessageSpan | None = None, - ): + ) -> list[Message]: """ Get the last messages in a chat in batches diff --git a/src/utils/telegram.py b/src/utils/telegram.py index 633a5b9..803751b 100644 --- a/src/utils/telegram.py +++ b/src/utils/telegram.py @@ -1,4 +1,4 @@ -from telebot.types import Message +from telebot.types import Message, User def get_mentions_in_message(message: Message) -> list[str]: @@ -12,3 +12,25 @@ def get_mentions_in_message(message: Message) -> list[str]: mention_text = message.text[entity.offset : entity.offset + entity.length] mentions.append(mention_text) return mentions + + +def get_formatted_username(user: User) -> str: + """ + Determine the appropriate identifier to which associate a user with + the chat context + """ + return user.username or f'{user.first_name or ""} {user.last_name or ""}' + + +def get_formatted_message_content(message: Message) -> str: + """ + Format either a telebot message into a string representing its content + """ + sender = get_formatted_username(message.from_user) + is_reply = message.reply_to_message is not None + + if is_reply: + reply_to_username = get_formatted_username(message.reply_to_message.from_user) + sender = f"{sender} (in reply to {reply_to_username})" + + return f"{sender}\n{message.text}" diff --git a/templates/system.yaml b/templates/system.yaml deleted file mode 100644 index f1158ab..0000000 --- a/templates/system.yaml +++ /dev/null @@ -1,13 +0,0 @@ -# Relevant personal bio -# {name} - the username of the telegram bot -# {date} - today's date -# {chat_details} - the details of the chat the bot is participating in -Role: | - You are a function calling AI Chat Assistant running on a decentralized LLM. - Your role is to assist chat participants with their questions and concerns, using the resources available to you. - You can call only one function at a time and analyse data you get from function response. - You are provided with function signatures within XML tags. - The current date is: {date}. - You will be addressed as {name}, bot, chat-bot, or assistant. - You will be participating in the following chat: - {chat_details}