# Vulnerable: Insecure deserialization in Django
import pickle
import yaml
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.views import View
# Dangerous: Pickle deserialization
class SessionDataView(View):
def post(self, request):
session_data = request.POST.get('session_data', '')
try:
# Extremely dangerous: pickle.loads with user data
decoded_data = pickle.loads(session_data.encode('latin1'))
# Store in session
request.session['user_data'] = decoded_data
return JsonResponse({'status': 'success'})
except Exception as e:
return JsonResponse({'error': str(e)})
# Dangerous: YAML deserialization
def import_config(request):
if request.method == 'POST' and 'config_file' in request.FILES:
config_file = request.FILES['config_file']
config_content = config_file.read().decode('utf-8')
try:
# Dangerous: yaml.load can execute arbitrary code
config_data = yaml.load(config_content)
# Process configuration
update_app_config(config_data)
return JsonResponse({'status': 'config_imported'})
except Exception as e:
return JsonResponse({'error': str(e)})
# Dangerous: eval() with user input
def calculate_expression(request):
expression = request.GET.get('expr', '')
try:
# Extremely dangerous: eval with user input
result = eval(expression)
return JsonResponse({'result': result})
except Exception as e:
return JsonResponse({'error': str(e)})
# Dangerous: Custom deserialization
class UserProfileView(View):
def post(self, request):
profile_data = request.POST.get('profile', '')
# Dangerous: exec with user-controlled code
user_code = f"profile = {profile_data}"
local_vars = {}
try:
exec(user_code, {}, local_vars)
profile = local_vars.get('profile', {})
# Save profile
request.user.profile.data = profile
request.user.profile.save()
return JsonResponse({'status': 'profile_updated'})
except Exception as e:
return JsonResponse({'error': str(e)})
# Secure: Safe deserialization in Django
import json
import yaml
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.views import View
from django.core import serializers
from django.core.exceptions import ValidationError
import ast
# Safe: JSON deserialization
class SessionDataView(View):
def post(self, request):
session_data = request.POST.get('session_data', '')
try:
# Safe: JSON deserialization
decoded_data = json.loads(session_data)
# Validate data structure
validated_data = self.validate_session_data(decoded_data)
# Store in session
request.session['user_data'] = validated_data
return JsonResponse({'status': 'success'})
except (json.JSONDecodeError, ValidationError) as e:
return JsonResponse({'error': 'Invalid data format'}, status=400)
def validate_session_data(self, data):
# Validate data structure and types
if not isinstance(data, dict):
raise ValidationError('Data must be a dictionary')
allowed_keys = {'preferences', 'settings', 'theme'}
validated = {}
for key, value in data.items():
if key in allowed_keys:
if key == 'preferences' and isinstance(value, dict):
validated[key] = {k: v for k, v in value.items()
if isinstance(k, str) and isinstance(v, (str, int, bool))}
elif key == 'settings' and isinstance(value, dict):
validated[key] = value
elif key == 'theme' and isinstance(value, str):
if value in ['light', 'dark', 'auto']:
validated[key] = value
return validated
# Safe: YAML deserialization
def import_config(request):
if request.method == 'POST' and 'config_file' in request.FILES:
config_file = request.FILES['config_file']
# Validate file size
if config_file.size > 1024 * 1024: # 1MB limit
return JsonResponse({'error': 'File too large'}, status=400)
config_content = config_file.read().decode('utf-8')
try:
# Safe: yaml.safe_load prevents code execution
config_data = yaml.safe_load(config_content)
# Validate configuration structure
validated_config = validate_config_structure(config_data)
# Process configuration
update_app_config(validated_config)
return JsonResponse({'status': 'config_imported'})
except yaml.YAMLError as e:
return JsonResponse({'error': 'Invalid YAML format'}, status=400)
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_config_structure(config):
if not isinstance(config, dict):
raise ValidationError('Configuration must be a dictionary')
required_keys = {'database', 'cache', 'debug'}
allowed_keys = required_keys.union({'logging', 'email', 'static'})
# Check required keys
for key in required_keys:
if key not in config:
raise ValidationError(f'Missing required key: {key}')
# Validate each section
validated = {}
for key, value in config.items():
if key not in allowed_keys:
continue # Skip unknown keys
if key == 'database' and isinstance(value, dict):
validated[key] = validate_database_config(value)
elif key == 'cache' and isinstance(value, dict):
validated[key] = validate_cache_config(value)
elif key == 'debug' and isinstance(value, bool):
validated[key] = value
# Add other validations as needed
return validated
# Safe: Mathematical expression evaluation
def calculate_expression(request):
expression = request.GET.get('expr', '')
# Validate expression format
if not expression or len(expression) > 100:
return JsonResponse({'error': 'Invalid expression'}, status=400)
# Only allow safe mathematical expressions
if not re.match(r'^[0-9+\-*/().\s]+$', expression):
return JsonResponse({'error': 'Expression contains invalid characters'}, status=400)
try:
# Safe: Use ast.literal_eval for literals only
# For more complex math, use a safe expression parser
result = safe_math_eval(expression)
return JsonResponse({'result': result})
except (ValueError, SyntaxError) as e:
return JsonResponse({'error': 'Invalid expression'}, status=400)
def safe_math_eval(expression):
# Simple safe math evaluation (extend as needed)
allowed_nodes = (ast.Expression, ast.BinOp, ast.UnaryOp, ast.Num, ast.Constant,
ast.Add, ast.Sub, ast.Mult, ast.Div, ast.USub, ast.UAdd)
try:
# Parse expression
tree = ast.parse(expression, mode='eval')
# Validate all nodes are safe
for node in ast.walk(tree):
if not isinstance(node, allowed_nodes):
raise ValueError('Unsafe operation in expression')
# Evaluate safely
return eval(compile(tree, '', 'eval'))
except Exception:
raise ValueError('Invalid mathematical expression')
# Safe: User profile handling
class UserProfileView(View):
def post(self, request):
try:
# Safe: JSON deserialization
profile_data = json.loads(request.body)
# Validate profile data
validated_profile = self.validate_profile_data(profile_data)
# Update profile using Django ORM
profile, created = UserProfile.objects.get_or_create(user=request.user)
# Safe attribute setting
for key, value in validated_profile.items():
if hasattr(profile, key):
setattr(profile, key, value)
profile.save()
return JsonResponse({'status': 'profile_updated'})
except (json.JSONDecodeError, ValidationError) as e:
return JsonResponse({'error': 'Invalid profile data'}, status=400)
def validate_profile_data(self, data):
if not isinstance(data, dict):
raise ValidationError('Profile data must be a dictionary')
allowed_fields = {
'first_name': str,
'last_name': str,
'email': str,
'phone': str,
'birth_date': str,
'bio': str
}
validated = {}
for field, expected_type in allowed_fields.items():
if field in data:
value = data[field]
if isinstance(value, expected_type):
# Additional validation per field
if field == 'email':
from django.core.validators import validate_email
validate_email(value)
elif field in ['first_name', 'last_name', 'bio']:
value = value[:100] # Limit length
elif field == 'birth_date':
from datetime import datetime
datetime.strptime(value, '%Y-%m-%d') # Validate date format
validated[field] = value
return validated
# Safe: Django serialization framework
def export_model_data(request):
from myapp.models import MyModel
# Safe: Django's built-in serialization
queryset = MyModel.objects.filter(user=request.user)
serialized_data = serializers.serialize('json', queryset)
return JsonResponse({'data': serialized_data})
def import_model_data(request):
if request.method == 'POST':
try:
# Safe: Django deserialization with validation
data = request.POST.get('data', '')
for obj in serializers.deserialize('json', data):
# Validate before saving
if obj.object.user == request.user:
obj.save()
return JsonResponse({'status': 'imported'})
except Exception as e:
return JsonResponse({'error': 'Import failed'}, status=400)