Skip to content

Commit

Permalink
inspect-binexport: factor out the index building
Browse files Browse the repository at this point in the history
  • Loading branch information
williballenthin committed Jan 30, 2024
1 parent 2202dc7 commit 265ffe1
Showing 1 changed file with 144 additions and 122 deletions.
266 changes: 144 additions & 122 deletions scripts/inspect-binexport2.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,126 @@
logger = logging.getLogger("inspect-binexport2")


class BinExport2Index:
def __init__(self, be2: BinExport2):
self.be2 = be2

self.callers_by_vertex_index: Dict[int, List[int]] = defaultdict(list)
self.callees_by_vertex_index: Dict[int, List[int]] = defaultdict(list)

# note: flow graph != call graph (vertex)
self.flow_graph_index_by_address: Dict[int, int] = {}
self.basic_block_index_by_address: Dict[int, int] = {}
self.basic_block_address_by_index: Dict[int, int] = {}
self.instruction_index_by_address: Dict[int, int] = {}
self.instruction_address_by_index: Dict[int, int] = {}

# edges that come from the given basic block
self.source_edges_by_basic_block_index: Dict[int, List[BinExport2.FlowGraph.Edge]] = defaultdict(list)
# edges that end up at the given basic block
self.target_edges_by_basic_block_index: Dict[int, List[BinExport2.FlowGraph.Edge]] = defaultdict(list)

self.vertex_index_by_address: Dict[int, int] = {}

self.data_reference_index_by_source_instruction_index: Dict[int, List[int]] = defaultdict(list)
self.data_reference_index_by_target_address: Dict[int, List[int]] = defaultdict(list)

self._index_vertex_edges()
self._index_instruction_addresses()
self._index_flow_graph_nodes()
self._index_flow_graph_edges()
self._index_call_graph_vertices()
self._index_data_references()

def _index_vertex_edges(self):
for edge in self.be2.call_graph.edge:
if not edge.source_vertex_index:
continue
if not edge.target_vertex_index:
continue

self.callers_by_vertex_index[edge.target_vertex_index].append(edge.source_vertex_index)
self.callees_by_vertex_index[edge.source_vertex_index].append(edge.target_vertex_index)

def _index_instruction_addresses(self):
instruction_address = 0
for instruction_index, instruction in enumerate(self.be2.instruction):
if instruction.HasField("address"):
instruction_address = instruction.address

self.instruction_index_by_address[instruction_address] = instruction_index
self.instruction_address_by_index[instruction_index] = instruction_address

assert instruction.HasField("raw_bytes")
instruction_address += len(instruction.raw_bytes)

def _index_flow_graph_nodes(self):
for flow_graph_index, flow_graph in enumerate(self.be2.flow_graph):
for basic_block_index in flow_graph.basic_block_index:
basic_block = self.be2.basic_block[basic_block_index]
for instruction_index in self.instruction_indices(basic_block):
basic_block_address = self.instruction_address_by_index[instruction_index]
self.basic_block_index_by_address[basic_block_address] = basic_block_index
self.basic_block_address_by_index[basic_block_index] = basic_block_address

entry_basic_block = self.be2.basic_block[flow_graph.entry_basic_block_index]
entry_instruction_index = next(self.instruction_indices(entry_basic_block))
entry_instruction_address = self.instruction_address_by_index[entry_instruction_index]
function_address = entry_instruction_address
self.flow_graph_index_by_address[function_address] = flow_graph_index

def _index_flow_graph_edges(self):
for flow_graph in self.be2.flow_graph:
for edge in flow_graph.edge:
if not edge.HasField("source_basic_block_index") or not edge.HasField("target_basic_block_index"):
continue

self.source_edges_by_basic_block_index[edge.source_basic_block_index].append(edge)
self.target_edges_by_basic_block_index[edge.target_basic_block_index].append(edge)

def _index_call_graph_vertices(self):
for vertex_index, vertex in enumerate(self.be2.call_graph.vertex):
if not vertex.HasField("address"):
continue

