diff --git a/bbot/modules/generic_ssrf.py b/bbot/modules/generic_ssrf.py index f486c7d97..860b71968 100644 --- a/bbot/modules/generic_ssrf.py +++ b/bbot/modules/generic_ssrf.py @@ -37,10 +37,10 @@ class BaseSubmodule: technique_description = "base technique description" severity = "INFO" - paths = None + paths = [] - def __init__(self, parent_module): - self.parent_module = parent_module + def __init__(self, generic_ssrf): + self.generic_ssrf = generic_ssrf self.test_paths = self.create_paths() def set_base_url(self, event): @@ -51,30 +51,30 @@ def create_paths(self): async def test(self, event): base_url = self.set_base_url(event) - - for test_path in self.test_paths: - subdomain_tag = self.parent_module.helpers.rand_string(4) - test_path_prepared = test_path.replace( - "SSRF_CANARY", f"{subdomain_tag}.{self.parent_module.interactsh_domain}" - ) - test_url = f"{base_url}{test_path_prepared}" - self.parent_module.debug(f"Sending request to URL: {test_url}") - r = await self.parent_module.helpers.curl(url=test_url) - if r: - self.process(event, r, subdomain_tag) + for test_path_result in self.test_paths: + for lower in [True, False]: + test_path = test_path_result[0] + if lower: + test_path = test_path.lower() + subdomain_tag = test_path_result[1] + test_url = f"{base_url}{test_path}" + self.generic_ssrf.debug(f"Sending request to URL: {test_url}") + r = await self.generic_ssrf.helpers.curl(url=test_url) + if r: + self.process(event, r, subdomain_tag) def process(self, event, r, subdomain_tag): - response_token = self.parent_module.interactsh_domain.split(".")[0][::-1] + response_token = self.generic_ssrf.interactsh_domain.split(".")[0][::-1] if response_token in r: - read_response = True + echoed_response = True else: - read_response = False + echoed_response = False - self.parent_module.interactsh_subdomain_tags[subdomain_tag] = ( + self.generic_ssrf.interactsh_subdomain_tags[subdomain_tag] = ( event, self.technique_description, self.severity, - read_response, + echoed_response, ) @@ -86,15 +86,15 @@ def set_base_url(self, event): return event.data def create_paths(self): - query_string = "" - for param in ssrf_params: - query_string += f"{param}=http://SSRF_CANARY&" - - query_string_lower = "" + test_paths = [] for param in ssrf_params: - query_string_lower += f"{param.lower()}=http://SSRF_CANARY&" - - return [f"?{query_string.rstrip('&')}", f"?{query_string_lower.rstrip('&')}"] + query_string = "" + subdomain_tag = self.generic_ssrf.helpers.rand_string(4) + ssrf_canary = f"{subdomain_tag}.{self.generic_ssrf.interactsh_domain}" + self.generic_ssrf.parameter_subdomain_tags_map[subdomain_tag] = param + query_string += f"{param}=http://{ssrf_canary}&" + test_paths.append((f"?{query_string.rstrip('&')}",subdomain_tag)) + return test_paths class Generic_SSRF_POST(BaseSubmodule): @@ -107,21 +107,23 @@ def set_base_url(self, event): async def test(self, event): test_url = f"{event.data}" - subdomain_tag = self.parent_module.helpers.rand_string(4, digits=False) + post_data = {} for param in ssrf_params: - post_data[param] = f"http://{subdomain_tag}.{self.parent_module.interactsh_domain}" + subdomain_tag = self.generic_ssrf.helpers.rand_string(4, digits=False) + self.generic_ssrf.parameter_subdomain_tags_map[subdomain_tag] = param + post_data[param] = f"http://{subdomain_tag}.{self.generic_ssrf.interactsh_domain}" - subdomain_tag_lower = self.parent_module.helpers.rand_string(4, digits=False) + subdomain_tag_lower = self.generic_ssrf.helpers.rand_string(4, digits=False) post_data_lower = { - k.lower(): f"http://{subdomain_tag_lower}.{self.parent_module.interactsh_domain}" + k.lower(): f"http://{subdomain_tag_lower}.{self.generic_ssrf.interactsh_domain}" for k, v in post_data.items() } post_data_list = [(subdomain_tag, post_data), (subdomain_tag_lower, post_data_lower)] for tag, pd in post_data_list: - r = await self.parent_module.helpers.curl(url=test_url, method="POST", post_data=pd) + r = await self.generic_ssrf.helpers.curl(url=test_url, method="POST", post_data=pd) self.process(event, r, tag) @@ -131,17 +133,17 @@ class Generic_XXE(BaseSubmodule): paths = None async def test(self, event): - rand_entity = self.parent_module.helpers.rand_string(4, digits=False) - subdomain_tag = self.parent_module.helpers.rand_string(4, digits=False) + rand_entity = self.generic_ssrf.helpers.rand_string(4, digits=False) + subdomain_tag = self.generic_ssrf.helpers.rand_string(4, digits=False) post_body = f""" - + ]> &{rand_entity};""" test_url = event.parsed_url.geturl() - r = await self.parent_module.helpers.curl( + r = await self.generic_ssrf.helpers.curl( url=test_url, method="POST", raw_body=post_body, headers={"Content-type": "application/xml"} ) if r: @@ -160,6 +162,7 @@ class generic_ssrf(BaseModule): async def setup(self): self.submodules = {} self.interactsh_subdomain_tags = {} + self.parameter_subdomain_tags_map = {} self.severity = None self.generic_only = self.config.get("generic_only", False) @@ -190,22 +193,34 @@ async def handle_event(self, event): async def interactsh_callback(self, r): full_id = r.get("full-id", None) + subdomain_tag = full_id.split(".")[0] + if full_id: if "." in full_id: - match = self.interactsh_subdomain_tags.get(full_id.split(".")[0]) + match = self.interactsh_subdomain_tags.get(subdomain_tag) if not match: return matched_event = match[0] matched_technique = match[1] matched_severity = match[2] - matched_read_response = str(match[3]) + matched_echoed_response = str(match[3]) + + # Check if any SSRF parameter is in the DNS request + triggering_param = self.parameter_subdomain_tags_map.get(subdomain_tag, None) + description = f"Out-of-band interaction: [{matched_technique}]" + if triggering_param: + self.debug(f"Found triggering parameter: {triggering_param}") + description += f" [Triggering Parameter: {triggering_param}]" + description += f" [{r.get('protocol').upper()}] Echoed Response: {matched_echoed_response}" + + self.debug(f"Emitting event with description: {description}") # Debug the final description await self.emit_event( { "severity": matched_severity, "host": str(matched_event.host), "url": matched_event.data, - "description": f"Out-of-band interaction: [{matched_technique}] [{r.get('protocol').upper()}] Read Response: {matched_read_response}", + "description": description, }, "VULNERABILITY", matched_event, diff --git a/bbot/test/test_step_2/module_tests/test_module_generic_ssrf.py b/bbot/test/test_step_2/module_tests/test_module_generic_ssrf.py index e183ab7b5..d7a30dac1 100644 --- a/bbot/test/test_step_2/module_tests/test_module_generic_ssrf.py +++ b/bbot/test/test_step_2/module_tests/test_module_generic_ssrf.py @@ -42,7 +42,7 @@ async def setup_after_prep(self, module_test): def check(self, module_test, events): assert any( - e.type == "VULNERABILITY" and "Out-of-band interaction: [Generic SSRF (GET)]" in e.data["description"] + e.type == "VULNERABILITY" and "Out-of-band interaction: [Generic SSRF (GET)]" and "[Triggering Parameter: Dest]" in e.data["description"] for e in events ), "Failed to detect Generic SSRF (GET)" assert any(