Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update oobtkube to test leaf keys in arrays #276

Merged
merged 3 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e-tests/manifests/task-controller-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ spec:
- -c
- |
while true; do
sleep 2
sleep 1
sh -c "$(oc get task/vulnerable -o=jsonpath='{.spec.description}')"
done
image: registry.redhat.io/openshift4/ose-cli:latest
Expand Down
119 changes: 76 additions & 43 deletions scanners/generic/tools/oobtkube.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#
######################################
import argparse
import copy
import json
import logging
import os
Expand All @@ -35,8 +36,14 @@
import socket
import subprocess
import sys
import tempfile
import threading
import time
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Union

import yaml

Expand Down Expand Up @@ -110,50 +117,82 @@ def get_sarif_output(shared_queue):
return sarif_conv.convert_to_sarif_json(result_message, artifact_url, snippet)


def count_total_leaf_keys(data):
count = 0
key_list = []
for key, value in data.items():
if isinstance(value, dict):
count += count_total_leaf_keys(value)
else:
count += 1
key_list.append(key)
def test_payload(filename: str):
redirect = "&> /dev/null"
if logging.getLogger().isEnabledFor(logging.DEBUG):
# don't supress output when debug logging
redirect = ""
# if using 'apply' and a resource already exists, the command won't run as it returns as 'unchanged'
# therefore 'create' and 'replace' are used
kube_cmd = f"kubectl create -f {filename} {redirect} || kubectl replace -f {filename} {redirect}"

return count
logging.debug(f"Command run: {kube_cmd}")
exit_code = os.system(kube_cmd)
if exit_code == 0:
# if object create/update succeeds add a small delay to allow
# for a possible command injection to occur, before replacing
# the object again with another command injection attempt
time.sleep(1)


# pylint: disable=R0913
def find_leaf_keys_and_test(data, original_file, ipaddr, port, total_leaf_keys, processed_leaf_keys=0):
def find_leaf_keys_and_test(data: Dict, ipaddr: str, port: int) -> int:
"""
Iterate the spec data and test each parameter by modifying the value with the attack payload.
Iterate the object data and test each leaf key by modifying the value with the attack payload.
Test cases: appending 'curl' command, TBD
"""
tmp_file = "/tmp/oobtkube-test.yaml"
for key, value in data.items():
if isinstance(value, dict):
processed_leaf_keys = find_leaf_keys_and_test(
value, original_file, ipaddr, port, total_leaf_keys, processed_leaf_keys
)

def get_leaf_keys(obj: Union[Dict, List], path: Optional[List] = None) -> Generator[List[str], None, None]:
"""Collect all possible leaves in the k8s object"""
if isinstance(obj, dict):
items = obj.items()
elif isinstance(obj, list):
items = enumerate(obj)
else:
processed_leaf_keys += 1
logging.info(f"Testing a leaf key: '{key}', ({processed_leaf_keys} / {total_leaf_keys})")
cmd = f"sed 's/{key}:.*/{key}: \"echo oobt; curl {ipaddr}:{port}\\/{key}\"/g' {original_file} > {tmp_file}"
logging.debug(f"Command run: {cmd}")
os.system(cmd)
return

if path is None: # avoids W0102: Dangerous default value [] as argument (dangerous-default-value)
path = []

for key, value in items:
# skip modifying these top-level keys, we mostly want to test 'spec' data of k8s API objects
if path == [] and key in ("apiVersion", "kind", "metadata"):
continue

current_path = path + [key]

if isinstance(value, (dict, list)):
yield from get_leaf_keys(value, current_path)
else:
yield current_path

redirect = "&> /dev/null"
if logging.getLogger().isEnabledFor(logging.DEBUG):
# don't supress output when debug logging
redirect = ""
# if using 'apply' and a resource already exists, the command won't run as it returns as 'unchanged'
# therefore 'create' and 'replace' are used
kube_cmd = f"kubectl create -f {tmp_file} {redirect} || kubectl replace -f {tmp_file} {redirect}"
def modify_leaf_key(obj: Union[Dict, List], path: List, value: str) -> Union[Dict, List]:
"""Create a new object with a single modified value at the given path"""
new_obj = copy.deepcopy(obj)
current = new_obj

logging.debug(f"Command run: {kube_cmd}")
os.system(kube_cmd)
# Navigate to the parent of the target node
for key in path[:-1]:
current = current[key]

