Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ajinabraham committed Dec 3, 2023
1 parent 287b18a commit ac97ae7
Show file tree
Hide file tree
Showing 15 changed files with 198 additions and 200 deletions.
47 changes: 0 additions & 47 deletions mobsf/DynamicAnalyzer/views/android/tests_frida.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# -*- coding: utf_8 -*-
"""Frida tests."""
import base64
import glob
import os
import re
import json
Expand All @@ -25,7 +24,6 @@
from mobsf.MobSF.utils import (
is_file_exists,
is_md5,
is_safe_path,
print_n_send_error_response,
)

Expand All @@ -34,24 +32,6 @@
# AJAX


@require_http_methods(['GET'])
def list_frida_scripts(request, api=False):
"""Get frida scripts from others."""
scripts = []
others = os.path.join(settings.TOOLS_DIR,
'frida_scripts',
'android'
'others')
files = glob.glob(others + '**/*.js', recursive=True)
for item in files:
scripts.append(Path(item).stem)
scripts.sort()
return send_response({'status': 'ok',
'files': scripts},
api)
# AJAX


@require_http_methods(['POST'])
def get_runtime_dependencies(request, api=False):
"""Get App runtime dependencies."""
Expand All @@ -75,33 +55,6 @@ def get_runtime_dependencies(request, api=False):
# AJAX


@require_http_methods(['POST'])
def get_script(request, api=False):
"""Get frida scripts from others."""
data = {'status': 'ok', 'content': ''}
try:
scripts = request.POST.getlist('scripts[]')
others = os.path.join(settings.TOOLS_DIR,
'frida_scripts',
'android'
'others')
script_ct = []
for script in scripts:
script_file = os.path.join(others, script + '.js')
if not is_safe_path(others, script_file):
data = {
'status': 'failed',
'message': 'Path traversal detected.'}
return send_response(data, api)
if is_file_exists(script_file):
script_ct.append(Path(script_file).read_text())
data['content'] = '\n'.join(script_ct)
except Exception:
pass
return send_response(data, api)
# AJAX


