Skip to content

Commit

Permalink
Added tests for analyzer; bug fix (#2083)
Browse files Browse the repository at this point in the history
* Added more tests for the analyzer

- fixed a couple of bugs

* Added more tests and fixed a couple more bugs

- typing hints now compatible with python 3.8
- in `in_protected_path` we were not correctly checking for files in a protected directory
- in `handle_interop` we now check ANALYSIS_TIMED_OUT
- in `inject_process` we now update LASTINJECT_TIME
- Updated the tests to match the fixed behaviour

* Ran `poetry lock --no-update` as prompted by github

* Added tests; reduced the scope of patch()
  • Loading branch information
rkoumis authored Apr 25, 2024
1 parent cc4d767 commit 6e70d44
Show file tree
Hide file tree
Showing 5 changed files with 573 additions and 64 deletions.
79 changes: 51 additions & 28 deletions analyzer/windows/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pathlib import Path
from shutil import copy
from threading import Lock, Thread
from typing import Union
from urllib.parse import urlencode
from urllib.request import urlopen

Expand Down Expand Up @@ -135,28 +136,29 @@ def pids_from_image_names(suffixlist):
return retpids


def _normalized_protected_path(path: str | bytes) -> bytes:
def _normalized_protected_path(path: Union[str, bytes]) -> bytes:
if isinstance(path, str):
path = path.encode()
if os.path.isdir(path) and path[-1] != b"\\":
path += b"\\"
return path.lower()


def in_protected_path(path: str | bytes) -> bool:
def in_protected_path(path: Union[str, bytes]) -> bool:
"""Checks if a path is protected."""
if not path:
return False
path = _normalized_protected_path(path)
for protected_path in PROTECTED_PATH_LIST:
if protected_path[-1] == "\\" and path.starswith(protected_path):
# protected_path is a list of bytes. The last one in the list is protected_path[-1:]
if protected_path[-1:] == b"\\" and path.startswith(protected_path):
return True
if protected_path == path:
return True
return False


def add_protected_path(name: str | bytes):
def add_protected_path(name: Union[str, bytes]):
"""Adds a pathname to the protected list"""
PROTECTED_PATH_LIST.append(_normalized_protected_path(name))

Expand Down Expand Up @@ -216,12 +218,15 @@ def __init__(self):
self.do_run = True
self.time_counter = 0

self.command_pipe = None
self.default_dll = None
self.process_lock = Lock()
self.files_list_lock = Lock()
self.pid = os.getpid()
self.pid_check = False
self.ppid = Process(pid=self.pid).get_parent_pid()
self.files = Files()
self.options = {}
self.process_list = ProcessList()

# analysis package class
Expand All @@ -242,7 +247,6 @@ def __init__(self):
def prepare(self):
"""Prepare env for analysis."""
global MONITOR_DLL, MONITOR_DLL_64, HIDE_PIDS
# global SERVICES_PID

# Get SeDebugPrivilege for the Python process. It will be needed in
# order to perform the injections.
Expand All @@ -263,10 +267,9 @@ def prepare(self):
self.options = self.config.get_options()

# Resolve the paths first in case some other part of the code needs those (fullpath) parameters.
if "curdir" in self.options:
self.options["curdir"] = os.path.expandvars(self.options["curdir"])
if "executiondir" in self.options:
self.options["executiondir"] = os.path.expandvars(self.options["executiondir"])
for option_name in ("curdir", "executiondir"):
if option_name in self.options:
self.options[option_name] = os.path.expandvars(self.options[option_name])

# Set the default DLL to be used for this analysis.
self.default_dll = self.options.get("dll")
Expand Down Expand Up @@ -791,9 +794,10 @@ def __init__(self):
self.files_orig = {}
self.dumped = []

def is_protected_filename(self, file_name):
@classmethod
def is_protected_filename(cls, file_name):
"""Return whether or not to inject into a process with this name."""
return file_name.lower() in self.PROTECTED_NAMES
return file_name.lower() in cls.PROTECTED_NAMES

def add_pid(self, filepath, pid, verbose=True):
"""Track a process identifier for this file."""
Expand Down Expand Up @@ -930,6 +934,7 @@ def remove_pid(self, pid):

class CommandPipeHandler:
"""Pipe Handler.
This class handles the notifications received through the Pipe Server and
decides what to do with them.
"""
Expand Down Expand Up @@ -992,16 +997,22 @@ def _handle_getpids(self, data):
hidepids.update([self.analyzer.pid, self.analyzer.ppid])
return struct.pack("%dI" % len(hidepids), *hidepids)

# remove pid from process list because we received a notification
# from kernel land
def _handle_kterminate(self, data):
"""Handle terminate notification.
Remove pid from process list because we received a notification
from kernel land
"""
process_id = int(data)
if process_id and process_id in self.analyzer.process_list.pids:
self.analyzer.process_list.remove_pid(process_id)

# same than below but we don't want to inject any DLLs because
# it's a kernel analysis
def _handle_kprocess(self, data):
"""Handle process notification.
Same than below but we don't want to inject any DLLs because
it's a kernel analysis
"""
self.analyzer.process_lock.acquire()
process_id = int(data)
thread_id = None
Expand All @@ -1017,10 +1028,14 @@ def _handle_kprocess(self, data):
def _handle_kerror(self, error_msg):
log.error("Error : %s", error_msg)

# if a new driver has been loaded, we stop the analysis
def _handle_ksubvert(self, data):
for pid in self.analyzer.process_list.pids:
log.info("Process with pid %s has terminated", pid)
"""Remove pids from the list.
If a new driver has been loaded, we stop the analysis.
"""
# Use a list slice ([:]) to make a copy of the list we are changing.
for pid in self.analyzer.process_list.pids[:]:
log.info("No longer tracking process with pid %s", pid)
self.analyzer.process_list.remove_pid(pid)

def _handle_shell(self, data):
Expand All @@ -1035,7 +1050,7 @@ def _handle_shell(self, data):
KERNEL32.Sleep(2000)

def _handle_interop(self, data):
if not self.analyzer.MONITORED_DCOM:
if not self.analyzer.MONITORED_DCOM and not ANALYSIS_TIMED_OUT:
self.analyzer.MONITORED_DCOM = True
dcom_pid = pid_from_service_name("DcomLaunch")
if dcom_pid:
Expand Down Expand Up @@ -1139,10 +1154,13 @@ def _handle_bits(self, data):
servproc.close()
KERNEL32.Sleep(2000)

# Handle case of a service being started by a monitored process
# Switch the service type to own process behind its back so we
# can monitor the service more easily with less noise
def _handle_service(self, servname):
"""Start a service and monitor it.
Handle case of a service being started by a monitored process.
Switch the service type to own process behind its back so we
can monitor the service more easily with less noise.
"""
if not ANALYSIS_TIMED_OUT:
si = subprocess.STARTUPINFO()
# STARTF_USESHOWWINDOW
Expand Down Expand Up @@ -1171,9 +1189,11 @@ def _handle_resume(self, data):
# RESUME:2560,3728'
self.analyzer.LASTINJECT_TIME = timeit.default_timer()

# Handle attempted shutdowns/restarts -- flush logs for all monitored processes
# additional handling can be added later
def _handle_shutdown(self, data):
"""Handle attempted shutdowns/restarts.
Flush logs for all monitored processes. Additional handling can be added later.
"""
log.info("Received shutdown request")
self.analyzer.process_lock.acquire()
for process_id in self.analyzer.process_list.pids:
Expand All @@ -1182,12 +1202,14 @@ def _handle_shutdown(self, data):
if event_handle:
KERNEL32.SetEvent(event_handle)
KERNEL32.CloseHandle(event_handle)
self.files.dump_files()
self.analyzer.files.dump_files()
self.analyzer.process_lock.release()

# Handle case of malware terminating a process -- notify the target
# ahead of time so that it can flush its log buffer
def _handle_kill(self, data):
"""Handle case of malware terminating a process.
Notify the target ahead of time so that it can flush its log buffer.
"""
self.analyzer.process_lock.acquire()

process_id = int(data)
Expand Down Expand Up @@ -1249,6 +1271,7 @@ def _inject_process(self, process_id, thread_id, mode):
self.analyzer.process_lock.release()

proc.inject(interest=filepath, nosleepskip=True)
self.analyzer.LASTINJECT_TIME = timeit.default_timer()
log.info("Injected into process with pid %s and name %s", proc.pid, filename)

def _handle_process(self, data):
Expand Down Expand Up @@ -1302,7 +1325,7 @@ def _handle_process(self, data):
# We want to prevent multiple injection attempts if one is already underway
if not in_protected_path(filename):
_ = proc.inject(interest)
self.LASTINJECT_TIME = timeit.default_timer()
self.analyzer.LASTINJECT_TIME = timeit.default_timer()
self.analyzer.NUM_INJECTED += 1
proc.close()
else:
Expand Down
Loading

0 comments on commit 6e70d44

Please sign in to comment.