# Vulnerable: URL host injection in Django
from django.http import JsonResponse, HttpResponseRedirect
from django.views import View
from django.shortcuts import redirect
from django.conf import settings
import requests
# Dangerous: User-controlled host in URL construction
class APIProxyView(View):
def get(self, request):
host = request.GET.get('host', '')
endpoint = request.GET.get('endpoint', '')
# CRITICAL: User controls host in URL
api_url = f"https://{host}/api/{endpoint}"
try:
response = requests.get(api_url)
return JsonResponse({'data': response.json()})
except Exception as e:
return JsonResponse({'error': str(e)})
# Another dangerous pattern
def generate_callback_url(request):
callback_host = request.POST.get('callback_host', '')
callback_path = request.POST.get('callback_path', '')
# Dangerous: User controls callback host
callback_url = f"https://{callback_host}{callback_path}"
# Store callback URL for later use
request.session['callback_url'] = callback_url
return JsonResponse({'callback_url': callback_url})
# OAuth redirect with user host
def oauth_redirect(request):
return_host = request.GET.get('return_host', '')
return_path = request.GET.get('return_path', '/dashboard')
# Dangerous: User controls return host
return_url = f"https://{return_host}{return_path}"
# Redirect to OAuth provider with return URL
oauth_url = f"https://oauth.provider.com/auth?redirect_uri={return_url}"
return HttpResponseRedirect(oauth_url)
# Image proxy with host injection
def proxy_image(request):
image_host = request.GET.get('host', '')
image_path = request.GET.get('path', '')
# Dangerous: User-controlled image host
image_url = f"https://{image_host}{image_path}"
try:
response = requests.get(image_url)
return JsonResponse({
'image_url': image_url,
'content_type': response.headers.get('content-type'),
'size': len(response.content)
})
except Exception as e:
return JsonResponse({'error': str(e)})
# Webhook URL generation
def create_webhook(request):
webhook_host = request.POST.get('webhook_host', '')
webhook_path = request.POST.get('webhook_path', '')
webhook_secret = request.POST.get('secret', '')
# Dangerous: User controls webhook host
webhook_url = f"https://{webhook_host}{webhook_path}"
# Save webhook configuration
webhook_config = {
'url': webhook_url,
'secret': webhook_secret
}
return JsonResponse({'webhook': webhook_config})
# CDN URL construction
def get_asset_url(request):
cdn_host = request.GET.get('cdn_host', '')
asset_path = request.GET.get('asset_path', '')
# Dangerous: User controls CDN host
asset_url = f"https://{cdn_host}/assets/{asset_path}"
return JsonResponse({'asset_url': asset_url})
# API endpoint discovery
def discover_api_endpoints(request):
api_host = request.GET.get('api_host', '')
# Dangerous: User controls API host
endpoints = [
f"https://{api_host}/users",
f"https://{api_host}/posts",
f"https://{api_host}/comments"
]
return JsonResponse({'endpoints': endpoints})
# Redirect with custom host
def custom_redirect(request):
redirect_host = request.GET.get('host', '')
redirect_path = request.GET.get('path', '/')
# Dangerous: User controls redirect host
redirect_url = f"https://{redirect_host}{redirect_path}"
return HttpResponseRedirect(redirect_url)
# Secure: Safe URL and host handling in Django
from django.http import JsonResponse, HttpResponseRedirect
from django.views import View
from django.shortcuts import redirect
from django.conf import settings
from django.core.exceptions import ValidationError
from urllib.parse import urlparse, urljoin, quote
import requests
import re
# Safe: Validated API proxy
class SafeAPIProxyView(View):
def get(self, request):
api_name = request.GET.get('api', '')
endpoint = request.GET.get('endpoint', '')
try:
# Validate inputs
validated_request = self.validate_api_request(api_name, endpoint)
# Fetch data safely
data = self.fetch_api_data_safely(validated_request)
return JsonResponse({'data': data})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_api_request(self, api_name, endpoint):
# Use allowlist for API hosts
allowed_apis = {
'users': 'https://api.users.example.com',
'content': 'https://api.content.example.com',
'analytics': 'https://api.analytics.example.com'
}
if api_name not in allowed_apis:
raise ValidationError('API not allowed')
# Validate endpoint
if not endpoint or len(endpoint) > 100:
raise ValidationError('Invalid endpoint')
# Only allow safe characters
if not re.match(r'^[a-zA-Z0-9/_.-]+$', endpoint):
raise ValidationError('Endpoint contains invalid characters')
# Prevent path traversal
if '..' in endpoint or endpoint.startswith('/'):
raise ValidationError('Invalid endpoint format')
return {
'base_url': allowed_apis[api_name],
'endpoint': endpoint
}
def fetch_api_data_safely(self, request_data):
# Construct URL safely
url = urljoin(request_data['base_url'], request_data['endpoint'])
try:
response = requests.get(
url,
timeout=10,
headers={'User-Agent': 'SafeApp/1.0'}
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
raise ValidationError(f'API request failed: {str(e)}')
# Safe: Callback URL generation with validation
def safe_generate_callback_url(request):
callback_type = request.POST.get('callback_type', '')
callback_id = request.POST.get('callback_id', '')
try:
# Validate inputs
validated_callback = validate_callback_request(callback_type, callback_id)
# Generate safe callback URL
callback_url = generate_safe_callback_url(validated_callback)
# Store in session
request.session['callback_url'] = callback_url
return JsonResponse({'callback_url': callback_url})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_callback_request(callback_type, callback_id):
# Validate callback type
allowed_types = ['payment', 'oauth', 'webhook', 'notification']
if callback_type not in allowed_types:
raise ValidationError('Callback type not allowed')
# Validate callback ID
if not callback_id or not callback_id.isalnum():
raise ValidationError('Invalid callback ID')
return {'type': callback_type, 'id': callback_id}
def generate_safe_callback_url(callback_data):
# Use application's own domain for callbacks
base_url = settings.BASE_URL # e.g., 'https://myapp.com'
# Generate safe callback path
callback_path = f"/callbacks/{callback_data['type']}/{callback_data['id']}"
return urljoin(base_url, callback_path)
# Safe: OAuth redirect with validation
def safe_oauth_redirect(request):
provider = request.GET.get('provider', '')
return_path = request.GET.get('return_path', '/dashboard')
try:
# Validate inputs
validated_oauth = validate_oauth_request(provider, return_path)
# Generate safe OAuth URL
oauth_url = generate_safe_oauth_url(validated_oauth)
return HttpResponseRedirect(oauth_url)
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_oauth_request(provider, return_path):
# Validate OAuth provider
allowed_providers = {
'google': 'https://accounts.google.com/oauth/authorize',
'github': 'https://github.com/login/oauth/authorize',
'facebook': 'https://www.facebook.com/v12.0/dialog/oauth'
}
if provider not in allowed_providers:
raise ValidationError('OAuth provider not allowed')
# Validate return path
if not return_path.startswith('/'):
raise ValidationError('Return path must be relative')
# Prevent path traversal
if '..' in return_path:
raise ValidationError('Invalid return path')
# Limit path length
if len(return_path) > 200:
raise ValidationError('Return path too long')
return {
'provider': provider,
'provider_url': allowed_providers[provider],
'return_path': return_path
}
def generate_safe_oauth_url(oauth_data):
# Generate safe return URL using application domain
return_url = urljoin(settings.BASE_URL, oauth_data['return_path'])
# Construct OAuth URL with safe parameters
oauth_params = {
'client_id': settings.OAUTH_CLIENT_ID,
'redirect_uri': return_url,
'response_type': 'code',
'scope': 'read'
}
# Build query string safely
from urllib.parse import urlencode
query_string = urlencode(oauth_params)
return f"{oauth_data['provider_url']}?{query_string}"
# Safe: Asset URL generation
def safe_get_asset_url(request):
asset_type = request.GET.get('type', '')
asset_name = request.GET.get('name', '')
try:
# Validate inputs
validated_asset = validate_asset_request(asset_type, asset_name)
# Generate safe asset URL
asset_url = generate_safe_asset_url(validated_asset)
return JsonResponse({'asset_url': asset_url})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_asset_request(asset_type, asset_name):
# Validate asset type
allowed_types = ['images', 'css', 'js', 'fonts']
if asset_type not in allowed_types:
raise ValidationError('Asset type not allowed')
# Validate asset name
if not asset_name or len(asset_name) > 100:
raise ValidationError('Invalid asset name')
# Only allow safe characters
if not re.match(r'^[a-zA-Z0-9._-]+$', asset_name):
raise ValidationError('Asset name contains invalid characters')
# Validate file extension based on type
valid_extensions = {
'images': ['.jpg', '.jpeg', '.png', '.gif', '.svg'],
'css': ['.css'],
'js': ['.js'],
'fonts': ['.woff', '.woff2', '.ttf', '.eot']
}
valid_exts = valid_extensions[asset_type]
if not any(asset_name.lower().endswith(ext) for ext in valid_exts):
raise ValidationError('Invalid file extension for asset type')
return {'type': asset_type, 'name': asset_name}
def generate_safe_asset_url(asset_data):
# Use configured CDN URL or local static URL
cdn_base = getattr(settings, 'CDN_BASE_URL', settings.STATIC_URL)
# Construct safe asset path
asset_path = f"{asset_data['type']}/{asset_data['name']}"
return urljoin(cdn_base, asset_path)
# Safe: Webhook creation with validation
def safe_create_webhook(request):
webhook_config = request.POST.get('webhook_config', '')
try:
# Parse and validate webhook configuration
import json
config = json.loads(webhook_config)
validated_webhook = validate_webhook_config(config)
# Save webhook configuration
webhook_id = save_webhook_config(request.user, validated_webhook)
return JsonResponse({
'webhook_id': webhook_id,
'webhook_url': validated_webhook['url']
})
except (json.JSONDecodeError, ValidationError) as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_webhook_config(config):
# Validate required fields
if 'url' not in config:
raise ValidationError('Webhook URL is required')
url = config['url']
# Parse URL
try:
parsed_url = urlparse(url)
except Exception:
raise ValidationError('Invalid URL format')
# Validate scheme
if parsed_url.scheme != 'https':
raise ValidationError('Webhook URL must use HTTPS')
# Validate hostname
if not parsed_url.hostname:
raise ValidationError('Invalid hostname')
# Block internal networks
import socket
import ipaddress
try:
ip = socket.gethostbyname(parsed_url.hostname)
ip_obj = ipaddress.ip_address(ip)
if ip_obj.is_private or ip_obj.is_loopback:
raise ValidationError('Webhooks to internal networks not allowed')
except (socket.gaierror, ValueError):
raise ValidationError('Unable to resolve webhook hostname')
# Validate other fields
secret = config.get('secret', '')
if secret and (len(secret) < 16 or len(secret) > 64):
raise ValidationError('Webhook secret must be 16-64 characters')
return {
'url': url,
'secret': secret,
'events': config.get('events', [])
}
def save_webhook_config(user, webhook_config):
# Save to database
from .models import WebhookConfiguration
webhook = WebhookConfiguration.objects.create(
user=user,
url=webhook_config['url'],
secret=webhook_config['secret'],
events=webhook_config['events']
)
return webhook.id
# Safe: Redirect with validation
def safe_custom_redirect(request):
redirect_target = request.GET.get('target', '')
try:
# Validate redirect target
validated_url = validate_redirect_target(redirect_target)
return HttpResponseRedirect(validated_url)
except ValidationError as e:
# Redirect to safe default on validation error
return redirect('home')
def validate_redirect_target(target):
if not target:
raise ValidationError('No redirect target specified')
# Allow relative URLs
if target.startswith('/'):
# Validate relative path
if '..' in target:
raise ValidationError('Path traversal not allowed')
return target
# Parse absolute URLs
try:
parsed = urlparse(target)
except Exception:
raise ValidationError('Invalid URL format')
# Only allow HTTPS
if parsed.scheme != 'https':
raise ValidationError('Only HTTPS URLs allowed')
# Check against allowlist
allowed_domains = getattr(settings, 'ALLOWED_REDIRECT_DOMAINS', [])
if not any(parsed.hostname.endswith(domain) for domain in allowed_domains):
raise ValidationError('Domain not allowed for redirect')
return target