From 93102db7b19c3e985b3502bcbfc2c18d29854624 Mon Sep 17 00:00:00 2001 From: Robin Koumis Date: Wed, 6 Mar 2024 17:03:53 -0500 Subject: [PATCH 1/3] Unit tests for agent - Added file test_agent.py - Tests can be run in Windows or Linux - Tests will be run in github actions - Test most existing functionality of the agent - In send_file open the file in binary mode (bug fix) - Updates to the agent to make it testable, including: - Pass a multiprocessing event to the run() method when under test, so the test knows when the agent process is ready - Tweaks to the shutdown method enabling testing - Let jsonify not crash if values cannot be serialized - Add a new command-line parameter, -v, useful when testing interactively - When -v is given, stdout and stderr will simply go to the console - Allow the 'date' command to be executed from localhost; for testing ran ruff. --- .github/workflows/python-package-windows.yml | 13 +- agent/agent.py | 75 ++- agent/pytest.ini | 3 + agent/test_agent.py | 479 +++++++++++++++++++ 4 files changed, 549 insertions(+), 21 deletions(-) create mode 100644 agent/pytest.ini create mode 100644 agent/test_agent.py diff --git a/.github/workflows/python-package-windows.yml b/.github/workflows/python-package-windows.yml index 3a1e353db06..061dd2476ae 100644 --- a/.github/workflows/python-package-windows.yml +++ b/.github/workflows/python-package-windows.yml @@ -23,14 +23,21 @@ jobs: - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v5 + # Use x86 python because of https://github.com/kevoreilly/CAPEv2/issues/168 with: python-version: ${{ matrix.python-version }} cache: 'pip' + architecture: 'x86' - - name: Install pytest - run: pip install pytest + - name: Install dependencies + run: pip install --upgrade pytest requests - - name: Run unit tests + - name: Run analyzer unit tests run: | cd analyzer/windows pytest -v . + + - name: Run agent unit tests + run: | + cd agent + pytest -v . diff --git a/agent/agent.py b/agent/agent.py index ca4c2a17c9b..496841b7fb3 100644 --- a/agent/agent.py +++ b/agent/agent.py @@ -7,6 +7,7 @@ import http.server import ipaddress import json +import multiprocessing import os import platform import shutil @@ -36,7 +37,7 @@ if sys.maxsize > 2**32 and sys.platform == "win32": sys.exit("You should install python3 x86! not x64") -AGENT_VERSION = "0.12" +AGENT_VERSION = "0.13" AGENT_FEATURES = [ "execpy", "execute", @@ -54,10 +55,6 @@ ANALYZER_FOLDER = "" state = {"status": STATUS_INIT} -# To send output to stdin comment out this 2 lines -sys.stdout = StringIO() -sys.stderr = StringIO() - class MiniHTTPRequestHandler(http.server.SimpleHTTPRequestHandler): server_version = "CAPE Agent" @@ -105,9 +102,19 @@ def __init__(self): "POST": [], } - def run(self, host: ipaddress.IPv4Address = "0.0.0.0", port: int = 8000): + def run( + self, + host: ipaddress.IPv4Address = "0.0.0.0", + port: int = 8000, + event: multiprocessing.Event = None, + ): + socketserver.TCPServer.allow_reuse_address = True self.s = socketserver.TCPServer((host, port), self.handler) - self.s.allow_reuse_address = True + + # tell anyone waiting that they're good to go + if event: + event.set() + self.s.serve_forever() def route(self, path: str, methods: Iterable[str] = ["GET"]): @@ -142,24 +149,41 @@ def handle(self, obj): elif isinstance(ret, send_file): ret.write(obj.wfile) + if hasattr(self, "s") and self.s._BaseServer__shutdown_request: + self.close_connection = True + def shutdown(self): # BaseServer also features a .shutdown() method, but you can't use # that from the same thread as that will deadlock the whole thing. - self.s._BaseServer__shutdown_request = True + if hasattr(self, "s"): + self.s._BaseServer__shutdown_request = True + else: + # When running unit tests in Windows, the system would hang here, + # until this `exit(1)` was added. + print(f"{self} has no 's' attribute") + exit(1) class jsonify: """Wrapper that represents Flask.jsonify functionality.""" - def __init__(self, **kwargs): - self.status_code = 200 + def __init__(self, status_code=200, **kwargs): + self.status_code = status_code self.values = kwargs def init(self): pass def json(self): - return json.dumps(self.values) + for valkey in self.values: + if isinstance(self.values[valkey], bytes): + self.values[valkey] = self.values[valkey].decode("utf8", "replace") + try: + retdata = json.dumps(self.values) + except Exception as ex: + retdata = json.dumps({"error": f"Error serializing json data: {ex.args[0]}"}) + + return retdata def headers(self, obj): pass @@ -183,7 +207,7 @@ def write(self, sock): if not self.length: return - with open(self.path, "r") as f: + with open(self.path, "rb") as f: buf = f.read(1024 * 1024) while buf: sock.write(buf) @@ -261,7 +285,13 @@ def put_status(): @app.route("/logs") def get_logs(): - return json_success("Agent logs", stdout=sys.stdout.getvalue(), stderr=sys.stderr.getvalue()) + if isinstance(sys.stdout, StringIO): + stdoutbuf = sys.stdout.getvalue() + stderrbuf = sys.stderr.getvalue() + else: + stdoutbuf = "verbose mode, stdout not saved" + stderrbuf = "verbose mode, stderr not saved" + return json_success("Agent logs", stdout=stdoutbuf, stderr=stderrbuf) @app.route("/system") @@ -394,14 +424,17 @@ def do_remove(): @app.route("/execute", methods=["POST"]) def do_execute(): - hostname = socket.gethostname() - local_ip = socket.gethostbyname(hostname) + local_ip = socket.gethostbyname(socket.gethostname()) - if request.client_ip in ("127.0.0.1", local_ip): - return json_error(500, "Not allowed to execute commands") if "command" not in request.form: return json_error(400, "No command has been provided") + # only allow date command from localhost. Even this is just to + # let it be tested + allowed_commands = ["date", "cmd /c date /t"] + if request.client_ip in ("127.0.0.1", local_ip) and request.form["command"] not in allowed_commands: + return json_error(500, "Not allowed to execute commands") + # Execute the command asynchronously? As a shell command? async_exec = "async" in request.form shell = "shell" in request.form @@ -475,9 +508,15 @@ def do_kill(): if __name__ == "__main__": + multiprocessing.set_start_method("spawn") parser = argparse.ArgumentParser() parser.add_argument("host", nargs="?", default="0.0.0.0") parser.add_argument("port", type=int, nargs="?", default=8000) - # ToDo redir to stdout + parser.add_argument("-v", "--verbose", action="store_true") args = parser.parse_args() + + if not args.verbose: + sys.stdout = StringIO() + sys.stderr = StringIO() + app.run(host=args.host, port=args.port) diff --git a/agent/pytest.ini b/agent/pytest.ini new file mode 100644 index 00000000000..bdfc0ebb117 --- /dev/null +++ b/agent/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +pythonpath = . +asyncio_mode = auto diff --git a/agent/test_agent.py b/agent/test_agent.py new file mode 100644 index 00000000000..bc38c28f654 --- /dev/null +++ b/agent/test_agent.py @@ -0,0 +1,479 @@ +"""Tests for the agent.""" + +import datetime +import io +import multiprocessing +import os +import pathlib +import random +import shutil +import sys +import tempfile +import uuid +import zipfile +from urllib.parse import urljoin + +import pytest +import requests + +import agent + +HOST = "127.0.0.1" +PORT = 8000 +BASE_URL = f"http://{HOST}:{PORT}" + +DIRPATH = os.path.join(tempfile.gettempdir(), str(uuid.uuid4())) + + +def make_temp_name(): + return str(uuid.uuid4()) + + +class TestAgent: + """Test the agent API.""" + + agent_process: multiprocessing.Process = None + + def setup_method(self): + agent.state = {"status": agent.STATUS_INIT, "description": "", "async_subprocess": None} + ev = multiprocessing.Event() + self.agent_process = multiprocessing.Process( + target=agent.app.run, + kwargs={"host": HOST, "port": PORT, "event": ev}, + ) + self.agent_process.start() + + # Wait for http server to start. + if not ev.wait(5.0): + raise Exception("Failed to start agent HTTP server") + + # Create temp directory for tests, as makes tidying up easier + os.mkdir(DIRPATH, 0o777) + assert os.path.isdir(DIRPATH) + + def teardown_method(self): + # Remove the temporary directory and files. + try: + # Test the kill endpoint, which shuts down the agent service. + r = requests.get(f"{BASE_URL}/kill") + assert r.status_code == 200 + assert r.json()["message"] == "Quit the CAPE Agent" + except requests.exceptions.ConnectionError: + pass + shutil.rmtree(DIRPATH, ignore_errors=True) + assert not os.path.isdir(DIRPATH) + + # Ensure agent process completes; release resources. + self.agent_process.join() + self.agent_process.close() + + @staticmethod + def non_existent_directory(): + root = pathlib.Path(tempfile.gettempdir()).root + current_pid = os.getpid() + non_existent = pathlib.Path(root, str(current_pid), str(random.randint(10000, 99999))) + assert not os.path.isdir(non_existent) + assert not os.path.exists(non_existent) + return non_existent + + @staticmethod + def confirm_status(expected_status): + """Do a get and check the status.""" + status_url = urljoin(BASE_URL, "status") + r = requests.get(status_url) + js = r.json() + assert js["message"] == "Analysis status" + assert js["status"] == expected_status + assert r.status_code == 200 + return js + + @staticmethod + def create_file(path, contents): + """Create the named file with the supplied contents.""" + with open(path, "w") as file: + file.write(contents) + assert os.path.exists(path) + assert os.path.isfile(path) + + @staticmethod + def file_contains(path, expected_contents): + """Examine the contents of a file.""" + with open(path) as file: + actual_contents = file.read() + return bool(expected_contents in actual_contents) + + @classmethod + def store_file(cls, file_contents): + """Store a file via the API, with the given contents. Return the filepath.""" + contents = os.linesep.join(file_contents) + upload_file = {"file": ("name-here-matters-not", contents)} + filepath = os.path.join(DIRPATH, make_temp_name() + ".py") + form = {"filepath": filepath} + js = cls.post_form("store", form, files=upload_file) + assert js["message"] == "Successfully stored file" + assert os.path.isfile(filepath) + assert cls.file_contains(filepath, file_contents[0]) + assert cls.file_contains(filepath, file_contents[-1]) + return filepath + + @staticmethod + def post_form(url_part, form_data, expected_status=200, files=None): + """Post to the URL and return the json.""" + url = urljoin(BASE_URL, url_part) + r = requests.post(url, data=form_data, files=files) + assert r.status_code == expected_status + js = r.json() + return js + + def test_root(self): + r = requests.get(f"{BASE_URL}/") + assert r.status_code == 200 + js = r.json() + assert js["message"] == "CAPE Agent!" + assert "version" in js + assert "features" in js + assert "execute" in js["features"] + assert "execpy" in js["features"] + assert "pinning" in js["features"] + + def test_status_write_valid_text(self): + """Write a status of 'exception'.""" + # First, confirm the status is NOT 'exception'. + _ = self.confirm_status(agent.STATUS_INIT) + form = {"status": "exception"} + url_part = "status" + _ = self.post_form(url_part, form) + _ = self.confirm_status("exception") + + def test_status_write_invalid(self): + """Fail to provide a valid status.""" + form = {"description": "Test Status"} + js = self.post_form("status", form, 400) + assert js["message"] == "No status has been provided" + + form = {"status": "unexpected value"} + js = self.post_form("status", form, 200) + assert js["message"] == "Analysis status updated" + _ = self.confirm_status("unexpected value") + + # Write an unexpected random number. + form = {"status": random.randint(50, 99)} + js = self.post_form("status", form, 200) + assert js["message"] == "Analysis status updated" + _ = self.confirm_status(str(form["status"])) + + def test_logs(self): + """Test that the agent responds to a request for the logs.""" + r = requests.get(f"{BASE_URL}/logs") + assert r.status_code == 200 + js = r.json() + assert js["message"] == "Agent logs" + assert "stdout" in js + assert "stderr" in js + + def test_system(self): + """Test that the agent responds to a request for the system/platform.""" + r = requests.get(f"{BASE_URL}/system") + assert r.status_code == 200 + js = r.json() + assert js["message"] == "System" + assert "system" in js + if sys.platform == "win32": + assert js["system"] == "Windows" + else: + assert js["system"] == "Linux" + + def test_environ(self): + """Test that the agent responds to a request for the environment.""" + r = requests.get(f"{BASE_URL}/environ") + assert r.status_code == 200 + js = r.json() + assert js["message"] == "Environment variables" + assert "environ" in js + + def test_path(self): + """Test that the agent responds to a request for its path.""" + r = requests.get(f"{BASE_URL}/path") + assert r.status_code == 200 + js = r.json() + assert js["message"] == "Agent path" + assert "filepath" in js + assert os.path.isfile(js["filepath"]) + + def test_mkdir_valid(self): + """Test that the agent creates a directory.""" + new_dir = os.path.join(DIRPATH, make_temp_name()) + form = { + "dirpath": new_dir, + "mode": 0o777, + } + js = self.post_form("mkdir", form) + assert js["message"] == "Successfully created directory" + assert os.path.exists(new_dir) + assert os.path.isdir(new_dir) + + def test_mkdir_invalid(self): + """Ensure we get an error returned when the mkdir request fails.""" + form = {} + js = self.post_form("mkdir", form, 400) + assert js["message"] == "No dirpath has been provided" + + root = pathlib.Path(tempfile.gettempdir()).root + form = {"dirpath": root, "mode": 0o777} + js = self.post_form("mkdir", form, 500) + assert js["message"] == "Error creating directory" + + def test_mktemp_valid(self): + form = { + "dirpath": DIRPATH, + "prefix": make_temp_name(), + "suffix": "tmp", + } + js = self.post_form("mktemp", form) + assert js["message"] == "Successfully created temporary file" + # tempfile.mkstemp adds random characters to suffix, so returned name + # will be different + assert "filepath" in js and js["filepath"].startswith(os.path.join(form["dirpath"], form["prefix"])) + assert os.path.exists(js["filepath"]) + assert os.path.isfile(js["filepath"]) + + def test_mktemp_invalid(self): + """Ensure we get an error returned when the mktemp request fails.""" + dirpath = self.non_existent_directory() + form = { + "dirpath": dirpath, + "prefix": "", + "suffix": "", + } + js = self.post_form("mktemp", form, 500) + assert js["message"] == "Error creating temporary file" + + def test_mkdtemp_valid(self): + """Ensure we can use the mkdtemp endpoint.""" + form = { + "dirpath": DIRPATH, + "prefix": make_temp_name(), + "suffix": "tmp", + } + js = self.post_form("mkdtemp", form) + assert js["message"] == "Successfully created temporary directory" + # tempfile.mkdtemp adds random characters to suffix, so returned name + # will be different + assert "dirpath" in js and js["dirpath"].startswith(os.path.join(form["dirpath"], form["prefix"])) + assert os.path.exists(js["dirpath"]) + assert os.path.isdir(js["dirpath"]) + + def test_mkdtemp_invalid(self): + """Ensure we get an error returned when the mkdtemp request fails.""" + dirpath = self.non_existent_directory() + assert not dirpath.exists() + form = { + "dirpath": dirpath, + "prefix": "", + "suffix": "", + } + js = self.post_form("mkdtemp", form, 500) + assert js["message"] == "Error creating temporary directory" + + def test_store(self): + sample_text = make_temp_name() + upload_file = {"file": ("ignored", os.linesep.join(("test data", sample_text, "test data")))} + form = {"filepath": os.path.join(DIRPATH, make_temp_name() + ".tmp")} + + js = self.post_form("store", form, files=upload_file) + assert js["message"] == "Successfully stored file" + assert os.path.exists(form["filepath"]) + assert os.path.isfile(form["filepath"]) + assert self.file_contains(form["filepath"], sample_text) + + def test_store_invalid(self): + # missing file + form = {"filepath": os.path.join(DIRPATH, make_temp_name() + ".tmp")} + js = self.post_form("store", form, 400) + assert js["message"] == "No file has been provided" + + # missing filepath + upload_file = {"file": ("test_data.txt", "test data\ntest data\n")} + js = self.post_form("store", {}, 400, files=upload_file) + assert js["message"] == "No filepath has been provided" + + # destination file path is invalid + upload_file = {"file": ("test_data.txt", "test data\ntest data\n")} + form = {"filepath": os.path.join(DIRPATH, make_temp_name(), "tmp")} + js = self.post_form("store", form, 500, files=upload_file) + assert js["message"] == "Error storing file" + + def test_retrieve(self): + """Create a file, then try to retrieve it.""" + first_line = make_temp_name() + last_line = make_temp_name() + file_contents = os.linesep.join((first_line, "test data", last_line)) + file_path = os.path.join(DIRPATH, make_temp_name() + ".tmp") + self.create_file(file_path, file_contents) + + form = {"filepath": file_path} + # Can't use self.post_form here as no json will be returned. + r = requests.post(f"{BASE_URL}/retrieve", data=form) + assert r.status_code == 200 + assert first_line in r.text + assert last_line in r.text + + def test_retrieve_invalid(self): + js = self.post_form("retrieve", {}, 400) + assert js["message"].startswith("No filepath has been provided") + + # request to retrieve non existent file + form = {"filepath": os.path.join(DIRPATH, make_temp_name() + ".tmp")} + # Can't use self.post_form here as no json will be returned. + r = requests.post(f"{BASE_URL}/retrieve", data=form) + assert r.status_code == 404 + + def test_extract(self): + """Create a file zip file, then upload and extract the contents.""" + file_dir = make_temp_name() + file_name = make_temp_name() + file_contents = make_temp_name() + zfile = io.BytesIO() + zf = zipfile.ZipFile(zfile, "w", zipfile.ZIP_DEFLATED, False) + zf.writestr(os.path.join(file_dir, file_name), file_contents) + zf.close() + zfile.seek(0) + + upload_file = {"zipfile": ("test_file.zip", zfile.read())} + form = {"dirpath": DIRPATH} + + js = self.post_form("extract", form, files=upload_file) + assert js["message"] == "Successfully extracted zip file" + expected_path = os.path.join(DIRPATH, file_dir, file_name) + assert os.path.exists(expected_path) + assert self.file_contains(expected_path, file_contents) + + # todo should I check the filesytem for the file? + + def test_extract_invalid(self): + form = {"dirpath": DIRPATH} + js = self.post_form("extract", form, 400) + assert js["message"] == "No zip file has been provided" + + upload_file = {"zipfile": ("test_file.zip", "dummy data")} + js = self.post_form("extract", {}, 400, files=upload_file) + assert js["message"] == "No dirpath has been provided" + + def test_remove(self): + tempdir = os.path.join(DIRPATH, make_temp_name()) + tempfile = os.path.join(tempdir, make_temp_name()) + os.mkdir(tempdir, 0o777) + self.create_file(tempfile, "test data\ntest data\n") + + # delete temp file + form = {"path": tempfile} + js = self.post_form("remove", form) + assert js["message"] == "Successfully deleted file" + + # delete temp directory + form = {"path": tempdir} + js = self.post_form("remove", form) + assert js["message"] == "Successfully deleted directory" + + def test_remove_invalid(self): + tempdir = os.path.join(DIRPATH, make_temp_name()) + + # missing parameter + form = {} + js = self.post_form("remove", form, 400) + assert js["message"] == "No path has been provided" + + # path doesn't exist + form = {"path": tempdir} + js = self.post_form("remove", form, 404) + assert js["message"] == "Path provided does not exist" + + @pytest.mark.skipif(agent.isAdmin(), reason="Test fails if privileges are elevated.") + def test_remove_system_temp_dir(self): + # error removing file or dir (permission) + form = {"path": tempfile.gettempdir()} + js = self.post_form("remove", form, 500) + assert js["message"] == "Error removing file or directory" + + def test_execute(self): + """Test executing the 'date' command.""" + if sys.platform == "win32": + form = {"command": "cmd /c date /t"} + else: + form = {"command": "date"} + js = self.post_form("execute", form) + assert js["message"] == "Successfully executed command" + assert "stdout" in js + assert "stderr" in js + current_year = datetime.date.today().isoformat() + assert current_year[:4] in js["stdout"] + + def test_execute_error(self): + """Expect an error on invalid command to execute.""" + js = self.post_form("execute", {}, 400) + assert js["message"] == "No command has been provided" + + form = {"command": "ls"} + js = self.post_form("execute", form, 500) + assert js["message"] == "Not allowed to execute commands" + + def test_execute_py(self): + """Test we can execute a simple python script.""" + # The output line endings are different between linux and Windows. + file_contents = ( + f"# Comment a random number {random.randint(1000, 9999)}'", + "import sys", + "print('hello world')", + "print('goodbye world', file=sys.stderr)", + ) + filepath = self.store_file(file_contents) + + form = {"filepath": filepath} + js = self.post_form("execpy", form) + assert js["message"] == "Successfully executed command" + assert "stdout" in js and "hello world" in js["stdout"] + assert "stderr" in js and "goodbye world" in js["stderr"] + + def test_execute_py_error_no_file(self): + """Ensure we get a 400 back when there's no file provided.""" + # The agent used to return 200 even in various failure scenarios. + js = self.post_form("execpy", {}, expected_status=400) + assert js["message"] == "No Python file has been provided" + + def test_execute_py_error_nonexistent_file(self): + """Ensure we get a 400 back when a nonexistent filename is provided.""" + filepath = os.path.join(DIRPATH, make_temp_name() + ".py") + form = {"filepath": filepath} + js = self.post_form("execpy", form, expected_status=200) + assert js["message"] == "Successfully executed command" + assert "stderr" in js and "No such file or directory" in js["stderr"] + _ = self.confirm_status(agent.STATUS_RUNNING) + + def test_execute_py_error_non_zero_exit_code(self): + """Ensure we get a 400 back when there's a non-zero exit code.""" + # Run a python script that exits non-zero. + file_contents = ( + f"# Comment a random number {random.randint(1000, 9999)}'", + "import sys", + "print('hello world')", + "sys.exit(3)", + ) + filepath = self.store_file(file_contents) + form = {"filepath": filepath} + js = self.post_form("execpy", form, expected_status=200) + assert js["message"] == "Successfully executed command" + assert "hello world" in js["stdout"] + _ = self.confirm_status(agent.STATUS_RUNNING) + + def test_pinning(self): + r = requests.get(f"{BASE_URL}/pinning") + assert r.status_code == 200 + js = r.json() + assert js["message"] == "Successfully pinned Agent" + assert "client_ip" in js + + # Pinning again causes an error. + r = requests.get(f"{BASE_URL}/pinning") + assert r.status_code == 500 + js = r.json() + assert js["message"] == "Agent has already been pinned to an IP!" From 9ca21defe56f6624dd3c944b03b9e435cd073d2f Mon Sep 17 00:00:00 2001 From: Robin Koumis Date: Wed, 6 Mar 2024 19:13:01 -0500 Subject: [PATCH 2/3] Stricter setting of agent status - Use an enum for the status. Only accept expected values. - No longer require x86 python for the agent. - It's only the analyzer that still requires x86 python. - Check that we can read a file before trying to send it. --- .github/workflows/python-package-windows.yml | 1 - agent/agent.py | 67 +++++++++++++------- agent/test_agent.py | 33 ++++++---- 3 files changed, 64 insertions(+), 37 deletions(-) diff --git a/.github/workflows/python-package-windows.yml b/.github/workflows/python-package-windows.yml index 061dd2476ae..8bf2437b467 100644 --- a/.github/workflows/python-package-windows.yml +++ b/.github/workflows/python-package-windows.yml @@ -27,7 +27,6 @@ jobs: with: python-version: ${{ matrix.python-version }} cache: 'pip' - architecture: 'x86' - name: Install dependencies run: pip install --upgrade pytest requests diff --git a/agent/agent.py b/agent/agent.py index 496841b7fb3..9dd036924fd 100644 --- a/agent/agent.py +++ b/agent/agent.py @@ -4,6 +4,7 @@ import argparse import cgi +import enum import http.server import ipaddress import json @@ -30,14 +31,7 @@ if sys.version_info[:2] < (3, 6): sys.exit("You are running an incompatible version of Python, please use >= 3.6") -# You must run x86 version not x64 -# The analysis process interacts with low-level Windows libraries that need a -# x86 Python to be running. -# (see https://github.com/kevoreilly/CAPEv2/issues/1680) -if sys.maxsize > 2**32 and sys.platform == "win32": - sys.exit("You should install python3 x86! not x64") - -AGENT_VERSION = "0.13" +AGENT_VERSION = "0.14" AGENT_FEATURES = [ "execpy", "execute", @@ -47,13 +41,35 @@ "unicodepath", ] -STATUS_INIT = 0x0001 -STATUS_RUNNING = 0x0002 -STATUS_COMPLETED = 0x0003 -STATUS_FAILED = 0x0004 + +class Status(enum.IntEnum): + INIT = 1 + RUNNING = 2 + COMPLETE = 3 + FAILED = 4 + EXCEPTION = 5 + + def __str__(self): + return f"{self.name.lower()}" + + @classmethod + def _missing_(cls, value): + if not isinstance(value, str): + return None + value = value.lower() + for member in cls: + if str(member) == value: + return member + if value.isnumeric() and int(value) == member.value: + return member + return None + ANALYZER_FOLDER = "" -state = {"status": STATUS_INIT} +state = { + "status": Status.INIT, + "description": "", +} class MiniHTTPRequestHandler(http.server.SimpleHTTPRequestHandler): @@ -197,11 +213,11 @@ def __init__(self, path): self.status_code = 200 def init(self): - if not os.path.isfile(self.path): + if os.path.isfile(self.path) and os.access(self.path, os.R_OK): + self.length = os.path.getsize(self.path) + else: self.status_code = 404 self.length = 0 - else: - self.length = os.path.getsize(self.path) def write(self, sock): if not self.length: @@ -270,15 +286,17 @@ def get_index(): @app.route("/status") def get_status(): - return json_success("Analysis status", status=state.get("status"), description=state.get("description")) + return json_success("Analysis status", status=str(state.get("status")), description=state.get("description")) @app.route("/status", methods=["POST"]) def put_status(): - if "status" not in request.form: - return json_error(400, "No status has been provided") + try: + status = Status(request.form.get("status")) + except ValueError: + return json_error(400, "No valid status has been provided") - state["status"] = request.form["status"] + state["status"] = status state["description"] = request.form.get("description") return json_success("Analysis status updated") @@ -449,11 +467,12 @@ def do_execute(): p = subprocess.Popen(request.form["command"], shell=shell, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() except Exception: - state["status"] = STATUS_FAILED + state["status"] = Status.FAILED state["description"] = "Error execute command" return json_exception("Error executing command") - state["status"] = STATUS_RUNNING + state["status"] = Status.RUNNING + state["description"] = "" return json_success("Successfully executed command", stdout=stdout, stderr=stderr) @@ -480,11 +499,11 @@ def do_execpy(): p = subprocess.Popen(args, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() except Exception: - state["status"] = STATUS_FAILED + state["status"] = Status.FAILED state["description"] = "Error executing command" return json_exception("Error executing command") - state["status"] = STATUS_RUNNING + state["status"] = Status.RUNNING return json_success("Successfully executed command", stdout=stdout, stderr=stderr) diff --git a/agent/test_agent.py b/agent/test_agent.py index bc38c28f654..cff1711eeab 100644 --- a/agent/test_agent.py +++ b/agent/test_agent.py @@ -35,7 +35,7 @@ class TestAgent: agent_process: multiprocessing.Process = None def setup_method(self): - agent.state = {"status": agent.STATUS_INIT, "description": "", "async_subprocess": None} + agent.state = {"status": agent.Status.INIT, "description": "", "async_subprocess": None} ev = multiprocessing.Event() self.agent_process = multiprocessing.Process( target=agent.app.run, @@ -139,28 +139,37 @@ def test_root(self): def test_status_write_valid_text(self): """Write a status of 'exception'.""" # First, confirm the status is NOT 'exception'. - _ = self.confirm_status(agent.STATUS_INIT) + _ = self.confirm_status(str(agent.Status.INIT)) form = {"status": "exception"} url_part = "status" _ = self.post_form(url_part, form) - _ = self.confirm_status("exception") + _ = self.confirm_status(str(agent.Status.EXCEPTION)) + + def test_status_write_valid_number(self): + """Write a status of '5'.""" + # First, confirm the status is NOT 'exception'. + _ = self.confirm_status(str(agent.Status.INIT)) + form = {"status": 5} + url_part = "status" + _ = self.post_form(url_part, form) + _ = self.confirm_status(str(agent.Status.EXCEPTION)) def test_status_write_invalid(self): """Fail to provide a valid status.""" form = {"description": "Test Status"} js = self.post_form("status", form, 400) - assert js["message"] == "No status has been provided" + assert js["message"] == "No valid status has been provided" form = {"status": "unexpected value"} - js = self.post_form("status", form, 200) - assert js["message"] == "Analysis status updated" - _ = self.confirm_status("unexpected value") + js = self.post_form("status", form, 400) + assert js["message"] == "No valid status has been provided" + _ = self.confirm_status(str(agent.Status.INIT)) # Write an unexpected random number. form = {"status": random.randint(50, 99)} - js = self.post_form("status", form, 200) - assert js["message"] == "Analysis status updated" - _ = self.confirm_status(str(form["status"])) + js = self.post_form("status", form, 400) + assert js["message"] == "No valid status has been provided" + _ = self.confirm_status(str(agent.Status.INIT)) def test_logs(self): """Test that the agent responds to a request for the logs.""" @@ -447,7 +456,7 @@ def test_execute_py_error_nonexistent_file(self): js = self.post_form("execpy", form, expected_status=200) assert js["message"] == "Successfully executed command" assert "stderr" in js and "No such file or directory" in js["stderr"] - _ = self.confirm_status(agent.STATUS_RUNNING) + _ = self.confirm_status(str(agent.Status.RUNNING)) def test_execute_py_error_non_zero_exit_code(self): """Ensure we get a 400 back when there's a non-zero exit code.""" @@ -463,7 +472,7 @@ def test_execute_py_error_non_zero_exit_code(self): js = self.post_form("execpy", form, expected_status=200) assert js["message"] == "Successfully executed command" assert "hello world" in js["stdout"] - _ = self.confirm_status(agent.STATUS_RUNNING) + _ = self.confirm_status(str(agent.Status.RUNNING)) def test_pinning(self): r = requests.get(f"{BASE_URL}/pinning") From 326ad8d9eaa10250b8b9b9655a86f0ad1a455489 Mon Sep 17 00:00:00 2001 From: Robin Koumis Date: Wed, 6 Mar 2024 20:29:20 -0500 Subject: [PATCH 3/3] Monitor a background process - Monitor async python process spawned in background - Be able to detect if background process completed ok or errored - Accurately report failure status on execution failures - The agent used to report RUNNING when the process actually FAILED - Add base64 encoding capability to send_file - Detect and log errors that occur during send_file - Add DELETE capability to the HTTPRequestHandler - This gives us the option of adding DELETE methods in future - Allow json_success to have status codes - Allow json_error to accept kwargs, like json_success already does - More detailed error messages for certain kinds of failure - creating directory; storing file; extracting zip file --- agent/agent.py | 186 +++++++++++++++++++++++++++++++++++--------- agent/test_agent.py | 84 ++++++++++++++++++-- 2 files changed, 228 insertions(+), 42 deletions(-) diff --git a/agent/agent.py b/agent/agent.py index 9dd036924fd..2452bcf6cbe 100644 --- a/agent/agent.py +++ b/agent/agent.py @@ -3,6 +3,7 @@ # See the file 'docs/LICENSE' for copying permission. import argparse +import base64 import cgi import enum import http.server @@ -11,6 +12,7 @@ import multiprocessing import os import platform +import shlex import shutil import socket import socketserver @@ -31,7 +33,7 @@ if sys.version_info[:2] < (3, 6): sys.exit("You are running an incompatible version of Python, please use >= 3.6") -AGENT_VERSION = "0.14" +AGENT_VERSION = "0.15" AGENT_FEATURES = [ "execpy", "execute", @@ -69,6 +71,7 @@ def _missing_(cls, value): state = { "status": Status.INIT, "description": "", + "async_subprocess": None, } @@ -105,6 +108,28 @@ def do_POST(self): request.form[key] = value.value self.httpd.handle(self) + def do_DELETE(self): + environ = { + "REQUEST_METHOD": "DELETE", + "CONTENT_TYPE": self.headers.get("Content-Type"), + } + + form = cgi.FieldStorage(fp=self.rfile, headers=self.headers, environ=environ) + + request.client_ip, request.client_port = self.client_address + request.form = {} + request.files = {} + request.method = "DELETE" + + if form.list: + for key in form.keys(): + value = form[key] + if value.filename: + request.files[key] = value.file + else: + request.form[key] = value.value + self.httpd.handle(self) + class MiniHTTPServer: def __init__(self): @@ -116,6 +141,7 @@ def __init__(self): self.routes = { "GET": [], "POST": [], + "DELETE": [], } def run( @@ -163,7 +189,7 @@ def handle(self, obj): if isinstance(ret, jsonify): obj.wfile.write(ret.json().encode()) elif isinstance(ret, send_file): - ret.write(obj.wfile) + ret.write(obj, obj.wfile) if hasattr(self, "s") and self.s._BaseServer__shutdown_request: self.close_connection = True @@ -208,9 +234,10 @@ def headers(self, obj): class send_file: """Wrapper that represents Flask.send_file functionality.""" - def __init__(self, path): + def __init__(self, path, encoding): self.path = path self.status_code = 200 + self.encoding = encoding def init(self): if os.path.isfile(self.path) and os.access(self.path, os.R_OK): @@ -219,15 +246,20 @@ def init(self): self.status_code = 404 self.length = 0 - def write(self, sock): + def write(self, httplog, sock): if not self.length: return - with open(self.path, "rb") as f: - buf = f.read(1024 * 1024) - while buf: - sock.write(buf) + try: + with open(self.path, "rb") as f: buf = f.read(1024 * 1024) + while buf: + if self.encoding == "base64": + buf = base64.b64encode(buf) + sock.write(buf) + buf = f.read(1024 * 1024) + except Exception as ex: + httplog.log_error(f"Error reading file {self.path}: {ex}") def headers(self, obj): obj.send_header("Content-Length", self.length) @@ -262,8 +294,8 @@ def isAdmin(): return is_admin -def json_error(error_code: int, message: str) -> jsonify: - r = jsonify(message=message, error_code=error_code) +def json_error(error_code: int, message: str, **kwargs) -> jsonify: + r = jsonify(message=message, error_code=error_code, **kwargs) r.status_code = error_code return r @@ -274,8 +306,8 @@ def json_exception(message: str) -> jsonify: return r -def json_success(message: str, **kwargs) -> jsonify: - return jsonify(message=message, **kwargs) +def json_success(message: str, status_code=200, **kwargs) -> jsonify: + return jsonify(message=message, status_code=status_code, **kwargs) @app.route("/") @@ -284,8 +316,40 @@ def get_index(): return json_success("CAPE Agent!", version=AGENT_VERSION, features=AGENT_FEATURES, is_user_admin=bool(is_admin)) +def get_subprocess_status(): + """Return the subprocess status.""" + async_subprocess = state.get("async_subprocess") + message = "Analysis status" + exitcode = async_subprocess.exitcode + if exitcode is None or (sys.platform == "win32" and exitcode == 259): + # Process is still running. + state["status"] = Status.RUNNING + return json_success( + message=message, + status=str(state.get("status")), + description=state.get("description"), + process_id=async_subprocess.pid, + ) + # Process completed; reset async subprocess state. + state["async_subprocess"] = None + if exitcode == 0: + state["status"] = Status.COMPLETE + state["description"] = "" + else: + state["status"] = Status.FAILED + state["description"] = f"Exited with exit code {exitcode}" + return json_success( + message=message, + status=str(state.get("status")), + description=state.get("description"), + exitcode=exitcode, + ) + + @app.route("/status") def get_status(): + if state.get("async_subprocess") is not None: + return get_subprocess_status() return json_success("Analysis status", status=str(state.get("status")), description=state.get("description")) @@ -332,11 +396,12 @@ def do_mkdir(): if "dirpath" not in request.form: return json_error(400, "No dirpath has been provided") - mode = int(request.form.get("mode", 0o777)) - try: + mode = int(request.form.get("mode", 0o777)) + os.makedirs(request.form["dirpath"], mode=mode) - except Exception: + except Exception as ex: + print(f"error creating dir {ex}") return json_exception("Error creating directory") return json_success("Successfully created directory") @@ -383,8 +448,8 @@ def do_store(): try: with open(request.form["filepath"], "wb") as f: shutil.copyfileobj(request.files["file"], f, 10 * 1024 * 1024) - except Exception: - return json_exception("Error storing file") + except Exception as ex: + return json_exception(f"Error storing file: {ex}") return json_success("Successfully stored file") @@ -394,7 +459,7 @@ def do_retrieve(): if "filepath" not in request.form: return json_error(400, "No filepath has been provided") - return send_file(request.form["filepath"]) + return send_file(request.form["filepath"], request.form.get("encoding", "")) @app.route("/extract", methods=["POST"]) @@ -408,8 +473,8 @@ def do_extract(): try: with ZipFile(request.files["zipfile"], "r") as archive: archive.extractall(request.form["dirpath"]) - except Exception: - return json_exception("Error extracting zip file") + except Exception as ex: + return json_exception(f"Error extracting zip file {ex}") return json_success("Successfully extracted zip file") @@ -446,6 +511,7 @@ def do_execute(): if "command" not in request.form: return json_error(400, "No command has been provided") + command_to_execute = shlex.split(request.form["command"]) # only allow date command from localhost. Even this is just to # let it be tested @@ -462,20 +528,64 @@ def do_execute(): try: if async_exec: - subprocess.Popen(request.form["command"], shell=shell, cwd=cwd) + subprocess.Popen(command_to_execute, shell=shell, cwd=cwd) else: - p = subprocess.Popen(request.form["command"], shell=shell, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p = subprocess.Popen(command_to_execute, shell=shell, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout, stderr = p.communicate() - except Exception: + if request.form.get("encoding", "") == "base64": + stdout = base64.b64encode(stdout) + stderr = base64.b64encode(stderr) + except Exception as ex: state["status"] = Status.FAILED state["description"] = "Error execute command" - return json_exception("Error executing command") + return json_exception(f"Error executing command: {ex}") state["status"] = Status.RUNNING state["description"] = "" return json_success("Successfully executed command", stdout=stdout, stderr=stderr) +def run_subprocess(command_args, cwd, base64_encode, shell=False): + """Execute the subprocess, wait for completion. + + Return the exitcode (returncode), the stdout, and the stderr. + """ + p = subprocess.Popen( + args=command_args, + cwd=cwd, + shell=shell, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = p.communicate() + if base64_encode: + stdout = base64.b64encode(stdout) + stderr = base64.b64encode(stderr) + return p.returncode, stdout, stderr + + +def background_subprocess(command_args, cwd, base64_encode, shell=False): + """Run subprocess, wait for completion, then exit. + + This process must exit, so the parent process (agent) can find the exit status.""" + # TODO: return the stdout/stderr to the parent process. + returncode, stdout, stderr = run_subprocess(command_args, cwd, base64_encode, shell) + sys.stdout.write(stdout.decode("ascii")) + sys.stderr.write(stderr.decode("ascii")) + sys.exit(returncode) + + +def spawn(args, cwd, base64_encode, shell=False): + """Kick off a subprocess in the background.""" + run_subprocess_args = [args, cwd, base64_encode, shell] + proc = multiprocessing.Process(target=background_subprocess, name=f"child process {args[1]}", args=run_subprocess_args) + proc.start() + state["status"] = Status.RUNNING + state["description"] = "" + state["async_subprocess"] = proc + return json_success("Successfully spawned command", process_id=proc.pid) + + @app.route("/execpy", methods=["POST"]) def do_execpy(): if "filepath" not in request.form: @@ -483,28 +593,34 @@ def do_execpy(): # Execute the command asynchronously? As a shell command? async_exec = "async" in request.form + base64_encode = request.form.get("encoding", "") == "base64" cwd = request.form.get("cwd") - stdout = stderr = None args = ( sys.executable, request.form["filepath"], ) + if async_exec and state["status"] == Status.RUNNING and state["async_subprocess"]: + return json_error(400, "Async process already running.") try: if async_exec: - subprocess.Popen(args, cwd=cwd) - else: - p = subprocess.Popen(args, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = p.communicate() - except Exception: + return spawn(args, cwd, base64_encode) + exitcode, stdout, stderr = run_subprocess(args, cwd, base64_encode) + if exitcode == 0: + state["status"] = Status.COMPLETE + state["description"] = "" + return json_success("Successfully executed command", stdout=stdout, stderr=stderr) + # Process exited with non-zero result. state["status"] = Status.FAILED - state["description"] = "Error executing command" - return json_exception("Error executing command") - - state["status"] = Status.RUNNING - return json_success("Successfully executed command", stdout=stdout, stderr=stderr) + message = "Error executing python command." + state["description"] = message + return json_error(400, message, stdout=stdout, stderr=stderr, exitcode=exitcode) + except Exception as ex: + state["status"] = Status.FAILED + state["description"] = "Error executing Python command" + return json_exception(f"Error executing Python command: {ex}") @app.route("/pinning") diff --git a/agent/test_agent.py b/agent/test_agent.py index cff1711eeab..4179d767969 100644 --- a/agent/test_agent.py +++ b/agent/test_agent.py @@ -9,6 +9,7 @@ import shutil import sys import tempfile +import time import uuid import zipfile from urllib.parse import urljoin @@ -310,7 +311,7 @@ def test_store_invalid(self): upload_file = {"file": ("test_data.txt", "test data\ntest data\n")} form = {"filepath": os.path.join(DIRPATH, make_temp_name(), "tmp")} js = self.post_form("store", form, 500, files=upload_file) - assert js["message"] == "Error storing file" + assert js["message"].startswith("Error storing file") def test_retrieve(self): """Create a file, then try to retrieve it.""" @@ -404,6 +405,75 @@ def test_remove_system_temp_dir(self): js = self.post_form("remove", form, 500) assert js["message"] == "Error removing file or directory" + def test_async_running(self): + """Test async execution shows as running after starting.""" + # upload test python file + file_contents = ( + f"# Comment a random number {random.randint(1000, 9999)}'", + "import sys", + "import time", + "print('hello world')", + "print('goodbye world', file=sys.stderr)", + "time.sleep(1)", + "sys.exit(0)", + ) + filepath = self.store_file(file_contents) + form = {"filepath": filepath, "async": 1} + + js = self.post_form("execpy", form) + assert js["message"] == "Successfully spawned command" + assert "stdout" not in js + assert "stderr" not in js + assert "process_id" in js + _ = self.confirm_status(str(agent.Status.RUNNING)) + + def test_async_complete(self): + """Test async execution shows as complete after exiting.""" + # upload test python file + file_contents = ( + f"# Comment a random number {random.randint(1000, 9999)}'", + "import sys", + "print('hello world')", + "sys.exit(0)", + ) + filepath = self.store_file(file_contents) + form = {"filepath": filepath, "async": 1} + + js = self.post_form("execpy", form) + assert js["message"] == "Successfully spawned command" + # sleep a moment to let it finish + time.sleep(1) + _ = self.confirm_status(str(agent.Status.COMPLETE)) + + def test_async_failure(self): + """Test that an unsuccessful script gets a status of 'failed'.""" + # upload test python file. It will sleep, then try to import a nonexistent module. + file_contents = ( + f"# Comment a random number {random.randint(1000, 9999)}'", + "import sys", + "import time", + "time.sleep(1)", + "import nonexistent", + "print('hello world')", + "print('goodbye world', file=sys.stderr)", + "sys.exit(0)", + ) + + filepath = self.store_file(file_contents) + form = {"filepath": filepath, "async": 1} + + js = self.post_form("execpy", form) + assert js["message"] == "Successfully spawned command" + assert "stdout" not in js + assert "stderr" not in js + assert "process_id" in js + js = self.confirm_status(str(agent.Status.RUNNING)) + assert "process_id" in js + time.sleep(2) + + js = self.confirm_status(str(agent.Status.FAILED)) + assert "process_id" not in js + def test_execute(self): """Test executing the 'date' command.""" if sys.platform == "win32": @@ -453,10 +523,10 @@ def test_execute_py_error_nonexistent_file(self): """Ensure we get a 400 back when a nonexistent filename is provided.""" filepath = os.path.join(DIRPATH, make_temp_name() + ".py") form = {"filepath": filepath} - js = self.post_form("execpy", form, expected_status=200) - assert js["message"] == "Successfully executed command" + js = self.post_form("execpy", form, expected_status=400) + assert js["message"] == "Error executing python command." assert "stderr" in js and "No such file or directory" in js["stderr"] - _ = self.confirm_status(str(agent.Status.RUNNING)) + _ = self.confirm_status(str(agent.Status.FAILED)) def test_execute_py_error_non_zero_exit_code(self): """Ensure we get a 400 back when there's a non-zero exit code.""" @@ -469,10 +539,10 @@ def test_execute_py_error_non_zero_exit_code(self): ) filepath = self.store_file(file_contents) form = {"filepath": filepath} - js = self.post_form("execpy", form, expected_status=200) - assert js["message"] == "Successfully executed command" + js = self.post_form("execpy", form, expected_status=400) + assert js["message"] == "Error executing python command." assert "hello world" in js["stdout"] - _ = self.confirm_status(str(agent.Status.RUNNING)) + _ = self.confirm_status(str(agent.Status.FAILED)) def test_pinning(self): r = requests.get(f"{BASE_URL}/pinning")