diff --git a/analyzer/windows/tests/test_analyzer.py b/analyzer/windows/tests/test_analyzer.py index 54ad5b55a3a..2102dae627c 100644 --- a/analyzer/windows/tests/test_analyzer.py +++ b/analyzer/windows/tests/test_analyzer.py @@ -24,6 +24,9 @@ def setUp(self): self.addCleanup(patch_call.stop) self.analyzer = Analyzer() self.cph = CommandPipeHandler(self.analyzer) + # Since the CommandPipeHandler.ignore_list is a class variable, + # we reset it between tests, so we get a fresh start. + self.cph.ignore_list = dict(pid=[]) def test_can_instantiate(self): self.assertIsInstance(self.analyzer, Analyzer) @@ -94,7 +97,6 @@ def test_handle_interop(self, mock_process, mock_pid_from_service_name): random_pid = random.randint(1, 99999999) mock_pid_from_service_name.return_value = random_pid ana = self.analyzer - # instead of mocking Process mock Process.get_filepath and Process.open maybe self.assertEqual(0, len(ana.CRITICAL_PROCESS_LIST)) self.assertFalse(ana.MONITORED_DCOM) self.assertIsNone(ana.LASTINJECT_TIME) @@ -119,6 +121,27 @@ def test_handle_interop_already(self, mock_process): mock_process.assert_not_called() self.call.assert_not_called() + @patch("analyzer.pid_from_service_name") + @patch("analyzer.Process") + def test_handle_interop_timed_out(self, mock_process, mock_pid_from_service_name): + """Even if ANALYSIS_TIMED_OUT, we still handle interop""" + # XXX Is this what we want? + with patch("analyzer.ANALYSIS_TIMED_OUT", True): + mock_process.return_value = MagicMock() + random_pid = random.randint(1, 99999999) + mock_pid_from_service_name.return_value = random_pid + ana = self.analyzer + self.assertEqual(0, len(ana.CRITICAL_PROCESS_LIST)) + self.assertFalse(ana.MONITORED_DCOM) + self.assertIsNone(ana.LASTINJECT_TIME) + self.cph._handle_interop(None) + self.assertEqual(1, len(ana.CRITICAL_PROCESS_LIST)) + self.assertTrue(ana.MONITORED_DCOM) + self.assertIsNotNone(ana.LASTINJECT_TIME) + self.assertIn(random_pid, ana.CRITICAL_PROCESS_LIST) + mock_pid_from_service_name.assert_called_once() + self.call.assert_not_called() + @patch("analyzer.pid_from_service_name") def test_handle_wmi(self, mock_pid_from_service_name): random_pid1 = random.randint(1, 99999999) @@ -259,6 +282,19 @@ def test_handle_service(self, mock_process): mock_process.assert_called_once() self.call.assert_called_once() + @patch("analyzer.Process") + def test_handle_service_already(self, mock_process): + ana = self.analyzer + self.assertIsNone(ana.LASTINJECT_TIME) + ana.MONITORED_SERVICES = True + self.cph._handle_service(servname=b"any-name-can-go-here") + # Should be no change to process list or last inject time + self.assertEqual(0, len(ana.CRITICAL_PROCESS_LIST)) + self.assertIsNone(ana.LASTINJECT_TIME) + mock_process.assert_not_called() + # It still wil call "sc config" + self.call.assert_called_once() + @patch("analyzer.Process") def test_handle_service_timed_out(self, mock_process): with patch("analyzer.ANALYSIS_TIMED_OUT", True): @@ -271,3 +307,69 @@ def test_handle_service_timed_out(self, mock_process): self.assertIsNone(ana.LASTINJECT_TIME) mock_process.assert_not_called() self.call.assert_not_called() + + @patch("analyzer.Process") + def test_inject_process(self, mock_process): + random_pid = random.randint(1, 99999999) + ana = self.analyzer + self.assertEqual(0, len(ana.process_list.pids)) + self.assertEqual(0, len(self.cph.ignore_list["pid"])) + self.cph._inject_process(process_id=random_pid, thread_id=None, mode=None) + self.assertEqual(1, len(ana.process_list.pids)) + self.assertIn(random_pid, ana.process_list.pids) + # XXX Calling _inject_process does nothing to LASTINJECT_TIME ? + self.assertIsNone(ana.LASTINJECT_TIME) + mock_process.assert_called_once() + self.call.assert_not_called() + + @patch("analyzer.Process") + def test_inject_process_self(self, mock_process): + """If _inject_process is called with the pid of the analyzer, do nothing.""" + random_pid = random.randint(1, 99999999) + ana = self.analyzer + ana.pid = random_pid + self.assertEqual(0, len(self.cph.ignore_list["pid"])) + self.cph._inject_process(process_id=random_pid, thread_id=None, mode=None) + self.assertEqual(1, len(self.cph.ignore_list["pid"])) + self.assertIn(random_pid, self.cph.ignore_list["pid"]) + # Should be no change to last inject time + self.assertIsNone(ana.LASTINJECT_TIME) + mock_process.assert_not_called() + self.call.assert_not_called() + + @patch("analyzer.Process") + def test_inject_process_already(self, mock_process): + """If _inject_process is called with a pid we are already monitoring, do nothing.""" + random_pid = random.randint(1, 99999999) + ana = self.analyzer + ana.process_list.pids.append(random_pid) + self.assertEqual(0, len(self.cph.ignore_list["pid"])) + self.cph._inject_process(process_id=random_pid, thread_id=None, mode=None) + self.assertEqual(1, len(self.cph.ignore_list["pid"])) + self.assertIn(random_pid, self.cph.ignore_list["pid"]) + # Should be no change to last inject time + self.assertIsNone(ana.LASTINJECT_TIME) + mock_process.assert_not_called() + self.call.assert_not_called() + + @patch("analyzer.Process") + def test_inject_process_already_notrack(self, mock_process): + """If _inject_process is called with a pid on the notrack list, move it to the track list. + + Do nothing else. + """ + random_pid = random.randint(1, 99999999) + ana = self.analyzer + ana.process_list.pids_notrack.append(random_pid) + self.assertEqual(0, len(self.cph.ignore_list["pid"])) + self.assertEqual(0, len(ana.process_list.pids)) + self.cph._inject_process(process_id=random_pid, thread_id=None, mode=None) + self.assertEqual(1, len(self.cph.ignore_list["pid"])) + self.assertIn(random_pid, self.cph.ignore_list["pid"]) + self.assertIn(random_pid, ana.process_list.pids) + self.assertEqual(0, len(ana.process_list.pids_notrack)) + self.assertEqual(1, len(ana.process_list.pids)) + # Should be no change to last inject time + self.assertIsNone(ana.LASTINJECT_TIME) + mock_process.assert_not_called() + self.call.assert_not_called()