@require_http_methods(['POST'])
def instrument(request, api=False):
"""Instrument app with frida."""
Expand Down
67 changes: 67 additions & 0 deletions mobsf/DynamicAnalyzer/views/common/frida.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Shared Frida Views."""
import glob
import os
from pathlib import Path

from django.conf import settings
from django.views.decorators.http import require_http_methods

from mobsf.DynamicAnalyzer.views.common.shared import (
send_response,
)
from mobsf.MobSF.utils import (
is_file_exists,
is_safe_path,
)
# AJAX


@require_http_methods(['POST'])
def list_frida_scripts(request, api=False):
"""List frida scripts from others."""
scripts = []
device = request.POST.get('device', 'android')
if device != 'android':
device = 'ios'
others = os.path.join(settings.TOOLS_DIR,
'frida_scripts',
device,
'others')
files = glob.glob(others + '**/*.js', recursive=True)
for item in files:
scripts.append(Path(item).stem)
scripts.sort()
return send_response(
{'status': 'ok',
'files': scripts},
api)
# AJAX


@require_http_methods(['POST'])
def get_script(request, api=False):
"""Get frida scripts from others."""
data = {'status': 'ok', 'content': ''}
try:
device = request.POST.get('device', 'android')
if device != 'android':
device = 'ios'
scripts = request.POST.getlist('scripts[]')
others = os.path.join(settings.TOOLS_DIR,
'frida_scripts',
device,
'others')
script_ct = []
for script in scripts:
script_file = os.path.join(others, script + '.js')
if not is_safe_path(others, script_file):
data = {
'status': 'failed',
'message': 'Path traversal detected.'}
return send_response(data, api)
if is_file_exists(script_file):
script_ct.append(Path(script_file).read_text())

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a
user-provided value
.
data['content'] = '\n'.join(script_ct)
except Exception:
pass
return send_response(data, api)
3 changes: 3 additions & 0 deletions mobsf/DynamicAnalyzer/views/ios/corellium_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def api_ready(self):

def api_auth(self):
"""Check Corellium API Auth."""
if not self.api_key:
logger.error('Corellium API key is not set')
return False
r = requests.get(
f'{self.api}/projects',
headers=self.headers)
Expand Down
1 change: 0 additions & 1 deletion mobsf/DynamicAnalyzer/views/ios/corellium_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,6 @@ def ssh_execute(request, api=False):
except Exception as exp:
data['message'] = str(exp)
logger.exception('Executing Commands')
return send_response(data, api)
return send_response(data, api)
# Helper Download app data tarfile

Expand Down
80 changes: 39 additions & 41 deletions mobsf/DynamicAnalyzer/views/ios/corellium_ssh.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
# -*- coding: utf_8 -*-
"""Corellium SSH.
Corellium SSH over Jump Host withLocal Port Forwarding for Frida Connection.
Corellium SSH Utilities , modified for MobSF.
Supports SSH over Jump Host
Local Port Forward
Remote Port Forward
SSH Shell Exec
SFTP File Upload
SFTP File Download
"""
# Copyright (C) 2003-2007 Robey Pointer <[email protected]>
#
Expand Down Expand Up @@ -80,6 +86,30 @@ def parse_ssh_string(ssh):
return ssh_dict


def sock_chan_handler(sock, chan):
"""Socket and Channel Handler."""
try:
while True:
r, w, x = select.select([sock, chan], [], [])
if sock in r:
data = sock.recv(1024)
if len(data) == 0:
break
chan.send(data)
if chan in r:
data = chan.recv(1024)
if len(data) == 0:
break
sock.send(data)
except ConnectionResetError:
pass
finally:
if chan:
chan.close()
if sock:
sock.close()


# Local Port Forward
class ForwardServer(socketserver.ThreadingTCPServer):
daemon_threads = True
Expand All @@ -89,11 +119,12 @@ class ForwardServer(socketserver.ThreadingTCPServer):
class Handler(socketserver.BaseRequestHandler):
def handle(self):
chan = None
sock = self.request
try:
chan = self.ssh_transport.open_channel(
'direct-tcpip',
(self.chain_host, self.chain_port),
self.request.getpeername(),
sock.getpeername(),
)
except paramiko.SSHException:
# SSH tunnel closed, try opening again
Expand All @@ -111,27 +142,11 @@ def handle(self):

logger.info(
'Connected! Tunnel open %r -> %r -> %r',
self.request.getpeername(),
sock.getpeername(),
chan.getpeername(),
(self.chain_host, self.chain_port))
while True:
r, w, x = select.select([self.request, chan], [], [])
if self.request in r:
data = self.request.recv(1024)
if len(data) == 0:
break
chan.send(data)
if chan in r:
data = chan.recv(1024)
if len(data) == 0:
break
self.request.send(data)

peername = self.request.getpeername()
if chan:
chan.close()
if self.request:
self.request.close()
peername = sock.getpeername()
sock_chan_handler(sock, chan)
logger.info('Tunnel closed from %r', peername)


Expand Down Expand Up @@ -162,24 +177,7 @@ def handler(chan, host, port):
except Exception:
logger.info('Forwarding request to %s:%d failed', host, port)
return
try:
while True:
r, w, x = select.select([sock, chan], [], [])
if sock in r:
data = sock.recv(1024)
if len(data) == 0:
break
chan.send(data)
if chan in r:
data = chan.recv(1024)
if len(data) == 0:
break
sock.send(data)
except ConnectionResetError:
pass
finally:
chan.close()
sock.close()
sock_chan_handler(sock, chan)


def reverse_forward_tunnel(server_port, remote_host, remote_port, transport):
Expand Down Expand Up @@ -275,7 +273,7 @@ def ssh_execute_cmd(target, cmd):


def ssh_file_upload(ssh_conn_string, fobject, fname):
"""File Upload over SSH."""
"""File Upload over SFTP."""
target, jumpbox = ssh_jump_host(ssh_conn_string)
with target.open_sftp() as sftp:
rfile = Path(fname.replace('..', '')).name
Expand All @@ -285,7 +283,7 @@ def ssh_file_upload(ssh_conn_string, fobject, fname):


def ssh_file_download(ssh_conn_string, remote_path, local_path):
"""File Download over SSH."""
"""File Download over SFTP."""
target, jumpbox = ssh_jump_host(ssh_conn_string)
with target.open_sftp() as sftp:
sftp.get(remote_path, local_path)
Expand Down
16 changes: 9 additions & 7 deletions mobsf/DynamicAnalyzer/views/ios/dynamic_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,18 @@ def dynamic_analysis(request, api=False):
"""The iOS Dynamic Analysis Entry point."""
try:
scan_apps = []
ios_dynamic = False
ipas = StaticAnalyzerIOS.objects.filter(
FILE_NAME__endswith='.ipa')
for ipa in reversed(ipas):
bundle_hash = get_md5(ipa.BUNDLE_ID.encode('utf-8'))
frida_dump = Path(
settings.UPLD_DIR) / bundle_hash / 'mobsf_dump_file.txt'
encrypted = python_dict(
ipa.MACHO_ANALYSIS)['encrypted']['is_encrypted']
macho = python_dict(ipa.MACHO_ANALYSIS)
encrypted = False
if (macho
and macho.get('encrypted')
and macho.get('encrypted').get('is_encrypted')):
encrypted = macho['encrypted']['is_encrypted']
temp_dict = {
'MD5': ipa.MD5,
'APP_NAME': ipa.APP_NAME,
Expand All @@ -63,16 +66,15 @@ def dynamic_analysis(request, api=False):
# Corellium
instances = []
project_id = None
ios_dynamic = bool(getattr(settings, 'CORELLIUM_API_KEY', ''))
c = CorelliumAPI(getattr(settings, 'CORELLIUM_PROJECT_ID', ''))
if c.api_ready() and c.api_auth() and c.get_projects():
corellium_auth = c.api_ready() and c.api_auth()
if corellium_auth and c.get_projects():
instances = c.get_instances()
project_id = c.project_id
setup_ssh_keys(c)
context = {'apps': scan_apps,
'dynamic_analyzer': ios_dynamic,
'dynamic_analyzer': corellium_auth,
'project_id': project_id,
'corellium_auth': c.api_auth(),
'instances': instances,
'title': 'MobSF Dynamic Analysis',
'version': settings.MOBSF_VER}
Expand Down
21 changes: 8 additions & 13 deletions mobsf/DynamicAnalyzer/views/ios/frida_auxiliary_scripts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
from pathlib import Path

from django.conf import settings

Expand All @@ -8,18 +8,13 @@


def get_content(file_name):
content = ''
script = os.path.join(settings.TOOLS_DIR,
'frida_scripts'
'ios',
'auxiliary',
file_name)

with open(script, 'r',
encoding='utf8',
errors='ignore') as scp:
content = scp.read()
return content
tools_dir = Path(settings.TOOLS_DIR)
aux_dir = tools_dir / 'frida_scripts' / 'ios' / 'auxiliary'
script = aux_dir / file_name

if script.exists():
return script.read_text('utf-8', 'ignore')
return ''


def get_loaded_classes():
Expand Down
Loading

0 comments on commit ac97ae7

Please sign in to comment.