Skip to content

Commit

Permalink
Merge pull request #2131 from random-robbie/patch-1
Browse files Browse the repository at this point in the history
Update generic_ssrf.py
  • Loading branch information
liquidsec authored Jan 9, 2025
2 parents b3e443b + d16f7d2 commit 4a42f93
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 40 deletions.
93 changes: 54 additions & 39 deletions bbot/modules/generic_ssrf.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
class BaseSubmodule:
technique_description = "base technique description"
severity = "INFO"
paths = None
paths = []

def __init__(self, parent_module):
self.parent_module = parent_module
def __init__(self, generic_ssrf):
self.generic_ssrf = generic_ssrf
self.test_paths = self.create_paths()

def set_base_url(self, event):
Expand All @@ -51,30 +51,30 @@ def create_paths(self):

async def test(self, event):
base_url = self.set_base_url(event)

for test_path in self.test_paths:
subdomain_tag = self.parent_module.helpers.rand_string(4)
test_path_prepared = test_path.replace(
"SSRF_CANARY", f"{subdomain_tag}.{self.parent_module.interactsh_domain}"
)
test_url = f"{base_url}{test_path_prepared}"
self.parent_module.debug(f"Sending request to URL: {test_url}")
r = await self.parent_module.helpers.curl(url=test_url)
if r:
self.process(event, r, subdomain_tag)
for test_path_result in self.test_paths:
for lower in [True, False]:
test_path = test_path_result[0]
if lower:
test_path = test_path.lower()
subdomain_tag = test_path_result[1]
test_url = f"{base_url}{test_path}"
self.generic_ssrf.debug(f"Sending request to URL: {test_url}")
r = await self.generic_ssrf.helpers.curl(url=test_url)
if r:
self.process(event, r, subdomain_tag)

def process(self, event, r, subdomain_tag):
response_token = self.parent_module.interactsh_domain.split(".")[0][::-1]
response_token = self.generic_ssrf.interactsh_domain.split(".")[0][::-1]
if response_token in r:
read_response = True
echoed_response = True
else:
read_response = False
echoed_response = False

