Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve analysis with sources and gdb #35

Open
wants to merge 6 commits into
base: add-selftest-heap
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions analysis/datastub/SymbolInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
# @version 0.3


import sys
import copy
import os.path
import shlex
import subprocess
import sys
from operator import itemgetter
from datastub.SortedCollection import SortedCollection
from datastub.utils import debug
Expand All @@ -35,6 +37,39 @@
*************************************************************************
"""

DEBUG_SYMBOLS = dict()


def getdebugsymbol(sym, address):
if address in DEBUG_SYMBOLS:
debug(3, f"found symbol {DEBUG_SYMBOLS[address]} at {hex(address)}")
return DEBUG_SYMBOLS[address]
offset = address - sym.img.lower
command = f"gdb -ex 'set print asm-demangle on' -ex 'x/i {hex(offset)}' -ex quit {sym.img.name}"
output = subprocess.check_output(shlex.split(command)).decode("utf-8")
line = str()
lines = output.splitlines()
for line in reversed(lines):
tmp = line.lstrip().split(" ", 1)[0]
if tmp == hex(offset):
break
line = line.split("<", 1)[1]
line = line[::-1].split(">", 1)[1]
line = line[::-1]
DEBUG_SYMBOLS[address] = line
return line


def getdebugelf(fname):
command = f"gdb -ex quit {fname}"
output = subprocess.check_output(command.split(" ")).decode("utf-8")
lines = output.splitlines()
assert lines[-2].find(fname) != -1
if lines[-1].find("No debugging symbols found") != -1:
return None
assert lines[-2].find("Reading symbols from") != -1
return lines[-1].split(" ")[-1].split("...")[0]


def readelfsyms(fname, image):
try:
Expand All @@ -53,7 +88,13 @@ def readelfsyms(fname, image):
return None

if lines is None or len(lines) == 0:
return None
debug(0, f"No symbols found in {fname}")
fname = getdebugelf(fname)
if fname is None:
debug(0, "GDB didnot found any debug file")
return None
debug(0, f"GDB found debug file: {fname}")
return readelfsyms(fname, image)

syms = []
for line in lines:
Expand Down Expand Up @@ -175,6 +216,11 @@ def lookup(cls, address):
assert cls.instance is not None
try:
(_, sym) = cls.instance.symbols.find_le(address)
if sym.name[0].find("_init") >= 0:
sym = copy.deepcopy(sym)
sym_name = getdebugsymbol(sym, address)
sym.name[0] = sym_name
return sym
return sym
except ValueError:
return None
Expand Down
152 changes: 136 additions & 16 deletions analysis/datastub/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import os
import gzip
import subprocess
import shlex
import pickle
from datastub.DataFS import DataFS
from datastub.IpInfoShort import IpInfoShort, IP_INFO_FILE
Expand Down Expand Up @@ -103,6 +104,38 @@ def loadpickle(pfile):
"""


def getGdbSourceFileInfo(addr, binary_path):
command = f"gdb -ex 'set print asm-demangle on' -ex 'info line *{addr}' -ex quit {binary_path}"
output = subprocess.check_output(shlex.split(command)).decode("utf-8")
tmp = "No line number information available"
if tmp not in output:
return None, 0
tmp = output.split(tmp)[1]
tmp = tmp.split("<", 1)[1]
tmp = tmp[::-1].split(">", 1)[1]
output = tmp[::-1]

tmp = "@plt"
if tmp not in output:
return None, 0
fn_name = output.split(tmp)[0]

command = f"gdb -ex 'set print asm-demangle on' -ex 'info line {fn_name}' -ex quit {binary_path}"
output = subprocess.check_output(shlex.split(command)).decode("utf-8")
tmp = "Line"
if tmp not in output:
return None, 0
linenr, _, rel_filepath = output.split(tmp)[-1].splitlines()[0].lstrip().split(" ")
linenr = int(linenr)
rel_filepath = rel_filepath.strip('"')
basepath = "/".join(binary_path.split("/")[:-1])
filepath = f"{basepath}/{rel_filepath}"

