# Vulnerable: open() with user input in Flask
from flask import Flask, request, jsonify, send_file
import os
import json
app = Flask(__name__)
# Dangerous: Direct file reading with user input
@app.route('/read-file')
def read_file():
filename = request.args.get('filename', '')
# CRITICAL: User controls filename, can use ../
file_path = os.path.join('/app/data', filename)
try:
with open(file_path, 'r') as f:
content = f.read()
return jsonify({'content': content})
except FileNotFoundError:
return jsonify({'error': 'File not found'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
# Another dangerous pattern
@app.route('/user-files/')
def get_user_file(user_id):
filename = request.args.get('file', '')
# Dangerous: Multiple path components from user input
file_path = f'/app/users/{user_id}/{filename}'
try:
with open(file_path, 'r') as f:
data = f.read()
return jsonify({'data': data})
except Exception as e:
return jsonify({'error': str(e)}), 500
# Configuration file access
@app.route('/config')
def get_config():
config_file = request.args.get('config', 'app.conf')
# Dangerous: User-controlled config path
config_path = os.path.join('/etc/myapp', config_file)
try:
with open(config_path, 'r') as f:
config_data = f.read()
return jsonify({'config': config_data})
except Exception as e:
return jsonify({'error': str(e)})
# Template file reading
@app.route('/template')
def get_template():
template_name = request.args.get('template', '')
template_dir = request.args.get('dir', 'default')
# Dangerous: User controls template path
template_path = os.path.join('/app/templates', template_dir, template_name)
try:
with open(template_path, 'r') as f:
template_content = f.read()
return jsonify({'template': template_content})
except Exception as e:
return jsonify({'error': str(e)})
# Log file access
@app.route('/logs')
def get_logs():
log_file = request.args.get('log', '')
date = request.args.get('date', '')
# Dangerous: Path traversal in log access
if date:
log_path = f'/var/log/myapp/{date}/{log_file}'
else:
log_path = f'/var/log/myapp/{log_file}'
try:
with open(log_path, 'r') as f:
log_content = f.read()
return jsonify({'logs': log_content})
except Exception as e:
return jsonify({'error': str(e)})
# Data file processing
@app.route('/process-data', methods=['POST'])
def process_data_file():
data_file = request.json.get('data_file', '')
# Dangerous: User-controlled data file path
data_path = os.path.join('/app/data', data_file)
try:
with open(data_path, 'r') as f:
data = json.load(f)
# Process data
processed_data = process_json_data(data)
return jsonify(processed_data)
except Exception as e:
return jsonify({'error': str(e)})
# Document download
@app.route('/download/')
def download_document(doc_id):
doc_path = request.args.get('path', '')
# Dangerous: User controls document path
full_path = os.path.join('/app/documents', doc_path, f'{doc_id}.pdf')
try:
# Check if file exists before sending
with open(full_path, 'rb') as f:
# File exists, send it
pass
return send_file(full_path, as_attachment=True)
except FileNotFoundError:
return jsonify({'error': 'Document not found'}), 404
# Backup file reading
@app.route('/backup/restore')
def read_backup():
backup_name = request.args.get('backup', '')
backup_type = request.args.get('type', 'daily')
# Dangerous: User controls backup path
backup_path = f'/backups/{backup_type}/{backup_name}'
try:
with open(backup_path, 'r') as f:
backup_data = f.read()
return jsonify({'backup_data': backup_data})
except Exception as e:
return jsonify({'error': str(e)})
# Include file functionality
@app.route('/include')
def include_file():
include_file = request.args.get('file', '')
# Dangerous: User controls include path
include_path = os.path.join('/app/includes', include_file)
try:
with open(include_path, 'r') as f:
include_content = f.read()
# Execute or process the included content
return jsonify({'content': include_content})
except Exception as e:
return jsonify({'error': str(e)})
def process_json_data(data):
# Mock processing function
return {'processed': True, 'count': len(data) if isinstance(data, (list, dict)) else 0}
# Secure: Safe file handling in Flask
from flask import Flask, request, jsonify, send_file
from pathlib import Path
import os
import json
import re
from marshmallow import Schema, fields, ValidationError as MarshmallowValidationError
app = Flask(__name__)
# Safe: Configuration with allowed paths
app.config['SAFE_DATA_DIR'] = '/app/data'
app.config['SAFE_CONFIG_DIR'] = '/app/config'
app.config['SAFE_TEMPLATE_DIR'] = '/app/templates'
app.config['SAFE_LOG_DIR'] = '/app/logs'
# Input validation schemas
class FileRequestSchema(Schema):
filename = fields.Str(required=True, validate=lambda x: len(x) <= 100)
class UserFileSchema(Schema):
file = fields.Str(required=True, validate=lambda x: len(x) <= 50)
# Safe: File reading with validation
@app.route('/read-file')
def safe_read_file():
filename = request.args.get('filename', '')
try:
# Validate filename
validated_filename = validate_filename(filename)
# Get safe file path
safe_path = get_safe_file_path(validated_filename, app.config['SAFE_DATA_DIR'])
# Read file securely
content = read_file_safely(safe_path)
return jsonify({'content': content})
except ValueError as e:
return jsonify({'error': str(e)}), 400
except FileNotFoundError:
return jsonify({'error': 'File not found'}), 404
def validate_filename(filename):
if not filename:
raise ValueError('Filename is required')
if len(filename) > 100:
raise ValueError('Filename too long')
# Only allow safe characters
if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
raise ValueError('Filename contains invalid characters')
# Prevent hidden files and traversal
if filename.startswith('.') or '..' in filename:
raise ValueError('Invalid filename format')
# Check file extension
allowed_extensions = ['.txt', '.json', '.csv', '.md']
if not any(filename.lower().endswith(ext) for ext in allowed_extensions):
raise ValueError('File extension not allowed')
return filename
def get_safe_file_path(filename, base_dir):
# Use pathlib for safe path handling
base_path = Path(base_dir)
file_path = base_path / filename
# Resolve path and validate it's within base directory
try:
resolved_path = file_path.resolve()
base_path_resolved = base_path.resolve()
# Ensure file is within the base directory
resolved_path.relative_to(base_path_resolved)
return resolved_path
except ValueError:
raise ValueError('File path outside allowed directory')
def read_file_safely(file_path):
try:
if not file_path.exists():
raise FileNotFoundError('File not found')
# Check file size
max_size = 1024 * 1024 # 1MB
if file_path.stat().st_size > max_size:
raise ValueError('File too large')
# Read file
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
except UnicodeDecodeError:
raise ValueError('File encoding error')
except PermissionError:
raise ValueError('Access denied')
except Exception as e:
raise ValueError(f'File read error: {str(e)}')
# Safe: User file access with validation
@app.route('/user-files/')
def safe_get_user_file(user_id):
try:
# Validate user ID
validated_user_id = validate_user_id(user_id)
# Validate file parameter
schema = UserFileSchema()
data = schema.load(request.args)
filename = data['file']
# Check user access
if not check_user_access(validated_user_id, request):
return jsonify({'error': 'Access denied'}), 403
# Get safe user file path
user_file_path = get_safe_user_file_path(validated_user_id, filename)
# Read file
content = read_file_safely(user_file_path)
return jsonify({'data': content})
except (ValueError, MarshmallowValidationError) as e:
return jsonify({'error': str(e)}), 400
except FileNotFoundError:
return jsonify({'error': 'File not found'}), 404
def validate_user_id(user_id):
if not user_id or not user_id.isdigit():
raise ValueError('Invalid user ID format')
user_id_int = int(user_id)
if user_id_int <= 0 or user_id_int > 999999:
raise ValueError('User ID out of range')
return user_id_int
def check_user_access(user_id, request):
# Mock access check - would typically check authentication/authorization
# For now, just check if user is accessing their own files
current_user_id = getattr(request, 'user_id', None)
return current_user_id == user_id or getattr(request, 'is_admin', False)
def get_safe_user_file_path(user_id, filename):
# Validate filename again
validated_filename = validate_filename(filename)
# Define safe user directory
user_dir = Path(app.config['SAFE_DATA_DIR']) / 'users' / str(user_id)
return get_safe_file_path(validated_filename, str(user_dir))
# Safe: Configuration access with allowlist
@app.route('/config')
def safe_get_config():
config_name = request.args.get('config', '')
try:
# Validate config name with allowlist
validated_config = validate_config_name(config_name)
# Get config data safely
config_data = get_safe_config_data(validated_config)
return jsonify({'config': config_data})
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_config_name(config_name):
# Use allowlist for config files
allowed_configs = ['app', 'database', 'cache', 'logging']
if not config_name:
raise ValueError('Config name is required')
if config_name not in allowed_configs:
raise ValueError('Configuration not allowed')
return config_name
def get_safe_config_data(config_name):
# Construct safe config path
config_path = Path(app.config['SAFE_CONFIG_DIR']) / f'{config_name}.json'
try:
# Ensure path is within config directory
config_path.resolve().relative_to(Path(app.config['SAFE_CONFIG_DIR']).resolve())
if not config_path.exists():
raise ValueError('Configuration file not found')
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
except (ValueError, json.JSONDecodeError, PermissionError) as e:
raise ValueError(f'Config read error: {str(e)}')
# Safe: Template access with validation
@app.route('/template')
def safe_get_template():
template_name = request.args.get('template', '')
template_category = request.args.get('category', 'default')
try:
# Validate inputs
validated_template = validate_template_request(template_name, template_category)
# Get template content safely
template_content = get_safe_template_content(validated_template)
return jsonify({'template': template_content})
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_template_request(template_name, category):
# Validate template name
if not template_name or not re.match(r'^[a-zA-Z0-9_-]+$', template_name):
raise ValueError('Invalid template name')
# Validate category
allowed_categories = ['email', 'web', 'report', 'notification']
if category not in allowed_categories:
raise ValueError('Template category not allowed')
# Use allowlist for template names per category
allowed_templates = {
'email': ['welcome', 'password_reset', 'notification'],
'web': ['header', 'footer', 'sidebar'],
'report': ['summary', 'detailed', 'chart'],
'notification': ['alert', 'reminder', 'update']
}
if template_name not in allowed_templates.get(category, []):
raise ValueError('Template not allowed in this category')
return {'name': template_name, 'category': category}
def get_safe_template_content(template_data):
template_name = template_data['name']
category = template_data['category']
# Construct safe template path
template_dir = Path(app.config['SAFE_TEMPLATE_DIR']) / category
template_path = template_dir / f'{template_name}.html'
try:
# Validate path is within template directory
template_path.resolve().relative_to(Path(app.config['SAFE_TEMPLATE_DIR']).resolve())
if not template_path.exists():
raise ValueError('Template not found')
return read_file_safely(template_path)
except ValueError as e:
raise ValueError(f'Template read error: {str(e)}')
# Safe: Log access with restrictions
@app.route('/logs')
def safe_get_logs():
log_name = request.args.get('log', '')
try:
# Check access permissions
if not check_log_access(request):
return jsonify({'error': 'Access denied'}), 403
# Validate log name
validated_log = validate_log_name(log_name)
# Get log content safely
log_content = get_safe_log_content(validated_log)
return jsonify({'logs': log_content})
except ValueError as e:
return jsonify({'error': str(e)}), 400
def check_log_access(request):
# Only allow staff/admin access to logs
return getattr(request, 'is_staff', False)
def validate_log_name(log_name):
# Only allow specific log files
allowed_logs = ['application', 'error', 'access', 'security']
if log_name not in allowed_logs:
raise ValueError('Log file not allowed')
return log_name
def get_safe_log_content(log_name):
# Construct safe log path
log_path = Path(app.config['SAFE_LOG_DIR']) / f'{log_name}.log'
try:
# Validate path is within log directory
log_path.resolve().relative_to(Path(app.config['SAFE_LOG_DIR']).resolve())
if not log_path.exists():
raise ValueError('Log file not found')
# Read log with size limit
max_size = 1024 * 1024 # 1MB
if log_path.stat().st_size > max_size:
# Read last 1MB
with open(log_path, 'rb') as f:
f.seek(-max_size, 2)
content = f.read().decode('utf-8', errors='ignore')
return '... (truncated)\n' + content
return read_file_safely(log_path)
except ValueError as e:
raise ValueError(f'Log read error: {str(e)}')
# Safe: Data processing with validation
@app.route('/process-data', methods=['POST'])
def safe_process_data_file():
try:
data_file = request.json.get('data_file', '') if request.json else ''
# Validate data file
validated_file = validate_data_file(data_file)
# Process data safely
result = process_data_file_safely(validated_file)
return jsonify(result)
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_data_file(data_file):
if not data_file:
raise ValueError('Data file name is required')
# Only allow JSON data files
if not data_file.endswith('.json'):
raise ValueError('Only JSON data files are allowed')
# Validate filename format
filename_part = data_file[:-5] # Remove .json
if not re.match(r'^[a-zA-Z0-9_-]+$', filename_part):
raise ValueError('Invalid data file name')
return data_file
def process_data_file_safely(data_file):
# Get safe data file path
data_path = get_safe_file_path(data_file, app.config['SAFE_DATA_DIR'])
try:
with open(data_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Process data safely
processed_data = process_json_data_safely(data)
return processed_data
except json.JSONDecodeError:
raise ValueError('Invalid JSON format')
except Exception as e:
raise ValueError(f'Data processing error: {str(e)}')
def process_json_data_safely(data):
# Safe data processing
if isinstance(data, dict):
return {
'processed': True,
'type': 'object',
'key_count': len(data.keys())
}
elif isinstance(data, list):
return {
'processed': True,
'type': 'array',
'item_count': len(data)
}
else:
return {
'processed': True,
'type': type(data).__name__,
'value': str(data)[:100] # Limit output
}
if __name__ == '__main__':
app.run(debug=False)