From 5d510c1d0fc3c0ab185951f3ae8f76287073f394 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Tue, 30 Jan 2024 15:48:27 +0000 Subject: [PATCH] binexport: implement ELF/aarch64 GOT/thunk analyzer --- .../extractors/binexport2/extractor.py | 141 +++++++++++------- 1 file changed, 83 insertions(+), 58 deletions(-) diff --git a/capa/features/extractors/binexport2/extractor.py b/capa/features/extractors/binexport2/extractor.py index 95e52aae5..3ddc5d2ec 100644 --- a/capa/features/extractors/binexport2/extractor.py +++ b/capa/features/extractors/binexport2/extractor.py @@ -5,6 +5,7 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +import logging from typing import Dict, List, Tuple, Iterator import capa.features.extractors.elf @@ -15,7 +16,7 @@ import capa.features.extractors.binexport2.basicblock from capa.features.common import Feature from capa.features.address import Address, AbsoluteVirtualAddress -from capa.features.extractors.binexport2 import FunctionContext, BasicBlockContext, InstructionContext +from capa.features.extractors.binexport2 import BinExport2Index, FunctionContext, BasicBlockContext, InstructionContext from capa.features.extractors.base_extractor import ( BBHandle, InsnHandle, @@ -25,30 +26,84 @@ ) from capa.features.extractors.binexport2.binexport2_pb2 import BinExport2 +logger = logging.getLogger(__name__) + + +class BinExport2Analysis: + def __init__(self, be2: BinExport2, idx: BinExport2Index, buf: bytes): + self.be2 = be2 + self.idx = idx + self.buf = buf + + # from virtual address to import name + self.thunks: Dict[int, str] = {} + + def _find_got_thunks(self): + if self.be2.meta_information.architecture_name != "aarch64": + logger.debug("skipping GOT thunk analysis on non-aarch64") + return + + if not self.buf.startswith(capa.features.extractors.common.MATCH_ELF): + logger.debug("skipping GOT thunk analysis on non-ELF") + return + + for vertex_index, vertex in enumerate(self.be2.call_graph.vertex): + if not vertex.HasField("address"): + continue + + if not vertex.HasField("mangled_name"): + continue + + if BinExport2.CallGraph.Vertex.Type.IMPORTED != vertex.type: + continue + + if len(self.idx.callers_by_vertex_index[vertex_index]) != 1: + # find imports with a single caller, + # which should be the thunk + continue + + maybe_thunk_vertex_index = self.idx.callers_by_vertex_index[vertex_index][0] + maybe_thunk_vertex = self.be2.call_graph.vertex[maybe_thunk_vertex_index] + maybe_thunk_address = maybe_thunk_vertex.address + + maybe_thunk_flow_graph_index = self.idx.flow_graph_index_by_address[maybe_thunk_address] + maybe_thunk_flow_graph = self.be2.flow_graph[maybe_thunk_flow_graph_index] + + if len(maybe_thunk_flow_graph.basic_block_index) != 1: + # should have a single basic block + continue + + maybe_thunk_basic_block = self.be2.basic_block[maybe_thunk_flow_graph.entry_basic_block_index] + if len(list(self.idx.instruction_indices(maybe_thunk_basic_block))) != 4: + # fstat: + # 000008b0 adrp x16, 0x11000 + # 000008b4 ldr x17, [x16, #0xf88] {fstat} + # 000008b8 add x16, x16, #0xf88 {fstat} + # 000008bc br x17 + continue + + thunk_address = maybe_thunk_address + thunk_name = vertex.mangled_name + logger.debug("found GOT thunk: 0x%x -> %s", thunk_address, thunk_name) + + self.thunks[thunk_address] = thunk_name + class BinExport2FeatureExtractor(StaticFeatureExtractor): def __init__(self, be2: BinExport2, buf: bytes): super().__init__(hashes=SampleHashes.from_bytes(buf)) self.be2 = be2 self.buf = buf - - self.address_by_instruction_index: Dict[int, int] = {} - self.flow_graph_index_by_function_index: Dict[int, int] = {} - self.function_index_by_address: Dict[int, int] = {} + self.idx = BinExport2Index(self.be2) + self.analysis = BinExport2Analysis(self.be2, self.idx, self.buf) self.global_features: List[Tuple[Feature, Address]] = [] self.global_features.extend(list(capa.features.extractors.common.extract_format(self.buf))) self.global_features.extend(list(capa.features.extractors.common.extract_os(self.buf))) self.global_features.extend(list(capa.features.extractors.common.extract_arch(self.buf))) - self._index_instruction_addresses() - self._index_basic_blocks_by_function() - - print("base address", hex(self.get_base_address())) - ba = self.get_base_address() - for v in self.be2.call_graph.vertex: - if v.mangled_name: - print(hex(v.address - ba), v.mangled_name) + # TODO: assert supported file formats, arches + # and gradually relax restrictions as they're tested. def get_base_address(self): # TODO: assume the lowest address is the base address. @@ -64,10 +119,12 @@ def extract_file_features(self): yield from capa.features.extractors.binexport2.file.extract_features(self.be2, self.buf) def get_functions(self) -> Iterator[FunctionHandle]: - for function_index in self.flow_graph_index_by_function_index.keys(): - vertex = self.be2.call_graph.vertex[function_index] + for flow_graph_index, flow_graph in enumerate(self.be2.flow_graph): + entry_basic_block_index = flow_graph.entry_basic_block_index + flow_graph_address = self.idx.basic_block_address_by_index[entry_basic_block_index] yield FunctionHandle( - AbsoluteVirtualAddress(vertex.address), inner=FunctionContext(self.be2, function_index) + AbsoluteVirtualAddress(flow_graph_address), + inner=FunctionContext(self.be2, self.idx, self.analysis, flow_graph_index), ) def extract_function_features(self, fh: FunctionHandle) -> Iterator[Tuple[Feature, Address]]: @@ -75,60 +132,28 @@ def extract_function_features(self, fh: FunctionHandle) -> Iterator[Tuple[Featur def get_basic_blocks(self, fh: FunctionHandle) -> Iterator[BBHandle]: fhi: FunctionContext = fh.inner - flow_graph_index = self.flow_graph_index_by_function_index[fhi.function_index] + flow_graph_index = fhi.flow_graph_index flow_graph = self.be2.flow_graph[flow_graph_index] for basic_block_index in flow_graph.basic_block_index: - bb = self.be2.basic_block[basic_block_index] + basic_block_address = self.idx.basic_block_address_by_index[basic_block_index] yield BBHandle( - address=AbsoluteVirtualAddress(self.address_by_instruction_index[bb.instruction_index[0].begin_index]), - inner=BasicBlockContext(self.be2, basic_block_index), + address=AbsoluteVirtualAddress(basic_block_address), + inner=BasicBlockContext(basic_block_index), ) def extract_basic_block_features(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[Tuple[Feature, Address]]: - # TODO(wb): 1755 - yield from () + yield from capa.features.extractors.binexport2.basicblock.extract_features(fh, bbh) def get_instructions(self, fh: FunctionHandle, bbh: BBHandle) -> Iterator[InsnHandle]: bbi: BasicBlockContext = bbh.inner - bb: BinExport2.BasicBlock = self.be2.basic_block[bbi.basic_block_index] - for instruction_index in range(bb.instruction_index[0].begin_index, bb.instruction_index[0].end_index): + basic_block: BinExport2.BasicBlock = self.be2.basic_block[bbi.basic_block_index] + for instruction_index in self.idx.instruction_indices(basic_block): + instruction_address = self.idx.instruction_address_by_index[instruction_index] yield InsnHandle( - address=AbsoluteVirtualAddress(self.address_by_instruction_index[instruction_index]), - inner=InstructionContext(self.be2, instruction_index), + address=AbsoluteVirtualAddress(instruction_address), + inner=InstructionContext(instruction_index), ) def extract_insn_features(self, fh: FunctionHandle, bbh: BBHandle, ih: InsnHandle): yield from capa.features.extractors.binexport2.insn.extract_features(fh, bbh, ih) - - def _index_instruction_addresses(self): - address = 0 - next_address = 0 - for instruction_index, instruction in enumerate(self.be2.instruction): - if instruction.HasField("address"): - address = instruction.address - next_address = address + len(instruction.raw_bytes) - else: - address = next_address - next_address += len(instruction.raw_bytes) - - self.address_by_instruction_index[instruction_index] = address - - def _index_basic_blocks_by_function(self): - function_index_from_address = {} - - for index, vertex in enumerate(self.be2.call_graph.vertex): - function_index_from_address[vertex.address] = index - - for flow_graph_index, flow_graph in enumerate(self.be2.flow_graph): - basic_block_entry_point = self.be2.basic_block[flow_graph.entry_basic_block_index] - basic_block_address = self.address_by_instruction_index[ - basic_block_entry_point.instruction_index[0].begin_index - ] - - if basic_block_address not in function_index_from_address: - continue - - function_index = function_index_from_address[basic_block_address] - - self.flow_graph_index_by_function_index[function_index] = flow_graph_index