debug(2, "[SRC] available via gdb for %s in %s", (addr, binary_path))
debug(2, f"[SRC] in {filepath}:{linenr}")
return filepath, linenr


def getSourceFileInfo(addr, binary_path):
# e.g., addr2line 0x42d4b9 -e openssl
# -> file_name:line_nr
Expand All @@ -116,14 +149,10 @@ def getSourceFileInfo(addr, binary_path):
infos = output.split(":")
source_file_path, source_line_number = infos[0], infos[1]
if "??" == source_file_path:
raise subprocess.CalledProcessError
raise subprocess.CalledProcessError(1, "addr2line")
except subprocess.CalledProcessError:
debug(2, "[SRC] unavailable for %s in %s", (addr, binary_path))
return None, 0
except Exception as error:
debug(0, f"lookup: {error} not catched!")
debug(2, "[SRC] unavailable for %s in %s", (addr, binary_path))
return None, 0
return getGdbSourceFileInfo(addr, binary_path)

if "discriminator" in source_line_number:
source_line_number = source_line_number.split()[0]
Expand All @@ -132,9 +161,6 @@ def getSourceFileInfo(addr, binary_path):
source_line_number = int(source_line_number)
except ValueError:
source_line_number = 0
except Exception as error:
debug(0, f"lookup: {error} not catched!")
source_line_number = 0

return source_file_path, source_line_number

