From 743ce09312dba778dea8130659f18732385d79bc Mon Sep 17 00:00:00 2001 From: MSec Date: Sun, 3 Nov 2024 03:46:52 +0000 Subject: [PATCH] updated cogs to use config and fixed access to restricted attribute uses of bot._http --- twitchrce/api/twitch/twitch_api_auth.py | 5 +- twitchrce/cogs/rce.py | 144 ++++++++++++------------ twitchrce/cogs/user_cog.py | 14 +-- twitchrce/cogs/vip.py | 101 +++++++---------- twitchrce/custom_bot.py | 136 +++++++++++----------- twitchrce/esclient.py | 14 +-- twitchrce/main.py | 18 +-- twitchrce/psclient.py | 78 +++++++++++-- twitchrce/utils/utils.py | 3 +- 9 files changed, 274 insertions(+), 239 deletions(-) diff --git a/twitchrce/api/twitch/twitch_api_auth.py b/twitchrce/api/twitch/twitch_api_auth.py index 7b45e66..2159098 100644 --- a/twitchrce/api/twitch/twitch_api_auth.py +++ b/twitchrce/api/twitch/twitch_api_auth.py @@ -174,11 +174,12 @@ async def validate_token(access_token: str) -> bool: return True async def get_users( - self, access_token: str, ids=Optional[int], logins=Optional[str] + self, access_token: str, _ids=Optional[int], _logins=Optional[str] ): """ https://dev.twitch.tv/docs/authentication/validate-tokens/ - WARNING Twitch periodically conducts audits to discover applications that are not validating access tokens hourly as required. + WARNING Twitch periodically conducts audits to discover applications that are not validating access tokens + hourly as required. """ url = "https://api.twitch.tv/helix/users" headers = { diff --git a/twitchrce/cogs/rce.py b/twitchrce/cogs/rce.py index 1fd371b..78fb5ef 100644 --- a/twitchrce/cogs/rce.py +++ b/twitchrce/cogs/rce.py @@ -1,3 +1,4 @@ +import logging import re import shlex import subprocess @@ -8,53 +9,58 @@ import twitchio from colorama import Fore, Style -from twitchio import errors +from custom_bot import CustomBot +from twitchio import User, errors, PartialUser, PartialChatter, Chatter from twitchio.ext import commands, pubsub -from twitchrce import custom_bot -from twitchrce.config import bot_config +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s | %(levelname)-8s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) class RCECog(commands.Cog): - def __init__(self, bot: custom_bot.CustomBot): + def __init__(self, bot: CustomBot): self.bot = bot @commands.Cog.event() async def event_message(self, message: twitchio.Message): if message.echo: return - # print('RCECog: ', message.author.name, message.content) + logger.info("RCECog: ", message.author.name, message.content) @commands.command(aliases=["cmd"]) async def exec(self, ctx: commands.Context): # get channel broadcaster - broadcaster = await self.bot._http.get_users(ids=[], logins=[ctx.channel.name]) - user_access_token_resultset = self.bot.database.fetch_user_access_token( - broadcaster_id=self.bot.user_id - ) + broadcaster: User = ( + await self.bot.fetch_users(ids=[], logins=[ctx.channel.name]) + )[0] if ctx.message.content == "!exec --help": await ctx.send( """exec: !exec [whatever /bin/bash commands you want to mess with the streamer]: - This will run (mostly) un-sanitised bash commands on the streamers machine. rm -rf for the win.""" + This will run (mostly) un-sanitised bash commands on the streamers machine. rm -rf for the win.""" ) - # only broadcaster can run exec commands - # TODO: allow mods to run exec commands - elif ( - int(ctx.author.id) == int(broadcaster[0]["id"]) - or int(ctx.author.id) == 125444292 - ): + # only broadcaster or bot can run exec commands + elif ctx.author.id == broadcaster.id or ctx.author.id == self.bot.bot_user.id: # grab the arbitrary bash command(s) without the bot prefix + cmd = None if ctx.message.content[:5] == "!exec": cmd = re.sub( - rf"^{self.bot._prefix}{ctx.command.name}", "", ctx.message.content + rf"^{self.bot.get_prefix(message=ctx.message)}{ctx.command.name}", + "", + ctx.message.content, ).strip() else: for alias in ctx.command.aliases: cmd = re.sub( - rf"^{self.bot._prefix}{alias}", "", ctx.message.content + rf"^{self.bot.get_prefix(message=ctx.message)}{alias}", + "", + ctx.message.content, ).strip() # strip operators @@ -73,7 +79,10 @@ async def exec(self, ctx: commands.Context): command1 = shlex.split(cmd1) command2 = shlex.split(cmd2) pass - if command1[0] in settings.CMD_ALLOW_LIST and command2[0] == "grep": + if ( + command1[0] in self.bot.config.CMD_ALLOW_LIST + and command2[0] == "grep" + ): proc1 = Popen( command1[:4], shell=False, @@ -93,7 +102,7 @@ async def exec(self, ctx: commands.Context): """if input has no pipes run the command""" command = shlex.split(cmd) pass - if command[0] in settings.CMD_ALLOW_LIST: + if command[0] in self.bot.config.CMD_ALLOW_LIST: proc = Popen( command, shell=False, stdin=PIPE, stdout=PIPE, stderr=PIPE ) @@ -106,10 +115,9 @@ async def exec(self, ctx: commands.Context): """post the stdout to chat""" stdout = f"stdout: {stdout.decode()}" try: - await self.bot._http.post_chat_announcement( - token=user_access_token_resultset["access_token"], - broadcaster_id=str(broadcaster[0]["id"]), - moderator_id=self.bot.user_id, + await self.bot.post_chat_announcement( + broadcaster=broadcaster, + moderator=self.bot.bot_user, message=f"{textwrap.shorten(stdout, width=500)}", color="green", ) @@ -119,10 +127,9 @@ async def exec(self, ctx: commands.Context): """post the stderr to chat""" stderr = f"stderr: {stderr.decode()}" try: - await self.bot._http.post_chat_announcement( - token=user_access_token_resultset["access_token"], - broadcaster_id=str(broadcaster[0]["id"]), - moderator_id=self.bot.user_id, + await self.bot.post_chat_announcement( + broadcaster=broadcaster, + moderator=self.bot.bot_user, message=f"{textwrap.shorten(stderr, width=500)}", color="orange", ) @@ -131,12 +138,15 @@ async def exec(self, ctx: commands.Context): else: """post message to chat informing they tried to run a command that wasn't in the allow list""" - error_msg = f"Nice try {ctx.author.display_name} but the command(s) in `{cmd}` are not in the allow list!" + # noinspection DuplicatedCode + error_msg = ( + f"Nice try {ctx.author.display_name} but the command(s) in '{cmd}' " + f"are not in the allow list!" + ) try: - await self.bot._http.post_chat_announcement( - token=user_access_token_resultset["access_token"], - broadcaster_id=str(broadcaster[0]["id"]), - moderator_id=self.bot.user_id, + await self.bot.post_chat_announcement( + broadcaster=broadcaster, + moderator=self.bot.bot_user, message=f"{textwrap.shorten(error_msg, width=500)}", color="orange", ) @@ -147,12 +157,15 @@ async def exec(self, ctx: commands.Context): await ctx.channel.send("TimeoutError occurred") except subprocess.TimeoutExpired: """post message to chat informing they tried to run a command that took too long to run""" - error_msg = f"Nice try {ctx.author.display_name} but the command(s) in `{cmd}` took too long to run!" + # noinspection DuplicatedCode + error_msg = ( + f"Nice try {ctx.author.display_name} but the command(s) in '{cmd}' " + f"took too long to run!" + ) try: - await self.bot._http.post_chat_announcement( - token=user_access_token_resultset["access_token"], - broadcaster_id=str(broadcaster[0]["id"]), - moderator_id=self.bot.user_id, + await self.bot.post_chat_announcement( + broadcaster=broadcaster, + moderator=self.bot.bot_user, message=f"{textwrap.shorten(error_msg, width=500)}", color="orange", ) @@ -167,25 +180,12 @@ async def exec(self, ctx: commands.Context): except RuntimeError: await ctx.channel.send("An exception occurred") - async def killmyshell( + async def kill_my_shell( self, - broadcaster_id: int, - author_login: str, + broadcaster: User | PartialUser, + chatter: Chatter | PartialChatter, event: pubsub.PubSubChannelPointsMessage, ): - # get channel broadcaster - broadcaster = await self.bot._http.get_users( - ids=[str(broadcaster_id)], logins=[] - ) - broadcaster_access_token_resultset = self.bot.database.fetch_user_access_token( - broadcaster_id=broadcaster[0]["id"] - ) - broadcaster_access_token = broadcaster_access_token_resultset["access_token"] - mod_access_token_resultset = self.bot.database.fetch_user_access_token( - broadcaster_id=self.bot.user_id - ) - mod_access_token = mod_access_token_resultset["access_token"] - cmd1 = "echo $(xwininfo -tree -root | grep qterminal | head -n 1)" proc_id = ( subprocess.check_output(cmd1, shell=True).decode().split(" ")[0].strip() @@ -195,22 +195,20 @@ async def killmyshell( cmd2 = f"xkill -id {proc_id}" result = subprocess.check_output(cmd2, shell=True) result = ( - f"{Fore.RED}{author_login} just killed {broadcaster[0]['display_name']}'s shell. " + f"{Fore.RED}{chatter.display_name} just killed {broadcaster.name}'s shell. " + f"{Fore.MAGENTA}stdout: {result.decode()}{Style.RESET_ALL}" ) try: - await self.bot._http.update_reward_redemption_status( - token=broadcaster_access_token, - broadcaster_id=str(broadcaster[0]["id"]), + await self.bot.update_reward_redemption_status( + broadcaster=broadcaster, reward_id=event.id, custom_reward_id=event.reward.id, status=True, ) print(f"{textwrap.shorten(result, width=500)}") - await self.bot._http.post_chat_announcement( - token=mod_access_token, - broadcaster_id=str(broadcaster[0]["id"]), - moderator_id=self.bot.user_id, + await self.bot.post_chat_announcement( + broadcaster=broadcaster, + moderator=self.bot.bot_user, message=f"{textwrap.shorten(result, width=500)}", color="green", ) @@ -224,24 +222,26 @@ async def killmyshell( ) else: try: - await self.bot._http.update_reward_redemption_status( - token=broadcaster_access_token, - broadcaster_id=str(broadcaster[0]["id"]), + await self.bot.update_reward_redemption_status( + broadcaster=broadcaster, reward_id=event.id, custom_reward_id=event.reward.id, status=False, ) print( - f"{Fore.RED}Unlucky {author_login} but there are no terminals open to kill{Style.RESET_ALL}" + f"{Fore.RED}Unlucky {chatter.display_name} but there are no terminals open to kill{Style.RESET_ALL}" + ) + message = ( + f"Unlucky {chatter.display_name} there's no terminals open to kill so your channel points have " + f"been refunded" ) - await self.bot._http.post_chat_announcement( - token=mod_access_token, - broadcaster_id=str(broadcaster[0]["id"]), - moderator_id=self.bot.user_id, - message=f"Unlucky {author_login} there are no terminals open to kill; your channel points have been refunded", + await self.bot.post_chat_announcement( + broadcaster=broadcaster, + moderator=self.bot.bot_user, + message=message, color="orange", ) except errors.AuthenticationError: print( - f"{Fore.RED}Unlucky {author_login} but there are no terminals open to kill{Style.RESET_ALL}" + f"{Fore.RED}Unlucky {chatter.display_name} but there are no terminals open to kill{Style.RESET_ALL}" ) diff --git a/twitchrce/cogs/user_cog.py b/twitchrce/cogs/user_cog.py index 809f555..2f5ed97 100644 --- a/twitchrce/cogs/user_cog.py +++ b/twitchrce/cogs/user_cog.py @@ -21,48 +21,48 @@ async def event_message(self, message: twitchio.Message): async def stairsthetrashman1(self, ctx: commands.Context): """type !stairsthetrashman or !ohlook""" if ctx.author.display_name.lower() in ["stairsthetrashman", "msec"]: - sound_file = "/home/kali/Music/ohlook.mp3" + _sound_file = "/home/kali/Music/ohlook.mp3" # data, samplerate = sf.read(sound_file) # self.bot.sd.play(data, samplerate) @commands.command(aliases=["stairs2", "because"]) async def stairsthetrashman2(self, ctx: commands.Context): if ctx.author.display_name.lower() in ["stairsthetrashman", "msec"]: - sound_file = "/home/kali/Music/Because_Im_mexican.mp3" + _sound_file = "/home/kali/Music/Because_Im_mexican.mp3" # data, samplerate = sf.read(sound_file) # self.bot.sd.play(data, samplerate) @commands.command(aliases=["stairs3", "sonofagun"]) async def stairsthetrashman3(self, ctx: commands.Context): if ctx.author.display_name.lower() in ["stairsthetrashman", "msec"]: - sound_file = "/home/kali/Music/yousonofagun.mp3" + _sound_file = "/home/kali/Music/yousonofagun.mp3" # data, samplerate = sf.read(sound_file) # self.bot.sd.play(data, samplerate) @commands.command(aliases=["alh4zr3d", "alhashes"]) async def alh4zr3d1(self, ctx: commands.Context): if ctx.author.display_name.lower() in ["alh4zr3d", "msec"]: - sound_file = "/home/kali/Music/al-fully-erect.mp3" + _sound_file = "/home/kali/Music/al-fully-erect.mp3" # data, samplerate = sf.read(sound_file) # self.bot.sd.play(data, samplerate) @commands.command(aliases=["tibs", "0xtib3rius", "sofgood"]) async def tibs1(self, ctx: commands.Context): if ctx.author.display_name.lower() in ["0xtib3rius", "msec"]: - sound_file = "/home/kali/Music/tibs1_sofgood.mp3" + _sound_file = "/home/kali/Music/tibs1_sofgood.mp3" # data, samplerate = sf.read(sound_file) # self.bot.sd.play(data, samplerate) @commands.command(aliases=["trshpuppy", "sbacunt"]) async def trshpuppy1(self, ctx: commands.Context): if ctx.author.display_name.lower() in ["trshpuppy", "msec"]: - sound_file = "/home/kali/Music/trashpuppy_stop-being-a-cunt.mp3" + _sound_file = "/home/kali/Music/trashpuppy_stop-being-a-cunt.mp3" # data, samplerate = sf.read(sound_file) # self.bot.sd.play(data, samplerate) @commands.command(aliases=["fcamerayo"]) async def trshpuppy2(self, ctx: commands.Context): if ctx.author.display_name.lower() in ["trshpuppy", "msec"]: - sound_file = "/home/kali/Music/trashpuppy_fucking-camera-yo.mp3" + _sound_file = "/home/kali/Music/trashpuppy_fucking-camera-yo.mp3" # data, samplerate = sf.read(sound_file) # self.bot.sd.play(data, samplerate) diff --git a/twitchrce/cogs/vip.py b/twitchrce/cogs/vip.py index a6ac372..41ee641 100644 --- a/twitchrce/cogs/vip.py +++ b/twitchrce/cogs/vip.py @@ -1,12 +1,22 @@ +import logging + import twitchio +from twitchio import PartialUser, User, Chatter, PartialChatter from twitchio.ext import commands, pubsub -from twitchrce import custom_bot +from custom_bot import CustomBot + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s | %(levelname)-8s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) class VIPCog(commands.Cog): - def __init__(self, bot: custom_bot.CustomBot): + def __init__(self, bot: CustomBot): self.bot = bot @commands.Cog.event() @@ -17,88 +27,55 @@ async def event_message(self, message: twitchio.Message): async def add_channel_vip( self, - channel_id: int, - author_id: str, - author_login: str, + broadcaster: User | PartialUser, + chatter: Chatter | PartialChatter, event: pubsub.PubSubChannelPointsMessage, ): - broadcaster = await self.bot._http.get_users(ids=[channel_id], logins=[]) - broadcaster_access_token_resultset = self.bot.database.fetch_user_access_token( - broadcaster_id=broadcaster[0]["id"] - ) - broadcaster_access_token: str = broadcaster_access_token_resultset[ - "access_token" - ] - mod_access_token_resultset = self.bot.database.fetch_user_access_token( - broadcaster_id=self.bot.user_id - ) - mod_access_token: str = mod_access_token_resultset["access_token"] - - # Get list of channel mods - mods = await self.bot._http.get_channel_moderators( - token=broadcaster_access_token, broadcaster_id=str(broadcaster[0]["id"]) - ) - mods_user_ids = [str(mod["user_id"]) for mod in mods] - - # Get list of channel vips - vips = await self.bot._http.get_channel_vips( - token=broadcaster_access_token, broadcaster_id=str(broadcaster[0]["id"]) - ) - vips_user_ids = [str(vip["user_id"]) for vip in vips] - - if author_id in mods_user_ids: + if chatter.is_mod: """Check if the redeemer is already a moderator and abort""" - print(f"{author_login} is already a MOD") - await self.bot._http.update_reward_redemption_status( - token=broadcaster_access_token, - broadcaster_id=str(broadcaster[0]["id"]), + logger.info(f"{chatter.display_name} is already a MOD") + await self.bot.update_reward_redemption_status( + broadcaster=broadcaster, reward_id=event.id, custom_reward_id=event.reward.id, status=False, ) - await self.bot._http.post_chat_announcement( - token=mod_access_token, - broadcaster_id=str(broadcaster[0]["id"]), - moderator_id=self.bot.user_id, - message=f"{author_login} is already a MOD; your channel points have been refunded", + await self.bot.post_chat_announcement( + broadcaster=broadcaster, + moderator=self.bot.bot_user, + message=f"{chatter.display_name} is already a MOD your channel points have been refunded", color="orange", ) - elif author_id in vips_user_ids: + + elif chatter.is_vip: """Check if the redeemer is already a VIP and abort""" - print(f"{author_login} is already a VIP") - await self.bot._http.update_reward_redemption_status( - token=broadcaster_access_token, - broadcaster_id=str(broadcaster[0]["id"]), + logger.info(f"{chatter.display_name} is already a VIP") + await self.bot.update_reward_redemption_status( + broadcaster=broadcaster, reward_id=event.id, custom_reward_id=event.reward.id, status=False, ) - await self.bot._http.post_chat_announcement( - token=mod_access_token, - broadcaster_id=str(broadcaster[0]["id"]), - moderator_id=self.bot.user_id, - message=f"{author_login} is already a VIP; your channel points have been refunded", + await self.bot.post_chat_announcement( + broadcaster=broadcaster, + moderator=self.bot.bot_user, + message=f"{chatter.display_name} is already a VIP your channel points have been refunded", color="orange", ) else: """Add redeemer as a VIP, and auto-fulfill the redemption""" - await self.bot._http.post_channel_vip( - token=broadcaster_access_token, - broadcaster_id=str(broadcaster[0]["id"]), - user_id=author_id, - ) - await self.bot._http.update_reward_redemption_status( - token=broadcaster_access_token, - broadcaster_id=str(broadcaster[0]["id"]), + broadcaster_token = "broadcaster_token" # TODO: Get actual broadcaster_token from database + await broadcaster.add_channel_vip(token=broadcaster_token, user_id=int(chatter.id)) + await self.bot.update_reward_redemption_status( + broadcaster=broadcaster, reward_id=event.id, custom_reward_id=event.reward.id, status=True, ) - await self.bot._http.post_chat_announcement( - token=mod_access_token, - broadcaster_id=str(broadcaster[0]["id"]), - moderator_id=self.bot.user_id, - message=f"Welcome {author_login} to the VIP family!", + await self.bot.post_chat_announcement( + broadcaster=broadcaster, + moderator=self.bot.bot_user, + message=f"Welcome {chatter.display_name} to the VIP family!", color="green", ) diff --git a/twitchrce/custom_bot.py b/twitchrce/custom_bot.py index dd1be5a..1ad37fe 100644 --- a/twitchrce/custom_bot.py +++ b/twitchrce/custom_bot.py @@ -4,23 +4,26 @@ import re from typing import List, Optional, Union +import boto3 import twitchio +from botocore.exceptions import ClientError from colorama import Fore, Style -from twitchio import PartialUser, User, Unauthorized, HTTPException -from twitchio.ext import commands, eventsub, pubsub +from twitchio import HTTPException, PartialUser, Unauthorized, User +from twitchio.ext import commands, eventsub from twitchio.ext.eventsub import ( + ChannelCharityDonationData, ChannelRaidData, ChannelSubscribeData, ChannelSubscriptionGiftData, - StreamOnlineData, StreamOfflineData, - ChannelCharityDonationData, + StreamOnlineData, ) +from utils.utils import Utils + from twitchrce.api.virustotal.virus_total_api import VirusTotalApiClient from twitchrce.config.bot_config import BotConfig from twitchrce.esclient import CustomEventSubClient from twitchrce.psclient import CustomPubSubClient -from utils.utils import Utils logging.basicConfig( level=logging.DEBUG, @@ -58,19 +61,34 @@ def __init__(self, config: BotConfig): if self.config.get_bot_config().get("bot_features").get("enable_psclient"): for channel in self.config.BOT_INITIAL_CHANNELS: self.ps_client = CustomPubSubClient( + bot=self, users_channel_id=int(channel.get("id")), bot_oauth_token=self.bot_oauth_token, ) if self.config.get_bot_config().get("bot_features").get("enable_esclient"): - eventsub_public_url = None # TODO: Get EC2 Instance Public DNS URL - if eventsub_public_url: - self.es_client: CustomEventSubClient = CustomEventSubClient( - client=self, - webhook_secret="some_secret_string", - callback_route=f"{eventsub_public_url}", - bot_oauth_token=self.bot_oauth_token, - ) + eventsub_public_url = None + try: + region_name = self.config.get_bot_config().get("aws").get("region_name") + ec2 = boto3.client("ec2", region_name=region_name) + # TODO: Don't hardcode InstanceIds + response = ec2.describe_instances(InstanceIds=["i-0100638f13e5451d8"]) + if response.get("Reservations"): + public_dns_name = ( + response.get("Reservations")[0] + .get("Instances")[0] + .get("PublicDnsName") + ) + eventsub_public_url = f"https://{public_dns_name}" + if eventsub_public_url: + self.es_client: CustomEventSubClient = CustomEventSubClient( + client=self, + webhook_secret="some_secret_string", + callback_route=f"{eventsub_public_url}", + bot_oauth_token=self.bot_oauth_token, + ) + except ClientError as client_error: + logger.error(msg=client_error) # TODO: Make persistent self.death_count = {} @@ -85,7 +103,7 @@ def __init__(self, config: BotConfig): ): from twitchrce.cogs.rce import RCECog - self.add_cog(RCECog(self)) + self.add_cog(RCECog(bot=self)) if ( self.config.get_bot_config() @@ -96,7 +114,7 @@ def __init__(self, config: BotConfig): ): from twitchrce.cogs.vip import VIPCog - self.add_cog(VIPCog(self)) + self.add_cog(VIPCog(bot=self)) @self.event() async def event_error(error: Exception, data: Optional[str] = None): @@ -154,37 +172,6 @@ async def event_part(user): ) pass - @self.event() - async def event_pubsub_channel_points(event: pubsub.PubSubChannelPointsMessage): - # Log redemption request - reward: CustomReward, user: PartialUser - logger.info( - f"{Fore.RED}[PubSub][ChannelPoints]: {event.reward.id}, {event.reward.title}, {event.reward.cost} | " - f"User: {event.user.id}, {event.user.name}{Style.RESET_ALL}" - ) - - # Check if reward can be redeemed at this time - if not event.reward.paused and event.reward.enabled: - """We have to check redemption names as id's are randomly allocated when redemption is added""" - - if event.reward.title == "Kill My Shell": - # noinspection PyTypeChecker - rce_cog: RCECog = self.cogs["RCECog"] - await rce_cog.killmyshell( - broadcaster_id=event.channel_id, - author_login=event.user.name, - event=event, - ) - - if event.reward.title == "VIP": - # noinspection PyTypeChecker - vip_cog: VIPCog = self.cogs["VIPCog"] - await vip_cog.add_channel_vip( - channel_id=event.channel_id, - author_id=event.user.id, - author_login=event.user.name, - event=event, - ) - @self.event() async def event_eventsub_notification_followV2( payload: eventsub.NotificationEvent, @@ -784,7 +771,9 @@ async def event_message(self, message: twitchio.Message): """Handle commands overriding the default `event_message`.""" await self.handle_commands(message) - async def add_kill_my_shell_redemption_reward(self, broadcaster: PartialUser): + async def add_kill_my_shell_redemption_reward( + self, broadcaster: User | PartialUser + ): """ Adds channel point redemption that immediately closes the last terminal window that was opened without warning """ @@ -807,7 +796,7 @@ async def add_kill_my_shell_redemption_reward(self, broadcaster: PartialUser): f"{Fore.RED}Added {Fore.MAGENTA}'Kill My Shell'{Fore.RED} channel point redemption.{Style.RESET_ALL}" ) - async def add_vip_auto_redemption_reward(self, broadcaster: PartialUser): + async def add_vip_auto_redemption_reward(self, broadcaster: User | PartialUser): """Adds channel point redemption that adds the user to the VIP list automatically""" vips = await self._http.get_channel_vips( token=self.config.BOT_OAUTH_TOKEN, broadcaster_id=broadcaster.id, first=100 @@ -831,7 +820,7 @@ async def add_vip_auto_redemption_reward(self, broadcaster: PartialUser): f"{Fore.RED}Added {Fore.MAGENTA}'VIP'{Fore.RED} channel point redemption.{Style.RESET_ALL}" ) - async def delete_all_custom_rewards(self, broadcaster: PartialUser): + async def delete_all_custom_rewards(self, broadcaster: User | PartialUser): """deletes all custom rewards (API limits deletes to those created by the bot) Requires a user access token that includes the channel:manage:redemptions scope. """ @@ -858,10 +847,25 @@ async def delete_all_custom_rewards(self, broadcaster: PartialUser): f"[{Fore.MAGENTA}title={reward['title']}{Fore.RED}]{Style.RESET_ALL}" ) + async def update_reward_redemption_status( + self, + broadcaster: User | PartialUser, + reward_id: str, + custom_reward_id: str, + status: bool, + ) -> None: + self._http.update_reward_redemption_status( + token=self.bot_oauth_token, + broadcaster_id=broadcaster.id, + reward_id=reward_id, + custom_reward_id=custom_reward_id, + status=status, + ) + async def announce_shoutout( self, ctx: Optional[commands.Context], - broadcaster: PartialUser, + broadcaster: User | PartialUser, channel: any, color: str, ): @@ -889,9 +893,9 @@ async def announce_shoutout( error_count = 0 try: await self.post_chat_announcement( - broadcaster_id=broadcaster.id, + broadcaster=broadcaster, message="".join(message), - moderator_id=self.bot_user.id, # Moderator ID must match the user ID in the user access token. + moderator=self.bot_user, color=color, ) @@ -932,24 +936,24 @@ async def announce_shoutout( async def post_chat_announcement( self, - broadcaster_id: str, - moderator_id: str, + broadcaster: User | PartialUser, + moderator: User | PartialUser, message: str, color: str, ): """Post a shoutout announcement to chat; color = blue, green, orange, purple, or primary""" logger.info( f"{Fore.LIGHTWHITE_EX}Trying to send chat announcement as " - f"Broadcaster ID: [{Fore.LIGHTRED_EX}{broadcaster_id}{Fore.LIGHTWHITE_EX}], " - f"Moderator ID: [{Fore.LIGHTGREEN_EX}{moderator_id}{Fore.LIGHTWHITE_EX}], " + f"Broadcaster ID: [{Fore.LIGHTRED_EX}{broadcaster.id}{Fore.LIGHTWHITE_EX}], " + f"Moderator ID: [{Fore.LIGHTGREEN_EX}{moderator.id}{Fore.LIGHTWHITE_EX}], " f"and using token: [{Fore.LIGHTMAGENTA_EX}OAuth {Utils().redact_secret_string(self.bot_oauth_token)}" f"{Fore.LIGHTWHITE_EX}].{Style.RESET_ALL}" ) try: await self._http.post_chat_announcement( token=self.bot_oauth_token, - broadcaster_id=broadcaster_id, - moderator_id=moderator_id, + broadcaster_id=broadcaster.id, + moderator_id=moderator.id, message=message, color=color, ) @@ -972,28 +976,28 @@ async def send_shoutout( """ @commands.command(aliases=["enablesounds"]) - async def soundson(self, ctx: commands.Context): + async def sounds_on(self, ctx: commands.Context): from cogs.sounds_cog import SoundsCog self.add_cog(SoundsCog(self)) await ctx.send(f"Sound Commands Enabled!") @commands.command(aliases=["disablesounds"]) - async def soundsoff(self, ctx: commands.Context): + async def sounds_off(self, ctx: commands.Context): from cogs.sounds_cog import SoundsCog self.remove_cog(SoundsCog(self).name) await ctx.send(f"Sound Commands Disabled!") - @commands.command(aliases=["enableusercommands"]) - async def usercommandson(self, ctx: commands.Context): + @commands.command() + async def user_commands_on(self, ctx: commands.Context): from cogs.user_cog import UserCog self.add_cog(UserCog(self)) await ctx.send(f"User Commands Enabled!") - @commands.command(aliases=["disableusercommands"]) - async def usercommandsoff(self, ctx: commands.Context): + @commands.command() + async def user_commands_off(self, ctx: commands.Context): from cogs.user_cog import UserCog self.remove_cog(UserCog(self).name) @@ -1207,7 +1211,9 @@ async def virustotal(self, ctx: commands.Context): else: # Delete the chat message try: - broadcaster_id: int = (await self.fetch_users(names=[ctx.channel.name]))[0].id + broadcaster_id: int = ( + await self.fetch_users(names=[ctx.channel.name]) + )[0].id logger.debug( f"{Fore.LIGHTWHITE_EX}Deleting chat message. " f"{Fore.LIGHTRED_EX}token{Fore.LIGHTWHITE_EX}: " diff --git a/twitchrce/esclient.py b/twitchrce/esclient.py index ef642cf..c6f2a22 100644 --- a/twitchrce/esclient.py +++ b/twitchrce/esclient.py @@ -133,15 +133,15 @@ async def subscribe_channel_events(self, broadcaster, moderator): async def delete_all_event_subscriptions(self): """before registering new event subscriptions remove old event subs""" - self.client._http.token = self.bot_oauth_token - self.client._http.__init__(client=self, token=self.bot_oauth_token) + self._http.token = self.bot_oauth_token + self._http.__init__(client=self, token=self.bot_oauth_token) try: - es_subs = await self.client._http.get_subscriptions() + es_subs = await self._http.get_subscriptions() print( f"{Fore.RED}Found {Fore.MAGENTA}{len(es_subs)}{Fore.RED} event subscription(s).{Style.RESET_ALL}" ) for es_sub in es_subs: - await self.client._http.delete_subscription(es_sub) + await self._http.delete_subscription(es_sub) print( f"{Fore.RED}Deleting the event subscription with id: " f"{Fore.MAGENTA}{es_sub.id}{Fore.RED}.{Style.RESET_ALL}" @@ -152,8 +152,8 @@ async def delete_all_event_subscriptions(self): async def delete_event_subscriptions(self, broadcasters: List[User]): """before registering new event subscriptions remove old event subs""" - self.client._http.__init__(client=self, token=self.bot_oauth_token) - es_subs = await self.client._http.get_subscriptions() + self._http.__init__(client=self, token=self.bot_oauth_token) + es_subs = await self._http.get_subscriptions() print( f"{Fore.RED}Found {Fore.MAGENTA}{len(es_subs)}{Fore.RED} event subscription(s).{Style.RESET_ALL}" ) @@ -166,7 +166,7 @@ async def delete_event_subscriptions(self, broadcasters: List[User]): and int(es_sub.condition["to_broadcaster_user_id"]) == broadcasters[0].id ): - await self.client._http.delete_subscription(es_sub) + await self._http.delete_subscription(es_sub) print( f"{Fore.RED}Deleting the event subscription with id: " f"{Fore.MAGENTA}{es_sub.id}{Fore.RED} for channel " diff --git a/twitchrce/main.py b/twitchrce/main.py index 60b6e0d..ea7f4e6 100644 --- a/twitchrce/main.py +++ b/twitchrce/main.py @@ -37,9 +37,6 @@ dynamodb = boto3.resource("dynamodb", region_name=region_name) user_table = dynamodb.Table("MSecBot_User") -# Worker -ec2 = boto3.client("ec2", region_name=region_name) - async def setup_bot() -> CustomBot: splash = { @@ -173,14 +170,6 @@ async def setup_bot() -> CustomBot: "Bot user is not in the database. Authenticate to get an access token!" ) - response = ec2.describe_instances( - InstanceIds=["i-0100638f13e5451d8"] - ) # TODO: Don't hardcode InstanceIds - if response.get("Reservations"): - public_url = f"https://{response.get('Reservations')[0].get('Instances')[0].get('PublicDnsName')}" - else: - public_url = None - # Create a bot from your twitchapi client credentials custom_bot = CustomBot(config) custom_bot.loop.run_until_complete(custom_bot.__ainit__()) @@ -191,8 +180,7 @@ async def setup_bot() -> CustomBot: # Start the eventsub client for the Twitch channel if config.get_bot_config().get("bot_features").get("enable_esclient"): - if public_url: - custom_bot.loop.run_until_complete(custom_bot.__esclient_init__()) + custom_bot.loop.run_until_complete(custom_bot.__esclient_init__()) return custom_bot @@ -201,5 +189,5 @@ async def setup_bot() -> CustomBot: try: bot = asyncio.run(setup_bot()) bot.run() - except AuthenticationError as error: - logger.error(msg=error) + except AuthenticationError as auth_error: + logger.error(msg=auth_error) diff --git a/twitchrce/psclient.py b/twitchrce/psclient.py index 20ea146..8a588a3 100644 --- a/twitchrce/psclient.py +++ b/twitchrce/psclient.py @@ -1,10 +1,23 @@ +import logging from typing import List import twitchio +from twitchio.ext.commands import Cog + +from cogs.rce import RCECog as RCE_Cog +from cogs.vip import VIPCog as VIP_Cog from colorama import Fore, Style -from twitchio import Client +from custom_bot import CustomBot +from twitchio import Client, User, Chatter, PartialChatter from twitchio.ext import pubsub +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s | %(levelname)-8s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + class CustomPubSubClient(Client): """ @@ -16,8 +29,9 @@ class CustomPubSubClient(Client): https://twitchio.dev/en/stable/exts/pubsub.html """ - def __init__(self, users_channel_id: int, bot_oauth_token: str): + def __init__(self, bot: CustomBot, users_channel_id: int, bot_oauth_token: str): super().__init__(token=bot_oauth_token) + self.bot = bot self.client = twitchio.Client(token=bot_oauth_token) self.users_channel_id: int = users_channel_id self.users_oauth_token: str = bot_oauth_token @@ -34,10 +48,55 @@ async def event_pubsub_bits(event: pubsub.PubSubBitsMessage): @self.client.event() async def event_pubsub_channel_points(event: pubsub.PubSubChannelPointsMessage): - print( - f"Channel Points redeemed by {event.user.name} for {event.reward.title}" + # Log redemption request - reward: CustomReward, user: PartialUser + logger.info( + f"{Fore.RED}[PubSub][ChannelPoints]: {event.reward.id}, {event.reward.title}, {event.reward.cost} | " + f"User: {event.user.id}, {event.user.name}{Style.RESET_ALL}" ) - pass # do stuff on channel point redemptions + + # Check if reward can be redeemed at this time + if not event.reward.paused and event.reward.enabled: + """We have to check redemption names as id's are randomly allocated when redemption is added""" + + if event.reward.title == "Kill My Shell" and ( + self.bot.config.get_bot_config() + .get("bot_features") + .get("cogs") + .get("rce_cog") + .get("enable_rce_cog") + ): + broadcaster: User = ( + await self.fetch_users(ids=[event.channel_id]) + )[0] + chatter: Chatter | PartialChatter = broadcaster.channel.get_chatter( + name=event.user.name + ) + cog: RCE_Cog | Cog = self.bot.get_cog(name="RCE_Cog") + await cog.kill_my_shell( + broadcaster=broadcaster, + chatter=chatter, + event=event, + ) + + if event.reward.title == "VIP" and ( + self.bot.config.get_bot_config() + .get("bot_features") + .get("cogs") + .get("vip_cog") + .get("enable_vip_cog") + ): + broadcaster: User = ( + await self.fetch_users(ids=[event.channel_id]) + )[0] + chatter: Chatter | PartialChatter = broadcaster.channel.get_chatter( + name=event.user.name + ) + cog: VIP_Cog | Cog = self.bot.get_cog(name="VIP_Cog") + await cog.add_channel_vip( + broadcaster=broadcaster, + chatter=chatter, + event=event, + ) async def start_pubsub(self): # Start listening to supported PubSub topics for channel points and bits @@ -51,11 +110,14 @@ async def start_pubsub(self): class CustomPubSubPool(pubsub.PubSubPool): async def auth_fail_hook(self, topics: List[pubsub.Topic]): """ - This function is a coroutine. This is a hook that can be overridden in a subclass. From this hook, you can refresh expired tokens (or prompt a user for new ones), and resubscribe to the events. - The topics will not be automatically resubscribed to. You must do it yourself by calling subscribe_topics() with the topics after obtaining new tokens. + This function is a coroutine. This is a hook that can be overridden in a subclass. From this hook, you can + refresh expired tokens (or prompt a user for new ones), and resubscribe to the events. + The topics will not be automatically resubscribed to. You must do it yourself by calling subscribe_topics() + with the topics after obtaining new tokens. Parameters: - - topics (List[Topic]): The topics that have been de-authorized. Typically, these will all contain the same token. + - topics (List[Topic]): The topics that have been de-authorized. + Typically, these will all contain the same token. https://twitchio.dev/en/latest/exts/pubsub.html#twitchio.ext.pubsub.PubSubPool.auth_fail_hook """ diff --git a/twitchrce/utils/utils.py b/twitchrce/utils/utils.py index d91e1cb..1e7b084 100644 --- a/twitchrce/utils/utils.py +++ b/twitchrce/utils/utils.py @@ -52,7 +52,8 @@ async def refresh_user_token(user: any) -> str: ReturnValues="UPDATED_NEW", ) logger.info( - f"{Fore.LIGHTWHITE_EX}Updated access_token and refresh_token for user {Fore.LIGHTCYAN_EX}{user['login']}" + f"{Fore.LIGHTWHITE_EX}Updated access_token and refresh_token for user " + f"{Fore.LIGHTCYAN_EX}{user['login']}" f"{Fore.LIGHTWHITE_EX}!{Style.RESET_ALL}" ) except ResourceNotFoundException as resource_error: