Skip to content

Commit

Permalink
Merge pull request #2420 from josh-feather/improve-trend-unquarantine…
Browse files Browse the repository at this point in the history
…-performance

Improve trend unquarantine performance
  • Loading branch information
doomedraven authored Dec 3, 2024
2 parents 9b2fce3 + 1c4b4ba commit 5035457
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 6 deletions.
17 changes: 12 additions & 5 deletions lib/cuckoo/common/quarantine.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,32 +558,39 @@ def kav_unquarantine(file):


def trend_unquarantine(f):
qdata = Path(f).read_bytes()
data = bytearray_xor(bytearray(qdata), 0xFF)
# Read first 10 bytes
with open(f, "rb") as fil:
qheader = fil.read(10)
header = bytearray_xor(bytearray(qheader), 0xFF)

magic, dataoffset, numtags = struct.unpack("<IIH", data[:10])
magic, dataoffset, numtags = struct.unpack("<IIH", header[:10])
if magic != 0x58425356: # VSBX
return None

origname = "UnknownTrendFile.bin"
basekey = 0x00000000
encmethod = 0

if numtags > 15:
return None

# If file looks like a quarantine file, then read it all
qdata = Path(f).read_bytes()
data = bytearray_xor(bytearray(qdata), 0xFF)

dataoffset += 10
offset = 10
for _ in range(numtags):
code, tagdata = read_trend_tag(data, offset)
if code == 2: # original filename
origname = str(tagdata).encode("utf16").decode(error="ignore").rstrip("\0")
origname = str(tagdata).encode("utf16").decode(errors="ignore").rstrip("\0")
elif code == 6: # base key
basekey = struct.unpack("<I", tagdata)[0]
elif code == 7: # encryption method: 1 == xor FF, 2 = CRC method
encmethod = struct.unpack("<I", tagdata)[0]
"""
elif code == 1: # original pathname
origpath = str(tagdata).encode("utf16").decode(error="ignore").rstrip("\0")
origpath = str(tagdata).encode("utf16").decode(errors="ignore").rstrip("\0")
elif code == 3: # platform
platform = str(tagdata)
elif code == 4: # file attributes
Expand Down
107 changes: 106 additions & 1 deletion tests/test_quarantine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# See the file 'docs/LICENSE' for copying permission.

import os
import pathlib
import struct
import tempfile
from unittest import mock

import pytest

from lib.cuckoo.common.quarantine import mbam_unquarantine, mse_unquarantine, unquarantine
from lib.cuckoo.common.quarantine import (
bytearray_xor,
mbam_unquarantine,
mse_unquarantine,
trend_unquarantine,
unquarantine,
)

# from tcr_misc import get_sample

Expand All @@ -23,6 +32,8 @@ def _grab_sample(sample_hash):
return _grab_sample
"""

QUARANTINED_DATA = b"\xff\xee\xdd\xcc\xbb\xaa"


@pytest.fixture
def empty_file():
Expand All @@ -31,6 +42,40 @@ def empty_file():
empty_file.close()


@pytest.fixture
def temp_trend_qarantined_pe(tmp_path):
def trend_tag(code: int, tag_data: bytes) -> bytes:
return struct.pack("<BH", code, len(tag_data)) + tag_data

def write_quarantine_file(dir):
tags = b"".join([
trend_tag(1, ('C:\\' + '\0').encode('utf-16le')),
trend_tag(2, ('dangerous.exe' + '\0').encode('utf-16le')),
trend_tag(3, b"win32"),
trend_tag(4, b"\x00"),
trend_tag(5, b"\x01"),
trend_tag(6, b"\x00\x00\x00\x00"),
trend_tag(7, b"\x01\x00\x00\x00"),
])
magic = 0x58425356
offset = len(tags) + 10
numtags = 7
header = struct.pack("<IIH", magic, offset, numtags)
file_data = bytearray(header)
file_data.extend(tags)
file_data.extend(b"\x00" * 10)
file_data.extend(QUARANTINED_DATA)
file_data = bytearray_xor(file_data, 0xFF)
qfilepath = os.path.join(dir, "quarantined_file")
with open(qfilepath, "wb") as qfil:
qfil.write(file_data)
return qfilepath

qpath = write_quarantine_file(tmp_path)
yield qpath
os.unlink(qpath)


class TestUnquarantine:
# disabled for test
"""
Expand Down Expand Up @@ -100,3 +145,63 @@ def test_sep(self, grab_sample):

def test_ext_err(self, empty_file):
assert unquarantine(empty_file.name) is None


def test_trend_unquarantine_normal_file(self, temp_pe32):
"""Test only the file header (first 10 bytes) is XOR'd for non-quarantined files."""
# The expected output is None
expected = None

def bytearray_xor_wrapper(data, key):
expected_header_length = 10
actual_header_length = len(data)
# We only want to see the 10 byte header here if the file is not
# quarantined.
assert expected_header_length == actual_header_length
return data


def store_temp_file_(filedata, filename, path=None):
return expected

# Mock `store_temp_file` to give us visibility into the call.
with mock.patch("lib.cuckoo.common.quarantine.store_temp_file") as mock_store_temp_file:
mock_store_temp_file.side_effect = store_temp_file_
# Mock `bytearray_xor` to give us visibility into any calls.
with mock.patch("lib.cuckoo.common.quarantine.bytearray_xor") as bytearray_xor_mock:
bytearray_xor_mock.side_effect = bytearray_xor_wrapper
actual = trend_unquarantine(temp_pe32)
# Check there is only a single call to `bytearray_xor`. Two calls means
# it XOR'd the whole file, which is not what we want.
bytearray_xor_mock.assert_called_once()
# Check that it didn't try to save any new files.
mock_store_temp_file.assert_not_called()
# Ensure `None` response when no action was performed.
assert actual == expected


def test_trend_unquarantine_quarantined_file(self, temp_trend_qarantined_pe, tmp_path):
"""Test the whole file is XOR'd for quarantined files."""
# We expect the output to be None
expected = os.path.join(tmp_path, "unqarantined_file")

def store_temp_file_(filedata, filename, path=None):
return expected

# Mock `store_temp_file` to give us visibility into the call.
with mock.patch("lib.cuckoo.common.quarantine.store_temp_file") as mock_store_temp_file:
mock_store_temp_file.side_effect = store_temp_file_
# Mock `bytearray_xor` to give us visibility into the calls.
with mock.patch("lib.cuckoo.common.quarantine.bytearray_xor") as mock_bytearray_xor:
mock_bytearray_xor.side_effect = bytearray_xor
actual = trend_unquarantine(temp_trend_qarantined_pe)
# Check there are two calls to `bytearray_xor`. One for the header and
# one for the full file.
mock_bytearray_xor.assert_has_calls([
mock.call(mock.ANY, mock.ANY),
mock.call(mock.ANY, mock.ANY)
])
# Assert that it attempts to create a new file with unquarantined data.
#mock_store_temp_file.assert_called_once_with(QUARANTINED_DATA, mock.ANY)
# Check that `trend_unquarantine` returns the filepath of the new file.
assert actual == expected

0 comments on commit 5035457

Please sign in to comment.