import xml.etree.ElementTree as ET
import xml.dom.minidom
import xml.sax
from flask import Flask, request
@app.route('/parse_xml', methods=['POST'])
def parse_xml():
# Vulnerable: Standard XML parsing without XXE protection
xml_data = request.get_data()
root = ET.fromstring(xml_data) # XXE vulnerable
result = {}
for child in root:
result[child.tag] = child.text
return result
@app.route('/parse_dom', methods=['POST'])
def parse_dom():
# Vulnerable: DOM parsing with XXE risk
xml_data = request.get_data(as_text=True)
dom = xml.dom.minidom.parseString(xml_data) # XXE vulnerable
elements = dom.getElementsByTagName('item')
items = [elem.firstChild.nodeValue for elem in elements]
return {'items': items}
@app.route('/parse_file', methods=['POST'])
def parse_file():
# Vulnerable: File-based XML parsing
file = request.files['xml_file']
tree = ET.parse(file) # XXE vulnerable
root = tree.getroot()
return {'root_tag': root.tag}
from defusedxml import ElementTree as DefusedET
from defusedxml import minidom as DefusedMinidom
from defusedxml.common import DefusedXmlException
from flask import Flask, request
import xml.etree.ElementTree as ET
# Configure safe XML parsing
def create_safe_parser():
"""Create XML parser with security features disabled."""
parser = ET.XMLParser()
# Disable dangerous features
parser.parser.DefaultHandler = None
parser.parser.ExternalEntityRefHandler = None
parser.parser.EntityDeclHandler = None
return parser
def validate_xml_structure(root, allowed_tags=None):
"""Validate XML structure against allowlist."""
if allowed_tags is None:
allowed_tags = ['root', 'item', 'name', 'value', 'description']
def check_element(element):
if element.tag not in allowed_tags:
raise ValueError(f'Tag "{element.tag}" not allowed')
# Check attributes
for attr in element.attrib:
if attr not in ['id', 'type', 'name']:
raise ValueError(f'Attribute "{attr}" not allowed')
# Recursively check children
for child in element:
check_element(child)
check_element(root)
@app.route('/parse_xml', methods=['POST'])
def parse_xml():
"""Secure XML parsing using defusedxml."""
xml_data = request.get_data()
# Limit XML size
if len(xml_data) > 100 * 1024: # 100KB limit
return {'error': 'XML data too large'}, 413
try:
# Safe: Use defusedxml to prevent XXE
root = DefusedET.fromstring(xml_data)
# Validate structure
validate_xml_structure(root)
# Process safely
result = {}
for child in root:
if child.text and len(child.text) <= 1000: # Limit text length
result[child.tag] = child.text[:1000]
return result
except DefusedXmlException as e:
return {'error': f'XML security violation: {str(e)}'}, 400
except ET.ParseError as e:
return {'error': f'XML parsing error: {str(e)}'}, 400
except ValueError as e:
return {'error': str(e)}, 400
@app.route('/parse_dom', methods=['POST'])
def parse_dom():
"""Secure DOM parsing with defusedxml."""
xml_data = request.get_data(as_text=True)
if len(xml_data) > 50 * 1024: # 50KB limit
return {'error': 'XML data too large'}, 413
try:
# Safe: Use defused minidom
dom = DefusedMinidom.parseString(xml_data)
# Extract data safely
elements = dom.getElementsByTagName('item')
items = []
for elem in elements[:100]: # Limit number of items
if elem.firstChild and elem.firstChild.nodeValue:
text = elem.firstChild.nodeValue[:500] # Limit text length
items.append(text)
return {'items': items}
except DefusedXmlException as e:
return {'error': f'XML security violation: {str(e)}'}, 400
except Exception as e:
return {'error': f'DOM parsing error: {str(e)}'}, 400
@app.route('/parse_file', methods=['POST'])
def parse_file():
"""Secure file-based XML parsing."""
if 'xml_file' not in request.files:
return {'error': 'No XML file provided'}, 400
file = request.files['xml_file']
# Validate file
if file.content_length and file.content_length > 1024 * 1024: # 1MB limit
return {'error': 'File too large'}, 413
# Check file extension
if not file.filename.lower().endswith('.xml'):
return {'error': 'Invalid file type'}, 400
try:
# Read file content with size limit
content = file.read(1024 * 1024) # 1MB max
# Safe: Use defusedxml
root = DefusedET.fromstring(content)
# Validate structure
validate_xml_structure(root)
# Extract safe information
info = {
'root_tag': root.tag,
'children_count': len(list(root)),
'attributes': dict(root.attrib) if root.attrib else {}
}
return info
except DefusedXmlException as e:
return {'error': f'XML security violation: {str(e)}'}, 400
except ET.ParseError as e:
return {'error': f'XML parsing error: {str(e)}'}, 400
except ValueError as e:
return {'error': str(e)}, 400
# Manual safe XML parsing (if defusedxml not available)
def manual_safe_xml_parse(xml_string):
"""Manually configure safe XML parsing."""
try:
# Create parser with security features disabled
parser = ET.XMLParser()
# Disable external entity processing
parser.parser.DefaultHandler = lambda data: None
parser.parser.ExternalEntityRefHandler = lambda *args: False
parser.parser.EntityDeclHandler = lambda *args: False
# Parse with safe parser
root = ET.fromstring(xml_string, parser=parser)
return root
except ET.ParseError as e:
raise ValueError(f'XML parsing failed: {str(e)}')
@app.route('/manual_safe_parse', methods=['POST'])
def manual_safe_parse():
"""Example of manual safe XML parsing."""
xml_data = request.get_data(as_text=True)
if len(xml_data) > 10 * 1024: # 10KB limit
return {'error': 'XML too large'}, 413
try:
root = manual_safe_xml_parse(xml_data)
validate_xml_structure(root)
return {
'tag': root.tag,
'text': root.text[:100] if root.text else None,
'children': len(list(root))
}
except ValueError as e:
return {'error': str(e)}, 400
# Alternative: Use JSON instead of XML
@app.route('/json_alternative', methods=['POST'])
def json_alternative():
"""Safer alternative using JSON instead of XML."""
try:
import json
data = request.get_json()
if not data:
return {'error': 'No JSON data provided'}, 400
# Validate JSON structure
if not isinstance(data, dict):
return {'error': 'Expected JSON object'}, 400
# Process JSON safely (no code execution risk)
allowed_fields = ['name', 'value', 'items', 'description']
result = {}
for key, value in data.items():
if key in allowed_fields:
if isinstance(value, (str, int, float, bool, list)):
result[key] = value
return {'processed': result}
except json.JSONDecodeError:
return {'error': 'Invalid JSON'}, 400
# XML Schema validation example
@app.route('/validate_xml_schema', methods=['POST'])
def validate_xml_schema():
"""Validate XML against schema for additional security."""
try:
from lxml import etree
xml_data = request.get_data()
# Define simple schema
schema_doc = etree.fromstring('''
''')
schema = etree.XMLSchema(schema_doc)
# Parse and validate
parser = etree.XMLParser(schema=schema, no_network=True)
doc = etree.fromstring(xml_data, parser)
return {'status': 'valid', 'root_tag': doc.tag}
except ImportError:
return {'error': 'lxml library not available'}, 500
except etree.XMLSyntaxError as e:
return {'error': f'XML validation failed: {str(e)}'}, 400