diff --git a/mindsdb/api/executor/datahub/datanodes/mindsdb_tables.py b/mindsdb/api/executor/datahub/datanodes/mindsdb_tables.py index 9fe8cc5d4d7..5dc0d5bd4f5 100644 --- a/mindsdb/api/executor/datahub/datanodes/mindsdb_tables.py +++ b/mindsdb/api/executor/datahub/datanodes/mindsdb_tables.py @@ -291,6 +291,7 @@ class ChatbotsTable(MdbTable): "PARAMS", "IS_RUNNING", "LAST_ERROR", + "WEBHOOK_TOKEN", ] @classmethod diff --git a/mindsdb/api/http/initialize.py b/mindsdb/api/http/initialize.py index fde90446971..390068b424a 100644 --- a/mindsdb/api/http/initialize.py +++ b/mindsdb/api/http/initialize.py @@ -38,6 +38,7 @@ from mindsdb.api.http.namespaces.tree import ns_conf as tree_ns from mindsdb.api.http.namespaces.views import ns_conf as views_ns from mindsdb.api.http.namespaces.util import ns_conf as utils_ns +from mindsdb.api.http.namespaces.webhooks import ns_conf as webhooks_ns from mindsdb.interfaces.database.integrations import integration_controller from mindsdb.interfaces.database.database import DatabaseController from mindsdb.interfaces.file.file_controller import FileController @@ -246,6 +247,7 @@ def root_index(path): api.add_namespace(ns) api.add_namespace(default_ns) api.add_namespace(auth_ns) + api.add_namespace(webhooks_ns) @api.errorhandler(Exception) def handle_exception(e): diff --git a/mindsdb/api/http/namespaces/configs/webhooks.py b/mindsdb/api/http/namespaces/configs/webhooks.py new file mode 100644 index 00000000000..bbea6f6f225 --- /dev/null +++ b/mindsdb/api/http/namespaces/configs/webhooks.py @@ -0,0 +1,3 @@ +from flask_restx import Namespace + +ns_conf = Namespace('webhooks', description='API to receive messages from bots') diff --git a/mindsdb/api/http/namespaces/webhooks.py b/mindsdb/api/http/namespaces/webhooks.py new file mode 100644 index 00000000000..d7e849d28bd --- /dev/null +++ b/mindsdb/api/http/namespaces/webhooks.py @@ -0,0 +1,29 @@ +from flask import request +from flask_restx import Resource + +from mindsdb.api.http.namespaces.configs.webhooks import ns_conf +from mindsdb.interfaces.chatbot.chatbot_controller import ChatBotController +from mindsdb.metrics.metrics import api_endpoint_metrics + + +# Stores the memory of the various chat-bots mapped by their webhook tokens. +# This is required because each time a new request is made, a new instance of the ChatBotTask is created. +# This causes the memory to be lost. +chat_bot_memory = {} + + +@ns_conf.route('/chatbots/') +class ChatbotWebhooks(Resource): + @ns_conf.doc('chatbots_webhook') + @api_endpoint_metrics('POST', '/webhooks/chatbots/') + def post(self, webhook_token: str) -> None: + """ + This endpoint is used to receive messages posted by bots from different platforms. + + Args: + webhook_token (str): The token of the webhook. It is used to uniquely identify the webhook. + """ + request_data = request.json + + chat_bot_controller = ChatBotController() + return chat_bot_controller.on_webhook(webhook_token, request_data, chat_bot_memory) diff --git a/mindsdb/integrations/handlers/ms_teams_handler/ms_graph_api_teams_client.py b/mindsdb/integrations/handlers/ms_teams_handler/ms_graph_api_teams_client.py deleted file mode 100644 index 46ed582f611..00000000000 --- a/mindsdb/integrations/handlers/ms_teams_handler/ms_graph_api_teams_client.py +++ /dev/null @@ -1,318 +0,0 @@ -from typing import Text, List, Dict, Optional -from mindsdb.integrations.utilities.handlers.api_utilities.microsoft.ms_graph_api_utilities import MSGraphAPIBaseClient - - -class MSGraphAPITeamsClient(MSGraphAPIBaseClient): - """ - The Microsoft Graph API client for the Microsoft Teams handler. - This client is used for accessing the Microsoft Teams specific endpoints of the Microsoft Graph API. - Several common methods for submitting requests, fetching data, etc. are inherited from the base class. - """ - - def _get_group_ids(self) -> List[Text]: - """ - Get all group IDs related to Microsoft Teams. - - Returns - ------- - List[Text] - The group IDs. - """ - - if not self._group_ids: - api_url = self._get_api_url("groups") - # only get the id and resourceProvisioningOptions fields - params = {"$select": "id,resourceProvisioningOptions"} - groups = self._get_response_value_unsafe(self._make_request(api_url, params=params)) - # filter out only the groups that are related to Microsoft Teams - self._group_ids = [item["id"] for item in groups if "Team" in item["resourceProvisioningOptions"]] - - return self._group_ids - - def _get_channel_ids(self, group_id: Text) -> List[Text]: - """ - Get all channel IDs related to a group. - - Parameters - ---------- - group_id : Text - The ID of the group. - - Returns - ------- - List[Text] - The channel IDs for that group. - """ - - api_url = self._get_api_url(f"teams/{group_id}/channels") - channels_ids = self._get_response_value_unsafe(self._make_request(api_url)) - - return channels_ids - - def get_channel(self, group_id: Text, channel_id: Text) -> Dict: - """ - Get a channel by its ID and the ID of the group that it belongs to. - - Parameters - ---------- - group_id : str - The ID of the group that the channel belongs to. - - channel_id : str - The ID of the channel. - - Returns - ------- - Dict - The channel data. - """ - - api_url = self._get_api_url(f"teams/{group_id}/channels/{channel_id}") - channel = self._make_request(api_url) - # add the group ID to the channel data - channel.update({"teamId": group_id}) - - last_message = self.get_last_channel_message(group_id, channel_id) - channel["lastMessagePreview_id"] = last_message.get("id") - - return channel - - def get_channels(self) -> List[Dict]: - """ - Get all channels. - - Returns - ------- - List[Dict] - The channels data. - """ - - channels = [] - for group_id in self._get_group_ids(): - for group_channels in self._fetch_data(f"teams/{group_id}/channels", pagination=False): - for group_channel in group_channels: - # add the group ID to the channel data - group_channel.update({"teamId": group_id}) - - last_message = self.get_last_channel_message(group_id, group_channel["id"]) - group_channel["lastMessagePreview_id"] = last_message.get("id") - - channels.extend(group_channels) - - return channels - - def get_channel_message(self, group_id: Text, channel_id: Text, message_id: Text) -> Dict: - """ - Get a channel message by its ID and the IDs of the group and channel that it belongs to. - - Parameters - ---------- - group_id : str - The ID of the group that the channel belongs to. - - channel_id : str - The ID of the channel that the message belongs to. - - message_id : str - The ID of the message. - - Returns - ------- - Dict - The channel message data. - """ - - api_url = self._get_api_url(f"teams/{group_id}/channels/{channel_id}/messages/{message_id}") - message = self._make_request(api_url) - - return message - - def get_channel_messages(self) -> List[Dict]: - """ - Get all channel messages. - - Returns - ------- - List[Dict] - The channel messages data. - """ - - channel_messages = [] - for group_id in self._get_group_ids(): - for channel_id in self._get_channel_ids(group_id): - for messages in self._fetch_data(f"teams/{group_id}/channels/{channel_id['id']}/messages"): - channel_messages.extend(messages) - - return channel_messages - - def get_last_channel_message(self, group_id: Text, channel_id: Text) -> Dict: - """ - Get the last message in a channel. - - Parameters - ---------- - group_id : Text - The ID of the group that the channel belongs to. - - channel_id : Text - The ID of the channel. - - Returns - ------- - Dict - The last message data. - """ - - api_url = self._get_api_url(f"teams/{group_id}/channels/{channel_id}/messages") - # get the last message only - messages = self._get_response_value_unsafe(self._make_request(api_url, params={"$top": 1})) - - return messages[0] if messages else {} - - def send_channel_message(self, group_id: Text, channel_id: Text, message: Text, subject: Optional[Text] = None) -> None: - """ - Send a message to a channel. - - Parameters - ---------- - group_id : Text - The ID of the group that the channel belongs to. - - channel_id : Text - The ID of the channel. - - message : Text - The message to send. - - subject : Text, Optional - The subject of the message. - """ - - api_url = self._get_api_url(f"teams/{group_id}/channels/{channel_id}/messages") - data = { - "subject": subject, - "body": { - "content": message - } - } - - self._make_request(api_url, data=data, method="POST") - - def get_chat(self, chat_id: Text) -> Dict: - """ - Get a chat by its ID. - - Parameters - ---------- - chat_id : str - The ID of the chat. - """ - - api_url = self._get_api_url(f"chats/{chat_id}") - # expand the response with the last message preview - chat = self._make_request(api_url, params={"$expand": "lastMessagePreview"}) - - return chat - - def get_chats(self) -> List[Dict]: - """ - Get all chats. - - Returns - ------- - List[Dict] - The chats data. - """ - - chats = [] - for chat in self._fetch_data("chats", params={"$expand": "lastMessagePreview"}): - chats.extend(chat) - - return chats - - def get_chat_message(self, chat_id: Text, message_id: Text) -> Dict: - """ - Get a chat message by its ID and the ID of the chat that it belongs to. - - Parameters - ---------- - chat_id : str - The ID of the chat that the message belongs to. - - message_id : str - The ID of the message. - - Returns - ------- - Dict - The chat message data. - """ - - api_url = self._get_api_url(f"chats/{chat_id}/messages/{message_id}") - message = self._make_request(api_url) - - return message - - def get_chat_messages(self, chat_id: Text) -> List[Dict]: - """ - Get all chat messages for a chat. - - Parameters - ---------- - chat_id : str - The ID of the chat. - - Returns - ------- - List[Dict] - The chat messages data. - """ - - chat_messages = [] - for messages in self._fetch_data(f"chats/{chat_id}/messages"): - chat_messages.extend(messages) - - return chat_messages - - def get_all_chat_messages(self) -> List[Dict]: - """ - Get all chat messages for all chats. - - Returns - ------- - List[Dict] - The chat messages data. - """ - - chat_messages = [] - for chat_id in [chat["id"] for chat in self.get_chats()]: - for messages in self._fetch_data(f"chats/{chat_id}/messages"): - chat_messages.extend(messages) - - return chat_messages - - def send_chat_message(self, chat_id: Text, message: Text, subject: Optional[Text] = None) -> None: - """ - Send a message to a chat. - - Parameters - ---------- - chat_id : Text - The ID of the chat. - - message : Text - The message to send. - - subject : Text, Optional - The subject of the message. - """ - - api_url = self._get_api_url(f"chats/{chat_id}/messages") - data = { - "subject": subject, - "body": { - "content": message - } - } - - self._make_request(api_url, data=data, method="POST") diff --git a/mindsdb/integrations/handlers/ms_teams_handler/ms_teams_handler.py b/mindsdb/integrations/handlers/ms_teams_handler/ms_teams_handler.py index 7082481f9f1..0934a134eba 100644 --- a/mindsdb/integrations/handlers/ms_teams_handler/ms_teams_handler.py +++ b/mindsdb/integrations/handlers/ms_teams_handler/ms_teams_handler.py @@ -1,16 +1,17 @@ -from typing import Text, Dict +from typing import Text, Dict, Callable +from botbuilder.schema import Activity, ActivityTypes +from botbuilder.schema import ChannelAccount +from botframework.connector import ConnectorClient +from botframework.connector.auth import MicrosoftAppCredentials from mindsdb.utilities import log from mindsdb_sql import parse_sql -from mindsdb.integrations.utilities.handlers.auth_utilities import MSGraphAPIAuthManager -from mindsdb.integrations.handlers.ms_teams_handler.settings import ms_teams_handler_config -from mindsdb.integrations.handlers.ms_teams_handler.ms_graph_api_teams_client import MSGraphAPITeamsClient from mindsdb.integrations.libs.response import ( HandlerStatusResponse as StatusResponse, ) from mindsdb.integrations.libs.api_handler import APIChatHandler -from mindsdb.integrations.handlers.ms_teams_handler.ms_teams_tables import ChannelsTable, ChannelMessagesTable, ChatsTable, ChatMessagesTable +from mindsdb.interfaces.chatbot.types import ChatBotMessage logger = log.getLogger(__name__) @@ -29,58 +30,37 @@ def __init__(self, name: str, **kwargs): name (str): name of particular handler instance **kwargs: arbitrary keyword arguments. """ - super().__init__(name) connection_data = kwargs.get("connection_data", {}) self.connection_data = connection_data - self.handler_storage = kwargs['handler_storage'] self.kwargs = kwargs self.connection = None self.is_connected = False - channels_data = ChannelsTable(self) - self._register_table("channels", channels_data) - - channel_messages_data = ChannelMessagesTable(self) - self._register_table("channel_messages", channel_messages_data) - - chats_data = ChatsTable(self) - self._register_table("chats", chats_data) - - chat_messages_data = ChatMessagesTable(self) - self._register_table("chat_messages", chat_messages_data) + self.service_url = None + self.channel_id = None + self.bot_id = None + self.conversation_id = None - def connect(self) -> MSGraphAPITeamsClient: + def connect(self) -> MicrosoftAppCredentials: """ Set up the connection required by the handler. Returns ------- - MSGraphAPITeamsClient - Client object for accessing the Microsoft Graph API. + MicrosoftAppCredentials + Client object for interacting with the Microsoft Teams app. """ - - if self.is_connected and self.connection.check_connection(): + if self.is_connected: return self.connection - # initialize the auth manager for the Microsoft Graph API - ms_graph_api_auth_manager = MSGraphAPIAuthManager( - handler_storage=self.handler_storage, - scopes=self.connection_data.get('scopes', ms_teams_handler_config.DEFAULT_SCOPES), - client_id=self.connection_data["client_id"], - client_secret=self.connection_data["client_secret"], - tenant_id=self.connection_data["tenant_id"], - code=self.connection_data.get('code') + self.connection = MicrosoftAppCredentials( + app_id=self.connection_data["client_id"], + password=self.connection_data["client_secret"] ) - # get access token from the auth manager for the Microsoft Graph API - access_token = ms_graph_api_auth_manager.get_access_token() - - # pass the access token to the client for access to the Microsoft Graph API - self.connection = MSGraphAPITeamsClient(access_token) - self.is_connected = True return self.connection @@ -94,14 +74,11 @@ def check_connection(self) -> StatusResponse: StatusResponse Response object with the status of the connection. """ - response = StatusResponse(False) try: - connection = self.connect() - connection.check_connection() + self.connect() response.success = True - response.copy_storage = True except Exception as e: logger.error(f'Error connecting to Microsoft Teams: {e}!') response.success = False @@ -125,7 +102,6 @@ def native_query(self, query: Text) -> StatusResponse: StatusResponse Response object with the result of the query. """ - ast = parse_sql(query, dialect="mindsdb") return self.query(ast) @@ -140,41 +116,10 @@ def get_chat_config(self) -> Dict: Dict Configuration for the chatbot. """ - params = { 'polling': { - 'type': 'message_count', - }, - 'tables': [ - { - 'polling': { - 'table': 'chats', - 'chat_id_col': 'id', - 'count_col': 'lastMessagePreview_id' - }, - 'chat_table': { - 'name': 'chat_messages', - 'chat_id_col': 'chatId', - 'username_col': 'from_user_displayName', - 'text_col': 'body_content', - 'time_col': 'createdDateTime', - } - }, - { - 'polling': { - 'table': 'channels', - 'chat_id_col': ['teamId', 'id'], - 'count_col': 'lastMessagePreview_id' - }, - 'chat_table': { - 'name': 'channel_messages', - 'chat_id_col': ['channelIdentity_teamId', 'channelIdentity_channelId'], - 'username_col': 'from_user_displayName', - 'text_col': 'body_content', - 'time_col': 'createdDateTime', - } - } - ] + 'type': 'webhook' + } } return params @@ -188,9 +133,61 @@ def get_my_user_name(self) -> Text: ------- Text Name of the signed in user. - """ + """ + return None + + def on_webhook(self, request: Dict, callback: Callable) -> None: + """ + Handle a webhook request. - connection = self.connect() - user_profile = connection.get_user_profile() - - return user_profile['displayName'] \ No newline at end of file + Parameters + ---------- + request: Dict + The incoming webhook request. + + callback: Callable + Callback function to call after parsing the request. + """ + self.service_url = request["serviceUrl"] + self.channel_id = request["channelId"] + self.bot_id = request["from"]["id"] + self.conversation_id = request["conversation"]["id"] + + chat_bot_message = ChatBotMessage( + ChatBotMessage.Type.DIRECT, + text=request["text"], + user=request["from"]["id"], + destination=request["recipient"]["id"] + ) + + callback( + chat_id=request['conversation']['id'], + message=chat_bot_message + ) + + def respond(self, message: ChatBotMessage) -> None: + """ + Send a response to the chatbot. + + Parameters + ---------- + message: ChatBotMessage + The message to send. + """ + credentials = self.connect() + + connector = ConnectorClient(credentials, base_url=self.service_url) + connector.conversations.send_to_conversation( + self.conversation_id, + Activity( + type=ActivityTypes.message, + channel_id=self.channel_id, + recipient=ChannelAccount( + id=message.destination + ), + from_property=ChannelAccount( + id=self.bot_id + ), + text=message.text + ) + ) diff --git a/mindsdb/integrations/handlers/ms_teams_handler/ms_teams_tables.py b/mindsdb/integrations/handlers/ms_teams_handler/ms_teams_tables.py deleted file mode 100644 index 09a8d6d8381..00000000000 --- a/mindsdb/integrations/handlers/ms_teams_handler/ms_teams_tables.py +++ /dev/null @@ -1,630 +0,0 @@ -import pandas as pd -from typing import Text, List, Dict, Any - -from mindsdb.utilities import log -from mindsdb_sql.parser import ast -from mindsdb.integrations.libs.api_handler import APITable - -from mindsdb.integrations.handlers.ms_teams_handler.settings import ms_teams_handler_config - -from mindsdb.integrations.utilities.handlers.query_utilities.insert_query_utilities import INSERTQueryParser -from mindsdb.integrations.utilities.handlers.query_utilities.select_query_utilities import SELECTQueryParser, SELECTQueryExecutor - -logger = log.getLogger(__name__) - - -class ChatsTable(APITable): - """ - The Microsoft Teams Chats Table implementation. - """ - - def select(self, query: ast.Select) -> pd.DataFrame: - """ - Pulls data from the "GET /chats" and "GET /chats/{chat_id} Microsoft Graph API endpoints. - - Parameters - ---------- - query : ast.Select - Given SQL SELECT query. - - Returns - ------- - pd.DataFrame - Microsoft Teams Chats matching the query. - - Raises - ------ - ValueError - If the query contains an unsupported target (column). - - NotImplementedError - If the query contains an unsupported condition. - """ - - select_statement_parser = SELECTQueryParser( - query, - 'chats', - self.get_columns() - ) - - selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() - - # only the = operator is supported for the id column - id = None - for op, arg1, arg2 in where_conditions: - if arg1 == 'id': - if op == "=": - id = arg2 - else: - raise NotImplementedError("Only '=' operator is supported for id column.") - - chats_df = pd.json_normalize(self.get_chats(id), sep='_') - - # if the id is given, remove the id column from the where conditions as it has already been evaluated in the API call (get_chats) - if id: - where_conditions = [where_condition for where_condition in where_conditions if where_condition[1] not in ['id']] - - select_statement_executor = SELECTQueryExecutor( - chats_df, - selected_columns, - where_conditions, - order_by_conditions, - result_limit if query.limit else None - ) - - chats_df = select_statement_executor.execute_query() - - return chats_df - - def get_chats(self, chat_id: Text = None) -> List[Dict[Text, Any]]: - """ - Calls the API client to get the chats from the Microsoft Graph API. - - Parameters - ---------- - chat_id: Text - The chat id to get the chat from. - - Returns - ------- - List[Dict[Text, Any]] - The chats from the Microsoft Graph API. - """ - - api_client = self.handler.connect() - - # if the chat_id is given, get the chat with that id from the API - if chat_id: - chats = [api_client.get_chat(chat_id)] - # if the chat_id is not given, get all the chats - else: - chats = api_client.get_chats() - - for chat in chats: - last_message_preview = chat.get("lastMessagePreview") - - # keep only the lastMessagePreview_id and lastMessagePreview_createdDateTime columns - if last_message_preview: - chat["lastMessagePreview_id"] = last_message_preview.get("id") - chat["lastMessagePreview_createdDateTime"] = last_message_preview.get("createdDateTime") - del chat["lastMessagePreview"] - del chat["lastMessagePreview@odata.context"] - - return chats - - def get_columns(self) -> List[Text]: - """ - Returns the columns of the Chats Table. - - Returns - ------- - List[Text] - The columns of the Chats Table. - """ - - return ms_teams_handler_config.CHATS_TABLE_COLUMNS - -class ChatMessagesTable(APITable): - """ - The Microsoft Teams Chat Messages Table implementation. - """ - - def select(self, query: ast.Select) -> pd.DataFrame: - """ - Pulls data from the "GET /chats/{chat_id}/messages" and "GET /chats/{chat_id}/messages/{message_id}" Microsoft Graph API endpoints. - - Parameters - ---------- - query : ast.Select - Given SQL SELECT query. - - Returns - ------- - pd.DataFrame - Microsoft Teams Chat Messages matching the query. - - Raises - ------ - ValueError - If the query contains an unsupported target (column). - - NotImplementedError - If the query contains an unsupported condition. - """ - - select_statement_parser = SELECTQueryParser( - query, - 'chat_messages', - self.get_columns() - ) - - selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() - - # only the = operator is supported for the id and chatId columns - chat_id, message_id = None, None - for op, arg1, arg2 in where_conditions: - if arg1 == 'id': - if op == "=": - message_id = arg2 - else: - raise NotImplementedError("Only '=' operator is supported for id column.") - - if arg1 == 'chatId': - if op == "=": - chat_id = arg2 - else: - raise NotImplementedError("Only '=' operator is supported for chatId column.") - - messages_df = pd.json_normalize(self.get_messages(chat_id, message_id), sep='_') - - # if both chat_id and message_id are given, remove the id and chatId columns from the where conditions as they have already been evaluated in the API call (get_messages) - if chat_id and message_id: - where_conditions = [where_condition for where_condition in where_conditions if where_condition[1] not in ['id', 'chatId']] - # if only the chat_id is given, remove the chatId column from the where conditions as it has already been evaluated in the API call (get_messages) - elif chat_id: - where_conditions = [where_condition for where_condition in where_conditions if where_condition[1] not in ['chatId']] - - select_statement_executor = SELECTQueryExecutor( - messages_df, - selected_columns, - where_conditions, - order_by_conditions, - result_limit if query.limit else None - ) - - messages_df = select_statement_executor.execute_query() - - return messages_df - - def get_messages(self, chat_id: Text = None, message_id: Text = None) -> List[Dict[Text, Any]]: - """ - Calls the API client to get the messages from the Microsoft Graph API. - If all parameters are None, it will return all the messages from all the chats. - If only the chat_id is given, it will return all the messages from that chat. - - Parameters - ---------- - chat_id: Text - The chat id to get the messages from. - - message_id: Text - The message id to get the message from. - - Returns - ------- - List[Dict[Text, Any]] - The messages from the Microsoft Graph API. - """ - - api_client = self.handler.connect() - - # if both chat_id and message_id are given, get the message with that id from the API - if message_id and chat_id: - chat_messages = [api_client.get_chat_message(chat_id, message_id)] - # if only the chat_id is given, get all the messages from that chat - elif chat_id: - chat_messages = api_client.get_chat_messages(chat_id) - # if no parameters are given or only the message_id is given, get all the messages from all the chats - else: - chat_messages = api_client.get_all_chat_messages() - - return chat_messages - - def insert(self, query: ast.Insert) -> None: - """ - Inserts data into the "POST /chats/{chat_id}/messages" Microsoft Graph API endpoint. - - Parameters - ---------- - query : ast.Insert - Given SQL INSERT query. - - Returns - ------- - None - - Raises - ------ - UnsupportedColumnException - If the query contains an unsupported column. - - MandatoryColumnException - If the query is missing a mandatory column. - - ColumnCountMismatchException - If the number of columns does not match the number of values. - """ - - # only the chatId, subject and body_content columns are supported - # chatId and body_content are mandatory - insert_statement_parser = INSERTQueryParser( - query, - supported_columns=["chatId", "subject", "body_content"], - mandatory_columns=["chatId", "body_content"], - ) - - messages_to_send = insert_statement_parser.parse_query() - - self.send_messages(messages_to_send) - - def send_messages(self, messages_to_send: List[Dict[Text, Any]]) -> None: - """ - Calls the API client to send the messages to the Microsoft Graph API. - - Parameters - ---------- - messages_to_send: List[Dict[Text, Any]] - The messages to send to the Microsoft Graph API. - - Returns - ------- - None - """ - - api_client = self.handler.connect() - - # send each message through the API to the chat with the given id - for message in messages_to_send: - api_client.send_chat_message( - chat_id=message["chatId"], - message=message["body_content"], - subject=message.get("subject") - ) - - def get_columns(self) -> List[Text]: - """ - Returns the columns of the Chat Messages Table. - - Returns - ------- - List[Text] - The columns of the Chat Messages Table. - """ - - return ms_teams_handler_config.CHAT_MESSAGES_TABLE_COLUMNS - - class ChatMessageRepliesTable(APITable): - """ - The Microsoft Chat Message Replies Table implementation. - """ - pass - -class ChannelsTable(APITable): - """ - The Microsoft Channels Table implementation. - """ - - def select(self, query: ast.Select) -> pd.DataFrame: - """ - Pulls data from the "GET /teams/{group_id}/channels" and "GET teams/{group_id}/channels/{channel_id}" Microsoft Graph API endpoints. - - Parameters - ---------- - query : ast.Select - Given SQL SELECT query. - - Returns - ------- - pd.DataFrame - Microsoft Teams Channels matching the query. - - Raises - ------ - ValueError - If the query contains an unsupported target (column). - - NotImplementedError - If the query contains an unsupported condition. - """ - - select_statement_parser = SELECTQueryParser( - query, - 'channels', - self.get_columns() - ) - - selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() - - # only the = operator is supported for the id and teamId columns - channel_id, team_id = None, None - for op, arg1, arg2 in where_conditions: - if arg1 == 'id': - if op == "=": - channel_id = arg2 - else: - raise NotImplementedError("Only '=' operator is supported for id column.") - - if arg1 == 'teamId': - if op == "=": - team_id = arg2 - else: - raise NotImplementedError("Only '=' operator is supported for teamId column.") - - channels_df = pd.json_normalize(self.get_channels(channel_id, team_id)) - - # if both channel_id and team_id are given, remove the id and teamId columns from the where conditions as they have already been evaluated in the API call (get_channels) - if channel_id and team_id: - where_conditions = [where_condition for where_condition in where_conditions if where_condition[1] not in ['id', 'teamId']] - - select_statement_executor = SELECTQueryExecutor( - channels_df, - selected_columns, - where_conditions, - order_by_conditions, - result_limit if query.limit else None - ) - - channels_df = select_statement_executor.execute_query() - - return channels_df - - def get_channels(self, channel_id: Text = None, team_id: Text = None) -> List[Dict[Text, Any]]: - """ - Calls the API client to get the channels from the Microsoft Graph API. - If all parameters are None, it will return all the channels from all the teams. - - Parameters - ---------- - channel_id: Text - The channel id to get the channel from. - - team_id: Text - The team id to get the channels from. - - Returns - ------- - List[Dict[Text, Any]] - The channels from the Microsoft Graph API. - """ - - api_client = self.handler.connect() - - # if both channel_id and team_id are given, get the channel with that id from the API - if channel_id and team_id: - return [api_client.get_channel(team_id, channel_id)] - # if no parameter are given or only the team_id is given, get all the channels - else: - return api_client.get_channels() - - def get_columns(self) -> List[Text]: - """ - Returns the columns of the Channels Table. - - Returns - ------- - List[Text] - The columns of the Channels Table. - """ - - return ms_teams_handler_config.CHANNELS_TABLE_COLUMNS - -class ChannelMessagesTable(APITable): - """ - The Microsoft Teams Channel Messages Table implementation. - """ - - def select(self, query: ast.Select) -> pd.DataFrame: - """ - Pulls data from the "GET /teams/{group_id}/channels/{channel_id}/messages" and "GET /teams/{group_id}/channels/{channel_id}/messages/{message_id}" Microsoft Graph API endpoints. - - Parameters - ---------- - query: ast.Select - Given SQL SELECT query. - - Returns - ------- - pd.DataFrame - Microsoft Teams Channel Messages matching the query. - - Raises - ------ - ValueError - If the query contains an unsupported target (column). - - NotImplementedError - If the query contains an unsupported condition. - """ - - select_statement_parser = SELECTQueryParser( - query, - 'channel_messages', - self.get_columns() - ) - - selected_columns, where_conditions, order_by_conditions, result_limit = select_statement_parser.parse_query() - - # only the = operator is supported for the id, channelIdentity_teamId and channelIdentity_channelId columns - team_id, channel_id, message_id = None, None, None - for op, arg1, arg2 in where_conditions: - if arg1 == 'id': - if op == "=": - message_id = arg2 - else: - raise NotImplementedError("Only '=' operator is supported for id column.") - - if arg1 == 'channelIdentity_teamId': - if op == "=": - team_id = arg2 - else: - raise NotImplementedError("Only '=' operator is supported for id column.") - - if arg1 == 'channelIdentity_channelId': - if op == "=": - channel_id = arg2 - else: - raise NotImplementedError("Only '=' operator is supported for teamId column.") - - messages_df = pd.json_normalize(self.get_messages(team_id, channel_id, message_id), sep='_') - - # if all parameters are given, remove the id, channelIdentity_teamId and channelIdentity_channelId columns from the where conditions as they have already been evaluated in the API call (get_messages) - if team_id and channel_id and message_id: - where_conditions = [where_condition for where_condition in where_conditions if where_condition[1] not in ['id', 'channelIdentity_teamId', 'channelIdentity_channelId']] - - select_statement_executor = SELECTQueryExecutor( - messages_df, - selected_columns, - where_conditions, - order_by_conditions, - result_limit if query.limit else None - ) - - messages_df = select_statement_executor.execute_query() - - return messages_df - - def get_messages(self, team_id: Text = None, channel_id: Text = None, message_id: Text = None) -> List[Dict[Text, Any]]: - """ - Calls the API client to get the messages from the Microsoft Graph API. - If all parameters are None, it will return all the messages from all the channels from all the teams. - - Parameters - ---------- - team_id: Text - The team id to get the messages from. - - channel_id: Text - The channel id to get the messages from. - - message_id: Text - The message id to get the message from. - """ - - api_client = self.handler.connect() - - # if all parameters are given, get the message with that id from that channel from that team from the API - if message_id and channel_id and team_id: - channel_message = api_client.get_channel_message(team_id, channel_id, message_id) - # add the missing eventDetail attribute to the channel message - channel_message['eventDetail'] = { - '@odata.type': None, - 'visibleHistoryStartDateTime': None, - 'members': None, - 'channelId': None, - 'channelDisplayName': None, - 'initiator': { - 'application': { - '@odata.type': None, - 'id': None, - 'displayName': None, - 'applicationIdentityType': None - }, - 'device': None, - 'user': { - '@odata.type': None, - 'id': None, - 'displayName': None, - 'userIdentityType': None, - 'tenantId': None - } - } - } - - return [channel_message] - # for any other combination of parameters, get all the messages from all the channels from all the teams - else: - return api_client.get_channel_messages() - - def insert(self, query: ast.Insert) -> None: - """ - Inserts data into the "POST /teams/{group_id}/channels/{channel_id}/messages" Microsoft Graph API endpoint. - - Parameters - ---------- - query : ast.Insert - Given SQL INSERT query. - - Returns - ------- - None - - Raises - ------ - UnsupportedColumnException - If the query contains an unsupported column. - - MandatoryColumnException - If the query is missing a mandatory column. - - ColumnCountMismatchException - If the number of columns does not match the number of values. - """ - - # only the channelIdentity_teamId, channelIdentity_channelId, subject and body_content columns are supported - # channelIdentity_teamId and channelIdentity_channelId are mandatory - insert_statement_parser = INSERTQueryParser( - query, - supported_columns=["channelIdentity_teamId", "channelIdentity_channelId", "subject", "body_content"], - mandatory_columns=["channelIdentity_teamId", "channelIdentity_channelId", "body_content"], - ) - - messages_to_send = insert_statement_parser.parse_query() - - self.send_messages(messages_to_send) - - def send_messages(self, messages_to_send: List[Dict[Text, Any]]) -> None: - """ - Calls the API client to send the messages to the Microsoft Graph API. - - Parameters - ---------- - messages_to_send: List[Dict[Text, Any]] - The messages to send to the Microsoft Graph API. - - Returns - ------- - None - - Raises - ------ - None - """ - - api_client = self.handler.connect() - - # send each message through the API to the channel with the given id from the team with the given id - for message in messages_to_send: - api_client.send_channel_message( - group_id=message["channelIdentity_teamId"], - channel_id=message["channelIdentity_channelId"], - message=message["body_content"], - subject=message.get("subject") - ) - - def get_columns(self) -> List[Text]: - """ - Returns the columns of the Channel Messages Table. - - Returns - ------- - list - The columns of the Channel Messages Table. - """ - - return ms_teams_handler_config.CHANNEL_MESSAGES_TABLE_COLUMNS - -class ChannelMessageRepliesTable(APITable): - """ - The Microsoft Teams Channel Message Replies Table implementation. - """ - pass diff --git a/mindsdb/integrations/handlers/ms_teams_handler/requirements.txt b/mindsdb/integrations/handlers/ms_teams_handler/requirements.txt index 0f0cb6a9ecf..e7440fcfadf 100644 --- a/mindsdb/integrations/handlers/ms_teams_handler/requirements.txt +++ b/mindsdb/integrations/handlers/ms_teams_handler/requirements.txt @@ -1 +1,2 @@ -pydantic-settings >= 2.1.0 \ No newline at end of file +botframework-connector +botbuilder-schema \ No newline at end of file diff --git a/mindsdb/integrations/handlers/ms_teams_handler/settings.py b/mindsdb/integrations/handlers/ms_teams_handler/settings.py deleted file mode 100644 index 24f0e22a0d4..00000000000 --- a/mindsdb/integrations/handlers/ms_teams_handler/settings.py +++ /dev/null @@ -1,360 +0,0 @@ -from typing import List -from pydantic_settings import BaseSettings - - -class MSTeamsHandlerConfig(BaseSettings): - """ - Settings for the Microsoft Teams handler. - - Attributes - ---------- - DEFAULT_SCOPES: List - Default scopes for querying the Microsoft Graph API. - - CHATS_TABLE_COLUMNS: List - Columns for the chats table. - - CHAT_MESSAGES_TABLE_COLUMNS: List - Columns for the chat messages table. - - CHANNELS_TABLE_COLUMNS: List - Columns for the channels table. - - CHANNEL_MESSAGES_TABLE_COLUMNS: List - Columns for the channel messages table. - - TEST_CHAT_DATA: Dict - Test data for the chats table. - - TEST_CHATS_DATA: List[Dict] - Test data for the chats table. - - TEST_CHAT_MESSAGE_DATA: Dict - Test data for the chat messages table. - - TEST_CHAT_MESSAGES_DATA: List[Dict] - Test data for the chat messages table. - - TEST_CHANNEL_DATA: Dict - Test data for the channels table. - - TEST_CHANNELS_DATA: List[Dict] - Test data for the channels table. - - TEST_CHANNEL_MESSAGE_DATA: Dict - Test data for the channel messages table. - - TEST_CHANNEL_MESSAGES_DATA: List[Dict] - Test data for the channel messages table. - - TEST_GROUP_DATA: dict - - TEST_CHANNEL_ID_DATA: dict - """ - - DEFAULT_SCOPES: List = [ - "https://graph.microsoft.com/.default", - ] - - CHATS_TABLE_COLUMNS: List = [ - "id", - "topic", - "createdDateTime", - "lastUpdatedDateTime", - "chatType", - "webUrl", - "tenantId", - "onlineMeetingInfo", - "viewpoint_isHidden", - "viewpoint_lastMessageReadDateTime", - "lastMessagePreview_id", - "lastMessagePreview_createdDateTime", - ] - - CHAT_MESSAGES_TABLE_COLUMNS: List = [ - "id", - "replyToId", - "etag", - "messageType", - "createdDateTime", - "lastModifiedDateTime", - "lastEditedDateTime", - "deletedDateTime", - "subject", - "summary", - "chatId", - "importance", - "locale", - "webUrl", - "channelIdentity", - "policyViolation", - "attachments", - "mentions", - "reactions", - "from_application", - "from_device", - "from_user_@odata.type", - "from_user_id", - "from_user_displayName", - "from_user_userIdentityType", - "from_user_tenantId", - "body_contentType", - "body_content" - ] - - CHANNELS_TABLE_COLUMNS: List = [ - "id", - "createdDateTime", - "displayName", - "description", - "isFavoriteByDefault", - "email", - "tenantId", - "webUrl", - "membershipType", - "teamId", - "lastMessagePreview_id" - ] - - CHANNEL_MESSAGES_TABLE_COLUMNS: List = [ - "id", - "replyToId", - "etag", - "messageType", - "createdDateTime", - "lastModifiedDateTime", - "lastEditedDateTime", - "deletedDateTime", - "subject", - "summary", - "chatId", - "importance", - "locale", - "webUrl", - "policyViolation", - "attachments", - "mentions", - "reactions", - "from_application", - "from_device", - "from_user_@odata.type", - "from_user_id", - "from_user_displayName", - "from_user_userIdentityType", - "from_user_tenantId", - "body_contentType", - "body_content", - "channelIdentity_teamId", - "channelIdentity_channelId", - "eventDetail_@odata.type", - "eventDetail_visibleHistoryStartDateTime", - "eventDetail_members", - "eventDetail_initiator_device", - "eventDetail_initiator_application_@odata.type", - "eventDetail_initiator_application_id", - "eventDetail_initiator_application_displayName", - "eventDetail_initiator_application_applicationIdentityType", - "eventDetail_channelId", - "eventDetail_channelDisplayName", - "eventDetail_initiator_user_@odata.type", - "eventDetail_initiator_user_id", - "eventDetail_initiator_user_displayName", - "eventDetail_initiator_user_userIdentityType", - "eventDetail_initiator_user_tenantId" - ] - - TEST_CHAT_DATA: dict = { - '@odata.context': 'test_context', - 'id': 'test_id', - 'topic': None, - 'createdDateTime': '2023-11-20T10:25:19.553Z', - 'lastUpdatedDateTime': '2023-11-20T10:25:19.553Z', - 'chatType': 'oneOnOne', - 'webUrl': 'https://teams.test', - 'tenantId': 'test_tenant_id', - 'onlineMeetingInfo': None, - 'viewpoint': { - 'isHidden': False, - 'lastMessageReadDateTime': '2023-12-08T17:09:34.214Z' - }, - 'lastMessagePreview@odata.context': 'https://graph.test', - 'lastMessagePreview': { - 'id': '1702055374214', - 'createdDateTime': '2023-12-08T17:09:34.214Z', - 'isDeleted': False, - 'messageType': 'message', - 'eventDetail': None, - 'body': { - 'contentType': 'text', - 'content': '\n\nTest message.' - }, - 'from': { - 'application': None, - 'device': None, - 'user': {} - } - } - } - - TEST_CHATS_DATA: List[dict] = [TEST_CHAT_DATA] - - TEST_CHAT_MESSAGE_DATA: dict = { - '@odata.context': 'test_context', - 'id': 'test_id', - 'replyToId': None, - 'etag': 'test_etag', - 'messageType': 'message', - 'createdDateTime': '2023-12-08T17:09:22.241Z', - 'lastModifiedDateTime': '2023-12-08T17:09:22.241Z', - 'lastEditedDateTime': None, - 'deletedDateTime': None, - 'subject': None, - 'summary': None, - 'chatId': 'test_chat_id', - 'importance': 'normal', - 'locale': 'en-us', - 'webUrl': None, - 'channelIdentity': None, - 'policyViolation': None, - 'attachments': [], - 'mentions': [], - 'reactions': [], - 'from': { - 'application': None, - 'device': None, - 'user': { - '@odata.type': 'test_type', - 'id': 'test_user_id', - 'displayName': 'test_user_display_name', - 'userIdentityType': 'aadUser', - 'tenantId': 'test_tenant_id' - } - }, - 'body': { - 'contentType': 'text', - 'content': '\n\nTest message.' - }, - 'eventDetail': { - '@odata.type': 'test_type', - 'visibleHistoryStartDateTime': '2023-12-08T17:09:22.241Z', - 'members': [], - 'initiator': { - 'device': None, - 'application': None, - 'user': { - '@odata.type': 'test_type', - 'id': 'test_user_id', - 'displayName': 'test_user_display_name', - 'userIdentityType': 'aadUser', - 'tenantId': 'test_tenant_id' - } - }, - } - } - - TEST_CHAT_MESSAGES_DATA: List[dict] = [TEST_CHAT_MESSAGE_DATA] - - TEST_CHANNEL_DATA: dict = { - '@odata.context': 'test_context', - 'id': 'test_id', - 'createdDateTime': '2023-11-17T22:54:33.055Z', - 'displayName': 'test_display_name', - 'description': None, - 'isFavoriteByDefault': None, - 'email': 'test@test.com', - 'tenantId': 'test_tenant_id', - 'webUrl': 'https://teams.test', - 'membershipType': 'standard', - 'teamId': 'test_team_id' - } - - TEST_CHANNELS_DATA: List[dict] = [TEST_CHANNEL_DATA] - - TEST_CHANNEL_MESSAGE_DATA: dict = { - '@odata.context': 'test_context', - 'id': 'test_id', - 'replyToId': None, - 'etag': 'test_etag', - 'messageType': 'message', - 'createdDateTime': '2023-11-30T16:52:50.18Z', - 'lastModifiedDateTime': '2023-11-30T16:52:50.18Z', - 'lastEditedDateTime': None, - 'deletedDateTime': None, - 'subject': 'Test Subject', - 'summary': None, - 'chatId': None, - 'importance': - 'normal', - 'locale': 'en-us', - 'webUrl': 'https://teams.test', - 'policyViolation': None, - 'attachments': [], - 'mentions': [], - 'reactions': [], - 'from': { - 'application': None, - 'device': None, - 'user': { - '@odata.type': 'test_type', - 'id': 'test_user_id', - 'displayName': 'test_user_display_name', - 'userIdentityType': 'aadUser', - 'tenantId': 'test_tenant_id' - } - }, - 'body': { - 'contentType': 'text', - 'content': '\n\nTest message.' - }, - 'channelIdentity': { - 'teamId': 'test_team_id', - 'channelId': 'test_channel_id' - }, - 'eventDetail': { - '@odata.type': 'test_type', - 'visibleHistoryStartDateTime': '2023-11-30T16:52:50.18Z', - 'members': [], - 'initiator': { - 'device': None, - 'application': { - '@odata.type': 'test_type', - 'id': 'test_app_id', - 'displayName': 'test_app_display_name', - 'applicationIdentityType': 'bot' - }, - 'user': { - '@odata.type': 'test_type', - 'id': 'test_user_id', - 'displayName': 'test_user_display_name', - 'userIdentityType': 'aadUser', - 'tenantId': 'test_tenant_id' - } - }, - 'channelId': 'test_channel_id', - 'channelDisplayName': 'test_channel_display_name' - } - } - - TEST_CHANNEL_MESSAGES_DATA: List[dict] = [TEST_CHANNEL_MESSAGE_DATA] - - TEST_GROUP_DATA: dict = { - '@odata.context': 'test_context', - 'value': [ - { - 'id': 'test_team_id', - 'resourceProvisioningOptions': ['Team'], - } - ] - } - - TEST_CHANNEL_ID_DATA: dict = { - '@odata.context': 'test_context', - '@odata.count': 1, - 'value': [ - { - 'id': 'test_channel_id', - } - ] - } - -ms_teams_handler_config = MSTeamsHandlerConfig() \ No newline at end of file diff --git a/mindsdb/interfaces/chatbot/chatbot_controller.py b/mindsdb/interfaces/chatbot/chatbot_controller.py index f32471f13c9..20ea6a34809 100644 --- a/mindsdb/interfaces/chatbot/chatbot_controller.py +++ b/mindsdb/interfaces/chatbot/chatbot_controller.py @@ -1,6 +1,7 @@ from typing import Dict, List from mindsdb.interfaces.agents.agents_controller import AgentsController +from mindsdb.interfaces.chatbot.chatbot_task import ChatBotTask from mindsdb.interfaces.database.projects import ProjectController from mindsdb.interfaces.storage import db @@ -48,6 +49,42 @@ def get_chatbot(self, chatbot_name: str, project_name: str = 'mindsdb') -> db.Ch db.Tasks.company_id == ctx.company_id, ) + return self._get_chatbot(query, project) + + def get_chatbot_by_id(self, chatbot_id: int) -> db.ChatBots: + ''' + Gets a chatbot by id. + + Parameters: + chatbot_id (int): The id of the chatbot + + Returns: + bot (db.ChatBots): The database chatbot object + ''' + + query = db.session.query( + db.ChatBots, db.Tasks + ).join( + db.Tasks, db.ChatBots.id == db.Tasks.object_id + ).filter( + db.ChatBots.id == chatbot_id, + db.Tasks.object_type == self.OBJECT_TYPE, + db.Tasks.company_id == ctx.company_id, + ) + + return self._get_chatbot(query) + + def _get_chatbot(self, query, project: db.Project = None) -> db.ChatBots: + ''' + Gets a chatbot by query. + + Parameters: + query: The query to get the chatbot + + Returns: + bot (db.ChatBots): The database chatbot object + ''' + query_result = query.first() if query_result is None: return None @@ -65,7 +102,7 @@ def get_chatbot(self, chatbot_name: str, project_name: str = 'mindsdb') -> db.Ch bot_obj = { 'id': bot.id, 'name': bot.name, - 'project': project_name, + 'project': project.name if project else self.project_controller.get(bot.project_id).name, 'agent': agent_obj, 'database_id': bot.database_id, # TODO remove in future 'database': database_names.get(bot.database_id, '?'), @@ -74,7 +111,9 @@ def get_chatbot(self, chatbot_name: str, project_name: str = 'mindsdb') -> db.Ch 'created_at': bot.created_at, 'is_running': task.active, 'last_error': task.last_error, + 'webhook_token': bot.webhook_token, } + return bot_obj def get_chatbots(self, project_name: str = 'mindsdb') -> List[dict]: @@ -132,6 +171,7 @@ def get_chatbots(self, project_name: str = 'mindsdb') -> List[dict]: 'created_at': bot.created_at, 'is_running': task.active, 'last_error': task.last_error, + 'webhook_token': bot.webhook_token, } ) @@ -227,7 +267,8 @@ def update_chatbot( agent_name: str = None, database_id: int = None, is_running: bool = None, - params: Dict[str, str] = None): + params: Dict[str, str] = None, + webhook_token: str = None) -> db.ChatBots: ''' Updates a chatbot in the database, creating it if it doesn't already exist. @@ -290,6 +331,10 @@ def update_chatbot( existing_params = existing_chatbot_rec.params or {} params.update(existing_params) existing_chatbot_rec.params = params + + if webhook_token is not None: + existing_chatbot_rec.webhook_token = webhook_token + db.session.commit() return existing_chatbot_rec @@ -321,3 +366,41 @@ def delete_chatbot(self, chatbot_name: str, project_name: str = 'mindsdb'): db.session.delete(bot_rec) db.session.commit() + + def on_webhook(self, webhook_token: str, request: dict, chat_bot_memory: dict): + """ + Handles incoming webhook requests. + Finds the chat bot associated with the webhook token and passes the request to the chat bot task. + + Args: + webhook_token (str): The token to uniquely identify the webhook. + request (dict): The incoming webhook request. + chat_bot_memory (dict): The memory of the various chat-bots mapped by their webhook tokens. + """ + query = db.session.query( + db.ChatBots, db.Tasks + ).join( + db.Tasks, db.ChatBots.id == db.Tasks.object_id + ).filter( + db.ChatBots.webhook_token == webhook_token, + db.Tasks.object_type == self.OBJECT_TYPE, + db.Tasks.company_id == ctx.company_id, + ) + result = query.first() + + chat_bot, task = result if result is not None else (None, None) + + if chat_bot is None: + raise Exception(f"No chat bot exists for webhook token: {webhook_token}") + + if not task.active: + raise Exception(f"Chat bot is not running: {chat_bot.name}") + + chat_bot_task = ChatBotTask(task_id=task.id, object_id=chat_bot.id) + + if webhook_token in chat_bot_memory: + chat_bot_task.set_memory(chat_bot_memory[webhook_token]) + else: + chat_bot_memory[webhook_token] = chat_bot_task.get_memory() + + chat_bot_task.on_webhook(request) diff --git a/mindsdb/interfaces/chatbot/chatbot_task.py b/mindsdb/interfaces/chatbot/chatbot_task.py index 5264274e114..de0a135fa25 100644 --- a/mindsdb/interfaces/chatbot/chatbot_task.py +++ b/mindsdb/interfaces/chatbot/chatbot_task.py @@ -9,8 +9,8 @@ from mindsdb.utilities import log -from .polling import MessageCountPolling, RealtimePolling -from .memory import DBMemory, HandlerMemory +from .polling import MessageCountPolling, RealtimePolling, WebhookPolling +from .memory import BaseMemory, DBMemory, HandlerMemory from .chatbot_executor import MultiModeBotExecutor, BotExecutor, AgentExecutor from .types import ChatBotMessage @@ -23,22 +23,23 @@ class ChatBotTask(BaseTask): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.bot_id = self.object_id - self.agent_id = None self.session = SessionController() - def run(self, stop_event): - - # TODO check deleted, raise errors - # TODO checks on delete predictor / project/ integration - bot_record = db.ChatBots.query.get(self.bot_id) self.base_model_name = bot_record.model_name - self.agent_id = bot_record.agent_id self.project_name = db.Project.query.get(bot_record.project_id).name self.project_datanode = self.session.datahub.get(self.project_name) + self.agent_id = bot_record.agent_id + if self.agent_id is not None: + self.bot_executor_cls = AgentExecutor + elif self.bot_params.get('modes') is None: + self.bot_executor_cls = BotExecutor + else: + self.bot_executor_cls = MultiModeBotExecutor + database_name = db.Integration.query.get(bot_record.database_id).name self.chat_handler = self.session.integration_controller.get_data_handler(database_name) @@ -49,8 +50,6 @@ def run(self, stop_event): self.bot_params = bot_record.params or {} chat_params = self.chat_handler.get_chat_config() - self.bot_params['bot_username'] = self.chat_handler.get_my_user_name() - polling = chat_params['polling']['type'] if polling == 'message_count': chat_params = chat_params['tables'] if 'tables' in chat_params else [chat_params] @@ -60,22 +59,29 @@ def run(self, stop_event): elif polling == 'realtime': self.chat_pooling = RealtimePolling(self, chat_params) self.memory = DBMemory(self, chat_params) + + elif polling == 'webhook': + self.chat_pooling = WebhookPolling(self, chat_params) + self.memory = DBMemory(self, chat_params) + else: raise Exception(f"Not supported polling: {polling}") - if self.agent_id is not None: - self.bot_executor_cls = AgentExecutor - elif self.bot_params.get('modes') is None: - self.bot_executor_cls = BotExecutor - else: - self.bot_executor_cls = MultiModeBotExecutor + self.bot_params['bot_username'] = self.chat_handler.get_my_user_name() + + def run(self, stop_event): + + # TODO check deleted, raise errors + # TODO checks on delete predictor / project/ integration self.chat_pooling.run(stop_event) - def on_message(self, chat_memory, message: ChatBotMessage, table_name=None): + def on_message(self, message: ChatBotMessage, chat_id=None, chat_memory=None, table_name=None): + if not chat_id and chat_memory: + raise Exception('chat_id or chat_memory should be provided') try: - self._on_message(chat_memory, message, table_name) + self._on_message(message, chat_id, chat_memory, table_name) except (SystemExit, KeyboardInterrupt): raise except Exception: @@ -83,9 +89,10 @@ def on_message(self, chat_memory, message: ChatBotMessage, table_name=None): logger.error(error) self.set_error(str(error)) - def _on_message(self, chat_memory, message: ChatBotMessage, table_name=None): + def _on_message(self, message: ChatBotMessage, chat_id, chat_memory, table_name=None): # add question to history # TODO move it to realtime pooling + chat_memory = chat_memory if chat_memory else self.memory.get_chat(chat_id, table_name=table_name) chat_memory.add_to_history(message) logger.debug(f'>>chatbot {chat_memory.chat_id} in: {message.text}') @@ -111,3 +118,31 @@ def _on_message(self, chat_memory, message: ChatBotMessage, table_name=None): # send to history chat_memory.add_to_history(response_message) + + def on_webhook(self, request: dict) -> None: + """ + Handle incoming webhook requests. + Passes the request to the chat handler along with the callback method. + + Args: + request (dict): The incoming webhook request. + """ + self.chat_handler.on_webhook(request, self.on_message) + + def get_memory(self) -> BaseMemory: + """ + Get the memory of the chatbot task. + + Returns: + BaseMemory: The memory of the chatbot task. + """ + return self.memory + + def set_memory(self, memory: BaseMemory) -> None: + """ + Set the memory of the chatbot task. + + Args: + memory (BaseMemory): The memory to set for the chatbot task. + """ + self.memory = memory diff --git a/mindsdb/interfaces/chatbot/polling.py b/mindsdb/interfaces/chatbot/polling.py index 8775788591c..ec38a8f4e19 100644 --- a/mindsdb/interfaces/chatbot/polling.py +++ b/mindsdb/interfaces/chatbot/polling.py @@ -1,3 +1,5 @@ +import secrets +import threading import time from mindsdb_sql.parser.ast import Identifier, Select, Insert @@ -68,7 +70,7 @@ def run(self, stop_event): message = None if message: - self.chat_task.on_message(chat_memory, message, table_name=chat_params["chat_table"]["name"]) + self.chat_task.on_message(message, chat_memory=chat_memory, table_name=chat_params["chat_table"]["name"]) except Exception as e: logger.error(e) @@ -156,8 +158,7 @@ def _callback(self, row, key): chat_id = row[t_params["chat_id_col"]] - chat_memory = self.chat_task.memory.get_chat(chat_id) - self.chat_task.on_message(chat_memory, message) + self.chat_task.on_message(message, chat_id=chat_id) def run(self, stop_event): t_params = self.params["chat_table"] @@ -168,3 +169,47 @@ def run(self, stop_event): # def send_message(self, message: ChatBotMessage): # # self.chat_task.chat_handler.realtime_send(message) + + +class WebhookPolling(BasePolling): + """ + Polling class for handling webhooks. + """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def run(self, stop_event: threading.Event) -> None: + """ + Run the webhook polling. + Check if a webhook token is set for the chatbot. If not, generate a new one. + Then, do nothing, as the webhook is handled by a task instantiated for each request. + + Args: + stop_event (threading.Event): Event to stop the polling. + """ + # If a webhook token is not set for the chatbot, generate a new one. + from mindsdb.interfaces.chatbot.chatbot_controller import ChatBotController + + chat_bot_controller = ChatBotController() + chat_bot = chat_bot_controller.get_chatbot_by_id(self.chat_task.object_id) + + if not chat_bot["webhook_token"]: + chat_bot_controller.update_chatbot( + chatbot_name=chat_bot["name"], + project_name=chat_bot["project"], + webhook_token=secrets.token_urlsafe(32), + ) + + # Do nothing, as the webhook is handled by a task instantiated for each request. + stop_event.wait() + + def send_message(self, message: ChatBotMessage, table_name: str = None) -> None: + """ + Send a message (response) to the chatbot. + Pass the message to the chatbot handler to respond. + + Args: + message (ChatBotMessage): The message to send. + table_name (str): The name of the table to send the message to. Defaults to None. + """ + self.chat_task.chat_handler.respond(message) diff --git a/mindsdb/interfaces/storage/db.py b/mindsdb/interfaces/storage/db.py index f9b5db08599..386c194858e 100644 --- a/mindsdb/interfaces/storage/db.py +++ b/mindsdb/interfaces/storage/db.py @@ -351,6 +351,7 @@ class ChatBots(Base): DateTime, default=datetime.datetime.now, onupdate=datetime.datetime.now ) created_at = Column(DateTime, default=datetime.datetime.now) + webhook_token = Column(String) def as_dict(self) -> Dict: return { @@ -360,6 +361,7 @@ def as_dict(self) -> Dict: "agent_id": self.agent_id, "model_name": self.model_name, "params": self.params, + "webhook_token": self.webhook_token, "created_at": self.created_at, "database_id": self.database_id, } diff --git a/mindsdb/migrations/versions/2024-10-07_6c57ed39a82b_added_webhook_token_to_chat_bots.py b/mindsdb/migrations/versions/2024-10-07_6c57ed39a82b_added_webhook_token_to_chat_bots.py new file mode 100644 index 00000000000..2189edbc259 --- /dev/null +++ b/mindsdb/migrations/versions/2024-10-07_6c57ed39a82b_added_webhook_token_to_chat_bots.py @@ -0,0 +1,27 @@ +"""added webhook_token to chat_bots + +Revision ID: 6c57ed39a82b +Revises: 8e17ff6b75e9 +Create Date: 2024-10-07 16:40:14.141878 + +""" +from alembic import op +import sqlalchemy as sa +import mindsdb.interfaces.storage.db # noqa + + +# revision identifiers, used by Alembic. +revision = '6c57ed39a82b' +down_revision = '8e17ff6b75e9' +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table('chat_bots', schema=None) as batch_op: + batch_op.add_column(sa.Column('webhook_token', sa.VARCHAR(), nullable=True)) + + +def downgrade(): + with op.batch_alter_table('chat_bots', schema=None) as batch_op: + batch_op.drop_column('webhook_token') diff --git a/tests/scripts/check_requirements.py b/tests/scripts/check_requirements.py index 5a76c930d11..55b84beecee 100644 --- a/tests/scripts/check_requirements.py +++ b/tests/scripts/check_requirements.py @@ -123,6 +123,8 @@ def get_requirements_from_file(path): "auto-ts": ["auto_ts"], "llama-index-readers-web": ["llama_index"], "llama-index-embeddings-openai": ["llama_index"], + "botframework-connector": ["botframework"], + "botbuilder-schema": ["botbuilder"], } # We use this to exit with a non-zero status code if any check fails