Skip to content

Commit

Permalink
Add search only option
Browse files Browse the repository at this point in the history
  • Loading branch information
Raymo111 committed Jul 29, 2020
1 parent 8594a19 commit 142e671
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 79 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ The nickname to join the Kahoot with
-p, --pin
The game pin
-s --search
Search for a quiz without joining a Kahoot. Cancels nick and pin options.
-q, --quizName
The quiz's name
Expand Down
80 changes: 54 additions & 26 deletions kbot
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import argparse

import klib
import re
import sys


def nameOrID():
Expand All @@ -14,8 +15,8 @@ def nameOrID():
return False


def checkID(id):
res = re.search(r"(([a-zA-Z0-9]){8}(-([a-zA-Z0-9]){4}){3}-([a-zA-Z0-9]){12})", id)
def checkID(qID):
res = re.search(r"(([a-zA-Z0-9]){8}(-([a-zA-Z0-9]){4}){3}-([a-zA-Z0-9]){12})", qID)
if not res:
print('Invalid UUID. It must take the form x8-x4-x4-x4-x12, where x8 means 8 alphanumeric characters')
exit()
Expand All @@ -24,53 +25,80 @@ def checkID(id):

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-s', "--search",
help="Search for a quiz without joining a Kahoot. Cancels nick and pin options.",
action="store_true")
parser.add_argument('-d', "--debug", help="Output go brrrrrrrrrrrrr", action="store_true")
parser.add_argument('-e', "--email", help="The email used to login to create.kahoot.it")
parser.add_argument('-a', "--password", help="The corresponding password used to login to create.kahoot.it")
parser.add_argument('-n', "--nick", help="The nickname to join the Kahoot with")
parser.add_argument('-p', "--pin", help="The game pin")
parser.add_argument('-q', "--quizName", help="The quiz's name")
parser.add_argument('-i', "--quizID", help="The quiz's ID")
parser.add_argument('-m', "--maxCount", help="How many quizzes to look for when searching by name")
parser.add_argument('-d', "--debug", help="Debug mode on")
args = parser.parse_args()
email = args.email
password = args.password
nickname = args.nick
pin = args.pin
searchOnly = args.search
quizID = args.quizName
quizName = args.quizID
maxCount = args.maxCount
debug = args.debug

try:
if not nickname:
nickname = input('name > ')
if not pin:
pin = input('pin > ')
if quizID:
user = klib.Kahoot(pin, nickname, quizID=checkID(quizID), DEBUG=debug)
elif quizName:
if email and password:
user = klib.Kahoot(pin, nickname, quizName=quizName, maxCount=maxCount, DEBUG=debug)
if debug:
print("In debug mode: output will go brrrrrrrrrrrr")
else:
sys.tracebacklimit = 0
if searchOnly:
print("In searchOnly mode: kbot will not join a Kahoot")
if quizID:
user = klib.Kahoot(quizID=checkID(quizID), DEBUG=debug)
elif quizName:
if email and password:
user = klib.Kahoot(quizName=quizName, maxCount=maxCount, DEBUG=debug)
else:
print('Authentication required when searching for quizzes by name')
exit()
else:
print('Authentication required when searching for quizzes by name')
exit()
if email and password:
if nameOrID():
user = klib.Kahoot(quizName=input('quizName > '), maxCount=maxCount, DEBUG=debug)
else:
user = klib.Kahoot(quizID=checkID(input('quizID > ')), DEBUG=debug)
else:
user = klib.Kahoot(quizID=checkID(input('quizID > ')), DEBUG=debug)
else:
if email and password:
if nameOrID():
quizName = input('quizName > ')
user = klib.Kahoot(pin, nickname, quizName=quizName, maxCount=maxCount, DEBUG=debug)
if not nickname:
nickname = input('name > ')
if not pin:
pin = input('pin > ')
if quizID:
user = klib.Kahoot(pin=pin, nickname=nickname, quizID=checkID(quizID), DEBUG=debug)
elif quizName:
if email and password:
user = klib.Kahoot(pin=pin, nickname=nickname, quizName=quizName, maxCount=maxCount, DEBUG=debug)
else:
quizID = input('quizID > ')
validID = re.search(r"^([a-zA-Z0-9]){8}(-([a-zA-Z0-9]){4}){3}-([a-zA-Z0-9]){12}$", quizID)
user = klib.Kahoot(pin, nickname, quizID=checkID(quizID), DEBUG=debug)
print('Authentication required when searching for quizzes by name')
exit()
else:
quizID = input('quizID > ')
user = klib.Kahoot(pin, nickname, quizID=quizID, DEBUG=debug)
if email and password:
if nameOrID():
user = klib.Kahoot(pin=pin, nickname=nickname, quizName=input('quizName > '), maxCount=maxCount,
DEBUG=debug)
else:
user = klib.Kahoot(pin=pin, nickname=nickname, quizID=checkID(input('quizID > ')), DEBUG=debug)
else:
user = klib.Kahoot(pin=pin, nickname=nickname, quizID=checkID(input('quizID > ')), DEBUG=debug)
if email and password:
user.authenticate(email, password)
user.checkPin()
user.startGame()
if searchOnly:
user.search()
else:
user.checkPin()
user.startGame()
except KeyboardInterrupt:
print('EXITING...')
print("\nBYE!")
exit()
133 changes: 80 additions & 53 deletions klib.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import json
import os
import re
import sys
import time

