diff --git a/analysis/datastub/SymbolInfo.py b/analysis/datastub/SymbolInfo.py index e62531ab..94284be8 100644 --- a/analysis/datastub/SymbolInfo.py +++ b/analysis/datastub/SymbolInfo.py @@ -24,9 +24,11 @@ # @version 0.3 -import sys +import copy import os.path +import shlex import subprocess +import sys from operator import itemgetter from datastub.SortedCollection import SortedCollection from datastub.utils import debug @@ -35,6 +37,39 @@ ************************************************************************* """ +DEBUG_SYMBOLS = dict() + + +def getdebugsymbol(sym, address): + if address in DEBUG_SYMBOLS: + debug(3, f"found symbol {DEBUG_SYMBOLS[address]} at {hex(address)}") + return DEBUG_SYMBOLS[address] + offset = address - sym.img.lower + command = f"gdb -ex 'set print asm-demangle on' -ex 'x/i {hex(offset)}' -ex quit {sym.img.name}" + output = subprocess.check_output(shlex.split(command)).decode("utf-8") + line = str() + lines = output.splitlines() + for line in reversed(lines): + tmp = line.lstrip().split(" ", 1)[0] + if tmp == hex(offset): + break + line = line.split("<", 1)[1] + line = line[::-1].split(">", 1)[1] + line = line[::-1] + DEBUG_SYMBOLS[address] = line + return line + + +def getdebugelf(fname): + command = f"gdb -ex quit {fname}" + output = subprocess.check_output(command.split(" ")).decode("utf-8") + lines = output.splitlines() + assert lines[-2].find(fname) != -1 + if lines[-1].find("No debugging symbols found") != -1: + return None + assert lines[-2].find("Reading symbols from") != -1 + return lines[-1].split(" ")[-1].split("...")[0] + def readelfsyms(fname, image): try: @@ -53,7 +88,13 @@ def readelfsyms(fname, image): return None if lines is None or len(lines) == 0: - return None + debug(0, f"No symbols found in {fname}") + fname = getdebugelf(fname) + if fname is None: + debug(0, "GDB didnot found any debug file") + return None + debug(0, f"GDB found debug file: {fname}") + return readelfsyms(fname, image) syms = [] for line in lines: @@ -175,6 +216,11 @@ def lookup(cls, address): assert cls.instance is not None try: (_, sym) = cls.instance.symbols.find_le(address) + if sym.name[0].find("_init") >= 0: + sym = copy.deepcopy(sym) + sym_name = getdebugsymbol(sym, address) + sym.name[0] = sym_name + return sym return sym except ValueError: return None diff --git a/analysis/datastub/export.py b/analysis/datastub/export.py index 1f3f6f93..cb08db78 100644 --- a/analysis/datastub/export.py +++ b/analysis/datastub/export.py @@ -27,6 +27,7 @@ import os import gzip import subprocess +import shlex import pickle from datastub.DataFS import DataFS from datastub.IpInfoShort import IpInfoShort, IP_INFO_FILE @@ -103,6 +104,38 @@ def loadpickle(pfile): """ +def getGdbSourceFileInfo(addr, binary_path): + command = f"gdb -ex 'set print asm-demangle on' -ex 'info line *{addr}' -ex quit {binary_path}" + output = subprocess.check_output(shlex.split(command)).decode("utf-8") + tmp = "No line number information available" + if tmp not in output: + return None, 0 + tmp = output.split(tmp)[1] + tmp = tmp.split("<", 1)[1] + tmp = tmp[::-1].split(">", 1)[1] + output = tmp[::-1] + + tmp = "@plt" + if tmp not in output: + return None, 0 + fn_name = output.split(tmp)[0] + + command = f"gdb -ex 'set print asm-demangle on' -ex 'info line {fn_name}' -ex quit {binary_path}" + output = subprocess.check_output(shlex.split(command)).decode("utf-8") + tmp = "Line" + if tmp not in output: + return None, 0 + linenr, _, rel_filepath = output.split(tmp)[-1].splitlines()[0].lstrip().split(" ") + linenr = int(linenr) + rel_filepath = rel_filepath.strip('"') + basepath = "/".join(binary_path.split("/")[:-1]) + filepath = f"{basepath}/{rel_filepath}" + + debug(2, "[SRC] available via gdb for %s in %s", (addr, binary_path)) + debug(2, f"[SRC] in {filepath}:{linenr}") + return filepath, linenr + + def getSourceFileInfo(addr, binary_path): # e.g., addr2line 0x42d4b9 -e openssl # -> file_name:line_nr @@ -116,14 +149,10 @@ def getSourceFileInfo(addr, binary_path): infos = output.split(":") source_file_path, source_line_number = infos[0], infos[1] if "??" == source_file_path: - raise subprocess.CalledProcessError + raise subprocess.CalledProcessError(1, "addr2line") except subprocess.CalledProcessError: debug(2, "[SRC] unavailable for %s in %s", (addr, binary_path)) - return None, 0 - except Exception as error: - debug(0, f"lookup: {error} not catched!") - debug(2, "[SRC] unavailable for %s in %s", (addr, binary_path)) - return None, 0 + return getGdbSourceFileInfo(addr, binary_path) if "discriminator" in source_line_number: source_line_number = source_line_number.split()[0] @@ -132,9 +161,6 @@ def getSourceFileInfo(addr, binary_path): source_line_number = int(source_line_number) except ValueError: source_line_number = 0 - except Exception as error: - debug(0, f"lookup: {error} not catched!") - source_line_number = 0 return source_file_path, source_line_number @@ -158,6 +184,97 @@ def getAsmFileInfo(addr, asm_dump): ************************************************************************* """ +DOWNLOADED_PACKAGE_SOURCES = list() + + +def searchSourceInPackages(bin_file_path, ip): + def search_in_directory(filename, filepath): + command = f"find -iname {filename}" + debug(4, f"exec: {command}") + output = subprocess.check_output(shlex.split(command)).decode("utf-8") + lines = output.splitlines() + filepath_chunks = 1 + while len(lines) > 1: + filepath_chunks += 1 + filename = "/".join(filepath.split("/")[-filepath_chunks:]) + lines = [line for line in lines if filename in line] + if len(lines) == 1: + return lines[0] + else: + return None + + if bin_file_path not in DOWNLOADED_PACKAGE_SOURCES: + # Identify source + command = f"dpkg -S {bin_file_path}" + debug(4, f"exec: {command}") + try: + output = subprocess.check_output(shlex.split(command)).decode("utf-8") + except subprocess.CalledProcessError: + debug(0, f"dpkg failed for {bin_file_path} @ {hex(ip)}") + return None, 0 + lines = output.splitlines() + assert len(lines) == 1 + package = lines[0].split(":")[0] + + # Download source package + command = f"apt-get source {package}" + debug(4, f"exec: {command}") + subprocess.check_output(shlex.split(command)) + + DOWNLOADED_PACKAGE_SOURCES.append(bin_file_path) + + # Use gdb to get filename for address + command = f"gdb -batch -ex 'set print asm-demangle on' -ex 'info line *{hex(ip)}' {bin_file_path}" + debug(4, f"exec: {command}") + output = subprocess.check_output(shlex.split(command)).decode("utf-8") + lines = output.splitlines() + assert len(lines) == 1 + if "No line number information available" in lines[0]: + return None, 0 + line = lines[0].split("starts at address ")[1].split(" and ends at")[0] + line = line.split("<", 1)[1] + line = line[::-1].split(">", 1)[1] + if "+" in line: + line = line.split("+", 1)[1] + functionname = line[::-1] + filelinenumber = int(lines[0].split(" ")[1]) + + command = f"gdb -batch -ex 'set print asm-demangle on' -ex 'info line {functionname}' {bin_file_path}" + debug(4, f"exec: {command}") + output = subprocess.check_output(shlex.split(command)).decode("utf-8") + lines = output.splitlines() + if len(lines) >= 1: + debug(0, f"Warning several lines received from {command}: {lines}") + lines = lines[:1] + filepath = lines[0].split('"')[1] + filename = filepath.split("/")[-1] + + src_file_path = search_in_directory(filename, filepath) + if src_file_path is not None: + return src_file_path, filelinenumber + + # Check if there are any tar.xz with the source code + command = f"ls **/*.tar.xz | xargs -n 1 -i bash -c 'tar -tf {str('{}')} | grep {filename} | wc -l | xargs echo {str('{}')}'" + debug(4, f"exec: {command}") + output = subprocess.check_output(command, shell=True).decode("utf-8") + lines = output.splitlines() + parts_list = [parts for line in lines if int((parts := line.split(" "))[1])] + assert len(parts_list) == 1 + [tarball, cnt] = parts_list[0] + + # Extract files and remove tarball + command = f"tar -xvf {tarball}" + debug(4, f"exec: {command}") + subprocess.check_output(shlex.split(command)) + + command = f"rm {tarball}" + debug(4, f"exec: {command}") + subprocess.check_output(shlex.split(command)) + + src_file_path = search_in_directory(filename, filepath) + assert src_file_path is not None + return src_file_path, filelinenumber + def export_ip(ip, datafs, imgmap, info_map): if ip is None or ip == 0: @@ -185,7 +302,6 @@ def export_ip(ip, datafs, imgmap, info_map): asm_dump = "" try: debug(1, "[ASM] objdump %s", (str(bin_file_path))) - # asm_dump = subprocess.check_output(["objdump", "-Dj", ".text", bin_file_path], universal_newlines=True) with datafs.create_file(asm_file_path) as f: subprocess.call( [ @@ -217,15 +333,19 @@ def export_ip(ip, datafs, imgmap, info_map): debug(1, "[ASM] unavailable for %s in %s", (hex(addr), bin_file_path)) # Search for leak in source code src_file_path, src_line_nr = getSourceFileInfo(hex(addr), bin_file_path) + if src_file_path is None: + src_file_path, src_line_nr = searchSourceInPackages(bin_file_path, addr) + debug( + 1, + "[SRC] available in package sources for %s in %s", + (hex(addr), bin_file_path), + ) if src_file_path is not None and os.path.exists(src_file_path): datafs.add_file(src_file_path) + elif src_file_path is None: + debug(1, "[SRC] unavailable for %s in %s", (hex(addr), bin_file_path)) else: - if src_file_path is None: - debug( - 1, "[SRC] unavailable for %s in %s", (hex(addr), bin_file_path) - ) - else: - debug(1, "[SRC] source file %s missing", (src_file_path)) + debug(1, "[SRC] source file %s missing", (src_file_path)) ip_info = IpInfoShort( asm_file_path, asm_line_nr, src_file_path, src_line_nr ) diff --git a/analysis/datastub/printer.py b/analysis/datastub/printer.py index 388f9998..85c74dff 100644 --- a/analysis/datastub/printer.py +++ b/analysis/datastub/printer.py @@ -71,14 +71,15 @@ def doprint(self, text, ip, leak): self.outstream.write(" " * self.depth) if len(text) > 0: self.outstream.write(text + " ") + + sym = None if SymbolInfo.isopen(): sym = SymbolInfo.lookup(ip) - if sym is not None: - self.outstream.write(escape(sym.strat(ip))) - else: - self.outstream.write(hex(ip)) - else: + if sym is None: self.outstream.write(hex(ip)) + else: + self.outstream.write(escape(sym.strat(ip))) + self.outstream.write("\n") if leak is not None: leak.doprint(self) @@ -209,7 +210,7 @@ def doprint_generic(self, obj, param1=False): else: node = f"{node_plain} origin='fixed' {str(evidence[0].key)}" self.startNode(node) - for key, value in entries.items(): + for key, value in sorted(entries.items()): self.doprint_line(f"{format(key, 'x')}: {value}") self.endNode(node_plain)