Skip to content

Commit

Permalink
More tests with mongomock
Browse files Browse the repository at this point in the history
- Add a test for report doc insert calls
- Add tests for perform_search
  • Loading branch information
rkoumis committed Jan 14, 2025
1 parent 73147e9 commit 3c97376
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 0 deletions.
85 changes: 85 additions & 0 deletions tests/modules/reporting/test_report_doc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import pathlib
import random
import sys
from unittest import mock
from unittest.mock import patch

import bson
import mongomock
import pymongo
import pytest

from lib.cuckoo.common.config import ConfigMeta
from modules.reporting.mongodb_constants import CALLS_COLL

TEST_DB_NAME = "cuckoo_test"


@pytest.fixture
def mongodb_enabled(custom_conf_path: pathlib.Path):
"""Enable mongodb.
Use sys.modules.pop to ensure target gets imported
as if for the first time.
"""
with open(custom_conf_path / "reporting.conf", "wt") as fil:
print(f"[mongodb]\nenabled = yes\ndb = {TEST_DB_NAME}", file=fil)
ConfigMeta.refresh()
sys.modules.pop("modules.reporting.report_doc", None)
yield
# Pop the module again, to reset the state for other tests.
sys.modules.pop("modules.reporting.report_doc", None)


@pytest.fixture
def mongodb_mock_client(request):
with mongomock.patch(servers=(("127.0.0.1", 27017),)):
client = pymongo.MongoClient(host=f"mongodb://127.0.0.1/{TEST_DB_NAME}")
request.instance.mongo_client = client
with mock.patch("dev_utils.mongodb.conn", new=client):
yield


@pytest.mark.usefixtures("mongodb_enabled", "db", "mongodb_mock_client")
class TestReportDoc:
def test_insert_calls_mongodb(self):
"""Test the insert_calls function."""
from modules.reporting.report_doc import insert_calls

pid0, pid1 = random.randint(1, 1000), random.randint(1, 1000)
report = {
"behavior": {
"processes": [
{
"process_id": pid0,
"calls": [
{
"timestamp": "2025-01-01 01:01:01",
},
],
},
{
"process_id": pid1,
"calls": [
{
"timestamp": "2025-02-02 02:02:02",
},
],
},
]
}
}
with patch("modules.reporting.report_doc.CHUNK_CALL_SIZE", new=1):
result = insert_calls(report, mongodb=True)
assert isinstance(result, list)
assert result[0]["process_id"] == pid0
assert result[1]["process_id"] == pid1
expected_calls = [
result[0]["calls"][0],
result[1]["calls"][0],
]
collection = self.mongo_client[TEST_DB_NAME][CALLS_COLL]
for item in expected_calls:
assert isinstance(item, bson.objectid.ObjectId)
obj = collection.find_one({"_id": item})
assert obj is not None
79 changes: 79 additions & 0 deletions tests/test_web_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,39 @@
# This file is part of Cuckoo Sandbox - http://www.cuckoosandbox.org
# See the file 'docs/LICENSE' for copying permission.

import pathlib
import random
import sys
import tempfile
from unittest import mock

import httpretty
import mongomock
import pymongo
import pytest

from lib.cuckoo.common.config import ConfigMeta
from lib.cuckoo.common.path_utils import path_delete, path_write_file
from lib.cuckoo.common.web_utils import _download_file, force_int, get_file_content, parse_request_arguments
from modules.reporting.mongodb_constants import ANALYSIS_COLL

TEST_DB_NAME = "cuckoo_test_db"


@pytest.fixture
def mongodb_enabled(custom_conf_path: pathlib.Path):
with open(custom_conf_path / "reporting.conf", "wt") as fil:
print(f"[mongodb]\nenabled = yes\ndb = {TEST_DB_NAME}", file=fil)
ConfigMeta.refresh()
yield


@pytest.fixture
def mongodb_mock_client():
with mongomock.patch(servers=(("127.0.0.1", 27017),)):
client = pymongo.MongoClient(host=f"mongodb://127.0.0.1/{TEST_DB_NAME}")
with mock.patch("dev_utils.mongodb.conn", new=client):
yield client


@pytest.fixture
Expand Down Expand Up @@ -90,3 +116,56 @@ def test_parse_request_arguments(mock_request):
def test_force_int():
assert force_int(value="1") == 1
assert force_int(value="$") == 0


def test_perform_search_invalid_ttp():
sys.modules.pop("lib.cuckoo.common.web_utils", None)
from lib.cuckoo.common.web_utils import perform_search

with pytest.raises(ValueError) as exc:
_ = perform_search(term="ttp", value="SPOONS")
assert "Invalid TTP" in str(exc)


def test_perform_search_not_in_search_term_map():
sys.modules.pop("lib.cuckoo.common.web_utils", None)
from lib.cuckoo.common.web_utils import perform_search, search_term_map

term = "Unexpected"
assert term not in search_term_map
actual_result = perform_search(term=term, value="not in search term map")
assert actual_result is None


def test_perform_search_invalid_int_value():
sys.modules.pop("lib.cuckoo.common.web_utils", None)
from lib.cuckoo.common.web_utils import normalized_int_terms, perform_search

term = random.choice(normalized_int_terms)
non_integer_value = "not an integer"
with pytest.raises(ValueError) as exc:
_ = perform_search(term=term, value=non_integer_value)
assert non_integer_value in str(exc)


@pytest.mark.usefixtures("mongodb_enabled")
def test_perform_search_mongo(mongodb_mock_client):
sys.modules.pop("lib.cuckoo.common.web_utils", None)
from lib.cuckoo.common.web_utils import perform_search, search_term_map

term = "tlp"
value = "red"
assert term in search_term_map
assert search_term_map[term] == "info.tlp"
id = random.randint(1, 1000)
analysis = {
"info": {
"id": id,
term: value,
}
}
mongodb_mock_client[TEST_DB_NAME][ANALYSIS_COLL].insert_one(analysis)
result = perform_search(term=term, value=value)
assert len(result) == 1
assert result[0]["info"][term] == value
assert result[0]["info"]["id"] == id

0 comments on commit 3c97376

Please sign in to comment.