Skip to content
This repository has been archived by the owner on Apr 23, 2023. It is now read-only.

Commit

Permalink
Reject invalid XML files, refactored validation functions
Browse files Browse the repository at this point in the history
  • Loading branch information
jorritfolmer committed Jan 7, 2018
1 parent 7256a9e commit ae07bce
Showing 1 changed file with 107 additions and 84 deletions.
191 changes: 107 additions & 84 deletions bin/dmarc/dir2splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,61 +183,33 @@ def rua2kv(self, xmldata, valid=False):
% xmldata.findtext("report_metadata/report_id", default=""))
return result

def rua2json(self, xmldata, validation_result=False):
""" Returns a string in JSON format based on RUA XML input and its validation status,
def rua2json(self, xmldata, validation_result=[]):
""" Returns a string in JSON format based on RUA XML input and its validation results
with optionally resolved IP addresses. Resolved checks are validated somewhat.
"""
# Setup result dict structures
result = []
result_dict = OrderedDict()
feedback_list = []
feedback_list=[]
feedback_dict = {}
validation_dict = {}
required_elements = ["report_metadata", "policy_published", "record"]
missing_elements = []
feedback_dict["feedback"] = feedback_list

validation_dict= {}
if self.do_validate_xml:
minimal_xsd = validation_result.pop("rua_ta_dmarc_minimal_v01.xsd")
if minimal_xsd["result"] != "pass":
self.helper.log_warning("rua2json: report did not contain a feedback root element")
validation_dict["vendor_rua_xsd_validations"] = minimal_xsd
result.append(dumps(validation_dict))
return result
else:
print validation_result
validation_dict["vendor_rua_xsd_validations"] = validation_result
validation_dict["vendor_rua_xsd_validations"] = validation_result
else:
validation_dict["vendor_rua_xsd_validations"] = "None"
pass

# Add all elements, requiring certain elements are present
try:
version = xmldata.find('version')
feedback_list.append(yahoo.data(version))
except Exception:
self.helper.log_debug("rua2json: report did not contain a version XML element")
for required_element in required_elements:
# Get metadata elements from aggregate report
meta_elements = ["report_metadata", "policy_published", "version"]
for meta_element in meta_elements:
try:
element = xmldata.find(required_element)
element = xmldata.find(meta_element)
feedback_list.append(yahoo.data(element))
except Exception:
self.helper.log_warning("rua2json: report did not contain a required XML element, %s" % required_element)
missing_elements.append(required_element)

if len(missing_elements): # Return element error if any required elements are missing
# TODO validate missing element output
feedback_dict.clear() # this line could be omitted, and the result could include additional information
validation_dict["vendor_rua_missing_elements"] = missing_elements
result.append(dumps(validation_dict))
return result
else: # Otherwise, remove the 'record' before the loop through all records
feedback_list.pop()

# Validation checks complete, time to add all the records

self.helper.log_debug("rua2json: report did not contain metadata element, %s" % meta_element)
records = xmldata.findall("record")
self.helper.log_debug("rua2json: report_id %s has %d records"
% (xmldata.findtext("report_metadata/report_id", default=""), len(records)))
# Get individual records from aggregate report
for record in records:
data_ip = record.findtext('row/source_ip')
row_tag = record.find("row")
Expand All @@ -247,6 +219,7 @@ def rua2json(self, xmldata, validation_result=False):
resolve = socket.gethostbyaddr(data_ip)
backresolve = socket.gethostbyname_ex(resolve[0])
if data_ip == backresolve[2][0]:
# Add resolved ip to row
ip_resolution = etree.SubElement(row_tag, "ip_resolution")
ip_resolution.text = resolve[0]
except Exception:
Expand Down Expand Up @@ -381,52 +354,92 @@ def process_xmlfile(self, file):
return lines


def validate_xml(self, file):
def validate_xml_xsd(self, file, xsdfile):
""" Validate DMARC XML files against an XML schema definition file (xsd)
Returns a dict containing the XSD filename, result, and an optional informational string
Returns a dict containing the result (bool), and an optional informational string
"""
dmarc_path = os.path.dirname(__file__)
res = {}
info = {}
xsdfilelist = ["rua_ta_dmarc_minimal_v01.xsd",
"rua_ta_dmarc_relaxed_v01.xsd",
"rua_draft-dmarc-base-00-02.xsd",
"rua_rfc7489.xsd"]

for xsdfile in xsdfilelist:
xsdfile_long = os.path.join(dmarc_path, xsdfile)
xsdfile_long = os.path.join(dmarc_path, xsdfile)

# Read XML and XSD files
try:
xmldata = open(file, 'r').read()
xsddata = open(xsdfile_long, 'r').read()
except Exception as e:
self.helper.log_warning("validate_xml: xsd validation opening with %s" % str(e))
info["result"] = "fail"
info["info"] = "%s" % str(e)
return info
# Parse the XML and XSD
try:
xml = etree.XML(xmldata)
xsd = etree.XML(xsddata)
xmlschema = etree.XMLSchema(xsd)
except Exception as e:
self.helper.log_warning("validate_xml: xml parse error for file %s with %s" % (file, str(e)))
info["result"] = "fail"
info["info"] = "%s" % str(e)
return info
# Validate XML against XSD
try:
xmlschema.assertValid(xml)
except Exception as e:
self.helper.log_warning("validate_xml: xsd validation failed against %s for file %s with %s" % (xsdfile, file, str(e)))
info["result"] = "fail"
info["info"] = "%s" % str(e)
return info
else:
self.helper.log_debug("validate_xml: xsd validation successful against %s for file %s" % (xsdfile, file))
info["result"] = "pass";
return info


def is_valid_rua_xmlfile(self, file):
""" Perform some sanity checks on a RUA XML file:
- is it free from XML threats?
- does it look like an aggregate report?
Returns True if valid, False if invalid """
# To protect against various XML threats we use the parse function from defusedxml.ElementTree
with open(file, 'r') as f:
self.helper.log_debug("is_valid_rua_xmlfile: start parsing xml file %s" % file)
try:
xmldata = open(file, 'r').read()
xsddata = open(xsdfile_long, 'r').read()
except Exception as e:
self.helper.log_warning("validate_xml: xsd validation opening with %s" % str(e))
info["result"] = "file error"
info["info"] = "%s" % str(e)
res[xsdfile] = info.copy()
continue
try:
xml = etree.XML(xmldata)
xsd = etree.XML(xsddata)
xmlschema = etree.XMLSchema(xsd)
except Exception as e:
self.helper.log_warning("validate_xml: xml parse error for file %s with %s" % (file, str(e)))
info["result"] = "parse error"
info["info"] = "%s" % str(e)
res[xsdfile] = info.copy()
continue
try:
xmlschema.assertValid(xml)
xmldata = defuse_parse(f)
except Exception as e:
self.helper.log_warning("validate_xml: xsd validation failed against %s for file %s with %s" % (xsdfile, file, str(e)))
info["result"] = "fail"
info["info"] = "%s" % str(e)
res[xsdfile] = info.copy()
continue
else:
self.helper.log_debug("validate_xml: xsd validation successful against %s for file %s" % (xsdfile, file))
info["result"] = "pass"
res[xsdfile] = info.copy()
self.helper.log_warning("is_valid_rua_xmlfile: XML parse error in file %s with exception %s"
% (file, e))
f.close()
return False
# Does it look like an aggregate report?
res = self.validate_xml_xsd(file, "rua_ta_dmarc_minimal_v01.xsd")
if res["result"] != "pass":
return False
# Does it have the necessary elements?
required_elements = ["report_metadata", "policy_published", "record"]
for required_element in required_elements:
try:
element = xmldata.find(required_element)
except Exception:
self.helper.log_warning("is_valid_rua_xmlfile: report did not contain a required XML element, %s" % required_element)
return False
return True
return False


def validate_xml(self, file):
""" Mail XML validation function for DMARC XML files
Returns a dict containing the validations result (bool), and an optional informational string
"""
# Validate XML against various RUA XSDs
xsdfilelist = [ "rua_ta_dmarc_relaxed_v01.xsd",
"rua_draft-dmarc-base-00-02.xsd",
"rua_rfc7489.xsd"]
res = {}
for xsdfile in xsdfilelist:
tmp = self.validate_xml_xsd(file, xsdfile)
res[xsdfile] = tmp.copy()
# Placeholder for future string checks
return res


Expand All @@ -439,6 +452,7 @@ def check_dir(self):
else:
return True


def write_event(self, lines):
try:
for line in lines:
Expand All @@ -464,17 +478,26 @@ def process_incoming(self):
if ext == ".zip":
self.helper.log_info("Start processing zip file %s" % file)
for xmlfile in self.process_zipfile(file):
lines = self.process_xmlfile(xmlfile)
self.write_event(lines)
if self.is_valid_rua_xmlfile(xmlfile):
lines = self.process_xmlfile(xmlfile)
self.write_event(lines)
else:
self.helper.log_debug("process_incoming: ignoring invalid xml file %s from %s" % (xmlfile, file))
elif ext == ".gz":
self.helper.log_info("Start processing gz file %s" % file)
for xmlfile in self.process_gzfile(file):
lines = self.process_xmlfile(xmlfile)
self.write_event(lines)
if self.is_valid_rua_xmlfile(xmlfile):
lines = self.process_xmlfile(xmlfile)
self.write_event(lines)
else:
self.helper.log_debug("process_incoming: ignoring invalid xml file %s from %s" % (xmlfile, file))
elif ext == ".xml":
self.helper.log_info("Start processing xml file %s" % file)
lines = self.process_xmlfile(file)
self.write_event(lines)
if self.is_valid_rua_xmlfile(file):
lines = self.process_xmlfile(file)
self.write_event(lines)
else:
self.helper.log_debug("process_incoming: ignoring invalid xml file %s" % file)
else:
self.helper.log_debug("process_incoming: Ignoring file %s" % file)
if self.do_checkpoint:
Expand Down

0 comments on commit ae07bce

Please sign in to comment.