vertex_address = vertex.address
self.vertex_index_by_address[vertex_address] = vertex_index

def _index_data_references(self):
for data_reference_index, data_reference in enumerate(self.be2.data_reference):
self.data_reference_index_by_source_instruction_index[data_reference.instruction_index].append(
data_reference_index
)
self.data_reference_index_by_target_address[data_reference.address].append(data_reference_index)

@staticmethod
def instruction_indices(basic_block: BinExport2.BasicBlock) -> Iterator[int]:
for index_range in basic_block.instruction_index:
if not index_range.HasField("end_index"):
yield index_range.begin_index
continue
else:
yield from range(index_range.begin_index, index_range.end_index)

def get_function_name_by_vertex(self, vertex_index: int) -> str:
vertex = self.be2.call_graph.vertex[vertex_index]
name = f"sub_{vertex.address:x}"
if vertex.HasField("mangled_name"):
name = vertex.mangled_name

if vertex.HasField("demangled_name"):
name = vertex.demangled_name

return name

def get_function_name_by_address(self, address: int) -> str:
if address not in self.vertex_index_by_address:
return ""

vertex_index = self.vertex_index_by_address[address]
return self.get_function_name_by_vertex(vertex_index)


class Renderer:
def __init__(self, o: io.StringIO):
self.o = o
Expand Down Expand Up @@ -71,6 +191,7 @@ def main(argv=None):

o = Renderer(io.StringIO())
be2: BinExport2 = capa.features.extractors.binexport2.get_binexport2(args.input_file)
idx = BinExport2Index(be2)

with o.section("meta"):
o.writeln(f"name: {be2.meta_information.executable_name}")
Expand Down Expand Up @@ -98,111 +219,12 @@ def main(argv=None):
if not be2.library:
o.writeln("(none)")

callers_by_vertex_index: Dict[int, List[int]] = defaultdict(list)
callees_by_vertex_index: Dict[int, List[int]] = defaultdict(list)

for edge in be2.call_graph.edge:
if not edge.source_vertex_index:
continue
if not edge.target_vertex_index:
continue

callers_by_vertex_index[edge.target_vertex_index].append(edge.source_vertex_index)
callees_by_vertex_index[edge.source_vertex_index].append(edge.target_vertex_index)

# note: flow graph != call graph (vertex)
flow_graph_index_by_address: Dict[int, int] = {}
basic_block_index_by_address: Dict[int, int] = {}
basic_block_address_by_index: Dict[int, int] = {}
instruction_index_by_address: Dict[int, int] = {}
instruction_address_by_index: Dict[int, int] = {}

instruction_address = 0
for instruction_index, instruction in enumerate(be2.instruction):
if instruction.HasField("address"):
instruction_address = instruction.address

instruction_index_by_address[instruction_address] = instruction_index
instruction_address_by_index[instruction_index] = instruction_address

assert instruction.HasField("raw_bytes")
instruction_address += len(instruction.raw_bytes)

def instruction_indices(basic_block: BinExport2.BasicBlock) -> Iterator[int]:
for index_range in basic_block.instruction_index:
if not index_range.HasField("end_index"):
yield index_range.begin_index
continue
else:
yield from range(index_range.begin_index, index_range.end_index)

for flow_graph_index, flow_graph in enumerate(be2.flow_graph):
for basic_block_index in flow_graph.basic_block_index:
basic_block = be2.basic_block[basic_block_index]
for instruction_index in instruction_indices(basic_block):
basic_block_address = instruction_address_by_index[instruction_index]
basic_block_index_by_address[basic_block_address] = basic_block_index
basic_block_address_by_index[basic_block_index] = basic_block_address

entry_basic_block = be2.basic_block[flow_graph.entry_basic_block_index]
entry_instruction_index = next(instruction_indices(entry_basic_block))
entry_instruction_address = instruction_address_by_index[entry_instruction_index]
function_address = entry_instruction_address
flow_graph_index_by_address[function_address] = flow_graph_index

