Skip to content

Commit

Permalink
Merge pull request #2140 from blacklanternsecurity/ajaxpro-vuln
Browse files Browse the repository at this point in the history
Add Ajaxpro Vulnerability Detection
  • Loading branch information
liquidsec authored Jan 9, 2025
2 parents cda250b + 98967f5 commit b3e443b
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 62 deletions.
101 changes: 64 additions & 37 deletions bbot/modules/ajaxpro.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import regex as re
from urllib.parse import urlparse
from bbot.modules.base import BaseModule


Expand All @@ -18,41 +19,67 @@ class ajaxpro(BaseModule):
}

async def handle_event(self, event):
if event.type == "URL":
if "dir" not in event.tags:
return False
for stem in ["ajax", "ajaxpro"]:
probe_url = f"{event.data}{stem}/whatever.ashx"
probe = await self.helpers.request(probe_url)
if probe:
if probe.status_code == 200:
probe_confirm = await self.helpers.request(f"{event.data}a/whatever.ashx")
if probe_confirm:
if probe_confirm.status_code != 200:
await self.emit_event(
{
"host": str(event.host),
"url": event.data,
"description": f"Ajaxpro Detected (Version Unconfirmed) Trigger: [{probe_url}]",
},
"FINDING",
event,
context="{module} discovered Ajaxpro instance ({event.type}) at {event.data}",
)

if event.type == "URL" and "dir" in event.tags:
await self.check_url_event(event)
elif event.type == "HTTP_RESPONSE":
resp_body = event.data.get("body", None)
if resp_body:
ajaxpro_regex_result = await self.helpers.re.search(self.ajaxpro_regex, resp_body)
if ajaxpro_regex_result:
ajax_pro_path = ajaxpro_regex_result.group(0)
await self.emit_event(
{
"host": str(event.host),
"url": event.data["url"],
"description": f"Ajaxpro Detected (Version Unconfirmed) Trigger: [{ajax_pro_path}]",
},
"FINDING",
event,
context="{module} discovered Ajaxpro instance ({event.type}) at {event.data}",
)
await self.check_http_response_event(event)

async def check_url_event(self, event):
for stem in ["ajax", "ajaxpro"]:
probe_url = f"{event.data}{stem}/whatever.ashx"
probe = await self.helpers.request(probe_url)
if probe and probe.status_code == 200:
confirm_url = f"{event.data}a/whatever.ashx"
confirm_probe = await self.helpers.request(confirm_url)
if confirm_probe and confirm_probe.status_code != 200:
await self.emit_technology(event, probe_url)
await self.confirm_exploitability(probe_url, event)

async def check_http_response_event(self, event):
resp_body = event.data.get("body")
if resp_body:
match = await self.helpers.re.search(self.ajaxpro_regex, resp_body)
if match:
ajaxpro_path = match.group(0)
await self.emit_technology(event, ajaxpro_path)
await self.confirm_exploitability(ajaxpro_path, event)

async def emit_technology(self, event, detection_url):
url = event.data if event.type == "URL" else event.data["url"]
await self.emit_event(
{
"host": str(event.host),
"url": url,
"technology": "ajaxpro",
},
"TECHNOLOGY",
event,
context=f"{self.meta['description']} discovered Ajaxpro instance ({event.type}) at {url} with trigger {detection_url}",
)

