Skip to content

Commit

Permalink
Added more tests, and a couple of XXX comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
rkoumis committed Apr 23, 2024
1 parent 373c978 commit b62f431
Showing 1 changed file with 103 additions and 1 deletion.
104 changes: 103 additions & 1 deletion analyzer/windows/tests/test_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ def setUp(self):
self.addCleanup(patch_call.stop)
self.analyzer = Analyzer()
self.cph = CommandPipeHandler(self.analyzer)
# Since the CommandPipeHandler.ignore_list is a class variable,
# we reset it between tests, so we get a fresh start.
self.cph.ignore_list = dict(pid=[])

def test_can_instantiate(self):
self.assertIsInstance(self.analyzer, Analyzer)
Expand Down Expand Up @@ -94,7 +97,6 @@ def test_handle_interop(self, mock_process, mock_pid_from_service_name):
random_pid = random.randint(1, 99999999)
mock_pid_from_service_name.return_value = random_pid
ana = self.analyzer
# instead of mocking Process mock Process.get_filepath and Process.open maybe
self.assertEqual(0, len(ana.CRITICAL_PROCESS_LIST))
self.assertFalse(ana.MONITORED_DCOM)
self.assertIsNone(ana.LASTINJECT_TIME)
Expand All @@ -119,6 +121,27 @@ def test_handle_interop_already(self, mock_process):
mock_process.assert_not_called()
self.call.assert_not_called()

@patch("analyzer.pid_from_service_name")
@patch("analyzer.Process")
def test_handle_interop_timed_out(self, mock_process, mock_pid_from_service_name):
"""Even if ANALYSIS_TIMED_OUT, we still handle interop"""
# XXX Is this what we want?
with patch("analyzer.ANALYSIS_TIMED_OUT", True):
mock_process.return_value = MagicMock()
random_pid = random.randint(1, 99999999)
mock_pid_from_service_name.return_value = random_pid
ana = self.analyzer
self.assertEqual(0, len(ana.CRITICAL_PROCESS_LIST))
self.assertFalse(ana.MONITORED_DCOM)
self.assertIsNone(ana.LASTINJECT_TIME)
self.cph._handle_interop(None)
self.assertEqual(1, len(ana.CRITICAL_PROCESS_LIST))
self.assertTrue(ana.MONITORED_DCOM)
self.assertIsNotNone(ana.LASTINJECT_TIME)
self.assertIn(random_pid, ana.CRITICAL_PROCESS_LIST)
mock_pid_from_service_name.assert_called_once()
self.call.assert_not_called()

@patch("analyzer.pid_from_service_name")
def test_handle_wmi(self, mock_pid_from_service_name):
random_pid1 = random.randint(1, 99999999)
Expand Down Expand Up @@ -259,6 +282,19 @@ def test_handle_service(self, mock_process):
mock_process.assert_called_once()
self.call.assert_called_once()

@patch("analyzer.Process")
def test_handle_service_already(self, mock_process):
ana = self.analyzer
self.assertIsNone(ana.LASTINJECT_TIME)
ana.MONITORED_SERVICES = True
self.cph._handle_service(servname=b"any-name-can-go-here")
# Should be no change to process list or last inject time
self.assertEqual(0, len(ana.CRITICAL_PROCESS_LIST))
self.assertIsNone(ana.LASTINJECT_TIME)
mock_process.assert_not_called()
# It still wil call "sc config"
self.call.assert_called_once()

@patch("analyzer.Process")
def test_handle_service_timed_out(self, mock_process):
with patch("analyzer.ANALYSIS_TIMED_OUT", True):
Expand All @@ -271,3 +307,69 @@ def test_handle_service_timed_out(self, mock_process):
self.assertIsNone(ana.LASTINJECT_TIME)
mock_process.assert_not_called()
self.call.assert_not_called()

@patch("analyzer.Process")
def test_inject_process(self, mock_process):
random_pid = random.randint(1, 99999999)
ana = self.analyzer
self.assertEqual(0, len(ana.process_list.pids))
self.assertEqual(0, len(self.cph.ignore_list["pid"]))
self.cph._inject_process(process_id=random_pid, thread_id=None, mode=None)
self.assertEqual(1, len(ana.process_list.pids))
self.assertIn(random_pid, ana.process_list.pids)
# XXX Calling _inject_process does nothing to LASTINJECT_TIME ?
self.assertIsNone(ana.LASTINJECT_TIME)
mock_process.assert_called_once()
self.call.assert_not_called()

@patch("analyzer.Process")
def test_inject_process_self(self, mock_process):
"""If _inject_process is called with the pid of the analyzer, do nothing."""
random_pid = random.randint(1, 99999999)
ana = self.analyzer
ana.pid = random_pid
self.assertEqual(0, len(self.cph.ignore_list["pid"]))
self.cph._inject_process(process_id=random_pid, thread_id=None, mode=None)
self.assertEqual(1, len(self.cph.ignore_list["pid"]))
self.assertIn(random_pid, self.cph.ignore_list["pid"])
# Should be no change to last inject time
self.assertIsNone(ana.LASTINJECT_TIME)
mock_process.assert_not_called()
self.call.assert_not_called()

@patch("analyzer.Process")
def test_inject_process_already(self, mock_process):
"""If _inject_process is called with a pid we are already monitoring, do nothing."""
random_pid = random.randint(1, 99999999)
ana = self.analyzer
ana.process_list.pids.append(random_pid)
self.assertEqual(0, len(self.cph.ignore_list["pid"]))
self.cph._inject_process(process_id=random_pid, thread_id=None, mode=None)
self.assertEqual(1, len(self.cph.ignore_list["pid"]))
self.assertIn(random_pid, self.cph.ignore_list["pid"])
# Should be no change to last inject time
self.assertIsNone(ana.LASTINJECT_TIME)
mock_process.assert_not_called()
self.call.assert_not_called()

@patch("analyzer.Process")
def test_inject_process_already_notrack(self, mock_process):
"""If _inject_process is called with a pid on the notrack list, move it to the track list.
Do nothing else.
"""
random_pid = random.randint(1, 99999999)
ana = self.analyzer
ana.process_list.pids_notrack.append(random_pid)
self.assertEqual(0, len(self.cph.ignore_list["pid"]))
self.assertEqual(0, len(ana.process_list.pids))
self.cph._inject_process(process_id=random_pid, thread_id=None, mode=None)
self.assertEqual(1, len(self.cph.ignore_list["pid"]))
self.assertIn(random_pid, self.cph.ignore_list["pid"])
self.assertIn(random_pid, ana.process_list.pids)
self.assertEqual(0, len(ana.process_list.pids_notrack))
self.assertEqual(1, len(ana.process_list.pids))
# Should be no change to last inject time
self.assertIsNone(ana.LASTINJECT_TIME)
mock_process.assert_not_called()
self.call.assert_not_called()

0 comments on commit b62f431

Please sign in to comment.