# edges that come from the given basic block
source_edges_by_basic_block_index: Dict[int, List[BinExport2.FlowGraph.Edge]] = defaultdict(list)
# edges that end up at the given basic block
target_edges_by_basic_block_index: Dict[int, List[BinExport2.FlowGraph.Edge]] = defaultdict(list)

for flow_graph in be2.flow_graph:
for edge in flow_graph.edge:
if not edge.HasField("source_basic_block_index") or not edge.HasField("target_basic_block_index"):
continue

source_edges_by_basic_block_index[edge.source_basic_block_index].append(edge)
target_edges_by_basic_block_index[edge.target_basic_block_index].append(edge)

vertex_index_by_address: Dict[int, int] = {}

for vertex_index, vertex in enumerate(be2.call_graph.vertex):
if not vertex.HasField("address"):
continue

vertex_address = vertex.address
vertex_index_by_address[vertex_address] = vertex_index

def get_function_name_by_vertex(be2: BinExport2, vertex_index: int) -> str:
vertex = be2.call_graph.vertex[vertex_index]
name = f"sub_{vertex.address:x}"
if vertex.HasField("mangled_name"):
name = vertex.mangled_name

if vertex.HasField("demangled_name"):
name = vertex.demangled_name

return name

def get_function_name_by_address(be2: BinExport2, address: int) -> str:
if address not in vertex_index_by_address:
return ""

vertex_index = vertex_index_by_address[address]
return get_function_name_by_vertex(be2, vertex_index)

data_reference_index_by_source_instruction_index: Dict[int, List[int]] = defaultdict(list)
data_reference_index_by_target_address: Dict[int, List[int]] = defaultdict(list)

for data_reference_index, data_reference in enumerate(be2.data_reference):
data_reference_index_by_source_instruction_index[data_reference.instruction_index].append(data_reference_index)
data_reference_index_by_target_address[data_reference.address].append(data_reference_index)

with o.section("functions"):
for vertex_index, vertex in enumerate(be2.call_graph.vertex):
if not vertex.HasField("address"):
continue

with o.section(f"function {get_function_name_by_vertex(be2, vertex_index)} @ {hex(vertex.address)}"):
with o.section(f"function {idx.get_function_name_by_vertex(vertex_index)} @ {hex(vertex.address)}"):
o.writeln(f"type: {vertex.Type.Name(vertex.type)}")

if vertex.HasField("mangled_name"):
Expand All @@ -217,58 +239,58 @@ def get_function_name_by_address(be2: BinExport2, address: int) -> str:
o.writeln(f"library: [{vertex.library_index}] {library.name}")

if vertex.HasField("module_index"):
module = be2.library[vertex.module_index]
module = be2.module[vertex.module_index]
o.writeln(f"module: [{vertex.module_index}] {module.name}")

if callees_by_vertex_index[vertex_index] or callers_by_vertex_index[vertex_index]:
if idx.callees_by_vertex_index[vertex_index] or idx.callers_by_vertex_index[vertex_index]:
o.writeln("xrefs:")

for caller_index in callers_by_vertex_index[vertex_index]:
o.writeln(f" ← {get_function_name_by_vertex(be2, caller_index)}")
for caller_index in idx.callers_by_vertex_index[vertex_index]:
o.writeln(f" ← {idx.get_function_name_by_vertex(caller_index)}")

for callee_index in callees_by_vertex_index[vertex_index]:
o.writeln(f" → {get_function_name_by_vertex(be2, callee_index)}")
for callee_index in idx.callees_by_vertex_index[vertex_index]:
o.writeln(f" → {idx.get_function_name_by_vertex(callee_index)}")

if vertex.address not in flow_graph_index_by_address:
if vertex.address not in idx.flow_graph_index_by_address:
o.writeln("(no flow graph)")
else:
flow_graph_index = flow_graph_index_by_address[vertex.address]
flow_graph_index = idx.flow_graph_index_by_address[vertex.address]
flow_graph = be2.flow_graph[flow_graph_index]

