From 3e7d95d33f8db50b869397a2d29cce9e8c5cbebc Mon Sep 17 00:00:00 2001 From: Gianni Amato Date: Wed, 18 Jan 2023 18:09:37 +0100 Subject: [PATCH] 6.1.0 --- CHANGELOG.md | 8 + README.md | 76 +++- knockpy/knockpy.py | 822 ++++++++--------------------------- knockpy/lib/extraargs.py | 47 ++ knockpy/lib/logo.py | 13 + knockpy/lib/output.py | 195 +++++++++ knockpy/lib/report.py | 100 +++++ knockpy/lib/request.py | 33 ++ knockpy/lib/scan.py | 49 +++ knockpy/lib/wordlists.py | 127 ++++++ knockpy/remote/api_censys.py | 48 ++ knockpy/remote/crtsh.py | 43 ++ requirements.txt | 3 +- setup.py | 2 +- 14 files changed, 913 insertions(+), 653 deletions(-) create mode 100644 knockpy/lib/extraargs.py create mode 100644 knockpy/lib/logo.py create mode 100644 knockpy/lib/output.py create mode 100644 knockpy/lib/report.py create mode 100644 knockpy/lib/request.py create mode 100644 knockpy/lib/scan.py create mode 100644 knockpy/lib/wordlists.py create mode 100644 knockpy/remote/api_censys.py create mode 100644 knockpy/remote/crtsh.py diff --git a/CHANGELOG.md b/CHANGELOG.md index d2a9cbb..15b2b2a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,12 @@ # Changelog + +6.1.0 - 2023-01-18 +--------- +- added plugin test --plugin-test +- added new plugin: api_censys, certsh +- fixed plugin scan loop +- optimized code structure + 6.0.0 - 2023-01-15 --------- - added silent mode --silent diff --git a/README.md b/README.md index d0a536b..1932e13 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Knock Subdomain Scan v6.0.0 +# Knock Subdomain Scan v6.1.0 Knockpy is a portable and modular python3 tool designed to quickly enumerate subdomains on a target domain through **passive reconnaissance** and **dictionary scan**. @@ -27,6 +27,8 @@ python3 knockpy.py domain.com * [Plot report](#plot-report---plot) * [Module](#module) * [Plugin](#plugin) + * [Write your own plugin](#write-your-own-plugin) + * [Plugin test](#plugin-test---plugin-test) * [License](#license) --- @@ -331,7 +333,7 @@ results = knockpy.Scanning.start("domain.com", params) # Plugin -#### Write your own plugin +### Write your own plugin The plugins are situated in ```remote``` folder. If you want to write your own plugin it's important to pay attention to some precautions: @@ -375,6 +377,76 @@ def get(domain): return result ``` +### Plugin test ```--plugin-test``` + +```bash +$ knockpy domain.com --plugin-test +``` + +In this example, the output shows errors ```'error': True``` in three plugins because they need the API key. + +```bash +{ + 'api_shodan.py': { + 'time': '00:00:03', + 'match': 0, + 'error': True + }, + 'certspotter.py': { + 'time': '00:00:00', + 'match': 9, + 'error': False + }, + 'rapiddns.py': { + 'time': '00:00:00', + 'match': 44, + 'error': False + }, + 'hackertarget.py': { + 'time': '00:00:00', + 'match': 9, + 'error': False + }, + 'crtsh.py': { + 'time': '00:00:19', + 'match': 10, + 'error': False + }, + 'api_censys.py': { + 'time': '00:00:03', + 'match': 0, + 'error': True + }, + 'webarchive.py': { + 'time': '00:00:04', + 'match': 4, + 'error': False + }, + 'api_virustotal.py': { + 'time': '00:00:03', + 'match': 0, + 'error': True + }, + 'alienvault.py': { + 'time': '00:00:01', + 'match': 11, + 'error': False + }, + '_results': { + 'time': '00:00:37', + 'plugins': { + 'count': 9, + 'list': ['api_shodan.py', 'certspotter.py', 'rapiddns.py', 'hackertarget.py', ...], + 'error': ['api_shodan.py', 'api_censys.py', 'api_virustotal.py'] + }, + 'subdomains': { + 'count': 52, + 'list': ['admin', 'cloud', 'www', 'mail', 'calendar', 'contact', 'ftp', .....] + } + } +} + +``` --- # License diff --git a/knockpy/knockpy.py b/knockpy/knockpy.py index d6bc1cd..8e45019 100644 --- a/knockpy/knockpy.py +++ b/knockpy/knockpy.py @@ -1,15 +1,10 @@ #!/usr/bin/python3 # -*- coding: utf-8 -*- -from importlib.machinery import SourceFileLoader from argparse import RawTextHelpFormatter -from colorama import Fore, Style import concurrent.futures -import colorama import argparse -import socket -from knockpy.lib import dns_socket -import requests +from knockpy.lib import output, request, wordlists, report, scan, extraargs, logo import random import time import json @@ -17,9 +12,7 @@ import re import os -# socket timeout -timeout = 3 -if hasattr(socket, "setdefaulttimeout"): socket.setdefaulttimeout(timeout) +__version__ = "6.1.0" user_agent = [ "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:65.0) Gecko/20100101 Firefox/65.0", @@ -29,430 +22,95 @@ ] _ROOT = os.path.abspath(os.path.dirname(__file__)) - -# resolve requests: DNS, HTTP, HTTPS -class Request(): - def dns(target): - try: - if dns: # use the specified DNS - return dns_socket._gethostbyname_ex(target, dns) - return socket.gethostbyname_ex(target) - except: - return [] - - def https(url): - headers = {"user-agent": useragent} - try: - resp = requests.get("https://"+url, headers=headers, timeout=timeout) - return [resp.status_code, resp.headers["Server"] if "Server" in resp.headers.keys() else ""] - except: - return [] - - def http(url): - headers = {"user-agent": useragent} - try: - resp = requests.get("http://"+url, headers=headers, timeout=timeout) - return [resp.status_code, resp.headers["Server"] if "Server" in resp.headers.keys() else ""] - except: - return [] - -# get and purge local/remote wordlist -class Wordlist(): - # get local dictionary - def local(filename): - try: - filename = os.path.join(_ROOT, "", filename) - wlist = open(filename,'r').read().split("\n") - except: - if not silent_mode: Output.progressPrint("wordlist not found: {filename}".format(filename=filename)) - return [] - #sys.exit("wordlist not found: {filename}".format(filename=filename)) - return filter(None, wlist) - - # get remote wordlist using plugin - def remotescan(domain): - result = [] - - # plugin directory - dir_plugins = _ROOT + '{sep}remote'.format(sep=os.sep) - - for (dir_plugins, dir_names, plugins) in os.walk(dir_plugins): - for plugin in plugins: - - # filter for .py scripts and exclude __init__.py file - if plugin.endswith('.py') and plugin != '__init__.py': - plugin_path = os.path.join(dir_plugins, plugin) - - try: - # load module - foo = SourceFileLoader(plugin, plugin_path).load_module() - if not silent_mode: - Output.progressPrint('') # print empty line - Output.progressPrint(plugin) # print name of the module - - # get module's result - plugin_result = foo.get(domain) - - # add subdomains - result = result + plugin_result - except: - # print plugin error and sleep 3 secs. - if not silent_mode: - Output.progressPrint("error plugin -> "+plugin) - time.sleep(3) - continue - - result = list(set([r.lower() for r in result])) - subdomains = [item.replace('.'+domain, '') for item in result] - return subdomains - - # purge wordlist - def purge(wordlist): - return [word for word in wordlist if word and re.match("[a-z0-9\.-]", word)] - - # get wordlist local and/or remote - def get(domain): - local, remote = [], [] - - if not no_local: - local = list(Wordlist.local(local_wordlist)) - - if not no_remote: - remote = list(Wordlist.remotescan(domain)) - - return local, remote - -# manage terminal output -class Output(): - # print progressbar - def progressPrint(text): - if not text: text = " "*80 - text_dim = Style.DIM + text + Style.RESET_ALL - sys.stdout.write("%s\r" % text_dim) - sys.stdout.flush() - sys.stdout.write("\r") - - # colorize line - def colorizeHeader(text, count, sep): - newText = Style.BRIGHT + Fore.YELLOW + text + Style.RESET_ALL - _count = str(len(count)) if isinstance(count, list) else str(count) - - newCount = Style.BRIGHT + Fore.CYAN + _count + Style.RESET_ALL - - if len(count) == 0: - newText = Style.DIM + text + Style.RESET_ALL - newCount = Style.DIM + _count + Style.RESET_ALL - newSep = " " + Fore.MAGENTA + sep + Style.RESET_ALL - - return newText + newCount + newSep - - # print wordlist and target information - def headerPrint(local, remote, domain): - """ - local: 0 | remote: 270 - - Wordlist: 270 | Target: domain.com | Ip: 123.123.123.123 - """ - - line = Output.colorizeHeader("local: ", local, "| ") - line += Output.colorizeHeader("remote: ", remote, "\n") - line += "\n" - line += Output.colorizeHeader("Wordlist: ", local + remote, "| ") - - req = Request.dns(domain) - if req != []: - ip_req = req[2][0] - ip = ip_req if req else "" - else: - ip = "None" - - line += Output.colorizeHeader("Target: ", domain, "| ") - line += Output.colorizeHeader("Ip: ", ip, "\n") - - return line - - # print header before of match-line (linePrint) - def headerBarPrint(time_start, max_len): - """ - 21:57:55 - - Ip address Subdomain Real hostname - --------------- ----------------------- ---------------------------- - """ - - # time_start - line = Style.BRIGHT - line += time.strftime("%H:%M:%S", time.gmtime(time_start)) + "\n\n" - - # spaces - spaceIp = " " * (16 - len("Ip address")) - spaceSub = " " * ((max_len + 1) - len("Subdomain")) - - # dns only - if no_http: - line += "Ip address" +spaceIp+ "Subdomain" +spaceSub+ "Real hostname" + "\n" - line += Style.RESET_ALL - line += "-" * 15 + " " + "-" * max_len + " " + "-" * max_len - - # http - else: - spaceCode = " " * (5 - len("Code")) - spaceServ = " " * ((max_len + 1) - len("Server")) - line += "Ip address" +spaceIp+ "Code" +spaceCode+ "Subdomain" +spaceSub+ "Server" +spaceServ+ "Real hostname" + "\n" - line += Style.RESET_ALL - line += "-" * 15 + " " + "-" * 4 + " " + "-" * max_len + " " + "-" * max_len + " " + "-" * max_len - - return line - - # change json for different scan: dns or dns + http - def jsonizeRequestData(req, target): - if len(req) == 3: - subdomain, aliasList, ipList = req - domain = subdomain if subdomain != target else "" - - data = { - "target": target, - "domain": domain, - "alias": aliasList, - "ipaddr": ipList - } - elif len(req) == 5: - subdomain, aliasList, ipList, code, server = req - domain = subdomain if subdomain != target else "" - - data = { - "target": target, - "domain": domain, - "alias": aliasList, - "ipaddr": ipList, - "code": code, - "server": server - } +_local_wordlist = _ROOT + "{sep}local{sep}wordlist.txt".format(sep=os.sep) +_remote_plugin_folder = _ROOT + '{sep}remote'.format(sep=os.sep) +_user_agent = random.choice(user_agent) + +# default params +_params = { + "no_local": False, # [bool] local wordlist ignore --no-local + "no_remote": False, # [bool] remote wordlist ignore --no-remote + "no_scan": False, # [bool] scanning ignore, show wordlist --no-scan + "no_http": False, # [bool] http requests ignore --no-http + "no_http_code": [], # [list] http code list to ignore --no-http-code 404 + "no_ip": [], # [list] ip address to ignore --no-ip 127.0.0.1 + "dns": "", # [str] use custom DNS ex. 8.8.8.8 --dns 8.8.8.8 + "timeout": 3, # [int] timeout in seconds -t 5 + "threads": 30, # [int] threads num -th 50 + "useragent": _user_agent, # [str] use a custom user agent --user-agent Mozilla + "wordlist": _local_wordlist, # [str] path to custom wordlist -w file.txt + "silent_mode": False, # [bool] silent mode --silent (--silent csv) + "output_folder": "knockpy_report", # [str] report folder -o /folder/ + "save_output": True, # [bool] save report -o false to disable it + "plugin_test": False, # [bool] test plugin --plugin-test + "plugin_folder": _remote_plugin_folder # [str] plugin folder (no via arg) +} + +# check for valid domain +def is_valid_domain(domain): + if not isinstance(domain, str) or not re.match("[a-z0-9\.-]", domain) or domain.startswith(("http", "www.")): + return False + return True + +""" +# module to import in python script: + +from knockpy import knockpy + +# return json results +results = knockpy.Scanning.start("domain.com", params) +""" +class Scanning: + def start(domain, params=False): + # params validation + if not params: + params = _params else: - data = {} - - return data + for param in _params.keys(): + value = _params[param] + if param not in params: + params.update({param: value}) - # print match-line while it's working - def linePrint(data, max_len): - """ - 123.123.123.123 click.domain.com click.virt.s6.exactdomain.com - """ - - # just a fix, print space if not domain - _domain = " "*max_len if not data["domain"] else data["domain"] - - # case dns only - if len(data.keys()) == 4: - spaceIp = " " * (16 - len(data["ipaddr"][0])) - spaceSub = " " * ((max_len + 1) - len(data["target"])) - _target = Style.BRIGHT + Fore.CYAN + data["target"] + Style.RESET_ALL if data["alias"] else data["target"] - line = data["ipaddr"][0] +spaceIp+ _target +spaceSub+ _domain - - # case dns +http - elif len(data.keys()) == 6: - data["server"] = data["server"][:max_len] - - spaceIp = " " * (16 - len(data["ipaddr"][0])) - spaceSub = " " * ((max_len + 1) - len(data["target"])) - spaceCode = " " * (5 - len(str(data["code"]))) - spaceServer = " " * ((max_len + 1) - len(data["server"])) - - if data["code"] == 200: - _code = Style.BRIGHT + Fore.GREEN + str(data["code"]) + Style.RESET_ALL - _target = Style.BRIGHT + Fore.GREEN + data["target"] + Style.RESET_ALL - elif str(data["code"]).startswith("4"): - _code = Style.BRIGHT + Fore.MAGENTA + str(data["code"]) + Style.RESET_ALL - _target = Style.BRIGHT + Fore.MAGENTA + data["target"] + Style.RESET_ALL - elif str(data["code"]).startswith("5"): - _code = Style.BRIGHT + Fore.RED + str(data["code"]) + Style.RESET_ALL - _target = Style.BRIGHT + Fore.RED + data["target"] + Style.RESET_ALL - else: - _code = str(data["code"]) - _target = Style.BRIGHT + Fore.CYAN + data["target"] + Style.RESET_ALL if data["domain"] else data["target"] - - line = data["ipaddr"][0] +spaceIp+ _code +spaceCode+ _target +spaceSub+ data["server"] +spaceServer+ _domain - - return line - - # print footer at the end after match-line (linePrint) - def footerPrint(time_end, time_start, results): - """ - 21:58:06 + if params["no_local"] and params["no_remote"]: # case no wordlist + return None + + # global flags by default + params["silent_mode"] = "json" + params["output_folder"] = False - Ip address: 122 | Subdomain: 93 | elapsed time: 00:00:11 - """ + # get wordlist + if params["plugin_test"]: + return wordlists.get(domain, params) - Output.progressPrint("") - elapsed_time = time_end - time_start - line = Style.BRIGHT - line += "\n" - line += time.strftime("%H:%M:%S", time.gmtime(time_end)) - line += "\n\n" - line += Style.RESET_ALL - - ipList = [] - for i in results.keys(): - for ii in results[i]["ipaddr"]: - ipList.append(ii) - - line += Output.colorizeHeader("Ip address: ", list(set(ipList)), "| ") - line += Output.colorizeHeader("Subdomain: ", list(results.keys()), "| ") - line += Output.colorizeHeader("elapsed time: ", time.strftime("%H:%M:%S", time.gmtime(elapsed_time)), "\n") - - return line - - # create json file - def write_json(path, json_data): - f = open(path, "w") - f.write(json.dumps(json_data, indent=4)) - f.close() - - # create csv file - def write_csv(path, csv_data): - f = open(path, "w") - f.write(csv_data) - f.close() - -class Report(): - # import json file - def load_json(report): - try: - report_json = json.load(open(report)) - del report_json["_meta"] - return report_json - except: - sys.exit("report not found or invalid json") - - # save output and add _meta to json file - def save(results, domain, time_start, time_end, len_wordlist): - _meta = { - "name": "knockpy", - "version": Start.__version__, - "time_start": time_start, - "time_end": time_end, - "domain": domain, - "wordlist": len_wordlist - } + local, remote = wordlists.get(domain, params) - results.update({"_meta": _meta}) - strftime = "%Y_%m_%d_%H_%M_%S" - date = time.strftime(strftime, time.gmtime(time_end)) - path = output_folder + os.sep + domain + "_" + date + ".json" - Output.write_json(path, results) - - # convert json to csv - def csv(report): - csv_data = "" - for item in report.keys(): - if len(report[item]) == 5: - """ - fix injection: - https://github.com/guelfoweb/knock/commit/156378d97f10871d30253eeefe15ec399aaa0b03 - https://www.exploit-db.com/exploits/49342 - """ - csv_injection = ("+", "-", "=", "@") - if report[item]["server"].startswith(csv_injection): - report[item]["server"] = "'" + report[item]["server"] - - csv_data += "%s;%s;%s;%s;%s" % (report[item]["ipaddr"][0], - report[item]["code"], - item, - report[item]["server"], - report[item]["domain"]) - if len(report[item]) == 3: - csv_data += "%s;%s;%s" % (report[item]["ipaddr"][0], - item, - report[item]["domain"]) - csv_data += "\n" - return csv_data - - # convert json to human text to show in terminal - def terminal(domain): - report_json = Report.load_json(domain) - - results = "" - for item in report_json.keys(): - report_json[item].update({"target": item}) - max_len = len(max(list(report_json.keys()), key=len)) - results += Output.linePrint(report_json[item], max_len) + "\n" - return results + # local wordlist not found + if local == None: return None - # plotting relationships - def plot(report): - # todo: - # get modules list from sys.modules.keys() - try: - import matplotlib.pyplot as plt - import networkx as nx - except: - print("Plot needs these libraries. Use 'pip' to install them:\n- matplotlib\n- networkx\n- PyQt5") - sys.exit(1) + # get wordlists + local, remote, wordlist = Start.wordlist(domain, params) - dataset = [] - for item in report.keys(): - dataset.append((report[item]["ipaddr"][0], item)) + # return a list ['sub1', 'sub2', 'sub3', ...] + if params["no_scan"]: + return wordlist - g = nx.Graph() - g.add_edges_from(dataset) + # max_len default value. + # it is not necessary to assign a correct value + # when working as a module + max_len = 1 - pos = nx.spring_layout(g) - nx.draw(g, pos, node_size=50, node_color="r", edge_color="c", with_labels=True, width=0.7, alpha=0.9) - plt.show() + # start scan + return Start.threads(domain, max_len, wordlist, params) class Start(): - __version__ = "6.0.0" - # print random message def msg_rnd(): return ["happy hacking ;)", "good luck!", "never give up!", "hacking is not a crime", "https://en.wikipedia.org/wiki/Bug_bounty_program"] - # check for valid domain - # it's used in Start.arguments() and Scanning.start() - def is_valid_domain(domain): - if not isinstance(domain, str) or not re.match("[a-z0-9\.-]", domain) or domain.startswith(("http", "www.")): - return False - return True - - # when domain not is a domain name but it's a command - def parse_and_exit(args): - if len(args) == 3 and args[1] in ["--report", "--plot", "--csv", "--set"]: - - # report - if args[1] == "--report": - if args[2].endswith(".json"): - if os.path.isfile(args[2]): - report = Report.terminal(args[2]) - if report: sys.exit(report) - sys.exit("report not found: %s" % args[2]) - sys.exit("try using: knockpy --report path/to/domain.com_yyyy_mm_dd_hh_mm_ss.json") - - # plot - if args[1] == "--plot": - if args[2].endswith(".json"): - if os.path.isfile(args[2]): - report = Report.load_json(args[2]) - if report: Report.plot(report) - sys.exit() - sys.exit("report not found: %s" % args[2]) - sys.exit("try using: knockpy --plot path/to/domain.com_yyyy_mm_dd_hh_mm_ss.json") - - # csv - if args[1] == "--csv": - if args[2].endswith(".json"): - if os.path.isfile(args[2]): - report = Report.load_json(args[2]) - if report: - csv_file = args[2].replace(".json", ".csv") - Output.write_csv(csv_file, Report.csv(report)) - sys.exit("csv report: %s" % csv_file) - sys.exit("report not found: %s" % args[2]) - sys.exit("try using: knockpy --csv path/to/domain.com_yyyy_mm_dd_hh_mm_ss.json") - def arguments(): - # check if domain string is a command - Start.parse_and_exit(sys.argv) + # check for extra arguments + extraargs.parse_and_exit(sys.argv) description = "-"*80+"\n" description += "* SCAN\n" @@ -475,8 +133,9 @@ def arguments(): parser = argparse.ArgumentParser(prog="knockpy", description=description, epilog=epilog, formatter_class=RawTextHelpFormatter) + # args parser.add_argument("domain", nargs='?', help="target to scan", default=sys.stdin, type=str) - parser.add_argument("-v", "--version", action="version", version="%(prog)s " + Start.__version__) + parser.add_argument("-v", "--version", action="version", version="%(prog)s " + __version__) parser.add_argument("--no-local", help="local wordlist ignore", action="store_true", required=False) parser.add_argument("--no-remote", help="remote wordlist ignore", action="store_true", required=False) parser.add_argument("--no-scan", help="scanning ignore, show wordlist and exit", action="store_true", required=False) @@ -485,6 +144,7 @@ def arguments(): parser.add_argument("--no-ip", help="ip address to ignore\n\n", nargs="+", type=str, required=False) parser.add_argument("--dns", help="use custom DNS ex. 8.8.8.8\n\n", dest="dns", required=False) parser.add_argument("--user-agent", help="use a custom user agent\n\n", dest="useragent", required=False) + parser.add_argument("--plugin-test", help="test plugins and exit\n\n", action="store_true", required=False) parser.add_argument("-w", help="wordlist file to import", dest="wordlist", required=False) parser.add_argument("-o", help="report folder to store json results", dest="folder", required=False) @@ -501,23 +161,23 @@ def arguments(): args = parser.parse_args() # --no-ip ignore ip addresses - global no_ip - no_ip = args.no_ip if args.no_ip else ["127.0.0.1"] + if args.no_ip: + _params["no_ip"] = args.no_ip # --no-scan ignore scanning - global no_scan - no_scan = args.no_scan - + if args.no_scan: + _params["no_scan"] = args.no_scan + # --silent set silent mode """ silent_mode is False by default. --silent without args -> the value is None, then I change it in "no-output" --silent with args -> it keep argument passed, ex: --silent csv """ - global silent_mode - silent_mode = args.silent - if silent_mode == None: - silent_mode = "no-output" + if args.silent: + _params["silent_mode"] = args.silent + elif args.silent == None: + _params["silent_mode"] = "no-output" # get domain name via positional argument or stdin if sys.stdin.isatty(): @@ -528,299 +188,163 @@ def arguments(): # stdin # echo "domain.com" | knockpy domain = args.domain.read() - + domain = domain.strip() + # check if the domain name is correct - if not Start.is_valid_domain(domain): + if not is_valid_domain(domain): parser.print_help(sys.stderr) sys.exit() - # choice wordlist - if args.no_local and args.no_remote: sys.exit("no wordlist") - # --no-local exclude local dictionary - global no_local - no_local = True if args.no_local else False - + if args.no_local: + _params["no_local"] = args.no_local + # --no-remote exclude remote dictionary - global no_remote - no_remote = True if args.no_remote else False + if args.no_remote: + _params["no_remote"] = args.no_remote + # choice wordlist + if _params["no_local"] and _params["no_remote"]: sys.exit("no wordlist") + # --no-http ignore requests - global no_http - no_http = True if args.no_http else False - + if args.no_http: + _params["no_http"] = args.no_http # --no-http-code ignore http code - global no_http_code - no_http_code = args.code if args.code else [] + if args.code: + _params["no_http"] = args.code # -o set report folder - global output_folder - output_folder = args.folder if args.folder else "knockpy_report" + if args.folder: + _params["output_folder"] = args.folder # check that the "-o false" parameter was not supplied - if output_folder != "false": + if _params["output_folder"] != "false": # create folder if not exists - if not os.path.exists(output_folder): os.makedirs(output_folder) + if not os.path.exists(_params["output_folder"]): os.makedirs(_params["output_folder"]) # check if the folder is accessible - if not os.access(output_folder, os.W_OK): sys.exit("folder not exists or not writable: " + output_folder) + if not os.access(_params["output_folder"], os.W_OK): sys.exit("folder not exists or not writable: " + _params["output_folder"]) # save output depends on the -o option # it's True by default except when "-o false" - global save_output - save_output = False if output_folder.lower() == "false" else True + if _params["output_folder"].lower() == "false": + _params["save_output"] = False # -t set timeout default is 3 - global timeout - timeout = args.sec if args.sec else 3 + if args.sec: + _params["timeout"] = args.sec # -th set threads default is 30 - global threads - threads = args.num if args.num else 30 + if args.num: + _params["threads"] = args.num # -w set path to local wordlist default is "wordlist.txt" - global local_wordlist - local_wordlist = args.wordlist if args.wordlist else _ROOT + "{sep}local{sep}wordlist.txt".format(sep=os.sep) + if args.wordlist: + _params["wordlist"] = args.wordlist # --dns set dns default is False - global dns - dns = args.dns if args.dns else False + if args.dns: + _params["dns"] = args.dns # --user-agent - global useragent - useragent = args.useragent if args.useragent else random.choice(user_agent) - - return domain - - - def scan(max_len, domain, subdomain, percentage, results): - ctrl_c = "(ctrl+c) | " - - #Output.progressPrint(ctrl_c + subdomain) - target = subdomain+"."+domain - if not silent_mode: Output.progressPrint(ctrl_c + str(percentage*100)[:4] + "% | " + target + " "*max_len) - req = Request.dns(target) - - if not req: return None - - req = list(req) - ip_req = req[2][0] + if args.useragent: + _params["useragent"] = args.useragent - if ip_req in no_ip: return None + # --plugin-test + if args.plugin_test: + _params["plugin_test"] = args.plugin_test - # dns only - if no_http: - # print line and update report - data = Output.jsonizeRequestData(req, target) - if not silent_mode: print (Output.linePrint(data, max_len)) - del data["target"] - return results.update({target: data}) + return domain, _params - # dns and http(s) - https = Request.https(target) - - if https: - for item in https: - req.append(item) - else: - http = Request.http(target) - - if http: - for item in http: - req.append(item) - else: - req.append("") - req.append("") - - # print line and update report - data = Output.jsonizeRequestData(req, target) - if data["code"] in no_http_code: return None - if not silent_mode: print (Output.linePrint(data, max_len)) - del data["target"] - return results.update({target: data}) - - - def logo(): - return """ - _ __ _ - | |/ / | | v%s - | ' / _ __ ___ ___| | ___ __ _ _ - | < | '_ \ / _ \ / __| |/ / '_ \| | | | - | . \| | | | (_) | (__| <| |_) | |_| | - |_|\_\_| |_|\___/ \___|_|\_\ .__/ \__, | - | | __/ | - |_| |___/ -""" % Start.__version__ - -class Scanning: - """ - # module to import in python script: - - from knockpy import knockpy - - # return json results - results = knockpy.Scanning.start("domain.com", params) - """ - - def start(domain, params=False): - # default params - params_default = { - "no_local": False, # [bool] local wordlist ignore - "no_remote": False, # [bool] remote wordlist ignore - "no_scan": False, # [bool] scanning ignore, show wordlist - "no_http": False, # [bool] http requests ignore - "no_http_code": [], # [list] http code list to ignore - "no_ip": [], # [list] ip address to ignore - "dns": "", # [str] use custom DNS ex. 8.8.8.8 - "timeout": 3, # [int] timeout in seconds - "threads": 30, # [int] threads num - "useragent": "", # [str] use a custom user agent - "wordlist": "" # [str] path to custom wordlist - } + # Start scan via "scan.start" module in lib/ + def threads(domain, max_len, wordlist, params): + len_wordlist = len(wordlist) + results = {} - # params validation - if not params: - params = params_default - else: - for param in params_default.keys(): - value = params_default[param] - if param not in params: - params.update({param: value}) + # start with threads + with concurrent.futures.ThreadPoolExecutor(max_workers=params["threads"]) as executor: + results_executor = {executor.submit(scan.start, max_len, domain, subdomain, wordlist.index(subdomain)/len_wordlist, results, params) for subdomain in wordlist} - # domain validation - if not Start.is_valid_domain(domain): - return False + if not params["silent_mode"]: + for item in concurrent.futures.as_completed(results_executor): + if item.result() != None: + print (item.result()) + # return a dict + return results - # global flags by params - global no_local - no_local = params["no_local"] - global no_remote - no_remote = params["no_remote"] + def wordlist(domain, params): + # get wordlist + local, remote = wordlists.get(domain, params) - if no_local and no_remote: # case no wordlist - return None + # case local wordlist not found + if local == None and remote == None: + return [], [], [] - global no_scan - no_scan = params["no_scan"] - global no_http - no_http = params["no_http"] - global no_http_code - no_http_code = params["no_http_code"] - global no_ip - no_ip = params["no_ip"] - global dns - dns = params["dns"] if params["dns"] else False - global timeout - timeout = params["timeout"] - global threads - threads = params["threads"] - global useragent - useragent = params["useragent"] if params["useragent"] else random.choice(user_agent) - global local_wordlist - local_wordlist = params["wordlist"] if params["wordlist"] else _ROOT + "{sep}local{sep}wordlist.txt".format(sep=os.sep) + local = wordlists.purge(local) + remote = wordlists.purge(remote) - # global flags by default - global silent_mode - silent_mode = "json" - global output_folder - output_folder = False - - # get wordlist - local, remote = Wordlist.get(domain) - local = Wordlist.purge(local) - remote = Wordlist.purge(remote) + # join wordlist local + remote wordlist = list(dict.fromkeys((local + remote))) wordlist = sorted(wordlist, key=str.lower) - wordlist = Wordlist.purge(wordlist) - - # return a list ['sub1', 'sub2', 'sub3', ...] - if no_scan: - return wordlist - - # constants - len_wordlist = len(wordlist) - max_len = 1 - - # start with threads - results = {} - with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: - results_executor = {executor.submit(Start.scan, max_len, domain, subdomain, wordlist.index(subdomain)/len_wordlist, results) for subdomain in wordlist} - - # return a dict - return results + wordlist = wordlists.purge(wordlist) + return local, remote, wordlist def main(): - domain = Start.arguments() - - # action: scan - if not silent_mode: print (Start.logo()) + domain, params = Start.arguments() - # wordlist - if not silent_mode: Output.progressPrint("getting wordlist ...") - local, remote = Wordlist.get(domain) + # action: scan + if not params["silent_mode"]: + print (logo.show(__version__)) + output.progressPrint("getting wordlist ...") - # purge wordlist - local = Wordlist.purge(local) - remote = Wordlist.purge(remote) - - # mix wordlist local + remote - wordlist = list(dict.fromkeys((local + remote))) - wordlist = sorted(wordlist, key=str.lower) + # get wordlist + if params["plugin_test"]: + print (wordlists.get(domain, params)) + sys.exit() - # purge wordlist - wordlist = Wordlist.purge(wordlist) + # get wordlists + local, remote, wordlist = Start.wordlist(domain, params) # takes the longest word in wordlist max_len = len(max(wordlist, key=len) + "." + domain) if wordlist else sys.exit("\nno wordlist") - # no wordlist found - if not wordlist: sys.exit("no wordlist") - # if no-scan args show wordlist and exit - if no_scan: + if params["no_scan"]: print (wordlist) sys.exit() # print header - if not silent_mode: print (Output.headerPrint(local, remote, domain)) + if not params["silent_mode"]: + print (output.headerPrint(local, remote, domain)) # time start and print time_start = time.time() - if not silent_mode: print (Output.headerBarPrint(time_start, max_len)) + if not params["silent_mode"]: + print (output.headerBarPrint(time_start, max_len, params["no_http"])) - # init - len_wordlist = len(wordlist) - results = {} - - # start with threads - with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor: - results_executor = {executor.submit(Start.scan, max_len, domain, subdomain, wordlist.index(subdomain)/len_wordlist, results) for subdomain in wordlist} - - for item in concurrent.futures.as_completed(results_executor): - if item.result() != None: - # show line - if not silent_mode: print (item.result()) + # Start threads + results = Start.threads(domain, max_len, wordlist, params) # elapsed time time_end = time.time() # show output - if not silent_mode: + # when silent_mode is None (--silent without args) -> "no-output" -> quiet + if not params["silent_mode"]: # when silent_mode is False - print (Output.footerPrint(time_end, time_start, results)) - elif silent_mode == "json": + print (output.footerPrint(time_end, time_start, results)) + elif params["silent_mode"] == "json": # json without indent print (json.dumps(results)) - elif silent_mode == "json-pretty": + elif params["silent_mode"] == "json-pretty": # json with indent print (json.dumps(results, indent=4)) - elif silent_mode == "csv": - print (Report.csv(results)) - - # when silent_mode is None (--silent without args) -> "no-output" -> quiet + elif params["silent_mode"] == "csv": + print (report.csv(results)) # save report - if save_output: Report.save(results, domain, time_start, time_end, len_wordlist) + if params["save_output"]: + report.save(results, domain, time_start, time_end, len(wordlist), __version__, params["output_folder"]) if __name__ == "__main__": try: diff --git a/knockpy/lib/extraargs.py b/knockpy/lib/extraargs.py new file mode 100644 index 0000000..81bede8 --- /dev/null +++ b/knockpy/lib/extraargs.py @@ -0,0 +1,47 @@ +import os +import sys +from . import report +from . import output + +# extra arguments ["--report", "--plot", "--csv"] +# when domain not is a domain name but it's a command +def parse_and_exit(args): + if len(args) == 3 and args[1] in ["--report", "--plot", "--csv"]: + + # report + if args[1] == "--report": + if args[2].endswith(".json"): + if os.path.isfile(args[2]): + data = report.terminal(args[2]) + if not data: + sys.exit("report not found or invalid json") + if data: sys.exit(data) + sys.exit("report not found: %s" % args[2]) + sys.exit("try using: knockpy --report path/to/domain.com_yyyy_mm_dd_hh_mm_ss.json") + + # plot + elif args[1] == "--plot": + if args[2].endswith(".json"): + if os.path.isfile(args[2]): + data = report.load_json(args[2]) + if not data: + sys.exit("report not found or invalid json") + if data: + plotting = report.plot(data) + if not plotting: + print("Plot needs these libraries. Use 'pip' to install them:\n- matplotlib\n- networkx\n- PyQt5") + sys.exit() + sys.exit("report not found: %s" % args[2]) + sys.exit("try using: knockpy --plot path/to/domain.com_yyyy_mm_dd_hh_mm_ss.json") + + # csv + elif args[1] == "--csv": + if args[2].endswith(".json"): + if os.path.isfile(args[2]): + data = report.load_json(args[2]) + if data: + csv_file = args[2].replace(".json", ".csv") + output.write_csv(csv_file, report.csv(data)) + sys.exit("csv report: %s" % csv_file) + sys.exit("report not found: %s" % args[2]) + sys.exit("try using: knockpy --csv path/to/domain.com_yyyy_mm_dd_hh_mm_ss.json") \ No newline at end of file diff --git a/knockpy/lib/logo.py b/knockpy/lib/logo.py new file mode 100644 index 0000000..f0f931a --- /dev/null +++ b/knockpy/lib/logo.py @@ -0,0 +1,13 @@ +# print logo + +def show(version): + return """ + _ __ _ + | |/ / | | v%s + | ' / _ __ ___ ___| | ___ __ _ _ + | < | '_ \ / _ \ / __| |/ / '_ \| | | | + | . \| | | | (_) | (__| <| |_) | |_| | + |_|\_\_| |_|\___/ \___|_|\_\ .__/ \__, | + | | __/ | + |_| |___/ +""" % version \ No newline at end of file diff --git a/knockpy/lib/output.py b/knockpy/lib/output.py new file mode 100644 index 0000000..47b9d68 --- /dev/null +++ b/knockpy/lib/output.py @@ -0,0 +1,195 @@ +from colorama import Fore, Style +from . import request +import time +import json +import sys + +# print progressbar +def progressPrint(text): + if not text: text = " "*80 + text_dim = Style.DIM + text + Style.RESET_ALL + sys.stdout.write("%s\r" % text_dim) + sys.stdout.flush() + sys.stdout.write("\r") + +# colorize line +def colorizeHeader(text, count, sep): + newText = Style.BRIGHT + Fore.YELLOW + text + Style.RESET_ALL + _count = str(len(count)) if isinstance(count, list) else str(count) + + newCount = Style.BRIGHT + Fore.CYAN + _count + Style.RESET_ALL + + if len(count) == 0: + newText = Style.DIM + text + Style.RESET_ALL + newCount = Style.DIM + _count + Style.RESET_ALL + newSep = " " + Fore.MAGENTA + sep + Style.RESET_ALL + + return newText + newCount + newSep + +# print wordlist and target information +def headerPrint(local, remote, domain): + """ + local: 0 | remote: 270 + + Wordlist: 270 | Target: domain.com | Ip: 123.123.123.123 + """ + + line = colorizeHeader("local: ", local, "| ") + line += colorizeHeader("remote: ", remote, "\n") + line += "\n" + line += colorizeHeader("Wordlist: ", local + remote, "| ") + + req = request.dns(domain) + if req != []: + ip_req = req[2][0] + ip = ip_req if req else "" + else: + ip = "None" + + line += colorizeHeader("Target: ", domain, "| ") + line += colorizeHeader("Ip: ", ip, "\n") + + return line + +# print header before of match-line (linePrint) +def headerBarPrint(time_start, max_len, no_http): + """ + 21:57:55 + + Ip address Subdomain Real hostname + --------------- ----------------------- ---------------------------- + """ + + # time_start + line = Style.BRIGHT + line += time.strftime("%H:%M:%S", time.gmtime(time_start)) + "\n\n" + + # spaces + spaceIp = " " * (16 - len("Ip address")) + spaceSub = " " * ((max_len + 1) - len("Subdomain")) + + # dns only + if no_http: + line += "Ip address" +spaceIp+ "Subdomain" +spaceSub+ "Real hostname" + "\n" + line += Style.RESET_ALL + line += "-" * 15 + " " + "-" * max_len + " " + "-" * max_len + + # http + else: + spaceCode = " " * (5 - len("Code")) + spaceServ = " " * ((max_len + 1) - len("Server")) + line += "Ip address" +spaceIp+ "Code" +spaceCode+ "Subdomain" +spaceSub+ "Server" +spaceServ+ "Real hostname" + "\n" + line += Style.RESET_ALL + line += "-" * 15 + " " + "-" * 4 + " " + "-" * max_len + " " + "-" * max_len + " " + "-" * max_len + + return line + +# change json for different scan: dns or dns + http +def jsonizeRequestData(req, target): + if len(req) == 3: + subdomain, aliasList, ipList = req + domain = subdomain if subdomain != target else "" + + data = { + "target": target, + "domain": domain, + "alias": aliasList, + "ipaddr": ipList + } + elif len(req) == 5: + subdomain, aliasList, ipList, code, server = req + domain = subdomain if subdomain != target else "" + + data = { + "target": target, + "domain": domain, + "alias": aliasList, + "ipaddr": ipList, + "code": code, + "server": server + } + else: + data = {} + + return data + +# print match-line while it's working +def linePrint(data, max_len): + """ + 123.123.123.123 click.domain.com click.virt.s6.exactdomain.com + """ + + # just a fix, print space if not domain + _domain = " "*max_len if not data["domain"] else data["domain"] + + # case dns only + if len(data.keys()) == 4: + spaceIp = " " * (16 - len(data["ipaddr"][0])) + spaceSub = " " * ((max_len + 1) - len(data["target"])) + _target = Style.BRIGHT + Fore.CYAN + data["target"] + Style.RESET_ALL if data["alias"] else data["target"] + line = data["ipaddr"][0] +spaceIp+ _target +spaceSub+ _domain + + # case dns +http + elif len(data.keys()) == 6: + data["server"] = data["server"][:max_len] + + spaceIp = " " * (16 - len(data["ipaddr"][0])) + spaceSub = " " * ((max_len + 1) - len(data["target"])) + spaceCode = " " * (5 - len(str(data["code"]))) + spaceServer = " " * ((max_len + 1) - len(data["server"])) + + if data["code"] == 200: + _code = Style.BRIGHT + Fore.GREEN + str(data["code"]) + Style.RESET_ALL + _target = Style.BRIGHT + Fore.GREEN + data["target"] + Style.RESET_ALL + elif str(data["code"]).startswith("4"): + _code = Style.BRIGHT + Fore.MAGENTA + str(data["code"]) + Style.RESET_ALL + _target = Style.BRIGHT + Fore.MAGENTA + data["target"] + Style.RESET_ALL + elif str(data["code"]).startswith("5"): + _code = Style.BRIGHT + Fore.RED + str(data["code"]) + Style.RESET_ALL + _target = Style.BRIGHT + Fore.RED + data["target"] + Style.RESET_ALL + else: + _code = str(data["code"]) + _target = Style.BRIGHT + Fore.CYAN + data["target"] + Style.RESET_ALL if data["domain"] else data["target"] + + line = data["ipaddr"][0] +spaceIp+ _code +spaceCode+ _target +spaceSub+ data["server"] +spaceServer+ _domain + + return line + +# print footer at the end after match-line (linePrint) +def footerPrint(time_end, time_start, results): + """ + 21:58:06 + + Ip address: 122 | Subdomain: 93 | elapsed time: 00:00:11 + """ + + progressPrint("") + elapsed_time = time_end - time_start + line = Style.BRIGHT + line += "\n" + line += time.strftime("%H:%M:%S", time.gmtime(time_end)) + line += "\n\n" + line += Style.RESET_ALL + + ipList = [] + for i in results.keys(): + for ii in results[i]["ipaddr"]: + ipList.append(ii) + + line += colorizeHeader("Ip address: ", list(set(ipList)), "| ") + line += colorizeHeader("Subdomain: ", list(results.keys()), "| ") + line += colorizeHeader("elapsed time: ", time.strftime("%H:%M:%S", time.gmtime(elapsed_time)), "\n") + + return line + +# create json file +def write_json(path, json_data): + f = open(path, "w") + f.write(json.dumps(json_data, indent=4)) + f.close() + +# create csv file +def write_csv(path, csv_data): + f = open(path, "w") + f.write(csv_data) + f.close() \ No newline at end of file diff --git a/knockpy/lib/report.py b/knockpy/lib/report.py new file mode 100644 index 0000000..3e3381f --- /dev/null +++ b/knockpy/lib/report.py @@ -0,0 +1,100 @@ +import os +import time +import json +from . import output + +user_agent = [ + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:65.0) Gecko/20100101 Firefox/65.0", + "Mozilla/5.0 (MSIE 10.0; Windows NT 6.1; Trident/5.0)", + "Opera/9.80 (Windows NT 6.0) Presto/2.12.388 Version/12.14", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A" +] + +# import json file +def load_json(report): + try: + report_json = json.load(open(report)) + del report_json["_meta"] + return report_json + except: + return None + +# save output and add _meta to json file +def save(results, domain, time_start, time_end, len_wordlist, version, output_folder): + _meta = { + "name": "knockpy", + "version": version, + "time_start": time_start, + "time_end": time_end, + "domain": domain, + "wordlist": len_wordlist + } + + results.update({"_meta": _meta}) + strftime = "%Y_%m_%d_%H_%M_%S" + date = time.strftime(strftime, time.gmtime(time_end)) + path = output_folder + os.sep + domain + "_" + date + ".json" + output.write_json(path, results) + +# convert json to csv +def csv(report): + csv_data = "" + for item in report.keys(): + if len(report[item]) == 5: + """ + fix injection: + https://github.com/guelfoweb/knock/commit/156378d97f10871d30253eeefe15ec399aaa0b03 + https://www.exploit-db.com/exploits/49342 + """ + csv_injection = ("+", "-", "=", "@") + if report[item]["server"].startswith(csv_injection): + report[item]["server"] = "'" + report[item]["server"] + + csv_data += "%s;%s;%s;%s;%s" % (report[item]["ipaddr"][0], + report[item]["code"], + item, + report[item]["server"], + report[item]["domain"]) + if len(report[item]) == 3: + csv_data += "%s;%s;%s" % (report[item]["ipaddr"][0], + item, + report[item]["domain"]) + csv_data += "\n" + return csv_data + +# convert json to human text to show in terminal +def terminal(domain): + report_json = load_json(domain) + + # report not found or invalid json + if report_json == None: + return None + + results = "" + for item in report_json.keys(): + report_json[item].update({"target": item}) + max_len = len(max(list(report_json.keys()), key=len)) + results += output.linePrint(report_json[item], max_len) + "\n" + return results + +# plotting relationships +def plot(report): + # todo: + # get modules list from sys.modules.keys() + try: + import matplotlib.pyplot as plt + import networkx as nx + except: + return None + + + dataset = [] + for item in report.keys(): + dataset.append((report[item]["ipaddr"][0], item)) + + g = nx.Graph() + g.add_edges_from(dataset) + + pos = nx.spring_layout(g) + nx.draw(g, pos, node_size=50, node_color="r", edge_color="c", with_labels=True, width=0.7, alpha=0.9) + plt.show() \ No newline at end of file diff --git a/knockpy/lib/request.py b/knockpy/lib/request.py new file mode 100644 index 0000000..7f70e9b --- /dev/null +++ b/knockpy/lib/request.py @@ -0,0 +1,33 @@ +import requests +import socket +from . import dns_socket + +# socket timeout +timeout = 3 +if hasattr(socket, "setdefaulttimeout"): socket.setdefaulttimeout(timeout) + +# resolve requests: DNS, HTTP, HTTPS + +def dns(target, dns=False): + try: + if dns: # use the specified DNS + return dns_socket._gethostbyname_ex(target, dns) + return socket.gethostbyname_ex(target) + except: + return [] + +def https(url, useragent): + headers = {"user-agent": useragent} + try: + resp = requests.get("https://"+url, headers=headers, timeout=timeout) + return [resp.status_code, resp.headers["Server"] if "Server" in resp.headers.keys() else ""] + except: + return [] + +def http(url, useragent): + headers = {"user-agent": useragent} + try: + resp = requests.get("http://"+url, headers=headers, timeout=timeout) + return [resp.status_code, resp.headers["Server"] if "Server" in resp.headers.keys() else ""] + except: + return [] \ No newline at end of file diff --git a/knockpy/lib/scan.py b/knockpy/lib/scan.py new file mode 100644 index 0000000..14ea6dc --- /dev/null +++ b/knockpy/lib/scan.py @@ -0,0 +1,49 @@ +from . import output +from . import request + +def start(max_len, domain, subdomain, percentage, results, params): + ctrl_c = "(ctrl+c) | " + + #output.progressPrint(ctrl_c + subdomain) + target = subdomain+"."+domain + if not params["silent_mode"]: output.progressPrint(ctrl_c + str(percentage*100)[:4] + "% | " + target + " "*max_len) + req = request.dns(target, params["dns"]) + + if not req: return None + + req = list(req) + ip_req = req[2][0] + + if ip_req in params["no_ip"]: return None + + # dns only + if params["no_http"]: + # print line and update report + data = output.jsonizeRequestData(req, target) + if not params["silent_mode"]: print (output.linePrint(data, max_len)) + del data["target"] + return results.update({target: data}) + + # dns and http(s) + https = request.https(target, params["useragent"]) + + if https: + for item in https: + req.append(item) + else: + http = request.http(target, params["useragent"]) + + if http: + for item in http: + req.append(item) + else: + req.append("") + req.append("") + + # print line and update report + data = output.jsonizeRequestData(req, target) + if data["code"] in params["no_http_code"]: return None + if not params["silent_mode"]: print (output.linePrint(data, max_len)) + del data["target"] + + return results.update({target: data}) \ No newline at end of file diff --git a/knockpy/lib/wordlists.py b/knockpy/lib/wordlists.py new file mode 100644 index 0000000..af37991 --- /dev/null +++ b/knockpy/lib/wordlists.py @@ -0,0 +1,127 @@ +import os +import re +import time +from . import output +from importlib.machinery import SourceFileLoader + +# purge wordlist +def purge(wordlist): + return [word for word in wordlist if word and re.match("[a-z0-9\.-]", word)] + +# get local dictionary +def localscan(filename): + try: + wlist = open(filename,'r').read().split("\n") + except: + if not silent_mode: output.progressPrint("wordlist not found: {filename}".format(filename=filename)) + return [] + return filter(None, wlist) + +# get remote wordlist using plugin +def remotescan(domain): + result = [] + + # plugin_test is global variable + if plugin_test: + plugin_test_results = {} + plugin_test_timeinit = time.time() + + for plugin in os.listdir(plugin_folder): + + # filter for .py scripts and exclude __init__.py file + if plugin.endswith('.py') and plugin != '__init__.py': + plugin_path = os.path.join(plugin_folder, plugin) + + try: + # plugin_test is global variable + if plugin_test: + plugin_test_timestart = time.time() + + if not silent_mode: + output.progressPrint('') # print empty line + output.progressPrint(plugin) # print name of the module + + # load module + foo = SourceFileLoader(plugin, plugin_path).load_module() + + # get module's result + plugin_result = foo.get(domain) + + if plugin_test: + # create dictionary with plugin info + plugin_time_elapsed = time.time() - plugin_test_timestart + plugin_time_elapsed = time.strftime("%H:%M:%S", time.gmtime(plugin_time_elapsed)) + plugin_test_match = len(plugin_result) + plugin_test_results.update({plugin: {"time": plugin_time_elapsed, "match": plugin_test_match, "error": False}}) + + # add subdomains + result = result + plugin_result + except: + # print plugin error and sleep 3 secs. + if not silent_mode: + output.progressPrint("error plugin -> "+plugin) + time.sleep(3) + + if plugin_test: + plugin_time_elapsed = time.time() - plugin_test_timestart + plugin_time_elapsed = time.strftime("%H:%M:%S", time.gmtime(plugin_time_elapsed)) + plugin_test_results.update({plugin: {"time": plugin_time_elapsed, "match": 0, "error": True}}) + + continue + + result = list(set([r.lower() for r in result])) + subdomains = [item.replace('.'+domain, '') for item in result] + subdomains = purge(subdomains) + + # return test results + if plugin_test: + # add final results + plugin_test_timeend = time.time() - plugin_test_timeinit + plugin_test_timeend = time.strftime("%H:%M:%S", time.gmtime(plugin_test_timeend)) + plugin_test_error = [item for item in plugin_test_results.keys() if plugin_test_results[item]["error"]] + plugin_test_list = list(plugin_test_results.keys()) + plugin_test_results.update({ + "_results": + { + "time": plugin_test_timeend, + "plugins": { + "count": len(plugin_test_list), + "list": plugin_test_list, + "error": plugin_test_error, + }, + "subdomains": { + "count": len(subdomains), + "list": subdomains, + } + } + }) + return plugin_test_results + + return subdomains + +# get wordlist local and/or remote +def get(domain, params): + local, remote = [], [] + + global silent_mode + silent_mode = params["silent_mode"] + global plugin_test + plugin_test = params["plugin_test"] + global local_wordlist + local_wordlist = params["wordlist"] + global plugin_folder + plugin_folder = params["plugin_folder"] + + if not os.path.isfile(local_wordlist): + return None, None + + if plugin_test: + return remotescan(domain) + + if not params["no_local"]: + local = list(localscan(local_wordlist)) + + if not params["no_remote"]: + remote = list(remotescan(domain)) + + return local, remote \ No newline at end of file diff --git a/knockpy/remote/api_censys.py b/knockpy/remote/api_censys.py new file mode 100644 index 0000000..0025e85 --- /dev/null +++ b/knockpy/remote/api_censys.py @@ -0,0 +1,48 @@ +from censys.search import CensysCertificates + +""" +Let's start with simple function specifications: + +1. if apikey is required, use "api_" before the plugin name: + + api_service.py + +2. the function name must be "get" and take as parameter "domain": + + def get(domain): + foo + +3. the function must return a possibly unique list of subdomains: + + ['sub1.domain.com', 'sub2.domain.com'] + +4. to parse the results it is recommended to use the standard modules such as: + + requests, json, bs4, re +""" + +# author: Gianni Amato +# plugin: api_censys +# version: 1.0 + +def get(domain): + # censys -> object -> CensysSearchAPIv1.search: key -> parsed.names + + # https://search.censys.io/account/api + api_id = "" # <- here your API ID + api_secret = "" # <- here your Secret + + limit = 1000 + + certificates = CensysCertificates(api_id=api_id, api_secret=api_secret) + query = "parsed.names: {domain}".format(domain=domain) + data = certificates.search(query, fields=["parsed.names"], max_records=limit) + + result = [] + for item in data: + subdomains = item["parsed.names"] + for subdomain in subdomains: + if subdomain.endswith(domain) and subdomain not in result: + result.append(subdomain) + + return result diff --git a/knockpy/remote/crtsh.py b/knockpy/remote/crtsh.py new file mode 100644 index 0000000..331d02a --- /dev/null +++ b/knockpy/remote/crtsh.py @@ -0,0 +1,43 @@ +import requests +import json + +""" +Let's start with simple function specifications: + +1. if apikey is required, use "api_" before the plugin name: + + api_service.py + +2. the function name must be "get" and take as parameter "domain": + + def get(domain): + foo + +3. the function must return a possibly unique list of subdomains: + + ['sub1.domain.com', 'sub2.domain.com'] + +4. to parse the results it is recommended to use the standard modules such as: + + requests, json, bs4, re +""" + +# author: Gianni Amato +# plugin: crtsh +# version: 1.0 + +def get(domain): + # crtsh -> LIST -> JSON: key -> common_name + url = "https://crt.sh/?q={domain}&output=json".format(domain=domain) + resp = requests.get(url, timeout=30).text + + resp = json.loads(resp) + + result = [] + for item in resp: + subdomain = item['common_name'] + if domain in subdomain: + if subdomain not in result: + result.append(subdomain) + + return result diff --git a/requirements.txt b/requirements.txt index 1117d5e..f367694 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ requests beautifulsoup4 -colorama \ No newline at end of file +colorama +censys \ No newline at end of file diff --git a/setup.py b/setup.py index be2ecaf..e09c614 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name="knockpy", - version="6.0.0", + version="6.1.0", description="Knock is a Knockpy is a portable and modular python3 tool designed to quickly enumerate subdomains on a target domain through passive reconnaissance and dictionary attack.", url="https://github.com/guelfoweb/knock", author="Gianni 'guelfoweb' Amato",