Skip to content

Commit

Permalink
Merge pull request #33 from CybercentreCanada/update/sig-fp
Browse files Browse the repository at this point in the history
Adding type check for CAPE garbage files [dev]
  • Loading branch information
cccs-kevin authored Jul 26, 2022
2 parents d81d21c + 2cf7ebf commit abdc01c
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 49 deletions.
19 changes: 17 additions & 2 deletions cape/cape_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from assemblyline_v4_service.common.tag_helper import add_tag

from assemblyline.common.str_utils import safe_str
from assemblyline.common.forge import get_identify
from assemblyline.common.identify_defaults import type_to_extension, trusted_mimes, magic_patterns
from assemblyline.common.exceptions import RecoverableError, ChainException
# from assemblyline.odm.models.ontology.types.sandbox import Sandbox
Expand Down Expand Up @@ -207,6 +208,7 @@ def __init__(self, config: Optional[Dict] = None) -> None:
self.hosts: List[Dict[str, Any]] = []
self.routing = ""
self.safelist: Dict[str, Dict[str, List[str]]] = {}
self.identify = get_identify(use_cache=os.environ.get('PRIVILEGED', 'false').lower() == 'true')
# self.sandbox_ontologies: List[SandboxOntology] = None

def start(self) -> None:
Expand Down Expand Up @@ -1442,11 +1444,24 @@ def _extract_artifacts(self, zip_obj: ZipFile, task_id: int, cape_artifact_pids:
continue
destination_file_path = os.path.join(task_dir, f)
zip_obj.extract(f, path=task_dir)
file_name = None

if key in ["CAPE", "procdump"]:
pid = next((pid for sha256, pid in cape_artifact_pids.items() if sha256 in f), None)
file_name = f"{task_id}_{pid}_{f}" if pid else f"{task_id}_{f}"
else:
if pid:
file_name = f"{task_id}_{pid}_{f}"
# The majority of files extracted by CAPE are junk and follow a similar file type pattern
elif key in ["files/"]:
file_type_details = self.identify.fileinfo(destination_file_path)
if file_type_details["type"] == "unknown" and \
file_type_details["mime"] == "application/octet-stream" and \
"SQLite Rollback Journal" in file_type_details["magic"]:
self.log.debug(
f"We are not extracting {destination_file_path} for task {task_id} "
"because we suspect it is garbage.")
continue

if not file_name:
file_name = f"{task_id}_{f}"

if key in ["shots"]:
Expand Down
109 changes: 62 additions & 47 deletions tests/test_cape_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ def namelist(self):
"shots/0001_small.jpg",
"shots/0001.jpg",
"network/blahblah",
"CAPE/ohmy.exe"
"CAPE/ohmy.exe",
"files/yaba.exe",
"dump.pcap",
"sum.pcap",
]

def extract(self, output, path=None):
Expand All @@ -149,6 +152,19 @@ def getnames(self):

def close(self):
pass

def get_artifacts(self):
return [
"shots/0005.jpg",
"shots/0010.jpg",
"shots/0001_small.jpg",
"shots/0001.jpg",
"network/blahblah",
"CAPE/ohmy.exe",
"files/yaba.exe",
"dump.pcap",
"sum.pcap",
]
yield DummyZip


Expand Down Expand Up @@ -1606,66 +1622,65 @@ def test_extract_injected_exes(cape_class_instance, dummy_request_class, mocker)
assert cape_class_instance.artifact_list[0]["to_be_extracted"]

@staticmethod
def test_extract_artifacts(cape_class_instance, dummy_request_class, dummy_zip_class, dummy_zip_member_class):
def test_extract_artifacts(cape_class_instance, dummy_request_class, dummy_zip_class, dummy_zip_member_class, mocker):
from assemblyline_v4_service.common.dynamic_service_helper import SandboxOntology
from assemblyline_v4_service.common.result import ResultSection, ResultImageSection
default_so = SandboxOntology()
zip_file_map = {
"shots": "Screenshot captured during analysis",
"dump.pcap": "TCPDUMP captured during analysis",
"evtx/evtx.zip": "EVTX generated during analysis",
"files/": "File extracted during analysis",
"sum.pcap": "TCPDUMP captured during analysis",
"CAPE": "Memory Dump",
"procdump": "Memory Dump",
"macros": "Macros found during analysis",
}