return processed_leaf_keys
current[path[-1]] = value

return new_obj

leaf_keys = list(get_leaf_keys(data))

# For each leaf key, create a new modified object with an injected payload
for i, path in enumerate(leaf_keys):
path_str = ".".join(str(p) for p in path)
logging.info(f"Testing leaf key ({i+1} / {len(leaf_keys)}): {path_str}")
# TODO test more kinds of payload variations
payload = f"echo oobt; curl {ipaddr}:{port}/{path_str}"
modified_data = modify_leaf_key(data, path, payload)

with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml") as tmp:
yaml.dump(modified_data, tmp)
sfowl marked this conversation as resolved.
Show resolved Hide resolved
test_payload(tmp.name)

return len(leaf_keys)


def parse_obj_data(filename: str) -> dict:
Expand All @@ -165,13 +204,6 @@ def parse_obj_data(filename: str) -> dict:
return {}


def scan_with_k8s_config(cfg_file_path: str, obj_data: dict, ipaddr: str, port: int):
spec_data = obj_data.get("spec", {})
total_leaf_keys = count_total_leaf_keys(spec_data)
# Apply Kubernetes config (e.g. CR for Operator, or Pod/resource for webhook)
find_leaf_keys_and_test(spec_data, cfg_file_path, ipaddr, port, total_leaf_keys)


def start_socket_listener(port, shared_queue, data_received, stop_event, duration):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
Expand All @@ -186,8 +218,8 @@ def start_socket_listener(port, shared_queue, data_received, stop_event, duratio

logging.info(f"Listening on port {port}")

client_socket = None
try:
client_socket = None
client_socket, client_address = server_socket.accept()
logging.info(f"Accepted connection from {client_address}")

Expand Down Expand Up @@ -373,9 +405,10 @@ def main():

# Record the start time for the main function
start_time_main = time.time()
elapsed_time_main = 0

# Run kubectl apply command
scan_with_k8s_config(args.filename, obj_data, args.ip_addr, args.port)
find_leaf_keys_and_test(obj_data, args.ip_addr, args.port)

# Check the overall duration periodically
vulnerability_count = 0
Expand Down
33 changes: 17 additions & 16 deletions tests/scanners/generic/tools/test_oobtkube.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,38 @@
def test_data():
# Sample nested dictionary data for testing
return {
"apiVersion": "v1",
"kind": "Foo",
"metadata": {"name": "foo"},
"root": {
"branch": {"leaf1": "value1", "spec": {"leaf2": "value2"}},
"leaf3": "value3",
}
"branch2": [{"leaf4": "value4"}, {"leaf5": "value5"}],
},
}


def test_count_total_leaf_keys(test_data):
# Test if the count_leaf_keys function returns the correct count of leaf keys
assert oobtkube.count_total_leaf_keys(test_data) == 3


@patch("scanners.generic.tools.oobtkube.os.system")
def test_find_leaf_keys_and_test(mock_system, test_data, caplog):
"""
Ensure all the leaves are navigated through
Ensure all the leaves are tested with the payload
"""

caplog.set_level(logging.INFO)

total_leaf_keys = oobtkube.count_total_leaf_keys(test_data)

oobtkube.find_leaf_keys_and_test(test_data, "cr_test_file", "10.10.10.10", "12345", total_leaf_keys)
oobtkube.find_leaf_keys_and_test(test_data, "10.10.10.10", 12345)

processed_count = 0
leaves = ["leaf1", "leaf2", "leaf3"]
for leaf_key in leaves:
processed_count += 1
assert f"Testing a leaf key: '{leaf_key}', ({processed_count} / {total_leaf_keys})" in caplog.text
leaves = [
"root.branch.leaf1",
"root.branch.spec.leaf2",
"root.leaf3",
"root.branch2.0.leaf4",
"root.branch2.1.leaf5",
]
for i, leaf_key in enumerate(leaves):
assert f"Testing leaf key ({i+1} / {len(leaves)}): {leaf_key}" in caplog.text

assert mock_system.call_count == 6 # Each leaf key runs `sed` and `kubectl` commands (2 calls per key)
assert mock_system.call_count == len(leaves) # Each leaf key runs a `kubectl` command


def test_parse_resource_yaml():
Expand Down
Loading