Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring '''get_extractor''' in capa/main.py #1842

Closed
wants to merge 24 commits into from
Closed
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 84 additions & 57 deletions capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,84 @@ def get_workspace(path: Path, format_: str, sigpaths: List[Path]):
return vw


def check_supported_format(path: Path, os_: str):
if not is_supported_format(path):
raise UnsupportedFormatError()

if not is_supported_arch(path):
raise UnsupportedArchError()

if os_ == OS_AUTO and not is_supported_os(path):
raise UnsupportedOSError()


def add_binja_to_path():
from capa.features.extractors.binja.find_binja_api import find_binja_path

bn_api = find_binja_path()
if bn_api.exists():
sys.path.append(str(bn_api))


def import_binja():
# When we are running as a standalone executable, we cannot directly import binaryninja
# We need to fist find the binja API installation path and add it into sys.path
if is_running_standalone():
add_binja_to_path()

try:
import binaryninja
from binaryninja import BinaryView
except ImportError:
raise RuntimeError(
"Cannot import binaryninja module. Please install the Binary Ninja Python API first: "
+ "https://docs.binary.ninja/dev/batch.html#install-the-api)."
)


def handle_binja_backend(path: Path, disable_progress: bool) -> FeatureExtractor:
import capa.features.extractors.binja.extractor

import_binja()

with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
bv: BinaryView = binaryninja.load(str(path))
if bv is None:
raise RuntimeError(f"Binary Ninja cannot open file {path}")

return capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv)


def handle_viv_backend(path: Path, format_: str, sigpaths: List[Path], should_save_workspace: bool, \
os_: str, disable_progress: bool) -> FeatureExtractor:
import capa.features.extractors.viv.extractor

with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
vw = get_workspace(path, format_, sigpaths)

if should_save_workspace:
logger.debug("saving workspace")
try:
vw.saveWorkspace()
except IOError:
# see #168 for discussion around how to handle non-writable directories
logger.info("source directory is not writable, won't save intermediate workspace")
else:
logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace")

return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os_)


def handle_pefile_backend(path: Path) -> FeatureExtractor:
import capa.features.extractors.pefile
return capa.features.extractors.pefile.PefileFeatureExtractor(path)


def handle_dotnet_format(path: Path) -> FeatureExtractor:
import capa.features.extractors.dnfile.extractor
return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path)


def get_extractor(
path: Path,
format_: str,
Expand All @@ -533,74 +611,23 @@ def get_extractor(
UnsupportedOSError
"""
if format_ not in (FORMAT_SC32, FORMAT_SC64):
if not is_supported_format(path):
raise UnsupportedFormatError()

if not is_supported_arch(path):
raise UnsupportedArchError()

if os_ == OS_AUTO and not is_supported_os(path):
raise UnsupportedOSError()
check_supported_format(path, os_)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reconsidering, I don't feel like these additional functions necessarily make the code easier to read

especially here where it's not obvious that this may raise an exception...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not something I thought about before, but this is helpful to consider. Thank you!


if format_ == FORMAT_DOTNET:
import capa.features.extractors.dnfile.extractor

return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path)
return handle_dotnet_format(format_)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... or here where only a few lines are wrapped

So, I'd vote to leave this function as is.


elif backend == BACKEND_BINJA:
from capa.features.extractors.binja.find_binja_api import find_binja_path

# When we are running as a standalone executable, we cannot directly import binaryninja
# We need to fist find the binja API installation path and add it into sys.path
if is_running_standalone():
bn_api = find_binja_path()
if bn_api.exists():
sys.path.append(str(bn_api))

try:
import binaryninja
from binaryninja import BinaryView
except ImportError:
raise RuntimeError(
"Cannot import binaryninja module. Please install the Binary Ninja Python API first: "
+ "https://docs.binary.ninja/dev/batch.html#install-the-api)."
)

import capa.features.extractors.binja.extractor

with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
bv: BinaryView = binaryninja.load(str(path))
if bv is None:
raise RuntimeError(f"Binary Ninja cannot open file {path}")

return capa.features.extractors.binja.extractor.BinjaFeatureExtractor(bv)
return handle_binja_backend(path, disable_progress)

elif backend == BACKEND_PEFILE:
import capa.features.extractors.pefile

return capa.features.extractors.pefile.PefileFeatureExtractor(path)
return handle_pefile_backend(path)

elif backend == BACKEND_VIV:
import capa.features.extractors.viv.extractor

with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress):
vw = get_workspace(path, format_, sigpaths)

if should_save_workspace:
logger.debug("saving workspace")
try:
vw.saveWorkspace()
except IOError:
# see #168 for discussion around how to handle non-writable directories
logger.info("source directory is not writable, won't save intermediate workspace")
else:
logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace")

return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os_)
return handle_viv_backend(path, format, sigpaths, should_save_workspace, os_, disable_progress)

else:
raise ValueError("unexpected backend: " + backend)


def get_file_extractors(sample: Path, format_: str) -> List[FeatureExtractor]:
file_extractors: List[FeatureExtractor] = []
Expand Down
Loading