# Confirm exploitability of the detected Ajaxpro instance
async def confirm_exploitability(self, detection_url, event):
self.debug("Ajaxpro detected, attempting to confirm exploitability")
parsed_url = urlparse(detection_url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
path = parsed_url.path.rsplit('/', 1)[0]
full_url = f"{base_url}{path}/AjaxPro.Services.ICartService,AjaxPro.2.ashx"

# Payload and headers defined inline
payload = {}
headers = {"X-Ajaxpro-Method": "AddItem"}

probe_response = await self.helpers.request(full_url, method="POST", headers=headers, json=payload)
if probe_response:
if "AjaxPro.Services.ICartService" and "MissingMethodException" in probe_response.text:
await self.emit_event(
{
"host": str(event.host),
"severity": "CRITICAL",
"url": event.data if event.type == "URL" else event.data["url"],
"description": f"Ajaxpro Deserialization RCE (CVE-2021-23758) Trigger: [{full_url}]",
},
"VULNERABILITY",
event,
context=f"{self.meta['description']} discovered Ajaxpro instance ({event.type}) at {detection_url}",
)
70 changes: 45 additions & 25 deletions bbot/test/test_step_2/module_tests/test_module_ajaxpro.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
from .base import ModuleTestBase


class TestAjaxpro(ModuleTestBase):
targets = ["http://127.0.0.1:8888"]
modules_overrides = ["httpx", "ajaxpro"]

http_response_data = """
<script src="ajax/AMBusinessFacades.AjaxUtils,AMBusinessFacades.ashx" type="text/javascript"></script><script type='text/javascript'>$(document).ready(function(){if (!(top.hasTouchScreen || (top.home && top.home.hasTouchScreen))){$('#ctl01_userid').trigger('focus').trigger('select');}});</script>
<script type="text/javascript">
if(typeof AjaxPro != "undefined") AjaxPro.noUtcTime = true;
</script>
<script type="text/javascript" src="/AcmeTest/ajax/AMBusinessFacades.NotificationsAjax,AMBusinessFacades.ashx"></script>
<script type="text/javascript" src="/AcmeTest/ajax/AMBusinessFacades.ReportingAjax,AMBusinessFacades.ashx"></script>
<script type="text/javascript" src="/AcmeTest/ajax/AMBusinessFacades.UsersAjax,AMBusinessFacades.ashx"></script>
<script type="text/javascript" src="/AcmeTest/ajax/FAServerControls.FAPage,FAServerControls.ashx"></script>
exploit_headers = {
"X-Ajaxpro-Method": "AddItem", "Content-Type": "text/json; charset=UTF-8"
}
exploit_response = """
null; r.error = {"Message":"Constructor on type 'AjaxPro.Services.ICartService' not found.","Type":"System.MissingMethodException"};/*
"""

async def setup_before_prep(self, module_test):
Expand All @@ -28,29 +21,56 @@ async def setup_before_prep(self, module_test):
respond_args = {"status": 404}
module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args)

# Simulate HTTP_RESPONSE detection
expect_args = {"method": "GET", "uri": "/"}
respond_args = {"response_data": self.http_response_data}
respond_args = {"response_data": "alive"}
module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args)

# Simulate Vulnerability
expect_args = {"method": "POST", "uri": "/ajaxpro/AjaxPro.Services.ICartService,AjaxPro.2.ashx"}
respond_args = {"response_data": self.exploit_response}
module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args)

def check(self, module_test, events):
ajaxpro_url_detection = False
ajaxpro_httpresponse_detection = False
ajaxpro_exploit_detection = False

for e in events:
if (e.type == "VULNERABILITY" and "Ajaxpro Deserialization RCE (CVE-2021-23758)" in e.data["description"] and "http://127.0.0.1:8888/ajaxpro/AjaxPro.Services.ICartService,AjaxPro.2.ashx" in e.data["description"]):
ajaxpro_exploit_detection = True

if (
e.type == "FINDING"
and "Ajaxpro Detected (Version Unconfirmed) Trigger: [http://127.0.0.1:8888/ajaxpro/whatever.ashx]"
in e.data["description"]
e.type == "TECHNOLOGY" and e.data["technology"] == "ajaxpro"
):
ajaxpro_url_detection = True
continue

assert ajaxpro_url_detection, "Ajaxpro URL probe detection failed"
assert ajaxpro_exploit_detection, "Ajaxpro Exploit detection failed"


class TestAjaxpro_httpdetect(TestAjaxpro):
http_response_data = """
<script src="ajax/AMBusinessFacades.AjaxUtils,AMBusinessFacades.ashx" type="text/javascript"></script><script type='text/javascript'>$(document).ready(function(){if (!(top.hasTouchScreen || (top.home && top.home.hasTouchScreen))){$('#ctl01_userid').trigger('focus').trigger('select');}});</script>
<script type="text/javascript">
if(typeof AjaxPro != "undefined") AjaxPro.noUtcTime = true;
</script>
<script type="text/javascript" src="/AcmeTest/ajax/AMBusinessFacades.NotificationsAjax,AMBusinessFacades.ashx"></script>
<script type="text/javascript" src="/AcmeTest/ajax/AMBusinessFacades.ReportingAjax,AMBusinessFacades.ashx"></script>
<script type="text/javascript" src="/AcmeTest/ajax/AMBusinessFacades.UsersAjax,AMBusinessFacades.ashx"></script>
<script type="text/javascript" src="/AcmeTest/ajax/FAServerControls.FAPage,FAServerControls.ashx"></script>
"""

async def setup_before_prep(self, module_test):
# Simulate HTTP_RESPONSE detection
expect_args = {"method": "GET", "uri": "/"}
respond_args = {"response_data": self.http_response_data}
module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args)

def check(self, module_test, events):
ajaxpro_httpresponse_detection = False
for e in events:
if (
e.type == "FINDING"
and 'Ajaxpro Detected (Version Unconfirmed) Trigger: [<script src="ajax/AMBusinessFacades.AjaxUtils,AMBusinessFacades.ashx"]'
e.type == "TECHNOLOGY" and e.data["technology"] == "ajaxpro"
):
ajaxpro_httpresponse_detection = True
continue

assert ajaxpro_url_detection, "Ajaxpro URL probe detection failed"
assert ajaxpro_httpresponse_detection, "Ajaxpro HTTP_RESPONSE detection failed"
assert ajaxpro_httpresponse_detection, "Ajaxpro HTTP_RESPONSE detection failed"

0 comments on commit b3e443b

Please sign in to comment.