From a3373e19fbfea954dedfd621bbe2e26461dc3b21 Mon Sep 17 00:00:00 2001 From: Sam Date: Tue, 7 Jan 2025 14:03:01 +1000 Subject: [PATCH] fixup --- scanners/generic/tools/oobtkube.py | 19 +++++++++++++------ scanners/zap/zap.py | 2 +- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/scanners/generic/tools/oobtkube.py b/scanners/generic/tools/oobtkube.py index 140709ca..e2835afd 100644 --- a/scanners/generic/tools/oobtkube.py +++ b/scanners/generic/tools/oobtkube.py @@ -39,7 +39,11 @@ import tempfile import threading import time -import typing +from typing import Dict +from typing import Generator +from typing import List +from typing import Optional +from typing import Union import yaml @@ -123,21 +127,21 @@ def test_payload(filename: str): kube_cmd = f"kubectl create -f {filename} {redirect} || kubectl replace -f {filename} {redirect}" logging.debug(f"Command run: {kube_cmd}") - rv = os.system(kube_cmd) - if rv == 0: + exit_code = os.system(kube_cmd) + if exit_code == 0: # if object create/update succeeds add a small delay to allow # for a possible command injection to occur, before replacing # the object again with another command injection attempt time.sleep(1) -def find_leaf_keys_and_test(data: typing.Dict, ipaddr: str, port: int) -> int: +def find_leaf_keys_and_test(data: Dict, ipaddr: str, port: int) -> int: """ Iterate the object data and test each leaf key by modifying the value with the attack payload. Test cases: appending 'curl' command, TBD """ - def get_leaf_keys(obj: typing.Any, path: typing.List = []): + def get_leaf_keys(obj: Union[Dict, List], path: Optional[List] = None) -> Generator[List[str]]: """Collect all possible leaves in the k8s object""" if isinstance(obj, dict): items = obj.items() @@ -146,6 +150,9 @@ def get_leaf_keys(obj: typing.Any, path: typing.List = []): else: return + if path is None: # avoids W0102: Dangerous default value [] as argument (dangerous-default-value) + path = [] + for key, value in items: # skip modifying these top-level keys, we mostly want to test 'spec' data of k8s API objects if path == [] and key in ("apiVersion", "kind", "metadata"): @@ -158,7 +165,7 @@ def get_leaf_keys(obj: typing.Any, path: typing.List = []): else: yield current_path - def modify_leaf_key(obj: typing.Any, path: typing.List, value: str): + def modify_leaf_key(obj: Union[Dict, List], path: List, value: str) -> Union[Dict, List]: """Create a new object with a single modified value at the given path""" new_obj = copy.deepcopy(obj) current = new_obj diff --git a/scanners/zap/zap.py b/scanners/zap/zap.py index 507c55ed..1a7a099c 100644 --- a/scanners/zap/zap.py +++ b/scanners/zap/zap.py @@ -789,7 +789,7 @@ def _enforce_job_parameters(self, job): @generic_authentication_factory() def authentication_factory(self): """This is the default function, attached to error reporting""" - raise NotImplementedError(f"No valid authenticator found for ZAP. ZAP current config is: {self.config}") + raise RuntimeError(f"No valid authenticator found for ZAP. ZAP current config is: {self.config}") @authentication_factory.register(None) def authentication_set_anonymous(self):