o.writeln("")
for basic_block_index in flow_graph.basic_block_index:
basic_block = be2.basic_block[basic_block_index]
basic_block_address = basic_block_address_by_index[basic_block_index]
basic_block_address = idx.basic_block_address_by_index[basic_block_index]

with o.section(f"basic block {hex(basic_block_address)}"):
for edge in target_edges_by_basic_block_index[basic_block_index]:
for edge in idx.target_edges_by_basic_block_index[basic_block_index]:
if edge.type == BinExport2.FlowGraph.Edge.Type.CONDITION_FALSE:
continue

source_basic_block_index = edge.source_basic_block_index
source_basic_block_address = basic_block_address_by_index[source_basic_block_index]
source_basic_block_address = idx.basic_block_address_by_index[source_basic_block_index]

o.writeln(
f"↓ {BinExport2.FlowGraph.Edge.Type.Name(edge.type)} basic block {hex(source_basic_block_address)}"
)

for instruction_index in instruction_indices(basic_block):
for instruction_index in idx.instruction_indices(basic_block):
instruction = be2.instruction[instruction_index]
instruction_address = instruction_address_by_index[instruction_index]
instruction_address = idx.instruction_address_by_index[instruction_index]

mnemonic = be2.mnemonic[instruction.mnemonic_index]

call_targets = ""
if instruction.call_target:
call_targets = " "
for call_target_address in instruction.call_target:
call_target_name = get_function_name_by_address(be2, call_target_address)
call_target_name = idx.get_function_name_by_address(call_target_address)
call_targets += f"→ function {call_target_name} @ {hex(call_target_address)} "

data_references = ""
if instruction_index in data_reference_index_by_source_instruction_index:
if instruction_index in idx.data_reference_index_by_source_instruction_index:
data_references = " "
for data_reference_index in data_reference_index_by_source_instruction_index[
for data_reference_index in idx.data_reference_index_by_source_instruction_index[
instruction_index
]:
data_reference = be2.data_reference[data_reference_index]
Expand All @@ -288,7 +310,7 @@ def get_function_name_by_address(be2: BinExport2, address: int) -> str:
)

does_fallthrough = False
for edge in source_edges_by_basic_block_index[basic_block_index]:
for edge in idx.source_edges_by_basic_block_index[basic_block_index]:
if edge.type == BinExport2.FlowGraph.Edge.Type.CONDITION_FALSE:
does_fallthrough = True
continue
Expand All @@ -298,7 +320,7 @@ def get_function_name_by_address(be2: BinExport2, address: int) -> str:
back_edge = "↑"

target_basic_block_index = edge.target_basic_block_index
target_basic_block_address = basic_block_address_by_index[target_basic_block_index]
target_basic_block_address = idx.basic_block_address_by_index[target_basic_block_index]
o.writeln(
f"→ {BinExport2.FlowGraph.Edge.Type.Name(edge.type)} basic block {hex(target_basic_block_address)} {back_edge}"
)
Expand All @@ -307,16 +329,16 @@ def get_function_name_by_address(be2: BinExport2, address: int) -> str:
o.writeln("↓ CONDITION_FALSE")

with o.section("data"):
for data_address in sorted(data_reference_index_by_target_address.keys()):
if data_address in instruction_index_by_address:
for data_address in sorted(idx.data_reference_index_by_target_address.keys()):
if data_address in idx.instruction_index_by_address:
# appears to be code
continue

data_references = ""
for data_reference_index in data_reference_index_by_target_address[data_address]:
for data_reference_index in idx.data_reference_index_by_target_address[data_address]:
data_reference = be2.data_reference[data_reference_index]
instruction_index = data_reference.instruction_index
instruction_address = instruction_address_by_index[instruction_index]
instruction_address = idx.instruction_address_by_index[instruction_index]
data_references += f"⇤ {hex(instruction_address)} "
o.writeln(f"{hex(data_address)} {data_references}")

Expand Down

0 comments on commit 265ffe1

Please sign in to comment.