diff --git a/tests/modules/reporting/test_report_doc.py b/tests/modules/reporting/test_report_doc.py new file mode 100644 index 00000000000..176b6b56637 --- /dev/null +++ b/tests/modules/reporting/test_report_doc.py @@ -0,0 +1,85 @@ +import pathlib +import random +import sys +from unittest import mock +from unittest.mock import patch + +import bson +import mongomock +import pymongo +import pytest + +from lib.cuckoo.common.config import ConfigMeta +from modules.reporting.mongodb_constants import CALLS_COLL + +TEST_DB_NAME = "cuckoo_test" + + +@pytest.fixture +def mongodb_enabled(custom_conf_path: pathlib.Path): + """Enable mongodb. + + Use sys.modules.pop to ensure target gets imported + as if for the first time. + """ + with open(custom_conf_path / "reporting.conf", "wt") as fil: + print(f"[mongodb]\nenabled = yes\ndb = {TEST_DB_NAME}", file=fil) + ConfigMeta.refresh() + sys.modules.pop("modules.reporting.report_doc", None) + yield + # Pop the module again, to reset the state for other tests. + sys.modules.pop("modules.reporting.report_doc", None) + + +@pytest.fixture +def mongodb_mock_client(request): + with mongomock.patch(servers=(("127.0.0.1", 27017),)): + client = pymongo.MongoClient(host=f"mongodb://127.0.0.1/{TEST_DB_NAME}") + request.instance.mongo_client = client + with mock.patch("dev_utils.mongodb.conn", new=client): + yield + + +@pytest.mark.usefixtures("mongodb_enabled", "db", "mongodb_mock_client") +class TestReportDoc: + def test_insert_calls_mongodb(self): + """Test the insert_calls function.""" + from modules.reporting.report_doc import insert_calls + + pid0, pid1 = random.randint(1, 1000), random.randint(1, 1000) + report = { + "behavior": { + "processes": [ + { + "process_id": pid0, + "calls": [ + { + "timestamp": "2025-01-01 01:01:01", + }, + ], + }, + { + "process_id": pid1, + "calls": [ + { + "timestamp": "2025-02-02 02:02:02", + }, + ], + }, + ] + } + } + with patch("modules.reporting.report_doc.CHUNK_CALL_SIZE", new=1): + result = insert_calls(report, mongodb=True) + assert isinstance(result, list) + assert result[0]["process_id"] == pid0 + assert result[1]["process_id"] == pid1 + expected_calls = [ + result[0]["calls"][0], + result[1]["calls"][0], + ] + collection = self.mongo_client[TEST_DB_NAME][CALLS_COLL] + for item in expected_calls: + assert isinstance(item, bson.objectid.ObjectId) + obj = collection.find_one({"_id": item}) + assert obj is not None diff --git a/tests/test_web_utils.py b/tests/test_web_utils.py index b52c5fd5d56..ce8f18eaa5d 100644 --- a/tests/test_web_utils.py +++ b/tests/test_web_utils.py @@ -2,13 +2,39 @@ # This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org # See the file 'docs/LICENSE' for copying permission. +import pathlib +import random +import sys import tempfile +from unittest import mock import httpretty +import mongomock +import pymongo import pytest +from lib.cuckoo.common.config import ConfigMeta from lib.cuckoo.common.path_utils import path_delete, path_write_file from lib.cuckoo.common.web_utils import _download_file, force_int, get_file_content, parse_request_arguments +from modules.reporting.mongodb_constants import ANALYSIS_COLL + +TEST_DB_NAME = "cuckoo_test_db" + + +@pytest.fixture +def mongodb_enabled(custom_conf_path: pathlib.Path): + with open(custom_conf_path / "reporting.conf", "wt") as fil: + print(f"[mongodb]\nenabled = yes\ndb = {TEST_DB_NAME}", file=fil) + ConfigMeta.refresh() + yield + + +@pytest.fixture +def mongodb_mock_client(): + with mongomock.patch(servers=(("127.0.0.1", 27017),)): + client = pymongo.MongoClient(host=f"mongodb://127.0.0.1/{TEST_DB_NAME}") + with mock.patch("dev_utils.mongodb.conn", new=client): + yield client @pytest.fixture @@ -90,3 +116,56 @@ def test_parse_request_arguments(mock_request): def test_force_int(): assert force_int(value="1") == 1 assert force_int(value="$") == 0 + + +def test_perform_search_invalid_ttp(): + sys.modules.pop("lib.cuckoo.common.web_utils", None) + from lib.cuckoo.common.web_utils import perform_search + + with pytest.raises(ValueError) as exc: + _ = perform_search(term="ttp", value="SPOONS") + assert "Invalid TTP" in str(exc) + + +def test_perform_search_not_in_search_term_map(): + sys.modules.pop("lib.cuckoo.common.web_utils", None) + from lib.cuckoo.common.web_utils import perform_search, search_term_map + + term = "Unexpected" + assert term not in search_term_map + actual_result = perform_search(term=term, value="not in search term map") + assert actual_result is None + + +def test_perform_search_invalid_int_value(): + sys.modules.pop("lib.cuckoo.common.web_utils", None) + from lib.cuckoo.common.web_utils import normalized_int_terms, perform_search + + term = random.choice(normalized_int_terms) + non_integer_value = "not an integer" + with pytest.raises(ValueError) as exc: + _ = perform_search(term=term, value=non_integer_value) + assert non_integer_value in str(exc) + + +@pytest.mark.usefixtures("mongodb_enabled") +def test_perform_search_mongo(mongodb_mock_client): + sys.modules.pop("lib.cuckoo.common.web_utils", None) + from lib.cuckoo.common.web_utils import perform_search, search_term_map + + term = "tlp" + value = "red" + assert term in search_term_map + assert search_term_map[term] == "info.tlp" + id = random.randint(1, 1000) + analysis = { + "info": { + "id": id, + term: value, + } + } + mongodb_mock_client[TEST_DB_NAME][ANALYSIS_COLL].insert_one(analysis) + result = perform_search(term=term, value=value) + assert len(result) == 1 + assert result[0]["info"][term] == value + assert result[0]["info"]["id"] == id