Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring '''get_extractor''' in capa/main.py #1842

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 39 additions & 26 deletions capa/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,33 +126,46 @@ def new_print(*args, **kwargs):
inspect.builtins.print = old_print # type: ignore


def log_unsupported_format_error():
logger.error("-" * 80)
logger.error(" Input file does not appear to be a PE or ELF file.")
logger.error(" ")
logger.error(
" capa currently only supports analyzing PE and ELF files (or shellcode, when using --format sc32|sc64)."
)
logger.error(" If you don't know the input file type, you can try using the `file` utility to guess it.")
logger.error("-" * 80)
def exceptUnsupportedError(func):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use snake case names and maybe rename this to catch_log_return_errors or similar?

then let's use via @<decorator_name> (see, e.g., https://rinaarts.com/declutter-python-code-with-error-handling-decorators/)

e_list, return_values = [(UnsupportedFormatError E_INVALID_FILE_TYPE),
(UnsupportedArchError, E_INVALID_FILE_ARCH),
(UnsupportedOSError, E_INVALID_FILE_OS)]

messsage_list = [ # UnsupportedFormatError
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these go into the same list of tuples?

(" Input file does not appear to be a PE or ELF file.",
" capa currently only supports analyzing PE and ELF files (or shellcode, when using --format sc32|sc64).",
" If you don't know the input file type, you can try using the `file` utility to guess it."),

# UnsupportedArchError
(" Input file does not appear to target a supported architecture.",
" capa currently only supports analyzing x86 (32- and 64-bit)."),

# UnsupportedOSError
(" Input file does not appear to target a supported OS.",
" capa currently only supports analyzing executables for some operating systems (including Windows and Linux).")
]

def logging_wrapper(exception):
assert(exception in e_list)
e_messages = message_list[e_list.index(exception)]
e_return_value = return_values[e_list.index(exception)]

logger.error("-" * 80)
logger.error(f"{e_messages[0]}")
logger.error(" ")

for i in e_messages[1:]:
logger.error(i)

logger.error("-" * 80)

return e_return_value

if type(func(*args, **kwargs)) = ValueError:
return logging_wrapper(func(*args, **kwargs))


def log_unsupported_os_error():
logger.error("-" * 80)
logger.error(" Input file does not appear to target a supported OS.")
logger.error(" ")
logger.error(
" capa currently only supports analyzing executables for some operating systems (including Windows and Linux)."
)
logger.error("-" * 80)


def log_unsupported_arch_error():
logger.error("-" * 80)
logger.error(" Input file does not appear to target a supported architecture.")
logger.error(" ")
logger.error(" capa currently only supports analyzing x86 (32- and 64-bit).")
logger.error("-" * 80)
else:
return func(*args, **kwargs)


def log_unsupported_runtime_error():
Expand Down
146 changes: 82 additions & 64 deletions capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,78 @@ def get_workspace(path: Path, format_: str, sigpaths: List[Path]):
return vw


def check_unsupported_raise_exception(path: Path, os_: str):
if not is_supported_format(path):
raise UnsupportedFormatError()

if not is_supported_arch(path):
raise UnsupportedArchError()

if os_ == OS_AUTO and not is_supported_os(path):
raise UnsupportedOSError()


def add_binja_to_path():
from capa.features.extractors.binja.find_binja_api import find_binja_path

bn_api = find_binja_path()
if bn_api.exists():
sys.path.append(str(bn_api))


def attempt_binja_import():
# When we are running as a standalone executable, we cannot directly import binaryninja
# We need to fist find the binja API installation path and add it into sys.path
if is_running_standalone():
add_binja_to_path()

try:
import binaryninja
from binaryninja import BinaryView
except ImportError:
raise RuntimeError(
"Cannot import binaryninja module. Please install the Binary Ninja Python API first: "
+ "https://docs.binary.ninja/dev/batch.html#install-the-api)."
)


def handle_binja_backend(path: Path, disable_progress: bool) -> FeatureExtractor:
import capa.features.extractors.binja.extractor

attempt_binja_import()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can also stay in here and likely resolves the ruff error?