try:
Expand All @@ -18,7 +17,7 @@


class Kahoot:
def __init__(self, pin, nickname, quizName=None, quizID=None, maxCount=None, DEBUG=None):
def __init__(self, pin=None, nickname=None, quizName=None, quizID=None, maxCount=None, DEBUG=None):
self.pin = pin
self.nickname = nickname
self.quizName = quizName
Expand All @@ -30,15 +29,23 @@ def __init__(self, pin, nickname, quizName=None, quizID=None, maxCount=None, DEB
self.answers = None
self.colors = {0: "RED", 1: "BLUE", 2: "YELLOW", 3: "GREEN"}
self.maxCount = maxCount if maxCount else 50
self.lookup = None
self.loadCodes()
self.DEBUG = DEBUG;
if not DEBUG:
sys.tracebacklimit = 0
self.sessionID = None
self.sessionToken = None
self.DEBUG = DEBUG
self.loop = asyncio.get_event_loop()

def error(self, err):
raise KahootError(err)

def gracefulExit(self):
exit()

def _check_auth(f):
def wrapper(self, *args, **kwargs):
if not self.authToken:
raise KahootError('You must be authenticated to use this method.')
self.error('You must be authenticated to use this method.')
return f(self, *args, **kwargs)

return wrapper
Expand All @@ -49,16 +56,21 @@ def authenticate(self, email, password):
response = self.client.post(url, json=data,
headers={'Content-Type': 'application/json', "x-kahoot-login-gate": "enabled"})
if response.status_code == 401:
raise KahootError("Invalid Email or Password.")
self.error("Invalid Email or Password.")
elif response.status_code == 200:
print('AUTHENTICATED')
self.authToken = response.json()["access_token"]
else:
raise KahootError("Login error %d", response.status_code)
self.error(f"Login error {response.status_code}")

def startGame(self):
loop = asyncio.get_event_loop()
loop.run_until_complete(self._play())
self.loop.run_until_complete(self._play())

def search(self):
self.loop.run_until_complete(self._search())

async def _search(self):
self.answers = await self.findAnswers(searchOnly=1)

async def _play(self):
url = f'wss://play.kahoot.it/cometd/{self.pin}/{self.sessionID}'
Expand All @@ -74,10 +86,12 @@ async def _play(self):
tFADone = 0
if self.quizID:
self.answers = await self.findAnswers()
if self.answers:
print(f'ANSWERS RECEIVED')
async for rawMessage in client:
message = rawMessage['data']
if 'error' in message:
raise KahootError(message['description'])
self.error(message['description'])
if 'id' in message:
data = json.loads(message['content'])
kind = ''
Expand All @@ -93,6 +107,8 @@ async def _play(self):
quizAnswers = data['quizQuestionAnswers']
if not self.answers:
self.answers = await self.findAnswers(exceptedAnswers=quizAnswers)
if self.answers:
print(f'ANSWERS RECEIVED')
elif kind == 'START_QUESTION':
print('------', data['questionIndex'] + 1, '------')
if data['gameBlockType'] != 'quiz':
Expand All @@ -110,10 +126,10 @@ async def _play(self):
pass
elif kind == 'RESET_CONTROLLER':
print("RESET_CONTROLLER")
exit()
self.gracefulExit()
elif kind == 'GAME_OVER':
print("Game over, if you didn't win the winner is hacking!")
exit()
self.gracefulExit()
if not (tFADone and kind == 'RESET_TWO_FACTOR_AUTH'):
print(kind.replace('_', ' '))

Expand All @@ -137,18 +153,17 @@ async def sendAnswer(self, choice):
{"content": choiceInfo, "gameid": self.pin, "host": "kahoot.it", "type": "message",
"id": 45})

def printAnswers(self, resp, url):
print("If the questions are randomized, go to " + url + "to get the answers yourself.") # TODO: output answers

async def getQuiz(self, url, exceptedAnswers=None, actualAnswers=None):
async def getQuiz(self, url, exceptedAnswers=None, actualAnswers=None, searchOnly=None):
if self.DEBUG:
print(url)
if self.authToken:
resp = self.client.get(url, headers={'Authorization': f'Bearer {self.authToken}'})
else:
resp = self.client.get(url)
if resp.status_code == 400:
raise KahootError("Invalid UUID.")
self.error("Invalid UUID.")
if resp.status_code != 200:
raise KahootError("Something went wrong finding answers.")
self.error("Something went wrong finding answers.")
if exceptedAnswers and actualAnswers:
if actualAnswers == len(exceptedAnswers):
isCorrectQuiz = True
Expand All @@ -158,22 +173,19 @@ async def getQuiz(self, url, exceptedAnswers=None, actualAnswers=None):
break
if isCorrectQuiz:
print("QUIZ FOUND")
self.printAnswers(resp.json(), url)
return resp.json()
else:
print("Wrong question types")
else:
print("Wrong num of expected answers")
else:
print("No excepted answers")
self.printAnswers(resp.json(), url)
print("Here you go:" if searchOnly else "No excepted answers")
return resp.json()