self.parent_module.interactsh_subdomain_tags[subdomain_tag] = (
self.generic_ssrf.interactsh_subdomain_tags[subdomain_tag] = (
event,
self.technique_description,
self.severity,
read_response,
echoed_response,
)


Expand All @@ -86,15 +86,15 @@ def set_base_url(self, event):
return event.data

def create_paths(self):
query_string = ""
for param in ssrf_params:
query_string += f"{param}=http://SSRF_CANARY&"

query_string_lower = ""
test_paths = []
for param in ssrf_params:
query_string_lower += f"{param.lower()}=http://SSRF_CANARY&"

return [f"?{query_string.rstrip('&')}", f"?{query_string_lower.rstrip('&')}"]
query_string = ""
subdomain_tag = self.generic_ssrf.helpers.rand_string(4)
ssrf_canary = f"{subdomain_tag}.{self.generic_ssrf.interactsh_domain}"
self.generic_ssrf.parameter_subdomain_tags_map[subdomain_tag] = param
query_string += f"{param}=http://{ssrf_canary}&"
test_paths.append((f"?{query_string.rstrip('&')}",subdomain_tag))
return test_paths


class Generic_SSRF_POST(BaseSubmodule):
Expand All @@ -107,21 +107,23 @@ def set_base_url(self, event):
async def test(self, event):
test_url = f"{event.data}"

subdomain_tag = self.parent_module.helpers.rand_string(4, digits=False)

post_data = {}
for param in ssrf_params:
post_data[param] = f"http://{subdomain_tag}.{self.parent_module.interactsh_domain}"
subdomain_tag = self.generic_ssrf.helpers.rand_string(4, digits=False)
self.generic_ssrf.parameter_subdomain_tags_map[subdomain_tag] = param
post_data[param] = f"http://{subdomain_tag}.{self.generic_ssrf.interactsh_domain}"

subdomain_tag_lower = self.parent_module.helpers.rand_string(4, digits=False)
subdomain_tag_lower = self.generic_ssrf.helpers.rand_string(4, digits=False)
post_data_lower = {
k.lower(): f"http://{subdomain_tag_lower}.{self.parent_module.interactsh_domain}"
k.lower(): f"http://{subdomain_tag_lower}.{self.generic_ssrf.interactsh_domain}"
for k, v in post_data.items()
}

post_data_list = [(subdomain_tag, post_data), (subdomain_tag_lower, post_data_lower)]

for tag, pd in post_data_list:
r = await self.parent_module.helpers.curl(url=test_url, method="POST", post_data=pd)
r = await self.generic_ssrf.helpers.curl(url=test_url, method="POST", post_data=pd)
self.process(event, r, tag)


Expand All @@ -131,17 +133,17 @@ class Generic_XXE(BaseSubmodule):
paths = None

async def test(self, event):
rand_entity = self.parent_module.helpers.rand_string(4, digits=False)
subdomain_tag = self.parent_module.helpers.rand_string(4, digits=False)
rand_entity = self.generic_ssrf.helpers.rand_string(4, digits=False)
subdomain_tag = self.generic_ssrf.helpers.rand_string(4, digits=False)

post_body = f"""<?xml version="1.0" encoding="ISO-8859-1"?>
<!DOCTYPE foo [
<!ELEMENT foo ANY >
<!ENTITY {rand_entity} SYSTEM "http://{subdomain_tag}.{self.parent_module.interactsh_domain}" >
<!ENTITY {rand_entity} SYSTEM "http://{subdomain_tag}.{self.generic_ssrf.interactsh_domain}" >
]>
<foo>&{rand_entity};</foo>"""
test_url = event.parsed_url.geturl()
r = await self.parent_module.helpers.curl(
r = await self.generic_ssrf.helpers.curl(
url=test_url, method="POST", raw_body=post_body, headers={"Content-type": "application/xml"}
)
if r:
Expand All @@ -160,6 +162,7 @@ class generic_ssrf(BaseModule):
async def setup(self):
self.submodules = {}
self.interactsh_subdomain_tags = {}
self.parameter_subdomain_tags_map = {}
self.severity = None
self.generic_only = self.config.get("generic_only", False)

Expand Down Expand Up @@ -190,22 +193,34 @@ async def handle_event(self, event):

async def interactsh_callback(self, r):
full_id = r.get("full-id", None)
subdomain_tag = full_id.split(".")[0]

if full_id:
if "." in full_id:
match = self.interactsh_subdomain_tags.get(full_id.split(".")[0])
match = self.interactsh_subdomain_tags.get(subdomain_tag)
if not match:
return
matched_event = match[0]
matched_technique = match[1]
matched_severity = match[2]
matched_read_response = str(match[3])
matched_echoed_response = str(match[3])

# Check if any SSRF parameter is in the DNS request
triggering_param = self.parameter_subdomain_tags_map.get(subdomain_tag, None)
description = f"Out-of-band interaction: [{matched_technique}]"
if triggering_param:
self.debug(f"Found triggering parameter: {triggering_param}")
description += f" [Triggering Parameter: {triggering_param}]"
description += f" [{r.get('protocol').upper()}] Echoed Response: {matched_echoed_response}"

self.debug(f"Emitting event with description: {description}") # Debug the final description

await self.emit_event(
{
"severity": matched_severity,
"host": str(matched_event.host),
"url": matched_event.data,
"description": f"Out-of-band interaction: [{matched_technique}] [{r.get('protocol').upper()}] Read Response: {matched_read_response}",
"description": description,
},
"VULNERABILITY",
matched_event,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def setup_after_prep(self, module_test):

def check(self, module_test, events):
assert any(
e.type == "VULNERABILITY" and "Out-of-band interaction: [Generic SSRF (GET)]" in e.data["description"]
e.type == "VULNERABILITY" and "Out-of-band interaction: [Generic SSRF (GET)]" and "[Triggering Parameter: Dest]" in e.data["description"]
for e in events
), "Failed to detect Generic SSRF (GET)"
assert any(
Expand Down

0 comments on commit 4a42f93

Please sign in to comment.