Expand All @@ -158,6 +184,97 @@ def getAsmFileInfo(addr, asm_dump):
*************************************************************************
"""

DOWNLOADED_PACKAGE_SOURCES = list()


def searchSourceInPackages(bin_file_path, ip):
def search_in_directory(filename, filepath):
command = f"find -iname {filename}"
debug(4, f"exec: {command}")
output = subprocess.check_output(shlex.split(command)).decode("utf-8")
lines = output.splitlines()
filepath_chunks = 1
while len(lines) > 1:
filepath_chunks += 1
filename = "/".join(filepath.split("/")[-filepath_chunks:])
lines = [line for line in lines if filename in line]
if len(lines) == 1:
return lines[0]
else:
return None

if bin_file_path not in DOWNLOADED_PACKAGE_SOURCES:
# Identify source
command = f"dpkg -S {bin_file_path}"
debug(4, f"exec: {command}")
try:
output = subprocess.check_output(shlex.split(command)).decode("utf-8")
except subprocess.CalledProcessError:
debug(0, f"dpkg failed for {bin_file_path} @ {hex(ip)}")
return None, 0
lines = output.splitlines()
assert len(lines) == 1
package = lines[0].split(":")[0]

# Download source package
command = f"apt-get source {package}"
debug(4, f"exec: {command}")
subprocess.check_output(shlex.split(command))

DOWNLOADED_PACKAGE_SOURCES.append(bin_file_path)

# Use gdb to get filename for address
command = f"gdb -batch -ex 'set print asm-demangle on' -ex 'info line *{hex(ip)}' {bin_file_path}"
debug(4, f"exec: {command}")
output = subprocess.check_output(shlex.split(command)).decode("utf-8")
lines = output.splitlines()
assert len(lines) == 1
if "No line number information available" in lines[0]:
return None, 0
line = lines[0].split("starts at address ")[1].split(" and ends at")[0]
line = line.split("<", 1)[1]
line = line[::-1].split(">", 1)[1]
if "+" in line:
line = line.split("+", 1)[1]
functionname = line[::-1]
filelinenumber = int(lines[0].split(" ")[1])

command = f"gdb -batch -ex 'set print asm-demangle on' -ex 'info line {functionname}' {bin_file_path}"
debug(4, f"exec: {command}")
output = subprocess.check_output(shlex.split(command)).decode("utf-8")
lines = output.splitlines()
if len(lines) >= 1:
debug(0, f"Warning several lines received from {command}: {lines}")
lines = lines[:1]
filepath = lines[0].split('"')[1]
filename = filepath.split("/")[-1]

src_file_path = search_in_directory(filename, filepath)
if src_file_path is not None:
return src_file_path, filelinenumber

# Check if there are any tar.xz with the source code
command = f"ls **/*.tar.xz | xargs -n 1 -i bash -c 'tar -tf {str('{}')} | grep {filename} | wc -l | xargs echo {str('{}')}'"
debug(4, f"exec: {command}")
output = subprocess.check_output(command, shell=True).decode("utf-8")
lines = output.splitlines()
parts_list = [parts for line in lines if int((parts := line.split(" "))[1])]
assert len(parts_list) == 1
[tarball, cnt] = parts_list[0]

# Extract files and remove tarball
command = f"tar -xvf {tarball}"
debug(4, f"exec: {command}")
subprocess.check_output(shlex.split(command))

command = f"rm {tarball}"
debug(4, f"exec: {command}")
subprocess.check_output(shlex.split(command))

src_file_path = search_in_directory(filename, filepath)
assert src_file_path is not None
return src_file_path, filelinenumber


def export_ip(ip, datafs, imgmap, info_map):
if ip is None or ip == 0:
Expand Down Expand Up @@ -185,7 +302,6 @@ def export_ip(ip, datafs, imgmap, info_map):
asm_dump = ""
try:
debug(1, "[ASM] objdump %s", (str(bin_file_path)))
# asm_dump = subprocess.check_output(["objdump", "-Dj", ".text", bin_file_path], universal_newlines=True)
with datafs.create_file(asm_file_path) as f:
subprocess.call(
[
Expand Down Expand Up @@ -217,15 +333,19 @@ def export_ip(ip, datafs, imgmap, info_map):
debug(1, "[ASM] unavailable for %s in %s", (hex(addr), bin_file_path))
# Search for leak in source code
src_file_path, src_line_nr = getSourceFileInfo(hex(addr), bin_file_path)
if src_file_path is None:
src_file_path, src_line_nr = searchSourceInPackages(bin_file_path, addr)
debug(
1,
"[SRC] available in package sources for %s in %s",
(hex(addr), bin_file_path),
)
if src_file_path is not None and os.path.exists(src_file_path):
datafs.add_file(src_file_path)
elif src_file_path is None:
debug(1, "[SRC] unavailable for %s in %s", (hex(addr), bin_file_path))
else:
if src_file_path is None:
debug(
1, "[SRC] unavailable for %s in %s", (hex(addr), bin_file_path)
)
else:
debug(1, "[SRC] source file %s missing", (src_file_path))
debug(1, "[SRC] source file %s missing", (src_file_path))
ip_info = IpInfoShort(
asm_file_path, asm_line_nr, src_file_path, src_line_nr
)
Expand Down
13 changes: 7 additions & 6 deletions analysis/datastub/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,15 @@ def doprint(self, text, ip, leak):
self.outstream.write(" " * self.depth)
if len(text) > 0:
self.outstream.write(text + " ")

sym = None
if SymbolInfo.isopen():
sym = SymbolInfo.lookup(ip)
if sym is not None:
self.outstream.write(escape(sym.strat(ip)))
else:
self.outstream.write(hex(ip))
else:
if sym is None:
self.outstream.write(hex(ip))
else:
self.outstream.write(escape(sym.strat(ip)))

self.outstream.write("\n")
if leak is not None:
leak.doprint(self)
Expand Down Expand Up @@ -209,7 +210,7 @@ def doprint_generic(self, obj, param1=False):
else:
node = f"{node_plain} origin='fixed' {str(evidence[0].key)}"
self.startNode(node)
for key, value in entries.items():
for key, value in sorted(entries.items()):
self.doprint_line(f"{format(key, 'x')}: {value}")
self.endNode(node_plain)

Expand Down