async def searchQuiz(self, exceptedAnswers=None):
async def findAnswers(self, exceptedAnswers=None, searchOnly=None):
if self.quizID:
url = f'https://create.kahoot.it/rest/kahoots/{self.quizID}'
quiz = await self.getQuiz(url=url, exceptedAnswers=exceptedAnswers)
return quiz
return self.parseAnswers(await self.getQuiz(url=url, exceptedAnswers=exceptedAnswers), self.DEBUG)
elif self.quizName:
url = 'https://create.kahoot.it/rest/kahoots/'
params = {'query': self.quizName, 'cursor': 0, 'limit': self.maxCount, 'topics': '', 'grades': '',
Expand All @@ -185,18 +197,50 @@ async def searchQuiz(self, exceptedAnswers=None):
else:
resp = self.client.get(url, params=params)
if resp.status_code != 200:
raise KahootError("Something went wrong searching quizzes.")
self.error("Something went wrong searching quizzes.")
quizzes = resp.json()['entities']
print(f'{len(quizzes)} matching quizzes found')
for quiz in quizzes:
print(f"Checking {quiz['card']['title']}...", end=" ")
url = f'https://create.kahoot.it/rest/kahoots/{quiz["card"]["uuid"]}'
rightQuiz = await self.getQuiz(url=url, exceptedAnswers=exceptedAnswers,
actualAnswers=quiz['card']['number_of_questions'])
if rightQuiz:
return rightQuiz
# Otherwise Panic
raise KahootError("No quiz found. (private?)")
for q in quizzes:
if searchOnly:
if not re.match(r'y(es)?', input(f"Check '{q['card']['title']}'? [y/N] ").lower()):
continue
else:
print(f"Checking {q['card']['title']}...", end=" ")
url = f'https://create.kahoot.it/rest/kahoots/{q["card"]["uuid"]}'
quiz = await self.getQuiz(url=url, exceptedAnswers=exceptedAnswers,
actualAnswers=q['card']['number_of_questions'], searchOnly=searchOnly)
if searchOnly:
self.parseAnswers(quiz, self.DEBUG)
elif quiz:
return self.parseAnswers(quiz, self.DEBUG)
if not quiz:
self.error("No quiz found. (private?)")

@staticmethod
def parseAnswers(quiz, debug=None):
answers = []
if debug:
print(quiz)
for question in quiz['questions']:
foundAnswer = False
if question['type'] != 'quiz':
answers.append({'NOT A': 'QUESTION'})
continue
for i, choice in enumerate(question['choices']):
if choice['correct'] and not foundAnswer:
foundAnswer = True
answers.append({'question': question['question'], 'index': i, 'answer': choice['answer']})
Kahoot.printAnswers(quiz, answers)
return answers

@staticmethod
def printAnswers(quiz, answers):
# print("If the questions are randomized, go to " + url + "to get the answers yourself.")
print(f"Title: {quiz['title']}")
print(f"Creator: {quiz['creator_username']}")
print(f"Desc: {quiz['description']}")
for q in answers:
print(f"{q['question']}\n\t{q['answer']}")

@staticmethod
def _remove_emojis(text):
Expand All @@ -213,30 +257,13 @@ def _remove_emojis(text):
def _similar(a, b):
return SequenceMatcher(None, a, b).ratio()

async def findAnswers(self, exceptedAnswers=None):
quizProperties = await self.searchQuiz(exceptedAnswers)
answers = []
if self.DEBUG:
print(quizProperties)
for question in quizProperties['questions']:
foundAnswer = False
if question['type'] != 'quiz':
answers.append({'NOT A': 'QUESTION'})
continue
for i, choice in enumerate(question['choices']):
if choice['correct'] and not foundAnswer:
foundAnswer = True
answers.append({'question': question['question'], 'index': i, 'answer': choice['answer']})
print(f'ANSWERS RECEIVED')
return answers

def checkPin(self):
assert type(self.pin) == str
currentTime = int(time.time())
url = f"https://play.kahoot.it/reserve/session/{self.pin}/?{currentTime}"
resp = self.client.get(url)
if resp.status_code != 200:
raise KahootError(f"Pin {self.pin} does not exist.")
self.error(f"Pin {self.pin} does not exist.")
self.sessionToken = resp.headers['x-kahoot-session-token']
self.sessionID = self.solveChallenge(resp.json()["challenge"])

Expand Down

0 comments on commit 142e671

Please sign in to comment.