-
Notifications
You must be signed in to change notification settings - Fork 568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactoring '''get_extractor''' in capa/main.py #1842
Changes from 8 commits
d061e0c
d46fa26
222cd6c
d649897
0aab720
50b4b06
a9ead12
bc616d0
329ac2d
4162c90
3533550
c8a2003
3d78316
932b36e
b755227
98d15fd
d3aead9
4247a94
faa4c0c
f96a5ff
10d8a20
4acd8cd
88d725f
f537838
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -126,33 +126,46 @@ def new_print(*args, **kwargs): | |
inspect.builtins.print = old_print # type: ignore | ||
|
||
|
||
def log_unsupported_format_error(): | ||
logger.error("-" * 80) | ||
logger.error(" Input file does not appear to be a PE or ELF file.") | ||
logger.error(" ") | ||
logger.error( | ||
" capa currently only supports analyzing PE and ELF files (or shellcode, when using --format sc32|sc64)." | ||
) | ||
logger.error(" If you don't know the input file type, you can try using the `file` utility to guess it.") | ||
logger.error("-" * 80) | ||
def exceptUnsupportedError(func): | ||
e_list, return_values = [(UnsupportedFormatError E_INVALID_FILE_TYPE), | ||
(UnsupportedArchError, E_INVALID_FILE_ARCH), | ||
(UnsupportedOSError, E_INVALID_FILE_OS)] | ||
|
||
messsage_list = [ # UnsupportedFormatError | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can these go into the same list of tuples? |
||
(" Input file does not appear to be a PE or ELF file.", | ||
" capa currently only supports analyzing PE and ELF files (or shellcode, when using --format sc32|sc64).", | ||
" If you don't know the input file type, you can try using the `file` utility to guess it."), | ||
|
||
# UnsupportedArchError | ||
(" Input file does not appear to target a supported architecture.", | ||
" capa currently only supports analyzing x86 (32- and 64-bit)."), | ||
|
||
# UnsupportedOSError | ||
(" Input file does not appear to target a supported OS.", | ||
" capa currently only supports analyzing executables for some operating systems (including Windows and Linux).") | ||
] | ||
|
||
def logging_wrapper(exception): | ||
assert(exception in e_list) | ||
e_messages = message_list[e_list.index(exception)] | ||
e_return_value = return_values[e_list.index(exception)] | ||
|
||
logger.error("-" * 80) | ||
logger.error(f"{e_messages[0]}") | ||
logger.error(" ") | ||
|
||
for i in e_messages[1:]: | ||
logger.error(i) | ||
|
||
logger.error("-" * 80) | ||
|
||
return e_return_value | ||
|
||
if type(func(*args, **kwargs)) = ValueError: | ||
return logging_wrapper(func(*args, **kwargs)) | ||
|
||
|
||
def log_unsupported_os_error(): | ||
logger.error("-" * 80) | ||
logger.error(" Input file does not appear to target a supported OS.") | ||
logger.error(" ") | ||
logger.error( | ||
" capa currently only supports analyzing executables for some operating systems (including Windows and Linux)." | ||
) | ||
logger.error("-" * 80) | ||
|
||
|
||
def log_unsupported_arch_error(): | ||
logger.error("-" * 80) | ||
logger.error(" Input file does not appear to target a supported architecture.") | ||
logger.error(" ") | ||
logger.error(" capa currently only supports analyzing x86 (32- and 64-bit).") | ||
logger.error("-" * 80) | ||
else: | ||
return func(*args, **kwargs) | ||
|
||
|
||
def log_unsupported_runtime_error(): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -517,6 +517,78 @@ def get_workspace(path: Path, format_: str, sigpaths: List[Path]): | |
return vw | ||
|
||
|
||
def check_unsupported_raise_exception(path: Path, os_: str): | ||
if not is_supported_format(path): | ||
raise UnsupportedFormatError() | ||
|
||
if not is_supported_arch(path): | ||
raise UnsupportedArchError() | ||
|
||
if os_ == OS_AUTO and not is_supported_os(path): | ||
raise UnsupportedOSError() | ||
|
||
|
||
def add_binja_to_path(): | ||
from capa.features.extractors.binja.find_binja_api import find_binja_path | ||
|
||
bn_api = find_binja_path() | ||
if bn_api.exists(): | ||
sys.path.append(str(bn_api)) | ||
|
||
|
||
def attempt_binja_import(): | ||
# When we are running as a standalone executable, we cannot directly import binaryninja | ||
# We need to fist find the binja API installation path and add it into sys.path | ||
if is_running_standalone(): | ||
add_binja_to_path() | ||
|
||
try: | ||
import binaryninja | ||
from binaryninja import BinaryView | ||
except ImportError: | ||
raise RuntimeError( | ||
"Cannot import binaryninja module. Please install the Binary Ninja Python API first: " | ||
+ "https://docs.binary.ninja/dev/batch.html#install-the-api)." | ||
) | ||
|
||
|
||
def handle_binja_backend(path: Path, disable_progress: bool) -> FeatureExtractor: | ||
import capa.features.extractors.binja.extractor | ||
|
||
attempt_binja_import() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this can also stay in here and likely resolves the ruff error? |
||
|
||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): | ||
bv: BinaryView = binaryninja.load(str(path)) | ||
if bv is None: | ||
raise RuntimeError(f"Binary Ninja cannot open file {path}") | ||
|
||
return capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv) | ||
|
||
|
||
def attempt_save_workspace(vw): | ||
try: | ||
vw.saveWorkspace() | ||
except IOError: | ||
# see #168 for discussion around how to handle non-writable directories | ||
logger.info("source directory is not writable, won't save intermediate workspace") | ||
|
||
|
||
def handle_viv_backend(path: Path, format_: str, sigpaths: List[Path], should_save_workspace: bool, \ | ||
os_: str, disable_progress: bool) -> FeatureExtractor: | ||
import capa.features.extractors.viv.extractor | ||
|
||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): | ||
vw = get_workspace(path, format_, sigpaths) | ||
|
||
if should_save_workspace: | ||
logger.debug("saving workspace") | ||
attempt_save_workspace(vw) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is also simple enough to just leave here |
||
else: | ||
logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace") | ||
|
||
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os_) | ||
|
||
|
||
def get_extractor( | ||
path: Path, | ||
format_: str, | ||
|
@@ -533,74 +605,25 @@ def get_extractor( | |
UnsupportedOSError | ||
""" | ||
if format_ not in (FORMAT_SC32, FORMAT_SC64): | ||
if not is_supported_format(path): | ||
raise UnsupportedFormatError() | ||
|
||
if not is_supported_arch(path): | ||
raise UnsupportedArchError() | ||
|
||
if os_ == OS_AUTO and not is_supported_os(path): | ||
raise UnsupportedOSError() | ||
check_unsupported_raise_exception(path, os_) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd propose to just leave the code verbatim here instead of in a new function. Or do you see much benefit added by the function? |
||
|
||
if format_ == FORMAT_DOTNET: | ||
import capa.features.extractors.dnfile.extractor | ||
|
||
return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path) | ||
|
||
elif backend == BACKEND_BINJA: | ||
from capa.features.extractors.binja.find_binja_api import find_binja_path | ||
|
||
# When we are running as a standalone executable, we cannot directly import binaryninja | ||
# We need to fist find the binja API installation path and add it into sys.path | ||
if is_running_standalone(): | ||
bn_api = find_binja_path() | ||
if bn_api.exists(): | ||
sys.path.append(str(bn_api)) | ||
|
||
try: | ||
import binaryninja | ||
from binaryninja import BinaryView | ||
except ImportError: | ||
raise RuntimeError( | ||
"Cannot import binaryninja module. Please install the Binary Ninja Python API first: " | ||
+ "https://docs.binary.ninja/dev/batch.html#install-the-api)." | ||
) | ||
|
||
import capa.features.extractors.binja.extractor | ||
|
||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): | ||
bv: BinaryView = binaryninja.load(str(path)) | ||
if bv is None: | ||
raise RuntimeError(f"Binary Ninja cannot open file {path}") | ||
|
||
return capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv) | ||
return handle_binja_backend(path, disable_progress) | ||
|
||
elif backend == BACKEND_PEFILE: | ||
import capa.features.extractors.pefile | ||
|
||
return capa.features.extractors.pefile.PefileFeatureExtractor(path) | ||
|
||
elif backend == BACKEND_VIV: | ||
import capa.features.extractors.viv.extractor | ||
|
||
with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): | ||
vw = get_workspace(path, format_, sigpaths) | ||
|
||
if should_save_workspace: | ||
logger.debug("saving workspace") | ||
try: | ||
vw.saveWorkspace() | ||
except IOError: | ||
# see #168 for discussion around how to handle non-writable directories | ||
logger.info("source directory is not writable, won't save intermediate workspace") | ||
else: | ||
logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace") | ||
|
||
return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os_) | ||
return handle_viv_backend(path, format, sigpaths, should_save_workspace, os_, disable_progress) | ||
|
||
else: | ||
raise ValueError("unexpected backend: " + backend) | ||
|
||
|
||
def get_file_extractors(sample: Path, format_: str) -> List[FeatureExtractor]: | ||
file_extractors: List[FeatureExtractor] = [] | ||
|
@@ -1257,8 +1280,11 @@ def main(argv: Optional[List[str]] = None): | |
|
||
should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) | ||
|
||
try: | ||
extractor = get_extractor( | ||
|
||
# Perform error checking | ||
# Return if unsupported hardware or software | ||
extractor = exceptUnsupportedError( | ||
get_extractor( | ||
args.sample, | ||
format_, | ||
args.os, | ||
|
@@ -1267,15 +1293,7 @@ def main(argv: Optional[List[str]] = None): | |
should_save_workspace, | ||
disable_progress=args.quiet or args.debug, | ||
) | ||
except UnsupportedFormatError: | ||
log_unsupported_format_error() | ||
return E_INVALID_FILE_TYPE | ||
except UnsupportedArchError: | ||
log_unsupported_arch_error() | ||
return E_INVALID_FILE_ARCH | ||
except UnsupportedOSError: | ||
log_unsupported_os_error() | ||
return E_INVALID_FILE_OS | ||
) | ||
|
||
meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use snake case names and maybe rename this to
catch_log_return_errors
or similar?then let's use via
@<decorator_name>
(see, e.g., https://rinaarts.com/declutter-python-code-with-error-handling-decorators/)