with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
bv: BinaryView = binaryninja.load(str(path))
if bv is None:
raise RuntimeError(f"Binary Ninja cannot open file {path}")

return capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv)


def attempt_save_workspace(vw):
try:
vw.saveWorkspace()
except IOError:
# see #168 for discussion around how to handle non-writable directories
logger.info("source directory is not writable, won't save intermediate workspace")


def handle_viv_backend(path: Path, format_: str, sigpaths: List[Path], should_save_workspace: bool, \
os_: str, disable_progress: bool) -> FeatureExtractor:
import capa.features.extractors.viv.extractor

with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
vw = get_workspace(path, format_, sigpaths)

if should_save_workspace:
logger.debug("saving workspace")
attempt_save_workspace(vw)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also simple enough to just leave here

else:
logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace")

return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os_)


def get_extractor(
path: Path,
format_: str,
Expand All @@ -533,74 +605,25 @@ def get_extractor(
UnsupportedOSError
"""
if format_ not in (FORMAT_SC32, FORMAT_SC64):
if not is_supported_format(path):
raise UnsupportedFormatError()

if not is_supported_arch(path):
raise UnsupportedArchError()

if os_ == OS_AUTO and not is_supported_os(path):
raise UnsupportedOSError()
check_unsupported_raise_exception(path, os_)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose to just leave the code verbatim here instead of in a new function. Or do you see much benefit added by the function?


if format_ == FORMAT_DOTNET:
import capa.features.extractors.dnfile.extractor

return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path)

elif backend == BACKEND_BINJA:
from capa.features.extractors.binja.find_binja_api import find_binja_path

# When we are running as a standalone executable, we cannot directly import binaryninja
# We need to fist find the binja API installation path and add it into sys.path
if is_running_standalone():
bn_api = find_binja_path()
if bn_api.exists():
sys.path.append(str(bn_api))

try:
import binaryninja
from binaryninja import BinaryView
except ImportError:
raise RuntimeError(
"Cannot import binaryninja module. Please install the Binary Ninja Python API first: "
+ "https://docs.binary.ninja/dev/batch.html#install-the-api)."
)

import capa.features.extractors.binja.extractor

with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
bv: BinaryView = binaryninja.load(str(path))
if bv is None:
raise RuntimeError(f"Binary Ninja cannot open file {path}")

return capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv)
return handle_binja_backend(path, disable_progress)

elif backend == BACKEND_PEFILE:
import capa.features.extractors.pefile

return capa.features.extractors.pefile.PefileFeatureExtractor(path)

elif backend == BACKEND_VIV:
import capa.features.extractors.viv.extractor

with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
vw = get_workspace(path, format_, sigpaths)

if should_save_workspace:
logger.debug("saving workspace")
try:
vw.saveWorkspace()
except IOError:
# see #168 for discussion around how to handle non-writable directories
logger.info("source directory is not writable, won't save intermediate workspace")
else:
logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace")

return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os_)
return handle_viv_backend(path, format, sigpaths, should_save_workspace, os_, disable_progress)

else:
raise ValueError("unexpected backend: " + backend)


def get_file_extractors(sample: Path, format_: str) -> List[FeatureExtractor]:
file_extractors: List[FeatureExtractor] = []
Expand Down Expand Up @@ -1257,8 +1280,11 @@ def main(argv: Optional[List[str]] = None):

should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None)

try:
extractor = get_extractor(

# Perform error checking
# Return if unsupported hardware or software
extractor = exceptUnsupportedError(
get_extractor(
args.sample,
format_,
args.os,
Expand All @@ -1267,15 +1293,7 @@ def main(argv: Optional[List[str]] = None):
should_save_workspace,
disable_progress=args.quiet or args.debug,
)
except UnsupportedFormatError:
log_unsupported_format_error()
return E_INVALID_FILE_TYPE
except UnsupportedArchError:
log_unsupported_arch_error()
return E_INVALID_FILE_ARCH
except UnsupportedOSError:
log_unsupported_os_error()
return E_INVALID_FILE_OS
)

meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor)

Expand Down
Loading