parent_section = ResultSection("blah")
correct_artifact_list = []
zip_obj = dummy_zip_class()
[zip_obj.members.append(dummy_zip_member_class(f, 1)) for f in zip_obj.get_artifacts()]
mocker.patch.object(cape_class_instance.identify, "fileinfo", return_value={"type": "unknown", "mime": "application/octet-stream", "magic": "SQLite Rollback Journal"})
task_id = 1
cape_class_instance.artifact_list = []
cape_class_instance.request = dummy_request_class()
cape_class_instance.request.deep_scan = True
cape_class_instance.config["extract_cape_dumps"] = True
correct_image_section = ResultImageSection(
dummy_request_class,
f"Screenshots taken during Task {task_id}",
)
names = zip_obj.namelist()
names.sort()
for f in names:
key = next((k for k in zip_file_map if k in f), None)
if not key:
continue
val = zip_file_map[key]
correct_path = f"{cape_class_instance.working_directory}/{task_id}/{f}"
dummy_zip_member = dummy_zip_member_class(f, 1)
zip_obj.members.append(dummy_zip_member)
if key in ["shots"]:
if "_small" not in f:
correct_image_section.add_image(correct_path, f"{task_id}_{f}", val)
continue
if key in ["supplementary"]:
correct_artifact_list.append({"path": correct_path, "name": f"{task_id}_{f}",
"description": val, "to_be_extracted": False})
else:
correct_artifact_list.append({"path": correct_path, "name": f"{task_id}_{f}",
"description": val, "to_be_extracted": True})

cape_class_instance.request = dummy_request_class()
cape_class_instance._extract_artifacts(zip_obj, task_id, {}, parent_section, default_so)

all_extracted = True
for extracted in cape_class_instance.artifact_list:
if extracted not in correct_artifact_list:
all_extracted = False
break
assert all_extracted

all_supplementary = True
for supplementary in cape_class_instance.artifact_list:
if supplementary not in correct_artifact_list:
all_supplementary = False
correct_image_section.add_image(
f"{cape_class_instance.working_directory}/{task_id}/shots/0001.jpg", f"{task_id}_shots/0001.jpg", "Screenshot captured during analysis"
)
correct_image_section.add_image(
f"{cape_class_instance.working_directory}/{task_id}/shots/0005.jpg", f"{task_id}_shots/0005.jpg", "Screenshot captured during analysis"
)
correct_image_section.add_image(
f"{cape_class_instance.working_directory}/{task_id}/shots/0010.jpg", f"{task_id}_shots/0010.jpg", "Screenshot captured during analysis"
)
correct_artifact_list.append({
"path": f"{cape_class_instance.working_directory}/{task_id}/CAPE/ohmy.exe",
"name": f"{task_id}_3_CAPE/ohmy.exe",
"description": "Memory Dump",
"to_be_extracted": True
})
correct_artifact_list.append({
"path": f"{cape_class_instance.working_directory}/{task_id}/sum.pcap",
"name": f"{task_id}_sum.pcap",
"description": "TCPDUMP captured during analysis",
"to_be_extracted": True
})
correct_artifact_list.append({
"path": f"{cape_class_instance.working_directory}/{task_id}/dump.pcap",
"name": f"{task_id}_dump.pcap",
"description": "TCPDUMP captured during analysis",
"to_be_extracted": True
})

cape_artifact_pids = {"ohmy.exe": 3}
cape_class_instance._extract_artifacts(zip_obj, task_id, cape_artifact_pids, parent_section, default_so)

all_files = True
assert len(cape_class_instance.artifact_list) == len(correct_artifact_list)
for f in cape_class_instance.artifact_list:
if f not in correct_artifact_list:
print(f"Missing {f}")
all_files = False
break
assert all_supplementary
assert all_files

assert check_section_equality(parent_section.subsections[0], correct_image_section)

Expand Down

0 comments on commit abdc01c

Please sign in to comment.