# Vulnerable: SSRF via requests with user input in Flask
import requests
from flask import Flask, request, jsonify
app = Flask(__name__)
# Dangerous: Direct URL from user input
@app.route('/fetch', methods=['POST'])
def fetch_url():
url = request.json.get('url', '')
try:
# CRITICAL: User controls URL for server request
response = requests.get(url)
return jsonify({
'status_code': response.status_code,
'content': response.text[:1000], # Limit content
'headers': dict(response.headers)
})
except Exception as e:
return jsonify({'error': str(e)})
# Another dangerous pattern
@app.route('/proxy')
def proxy_request():
target_url = request.args.get('url', '')
method = request.args.get('method', 'GET')
# Dangerous: User controls both URL and method
try:
if method.upper() == 'GET':
response = requests.get(target_url)
elif method.upper() == 'POST':
response = requests.post(target_url, data=request.form)
return jsonify({
'status': response.status_code,
'data': response.text
})
except Exception as e:
return jsonify({'error': str(e)})
# Webhook processing
@app.route('/webhook', methods=['POST'])
def send_webhook():
webhook_url = request.json.get('webhook_url', '')
payload = request.json.get('payload', {})
# Dangerous: User-controlled webhook URL
try:
response = requests.post(webhook_url, json=payload)
return jsonify({'webhook_status': response.status_code})
except Exception as e:
return jsonify({'error': str(e)})
# API integration
@app.route('/api-call')
def api_call():
api_endpoint = request.args.get('endpoint', '')
api_key = request.args.get('key', '')
# Dangerous: User controls API endpoint
headers = {'Authorization': f'Bearer {api_key}'}
try:
response = requests.get(api_endpoint, headers=headers)
return jsonify(response.json())
except Exception as e:
return jsonify({'error': str(e)})
# Image processing from URL
@app.route('/process-image', methods=['POST'])
def process_image_from_url():
image_url = request.json.get('image_url', '')
# Dangerous: Fetch image from user URL
try:
response = requests.get(image_url)
if response.status_code == 200:
# Process image content
return jsonify({
'status': 'Image processed',
'size': len(response.content),
'content_type': response.headers.get('content-type')
})
else:
return jsonify({'error': 'Failed to fetch image'})
except Exception as e:
return jsonify({'error': str(e)})
# RSS feed fetcher
@app.route('/rss')
def fetch_rss():
feed_url = request.args.get('feed_url', '')
# Dangerous: User-controlled RSS URL
try:
response = requests.get(feed_url)
if 'xml' in response.headers.get('content-type', ''):
return jsonify({'feed_content': response.text})
else:
return jsonify({'error': 'Invalid RSS feed'})
except Exception as e:
return jsonify({'error': str(e)})
# URL validation check
@app.route('/check-url')
def check_url():
check_url = request.args.get('url', '')
# Dangerous: No validation of target URL
try:
response = requests.head(check_url, timeout=5)
return jsonify({
'url': check_url,
'status': response.status_code,
'accessible': True,
'headers': dict(response.headers)
})
except requests.RequestException:
return jsonify({
'url': check_url,
'accessible': False
})
# Metadata fetcher
@app.route('/metadata')
def fetch_metadata():
url = request.args.get('url', '')
# Dangerous: Could access cloud metadata services
try:
response = requests.get(url, timeout=10)
# Extract metadata
metadata = {
'title': extract_title(response.text),
'description': extract_description(response.text),
'status_code': response.status_code
}
return jsonify(metadata)
except Exception as e:
return jsonify({'error': str(e)})
# Content aggregator
@app.route('/aggregate')
def aggregate_content():
urls = request.args.getlist('urls')
# Dangerous: Multiple user-controlled URLs
results = []
for url in urls:
try:
response = requests.get(url, timeout=5)
results.append({
'url': url,
'status': response.status_code,
'content_length': len(response.content)
})
except Exception as e:
results.append({
'url': url,
'error': str(e)
})
return jsonify({'results': results})
def extract_title(html):
# Mock function
return 'Sample Title'
def extract_description(html):
# Mock function
return 'Sample Description'
# Secure: Safe URL handling with SSRF protection in Flask
import requests
from flask import Flask, request, jsonify
from urllib.parse import urlparse
import socket
import ipaddress
import re
from marshmallow import Schema, fields, ValidationError as MarshmallowValidationError
app = Flask(__name__)
# Safe: Configuration with allowed domains
app.config['ALLOWED_DOMAINS'] = [
'api.example.com',
'feeds.example.com',
'images.example.com',
'webhook.example.com'
]
app.config['MAX_REQUEST_SIZE'] = 10 * 1024 * 1024 # 10MB
app.config['REQUEST_TIMEOUT'] = 10 # seconds
# Input validation schemas
class URLFetchSchema(Schema):
url = fields.Url(required=True)
class WebhookSchema(Schema):
webhook_id = fields.Str(required=True)
payload = fields.Dict(required=True)
# Safe: URL validation and SSRF protection
def validate_url_for_ssrf(url):
"""Validate URL against SSRF attacks"""
try:
parsed = urlparse(url)
except Exception:
raise ValueError('Invalid URL format')
# Validate scheme
if parsed.scheme not in ['http', 'https']:
raise ValueError('Only HTTP and HTTPS protocols are allowed')
# Validate hostname
if not parsed.hostname:
raise ValueError('URL must have a valid hostname')
# Check against allowlist
if not is_allowed_domain(parsed.hostname):
raise ValueError('Domain not allowed')
# Prevent SSRF to internal networks
check_internal_network_access(parsed.hostname)
return url
def is_allowed_domain(hostname):
"""Check if domain is in allowlist"""
allowed_domains = app.config['ALLOWED_DOMAINS']
# Check exact match or subdomain
for domain in allowed_domains:
if hostname == domain or hostname.endswith('.' + domain):
return True
return False
def check_internal_network_access(hostname):
"""Prevent access to internal networks"""
try:
# Resolve hostname to IP
ip = socket.gethostbyname(hostname)
ip_obj = ipaddress.ip_address(ip)
# Block private/internal networks
if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local:
raise ValueError('Requests to internal networks are not allowed')
# Block specific dangerous IP ranges
dangerous_ranges = [
ipaddress.ip_network('169.254.0.0/16'), # AWS metadata
ipaddress.ip_network('10.0.0.0/8'), # Private class A
ipaddress.ip_network('172.16.0.0/12'), # Private class B
ipaddress.ip_network('192.168.0.0/16'), # Private class C
]
for network in dangerous_ranges:
if ip_obj in network:
raise ValueError('IP address not allowed')
except socket.gaierror:
raise ValueError('Unable to resolve hostname')
except ValueError as e:
if 'not allowed' in str(e) or 'internal networks' in str(e):
raise e
raise ValueError('Invalid IP address')
def make_safe_request(url, method='GET', **kwargs):
"""Make a safe HTTP request with proper configurations"""
# Set safe defaults
safe_kwargs = {
'timeout': app.config['REQUEST_TIMEOUT'],
'allow_redirects': False, # Prevent redirect SSRF
'stream': True, # Stream response for size checking
'headers': kwargs.get('headers', {})
}
# Add User-Agent
safe_kwargs['headers']['User-Agent'] = 'SafeFlaskApp/1.0'
# Make request
if method.upper() == 'GET':
response = requests.get(url, **safe_kwargs)
elif method.upper() == 'POST':
safe_kwargs['json'] = kwargs.get('json')
safe_kwargs['data'] = kwargs.get('data')
response = requests.post(url, **safe_kwargs)
elif method.upper() == 'HEAD':
response = requests.head(url, **safe_kwargs)
else:
raise ValueError('HTTP method not allowed')
# Check response size
content_length = response.headers.get('content-length')
if content_length and int(content_length) > app.config['MAX_REQUEST_SIZE']:
raise ValueError('Response too large')
# Read content with size limit
content = b''
for chunk in response.iter_content(chunk_size=8192):
content += chunk
if len(content) > app.config['MAX_REQUEST_SIZE']:
raise ValueError('Response too large')
# Replace response content with size-limited content
response._content = content
return response
# Safe: URL fetching with validation
@app.route('/fetch', methods=['POST'])
def safe_fetch_url():
try:
# Validate input
schema = URLFetchSchema()
data = schema.load(request.json or {})
# Validate URL for SSRF
validated_url = validate_url_for_ssrf(data['url'])
# Make safe request
response = make_safe_request(validated_url)
return jsonify({
'status_code': response.status_code,
'content_preview': response.text[:500], # Limit preview
'content_length': len(response.content),
'content_type': response.headers.get('content-type', 'unknown')
})
except (ValueError, MarshmallowValidationError, requests.RequestException) as e:
return jsonify({'error': str(e)}), 400
# Safe: Proxy with validation
@app.route('/proxy')
def safe_proxy_request():
try:
target_url = request.args.get('url', '')
method = request.args.get('method', 'GET')
# Validate method
if method.upper() not in ['GET', 'HEAD']:
raise ValueError('HTTP method not allowed')
# Validate URL
validated_url = validate_url_for_ssrf(target_url)
# Make safe request
response = make_safe_request(validated_url, method)
return jsonify({
'status': response.status_code,
'headers': dict(response.headers),
'content_length': len(response.content)
})
except (ValueError, requests.RequestException) as e:
return jsonify({'error': str(e)}), 400
# Safe: Webhook with validation
@app.route('/webhook', methods=['POST'])
def safe_send_webhook():
try:
# Validate input
schema = WebhookSchema()
data = schema.load(request.json or {})
# Get webhook configuration from database
webhook_config = get_webhook_config(data['webhook_id'])
# Validate webhook URL (should be done when webhook is created)
validated_url = validate_url_for_ssrf(webhook_config['url'])
# Prepare payload
payload = data['payload']
# Add signature if secret is provided
headers = {'Content-Type': 'application/json'}
if webhook_config.get('secret'):
signature = generate_webhook_signature(payload, webhook_config['secret'])
headers['X-Webhook-Signature'] = signature
# Make safe request
response = make_safe_request(
validated_url,
method='POST',
json=payload,
headers=headers
)
return jsonify({
'webhook_status': response.status_code,
'delivery_id': generate_delivery_id()
})
except (ValueError, MarshmallowValidationError, requests.RequestException) as e:
return jsonify({'error': str(e)}), 400
def get_webhook_config(webhook_id):
"""Get webhook configuration from database"""
# Mock function - would query database
webhooks = {
'webhook_1': {
'url': 'https://webhook.example.com/endpoint',
'secret': 'webhook_secret_123'
}
}
if webhook_id not in webhooks:
raise ValueError('Webhook not found')
return webhooks[webhook_id]
def generate_webhook_signature(payload, secret):
"""Generate webhook signature"""
import hmac
import hashlib
import json
payload_str = json.dumps(payload, sort_keys=True)
signature = hmac.new(
secret.encode(),
payload_str.encode(),
hashlib.sha256
).hexdigest()
return f'sha256={signature}'
def generate_delivery_id():
"""Generate unique delivery ID"""
import uuid
return str(uuid.uuid4())
# Safe: API integration with validation
@app.route('/api-call')
def safe_api_call():
try:
api_name = request.args.get('api', '')
endpoint = request.args.get('endpoint', '')
# Validate API name
api_config = get_api_config(api_name)
# Validate endpoint
validated_endpoint = validate_api_endpoint(endpoint, api_config)
# Construct URL
full_url = f"{api_config['base_url']}/{validated_endpoint}"
# Make safe request
headers = {'Authorization': f"Bearer {api_config['api_key']}"}
response = make_safe_request(full_url, headers=headers)
return jsonify({
'api_response': response.json() if response.headers.get('content-type', '').startswith('application/json') else response.text[:1000],
'status_code': response.status_code
})
except (ValueError, requests.RequestException) as e:
return jsonify({'error': str(e)}), 400
def get_api_config(api_name):
"""Get API configuration"""
apis = {
'weather': {
'base_url': 'https://api.example.com',
'api_key': 'weather_api_key'
}
}
if api_name not in apis:
raise ValueError('API not configured')
return apis[api_name]
def validate_api_endpoint(endpoint, api_config):
"""Validate API endpoint"""
if not endpoint:
raise ValueError('Endpoint is required')
# Only allow safe characters
if not re.match(r'^[a-zA-Z0-9/_.-]+$', endpoint):
raise ValueError('Endpoint contains invalid characters')
# Prevent traversal
if '..' in endpoint:
raise ValueError('Path traversal not allowed')
return endpoint
# Safe: Image processing with validation
@app.route('/process-image', methods=['POST'])
def safe_process_image_from_url():
try:
image_url = request.json.get('image_url', '') if request.json else ''
# Validate URL
validated_url = validate_url_for_ssrf(image_url)
# Make safe request
response = make_safe_request(validated_url)
# Validate content type
content_type = response.headers.get('content-type', '')
if not content_type.startswith('image/'):
raise ValueError('URL does not point to an image')
# Process image safely
image_info = process_image_safely(response.content, content_type)
return jsonify(image_info)
except (ValueError, requests.RequestException) as e:
return jsonify({'error': str(e)}), 400
def process_image_safely(image_data, content_type):
"""Process image data safely"""
from PIL import Image
import io
try:
# Validate image
image = Image.open(io.BytesIO(image_data))
image.verify() # Verify it's a valid image
return {
'status': 'Image processed successfully',
'format': image.format,
'size': image.size,
'mode': image.mode,
'content_type': content_type
}
except Exception as e:
raise ValueError(f'Image processing failed: {str(e)}')
# Safe: URL checker with validation
@app.route('/check-url')
def safe_check_url():
try:
url_to_check = request.args.get('url', '')
# Validate URL
validated_url = validate_url_for_ssrf(url_to_check)
# Make safe HEAD request
response = make_safe_request(validated_url, method='HEAD')
return jsonify({
'url': validated_url,
'status_code': response.status_code,
'accessible': True,
'response_time': response.elapsed.total_seconds(),
'content_type': response.headers.get('content-type')
})
except (ValueError, requests.RequestException) as e:
return jsonify({
'url': request.args.get('url', ''),
'accessible': False,
'error': str(e)
}), 400
if __name__ == '__main__':
app.run(debug=False)