From 4d6e794f916889e7d664d073b794b37862ee837c Mon Sep 17 00:00:00 2001 From: MSec Date: Tue, 29 Oct 2024 04:33:34 +0000 Subject: [PATCH] Moved non-bot setup specific methods to Utils class --- tests/test_custom_bot.py | 4 +- tests/test_main.py | 145 +----------------------- tests/utils/test_utils.py | 36 +++++- twitchrce/__init__.py | 0 twitchrce/api/twitch/twitch_api_auth.py | 3 +- twitchrce/main.py | 112 +++++------------- twitchrce/utils/utils.py | 86 ++++++++++++++ 7 files changed, 159 insertions(+), 227 deletions(-) delete mode 100644 twitchrce/__init__.py diff --git a/tests/test_custom_bot.py b/tests/test_custom_bot.py index 28833bf..851ed1c 100644 --- a/tests/test_custom_bot.py +++ b/tests/test_custom_bot.py @@ -7,7 +7,7 @@ @mock_aws @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_get_app_token(mocker): - from twitchrce.main import get_app_token + from twitchrce.utils.utils import Utils # Mock the response from client_credentials_grant_flow client_credentials_grant_flow_response = { @@ -25,6 +25,6 @@ def test_get_app_token(mocker): ) event_loop = asyncio.get_event_loop() - access_token = event_loop.run_until_complete(get_app_token()) + access_token = event_loop.run_until_complete(Utils.get_app_token()) assert access_token == "1234567890abcdef1234567890abcdef" mock_client_credentials_grant_flow.assert_awaited_once() diff --git a/tests/test_main.py b/tests/test_main.py index c4429d5..cc8f991 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -3,7 +3,6 @@ import boto3 import pytest -from botocore.exceptions import NoCredentialsError, PartialCredentialsError from moto import mock_aws from twitchrce.config import bot_config @@ -43,142 +42,6 @@ def set_environment_variables(monkeypatch): monkeypatch.setenv("VIRUS_TOTAL_API_KEY", "xyz") -@pytest.mark.asyncio -@pytest.mark.filterwarnings("ignore::DeprecationWarning") -async def test_get_app_token(mocker): - from twitchrce.main import get_app_token - - mock_client_credentials_grant_flow_response = { - "access_token": "access_token_xyz789", - "expires_in": 12345, - "token_type": "bearer", - } - - mock_client_credentials_grant_flow = mocker.patch( - "twitchrce.api.twitch.twitch_api_auth.TwitchApiAuth.client_credentials_grant_flow" - ) - mock_client_credentials_grant_flow.return_value = ( - mock_client_credentials_grant_flow_response - ) - - access_token = await get_app_token() - assert access_token == "access_token_xyz789" - mock_client_credentials_grant_flow.assert_awaited_once() - - -@pytest.mark.asyncio -async def test_check_valid_token_is_valid(mocker): - from twitchrce.main import check_valid_token - - mock_validate_token = mocker.patch( - "twitchrce.api.twitch.twitch_api_auth.TwitchApiAuth.validate_token" - ) - mock_validate_token.return_value = True - - mock_user = {} - - is_valid_token = await check_valid_token(user=mock_user) - assert bool(is_valid_token) - mock_validate_token.assert_awaited_once() - - -@mock_aws -def test_check_valid_token_is_invalid(mocker): - from twitchrce.main import check_valid_token - - BOT_CONFIG = bot_config.BotConfig().get_bot_config() - - mock_validate_token = mocker.patch( - "twitchrce.api.twitch.twitch_api_auth.TwitchApiAuth.validate_token" - ) - mock_validate_token.side_effect = [False, True] - - mock_refresh_access_token = mocker.patch( - "twitchrce.api.twitch.twitch_api_auth.TwitchApiAuth.refresh_access_token" - ) - mock_refresh_access_token.return_value = MOCK_REFRESH_ACCESS_TOKEN_RESPONSE_SUCCESS - - mock_dynamodb = boto3.resource( - "dynamodb", region_name=BOT_CONFIG.get("aws").get("region_name") - ) - mock_table_name = "user" - mock_dynamodb.create_table( - TableName=mock_table_name, - KeySchema=[ - {"AttributeName": "id", "KeyType": "HASH"}, # Partition key - ], - AttributeDefinitions=[ - {"AttributeName": "id", "AttributeType": "S"}, # String - ], - ProvisionedThroughput={ - "ReadCapacityUnits": 5, - "WriteCapacityUnits": 5, - }, - ) - mock_table = mock_dynamodb.Table(mock_table_name) - mock_table.put_item( - Item={ - "id": "123456", - "access_token": "access_token_abc123", - "client_id": BOT_CONFIG.get("twitch").get("client_id"), - "expires_in": 12345, - "login": "username_bot", - "refresh_token": "refresh_token_abc123", - } - ) - mock_user_table = mocker.patch("twitchrce.main.user_table") - mock_user_table.return_value = mock_table - - mock_user = {"id": "123", "login": "username_bot"} - - event_loop = asyncio.get_event_loop() - is_valid_token = event_loop.run_until_complete(check_valid_token(user=mock_user)) - assert bool(is_valid_token) - assert mock_validate_token.await_count == 2 - mock_validate_token.assert_awaited() - mock_refresh_access_token.assert_awaited_once() - - -@pytest.mark.asyncio -async def test_refresh_user_token_no_credentials(mocker): - from twitchrce.main import refresh_user_token - - mock_refresh_access_token = mocker.patch( - "twitchrce.api.twitch.twitch_api_auth.TwitchApiAuth.refresh_access_token" - ) - mock_refresh_access_token.return_value = MOCK_REFRESH_ACCESS_TOKEN_RESPONSE_SUCCESS - - mock_user_table_update_item = mocker.patch("twitchrce.main.user_table.update_item") - mock_user_table_update_item.side_effect = ( - NoCredentialsError() - ) # simulate missing AWS credentials - - mock_user = {"id": "123", "login": "username_bot"} - - with pytest.raises(NoCredentialsError): - await refresh_user_token(mock_user) - - -@pytest.mark.asyncio -async def test_refresh_user_token_partial_credentials(mocker): - from twitchrce.main import refresh_user_token - - mock_refresh_access_token = mocker.patch( - "twitchrce.api.twitch.twitch_api_auth.TwitchApiAuth.refresh_access_token" - ) - mock_refresh_access_token.return_value = MOCK_REFRESH_ACCESS_TOKEN_RESPONSE_SUCCESS - - mock_user_table_update_item = mocker.patch("twitchrce.main.user_table.update_item") - mock_user_table_update_item.side_effect = PartialCredentialsError( - provider="mock_provider", cred_var="mock_cred_var" - ) # simulate missing AWS credentials - - mock_user = {"id": "123", "login": "username_bot"} - - with pytest.raises(PartialCredentialsError): - await refresh_user_token(mock_user) - - @mock_aws def test_main(mocker, capfd): from twitchrce.main import setup_bot @@ -277,7 +140,7 @@ def test_setup_bot_if_bot_user_has_no_access_token_should_raise_value_error(mock BOT_CONFIG = bot_config.BotConfig().get_bot_config() - mock_get_app_token = mocker.patch("twitchrce.main.get_app_token") + mock_get_app_token = mocker.patch("twitchrce.utils.utils.Utils.get_app_token") mock_get_app_token.return_value = "access_token_abc123" mock_dynamodb = boto3.resource( @@ -325,7 +188,7 @@ def test_setup_bot_if_db_has_no_bot_user_should_raise_value_error(mocker): BOT_CONFIG = bot_config.BotConfig().get_bot_config() - mock_get_app_token = mocker.patch("twitchrce.main.get_app_token") + mock_get_app_token = mocker.patch("twitchrce.utils.utils.Utils.get_app_token") mock_get_app_token.return_value = "access_token_abc123" mock_dynamodb = boto3.resource( @@ -364,7 +227,7 @@ def test_setup_bot_if_bot_user_has_access_token_but_describe_instances_has_no_re BOT_CONFIG = bot_config.BotConfig().get_bot_config() - mock_get_app_token = mocker.patch("twitchrce.main.get_app_token") + mock_get_app_token = mocker.patch("twitchrce.utils.utils.Utils.get_app_token") mock_get_app_token.return_value = "access_token_abc123" mock_dynamodb = boto3.resource( @@ -408,7 +271,7 @@ def test_setup_bot_if_bot_user_has_access_token_but_describe_instances_has_no_re ), ] - mock_check_valid_token = mocker.patch("twitchrce.main.check_valid_token") + mock_check_valid_token = mocker.patch("twitchrce.utils.utils.Utils.check_valid_token") mock_check_valid_token.return_value = True mock_describe_instances_response = {} diff --git a/tests/utils/test_utils.py b/tests/utils/test_utils.py index d42cbb4..846897c 100644 --- a/tests/utils/test_utils.py +++ b/tests/utils/test_utils.py @@ -3,7 +3,41 @@ from twitchrce.utils.utils import Utils -# Test cases for the redact_secret_string method +@pytest.mark.asyncio +@pytest.mark.filterwarnings("ignore::DeprecationWarning") +async def test_get_app_token(mocker): + mock_client_credentials_grant_flow_response = { + "access_token": "access_token_xyz789", + "expires_in": 12345, + "token_type": "bearer", + } + + mock_client_credentials_grant_flow = mocker.patch( + "twitchrce.api.twitch.twitch_api_auth.TwitchApiAuth.client_credentials_grant_flow" + ) + mock_client_credentials_grant_flow.return_value = ( + mock_client_credentials_grant_flow_response + ) + + access_token = await Utils.get_app_token() + assert access_token == "access_token_xyz789" + mock_client_credentials_grant_flow.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_check_valid_token_is_valid(mocker): + mock_validate_token = mocker.patch( + "twitchrce.api.twitch.twitch_api_auth.TwitchApiAuth.validate_token" + ) + mock_validate_token.return_value = True + + mock_user = {} + + is_valid_token = await Utils().check_valid_token(user=mock_user) + assert bool(is_valid_token) + mock_validate_token.assert_awaited_once() + + @pytest.mark.parametrize( "secret_string, visible_chars, expected", [ diff --git a/twitchrce/__init__.py b/twitchrce/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/twitchrce/api/twitch/twitch_api_auth.py b/twitchrce/api/twitch/twitch_api_auth.py index 85d2246..10bded6 100644 --- a/twitchrce/api/twitch/twitch_api_auth.py +++ b/twitchrce/api/twitch/twitch_api_auth.py @@ -69,7 +69,7 @@ async def obtain_access_token(self, code: str, redirect_uri: str) -> dict: Parameter Required? Type Description client_id Yes String Your app’s registered client ID. client_secret Yes String Your app’s registered client secret. - code Yes String The code that the /authorize response returned in the code query parameter. + code Yes String The code that the /authorize response returned to the code query parameter. grant_type Yes String Must be set to authorization_code. redirect_uri Yes URI Your app’s registered redirect URI. """ @@ -128,6 +128,7 @@ async def refresh_access_token(self, refresh_token: str): status = resp.status data = await resp.json() if status == 400: + from twitchrce.utils.utils import Utils logger.error( f"{Fore.RED}Refresh of user oauth access_token using refresh_token [{Fore.MAGENTA}" f"{Utils.redact_secret_string(refresh_token)}{Fore.RED}] has FAILED!.{Style.RESET_ALL}" diff --git a/twitchrce/main.py b/twitchrce/main.py index 27d8a01..214368b 100644 --- a/twitchrce/main.py +++ b/twitchrce/main.py @@ -4,13 +4,13 @@ import boto3 import nest_asyncio -from botocore.exceptions import NoCredentialsError, PartialCredentialsError from colorama import Fore, Style from twitchio import AuthenticationError from twitchrce.api.twitch.twitch_api_auth import TwitchApiAuth from twitchrce.config import bot_config from twitchrce.custom_bot import CustomBot +from twitchrce.utils.utils import Utils nest_asyncio.apply() @@ -22,65 +22,6 @@ logger = logging.getLogger(__name__) -async def get_app_token() -> str: - """Uses the bots' client id and secret to generate a new application token via client credentials grant flow""" - client_creds_grant_flow = await TwitchApiAuth().client_credentials_grant_flow() - logger.info( - f"{Fore.LIGHTWHITE_EX}Updated {Fore.LIGHTCYAN_EX}app access token{Fore.LIGHTWHITE_EX}!{Style.RESET_ALL}" - ) - return client_creds_grant_flow["access_token"] - - -# TODO: Replace with lambda calls -async def check_valid_token(user: any) -> bool: - """ - Asynchronously checks if a user's access token is valid. If the token is invalid, - attempts to refresh the token and validates it again. - - Args: - user (any): A user object or dictionary containing the user's access token - under the key "access_token". - - Returns: - bool: True if the user's access token is valid after validation or refresh; - False if it remains invalid. - """ - is_valid_token = await TwitchApiAuth().validate_token( - access_token=user.get("access_token") - ) - if not is_valid_token: - access_token = await refresh_user_token(user=user) - is_valid_token = await TwitchApiAuth().validate_token(access_token=access_token) - return is_valid_token - - -# TODO: Replace with lambda calls -async def refresh_user_token(user: any) -> str: - auth_result = await TwitchApiAuth().refresh_access_token( - refresh_token=user.get("refresh_token") - ) - try: - # Insert the item - user_table.update_item( - Key={"id": user.get("id")}, - UpdateExpression="set access_token=:a, refresh_token=:r, expires_in=:e", - ExpressionAttributeValues={ - ":a": auth_result.get("access_token"), - ":r": auth_result.get("refresh_token"), - ":e": auth_result.get("expires_in"), - }, - ReturnValues="UPDATED_NEW", - ) - logger.info( - f"{Fore.LIGHTWHITE_EX}Updated access_token and refresh_token for user {Fore.LIGHTCYAN_EX}{user['login']}" - f"{Fore.LIGHTWHITE_EX}!{Style.RESET_ALL}" - ) - except (NoCredentialsError, PartialCredentialsError) as error: - logger.error("Credentials not available") - raise error - return auth_result.get("access_token") - - """ ██████ ██████ ████████ ██ ███ ██ ██ ████████ ██ ██ ██ ██ ██ ██ ████ ██ ██ ██ @@ -90,13 +31,18 @@ async def refresh_user_token(user: any) -> str: Start the pubsub client for the Twitch channel """ -bot_config = bot_config.BotConfig() +config = bot_config.BotConfig() +region_name = config.get_bot_config().get("aws").get("region_name") + +# Database dynamodb = boto3.resource( - "dynamodb", region_name=bot_config.get_bot_config().get("aws").get("region_name") + "dynamodb", region_name=region_name ) user_table = dynamodb.Table("MSecBot_User") + +# Worker ec2 = boto3.client( - "ec2", region_name=bot_config.get_bot_config().get("aws").get("region_name") + "ec2", region_name=region_name ) @@ -104,7 +50,8 @@ async def setup_bot() -> CustomBot: splash = { "name": "TwitchRCE", "version": "v1.0.0", - "description": f"{Fore.LIGHTWHITE_EX}TwitchRCE is an advanced bot for interacting with Twitch's PubSub, EventSub and API services.{Style.RESET_ALL}", + "description": f"""{Fore.LIGHTWHITE_EX}TwitchRCE is an advanced bot for interacting with Twitch's PubSub, + EventSub and API services.{Style.RESET_ALL}""", "project_url": f"{Fore.LIGHTWHITE_EX}Project URL: https://github.com/minimike86/TwitchRCE{Style.RESET_ALL}", "copyright": f"{Fore.LIGHTWHITE_EX}Copyright (c) 2024 MSec @minimike86.{Style.RESET_ALL}", } @@ -128,7 +75,7 @@ async def setup_bot() -> CustomBot: {splash['copyright']} {Fore.RED} THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE - WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS + WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. @@ -141,7 +88,7 @@ async def setup_bot() -> CustomBot: asyncio.set_event_loop(loop) # fetch bot app token - app_token = loop.run_until_complete(get_app_token()) + _app_token = loop.run_until_complete(Utils().get_app_token()) scope = ( "user:read:chat user:write:chat moderator:read:suspicious_users moderator:read:chatters " @@ -163,20 +110,20 @@ async def setup_bot() -> CustomBot: "analytics:read:games analytics:read:extensions" ) api_gateway_invoke_url = ( - bot_config.get_bot_config() + config.get_bot_config() .get("aws") .get("api_gateway") .get("api_gateway_invoke_url") ) api_gateway_route = ( - bot_config.get_bot_config() + config.get_bot_config() .get("aws") .get("api_gateway") .get("api_gateway_route") ) redirect_uri = f"{api_gateway_invoke_url}{api_gateway_route}" authorization_url = ( - f"https://id.twitch.tv/oauth2/authorize?client_id={bot_config.get_bot_config().get('twitch').get('client_id')}" + f"https://id.twitch.tv/oauth2/authorize?client_id={config.get_bot_config().get('twitch').get('client_id')}" f"&force_verify=true" f"&redirect_uri={redirect_uri}" f"&response_type=code" @@ -185,22 +132,22 @@ async def setup_bot() -> CustomBot: ) # fetch bot user token (refresh it if needed) - bot_user = None + _bot_user = None try: response = user_table.get_item( Key={ "id": int( - bot_config.get_bot_config() + config.get_bot_config() .get("twitch") .get("bot_auth") .get("bot_user_id") ) } ) - bot_user = response.get("Item") + _bot_user = response.get("Item") # the bot user has no twitch access token stored in db so can't use chat programmatically - if not bot_user.get("access_token"): + if not _bot_user.get("access_token"): # Send URL to stdout allows the user to grant the oauth flow and store an access token in the db # TODO: Deduplicate code logger.error( @@ -217,9 +164,9 @@ async def setup_bot() -> CustomBot: else: # the bot user has a twitch access token stored in db so check its actually valid else refresh it - is_valid = loop.run_until_complete(check_valid_token(user=bot_user)) + is_valid = loop.run_until_complete(Utils().check_valid_token(user=_bot_user)) if is_valid: - bot_config.BOT_OAUTH_TOKEN = bot_user.get("access_token") + config.BOT_OAUTH_TOKEN = _bot_user.get("access_token") except AttributeError: # database doesn't have an item for the bot_user_id provided @@ -227,7 +174,8 @@ async def setup_bot() -> CustomBot: # Send URL to stdout allows the user to grant the oauth flow and store an access token in the db # TODO: Deduplicate code logger.error( - f"{Fore.CYAN}Failed to get bot user object for {Fore.MAGENTA}{bot_config.get_bot_config().get('twitch').get('bot_user_id')}{Fore.CYAN}!" + f"{Fore.CYAN}Failed to get bot user object for " + f"{Fore.MAGENTA}{config.get_bot_config().get('twitch').get('bot_user_id')}{Fore.CYAN}!" f"{Style.RESET_ALL}" ) logger.info( @@ -246,18 +194,18 @@ async def setup_bot() -> CustomBot: public_url = None # Create a bot from your twitchapi client credentials - bot = CustomBot(bot_config) + custom_bot = CustomBot(config) # Start the pubsub client for the Twitch channel - if bot_config.get_bot_config().get("bot_features").get("enable_psclient"): - bot.loop.run_until_complete(bot.__psclient_init__()) + if config.get_bot_config().get("bot_features").get("enable_psclient"): + custom_bot.loop.run_until_complete(custom_bot.__psclient_init__()) # Start the eventsub client for the Twitch channel - if bot_config.get_bot_config().get("bot_features").get("enable_esclient"): + if config.get_bot_config().get("bot_features").get("enable_esclient"): if public_url: - bot.loop.run_until_complete(bot.__esclient_init__()) + custom_bot.loop.run_until_complete(custom_bot.__esclient_init__()) - return bot + return custom_bot if __name__ == "__main__": diff --git a/twitchrce/utils/utils.py b/twitchrce/utils/utils.py index bf4134d..5f1d6cd 100644 --- a/twitchrce/utils/utils.py +++ b/twitchrce/utils/utils.py @@ -1,5 +1,91 @@ +import logging + +import boto3 +from botocore.exceptions import NoCredentialsError, PartialCredentialsError +from colorama import Fore, Style +from moto.dynamodb.exceptions import ResourceNotFoundException + +from twitchrce.config import bot_config + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s | %(levelname)-8s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + class Utils: + @staticmethod + async def get_app_token() -> str: + """Uses the bots' client id and secret to generate a new application token via client credentials grant flow""" + from twitchrce.api.twitch.twitch_api_auth import TwitchApiAuth + client_creds_grant_flow = await TwitchApiAuth().client_credentials_grant_flow() + logger.info( + f"{Fore.LIGHTWHITE_EX}Updated {Fore.LIGHTCYAN_EX}app access token{Fore.LIGHTWHITE_EX}!{Style.RESET_ALL}" + ) + return client_creds_grant_flow["access_token"] + + @staticmethod + async def refresh_user_token(user: any) -> str: + # TODO: Replace with lambda calls + from twitchrce.api.twitch.twitch_api_auth import TwitchApiAuth + auth_result = await TwitchApiAuth().refresh_access_token( + refresh_token=user.get("refresh_token") + ) + try: + config = bot_config.BotConfig() + region_name = config.get_bot_config().get("aws").get("region_name") + dynamodb = boto3.resource( + "dynamodb", region_name=region_name + ) + user_table = dynamodb.Table("MSecBot_User") + user_table.update_item( + Key={"id": user.get("id")}, + UpdateExpression="set access_token=:a, refresh_token=:r, expires_in=:e", + ExpressionAttributeValues={ + ":a": auth_result.get("access_token"), + ":r": auth_result.get("refresh_token"), + ":e": auth_result.get("expires_in"), + }, + ReturnValues="UPDATED_NEW", + ) + logger.info( + f"{Fore.LIGHTWHITE_EX}Updated access_token and refresh_token for user {Fore.LIGHTCYAN_EX}{user['login']}" + f"{Fore.LIGHTWHITE_EX}!{Style.RESET_ALL}" + ) + except ResourceNotFoundException as resource_error: + logger.error("ResourceNotFoundException", resource_error) + raise resource_error + except (NoCredentialsError, PartialCredentialsError) as credential_error: + logger.error("Credentials Error", credential_error) + raise credential_error + return auth_result.get("access_token") + + async def check_valid_token(self, user: any) -> bool: + # TODO: Replace with lambda calls + """ + Asynchronously checks if a user's access token is valid. If the token is invalid, + attempts to refresh the token and validates it again. + + Args: + user (any): A user object or dictionary containing the user's access token + under the key "access_token". + + Returns: + bool: True if the user's access token is valid after validation or refresh; + False if it remains invalid. + """ + from twitchrce.api.twitch.twitch_api_auth import TwitchApiAuth + is_valid_token = await TwitchApiAuth().validate_token( + access_token=user.get("access_token") + ) + if not is_valid_token: + access_token = await self.refresh_user_token(user=user) + is_valid_token = await TwitchApiAuth().validate_token(access_token=access_token) + return is_valid_token + @staticmethod def redact_secret_string(secret_string: str, visible_chars: int = 4) -> str: """