Flask Server-Side Request Forgery (SSRF) via requests

High Risk Server-Side Request Forgery (SSRF)
flaskpythonssrfrequestsurl-injectioninternal-access

What it is

The Flask application uses the requests library with user-controlled URLs without proper validation, creating Server-Side Request Forgery (SSRF) vulnerabilities. Attackers can manipulate URLs to make the server perform HTTP requests to internal resources, cloud metadata services, or arbitrary external systems, potentially accessing sensitive information, performing internal network reconnaissance, or bypassing security controls.

# Vulnerable: SSRF via requests with user input in Flask import requests from flask import Flask, request, jsonify app = Flask(__name__) # Dangerous: Direct URL from user input @app.route('/fetch', methods=['POST']) def fetch_url(): url = request.json.get('url', '') try: # CRITICAL: User controls URL for server request response = requests.get(url) return jsonify({ 'status_code': response.status_code, 'content': response.text[:1000], # Limit content 'headers': dict(response.headers) }) except Exception as e: return jsonify({'error': str(e)}) # Another dangerous pattern @app.route('/proxy') def proxy_request(): target_url = request.args.get('url', '') method = request.args.get('method', 'GET') # Dangerous: User controls both URL and method try: if method.upper() == 'GET': response = requests.get(target_url) elif method.upper() == 'POST': response = requests.post(target_url, data=request.form) return jsonify({ 'status': response.status_code, 'data': response.text }) except Exception as e: return jsonify({'error': str(e)}) # Webhook processing @app.route('/webhook', methods=['POST']) def send_webhook(): webhook_url = request.json.get('webhook_url', '') payload = request.json.get('payload', {}) # Dangerous: User-controlled webhook URL try: response = requests.post(webhook_url, json=payload) return jsonify({'webhook_status': response.status_code}) except Exception as e: return jsonify({'error': str(e)}) # API integration @app.route('/api-call') def api_call(): api_endpoint = request.args.get('endpoint', '') api_key = request.args.get('key', '') # Dangerous: User controls API endpoint headers = {'Authorization': f'Bearer {api_key}'} try: response = requests.get(api_endpoint, headers=headers) return jsonify(response.json()) except Exception as e: return jsonify({'error': str(e)}) # Image processing from URL @app.route('/process-image', methods=['POST']) def process_image_from_url(): image_url = request.json.get('image_url', '') # Dangerous: Fetch image from user URL try: response = requests.get(image_url) if response.status_code == 200: # Process image content return jsonify({ 'status': 'Image processed', 'size': len(response.content), 'content_type': response.headers.get('content-type') }) else: return jsonify({'error': 'Failed to fetch image'}) except Exception as e: return jsonify({'error': str(e)}) # RSS feed fetcher @app.route('/rss') def fetch_rss(): feed_url = request.args.get('feed_url', '') # Dangerous: User-controlled RSS URL try: response = requests.get(feed_url) if 'xml' in response.headers.get('content-type', ''): return jsonify({'feed_content': response.text}) else: return jsonify({'error': 'Invalid RSS feed'}) except Exception as e: return jsonify({'error': str(e)}) # URL validation check @app.route('/check-url') def check_url(): check_url = request.args.get('url', '') # Dangerous: No validation of target URL try: response = requests.head(check_url, timeout=5) return jsonify({ 'url': check_url, 'status': response.status_code, 'accessible': True, 'headers': dict(response.headers) }) except requests.RequestException: return jsonify({ 'url': check_url, 'accessible': False }) # Metadata fetcher @app.route('/metadata') def fetch_metadata(): url = request.args.get('url', '') # Dangerous: Could access cloud metadata services try: response = requests.get(url, timeout=10) # Extract metadata metadata = { 'title': extract_title(response.text), 'description': extract_description(response.text), 'status_code': response.status_code } return jsonify(metadata) except Exception as e: return jsonify({'error': str(e)}) # Content aggregator @app.route('/aggregate') def aggregate_content(): urls = request.args.getlist('urls') # Dangerous: Multiple user-controlled URLs results = [] for url in urls: try: response = requests.get(url, timeout=5) results.append({ 'url': url, 'status': response.status_code, 'content_length': len(response.content) }) except Exception as e: results.append({ 'url': url, 'error': str(e) }) return jsonify({'results': results}) def extract_title(html): # Mock function return 'Sample Title' def extract_description(html): # Mock function return 'Sample Description'
# Secure: Safe URL handling with SSRF protection in Flask import requests from flask import Flask, request, jsonify from urllib.parse import urlparse import socket import ipaddress import re from marshmallow import Schema, fields, ValidationError as MarshmallowValidationError app = Flask(__name__) # Safe: Configuration with allowed domains app.config['ALLOWED_DOMAINS'] = [ 'api.example.com', 'feeds.example.com', 'images.example.com', 'webhook.example.com' ] app.config['MAX_REQUEST_SIZE'] = 10 * 1024 * 1024 # 10MB app.config['REQUEST_TIMEOUT'] = 10 # seconds # Input validation schemas class URLFetchSchema(Schema): url = fields.Url(required=True) class WebhookSchema(Schema): webhook_id = fields.Str(required=True) payload = fields.Dict(required=True) # Safe: URL validation and SSRF protection def validate_url_for_ssrf(url): """Validate URL against SSRF attacks""" try: parsed = urlparse(url) except Exception: raise ValueError('Invalid URL format') # Validate scheme if parsed.scheme not in ['http', 'https']: raise ValueError('Only HTTP and HTTPS protocols are allowed') # Validate hostname if not parsed.hostname: raise ValueError('URL must have a valid hostname') # Check against allowlist if not is_allowed_domain(parsed.hostname): raise ValueError('Domain not allowed') # Prevent SSRF to internal networks check_internal_network_access(parsed.hostname) return url def is_allowed_domain(hostname): """Check if domain is in allowlist""" allowed_domains = app.config['ALLOWED_DOMAINS'] # Check exact match or subdomain for domain in allowed_domains: if hostname == domain or hostname.endswith('.' + domain): return True return False def check_internal_network_access(hostname): """Prevent access to internal networks""" try: # Resolve hostname to IP ip = socket.gethostbyname(hostname) ip_obj = ipaddress.ip_address(ip) # Block private/internal networks if ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local: raise ValueError('Requests to internal networks are not allowed') # Block specific dangerous IP ranges dangerous_ranges = [ ipaddress.ip_network('169.254.0.0/16'), # AWS metadata ipaddress.ip_network('10.0.0.0/8'), # Private class A ipaddress.ip_network('172.16.0.0/12'), # Private class B ipaddress.ip_network('192.168.0.0/16'), # Private class C ] for network in dangerous_ranges: if ip_obj in network: raise ValueError('IP address not allowed') except socket.gaierror: raise ValueError('Unable to resolve hostname') except ValueError as e: if 'not allowed' in str(e) or 'internal networks' in str(e): raise e raise ValueError('Invalid IP address') def make_safe_request(url, method='GET', **kwargs): """Make a safe HTTP request with proper configurations""" # Set safe defaults safe_kwargs = { 'timeout': app.config['REQUEST_TIMEOUT'], 'allow_redirects': False, # Prevent redirect SSRF 'stream': True, # Stream response for size checking 'headers': kwargs.get('headers', {}) } # Add User-Agent safe_kwargs['headers']['User-Agent'] = 'SafeFlaskApp/1.0' # Make request if method.upper() == 'GET': response = requests.get(url, **safe_kwargs) elif method.upper() == 'POST': safe_kwargs['json'] = kwargs.get('json') safe_kwargs['data'] = kwargs.get('data') response = requests.post(url, **safe_kwargs) elif method.upper() == 'HEAD': response = requests.head(url, **safe_kwargs) else: raise ValueError('HTTP method not allowed') # Check response size content_length = response.headers.get('content-length') if content_length and int(content_length) > app.config['MAX_REQUEST_SIZE']: raise ValueError('Response too large') # Read content with size limit content = b'' for chunk in response.iter_content(chunk_size=8192): content += chunk if len(content) > app.config['MAX_REQUEST_SIZE']: raise ValueError('Response too large') # Replace response content with size-limited content response._content = content return response # Safe: URL fetching with validation @app.route('/fetch', methods=['POST']) def safe_fetch_url(): try: # Validate input schema = URLFetchSchema() data = schema.load(request.json or {}) # Validate URL for SSRF validated_url = validate_url_for_ssrf(data['url']) # Make safe request response = make_safe_request(validated_url) return jsonify({ 'status_code': response.status_code, 'content_preview': response.text[:500], # Limit preview 'content_length': len(response.content), 'content_type': response.headers.get('content-type', 'unknown') }) except (ValueError, MarshmallowValidationError, requests.RequestException) as e: return jsonify({'error': str(e)}), 400 # Safe: Proxy with validation @app.route('/proxy') def safe_proxy_request(): try: target_url = request.args.get('url', '') method = request.args.get('method', 'GET') # Validate method if method.upper() not in ['GET', 'HEAD']: raise ValueError('HTTP method not allowed') # Validate URL validated_url = validate_url_for_ssrf(target_url) # Make safe request response = make_safe_request(validated_url, method) return jsonify({ 'status': response.status_code, 'headers': dict(response.headers), 'content_length': len(response.content) }) except (ValueError, requests.RequestException) as e: return jsonify({'error': str(e)}), 400 # Safe: Webhook with validation @app.route('/webhook', methods=['POST']) def safe_send_webhook(): try: # Validate input schema = WebhookSchema() data = schema.load(request.json or {}) # Get webhook configuration from database webhook_config = get_webhook_config(data['webhook_id']) # Validate webhook URL (should be done when webhook is created) validated_url = validate_url_for_ssrf(webhook_config['url']) # Prepare payload payload = data['payload'] # Add signature if secret is provided headers = {'Content-Type': 'application/json'} if webhook_config.get('secret'): signature = generate_webhook_signature(payload, webhook_config['secret']) headers['X-Webhook-Signature'] = signature # Make safe request response = make_safe_request( validated_url, method='POST', json=payload, headers=headers ) return jsonify({ 'webhook_status': response.status_code, 'delivery_id': generate_delivery_id() }) except (ValueError, MarshmallowValidationError, requests.RequestException) as e: return jsonify({'error': str(e)}), 400 def get_webhook_config(webhook_id): """Get webhook configuration from database""" # Mock function - would query database webhooks = { 'webhook_1': { 'url': 'https://webhook.example.com/endpoint', 'secret': 'webhook_secret_123' } } if webhook_id not in webhooks: raise ValueError('Webhook not found') return webhooks[webhook_id] def generate_webhook_signature(payload, secret): """Generate webhook signature""" import hmac import hashlib import json payload_str = json.dumps(payload, sort_keys=True) signature = hmac.new( secret.encode(), payload_str.encode(), hashlib.sha256 ).hexdigest() return f'sha256={signature}' def generate_delivery_id(): """Generate unique delivery ID""" import uuid return str(uuid.uuid4()) # Safe: API integration with validation @app.route('/api-call') def safe_api_call(): try: api_name = request.args.get('api', '') endpoint = request.args.get('endpoint', '') # Validate API name api_config = get_api_config(api_name) # Validate endpoint validated_endpoint = validate_api_endpoint(endpoint, api_config) # Construct URL full_url = f"{api_config['base_url']}/{validated_endpoint}" # Make safe request headers = {'Authorization': f"Bearer {api_config['api_key']}"} response = make_safe_request(full_url, headers=headers) return jsonify({ 'api_response': response.json() if response.headers.get('content-type', '').startswith('application/json') else response.text[:1000], 'status_code': response.status_code }) except (ValueError, requests.RequestException) as e: return jsonify({'error': str(e)}), 400 def get_api_config(api_name): """Get API configuration""" apis = { 'weather': { 'base_url': 'https://api.example.com', 'api_key': 'weather_api_key' } } if api_name not in apis: raise ValueError('API not configured') return apis[api_name] def validate_api_endpoint(endpoint, api_config): """Validate API endpoint""" if not endpoint: raise ValueError('Endpoint is required') # Only allow safe characters if not re.match(r'^[a-zA-Z0-9/_.-]+$', endpoint): raise ValueError('Endpoint contains invalid characters') # Prevent traversal if '..' in endpoint: raise ValueError('Path traversal not allowed') return endpoint # Safe: Image processing with validation @app.route('/process-image', methods=['POST']) def safe_process_image_from_url(): try: image_url = request.json.get('image_url', '') if request.json else '' # Validate URL validated_url = validate_url_for_ssrf(image_url) # Make safe request response = make_safe_request(validated_url) # Validate content type content_type = response.headers.get('content-type', '') if not content_type.startswith('image/'): raise ValueError('URL does not point to an image') # Process image safely image_info = process_image_safely(response.content, content_type) return jsonify(image_info) except (ValueError, requests.RequestException) as e: return jsonify({'error': str(e)}), 400 def process_image_safely(image_data, content_type): """Process image data safely""" from PIL import Image import io try: # Validate image image = Image.open(io.BytesIO(image_data)) image.verify() # Verify it's a valid image return { 'status': 'Image processed successfully', 'format': image.format, 'size': image.size, 'mode': image.mode, 'content_type': content_type } except Exception as e: raise ValueError(f'Image processing failed: {str(e)}') # Safe: URL checker with validation @app.route('/check-url') def safe_check_url(): try: url_to_check = request.args.get('url', '') # Validate URL validated_url = validate_url_for_ssrf(url_to_check) # Make safe HEAD request response = make_safe_request(validated_url, method='HEAD') return jsonify({ 'url': validated_url, 'status_code': response.status_code, 'accessible': True, 'response_time': response.elapsed.total_seconds(), 'content_type': response.headers.get('content-type') }) except (ValueError, requests.RequestException) as e: return jsonify({ 'url': request.args.get('url', ''), 'accessible': False, 'error': str(e) }), 400 if __name__ == '__main__': app.run(debug=False)

💡 Why This Fix Works

See fix suggestions for detailed explanation.

Why it happens

Flask views pass request parameters to requests library: url = request.args['url']; requests.get(url). Attackers control destination, enabling requests to internal services, cloud metadata (169.254.169.254), localhost, or bypassing firewalls through server-side requests. Application makes arbitrary HTTP requests on attacker's behalf.

Root causes

Using User Input Directly in requests.get() URL Parameter

Flask views pass request parameters to requests library: url = request.args['url']; requests.get(url). Attackers control destination, enabling requests to internal services, cloud metadata (169.254.169.254), localhost, or bypassing firewalls through server-side requests. Application makes arbitrary HTTP requests on attacker's behalf.

Not Validating URL Schemes, Allowing file:// and Other Protocols

Code accepts any URL scheme: requests.get(user_url) without scheme validation. Attackers use file:///etc/passwd for local file access, gopher:// for protocol smuggling, or dict:// for service probing. Non-HTTP schemes bypass network-based protections, enabling file system access and protocol exploitation.

Insufficient Hostname or IP Address Validation

Validation misses private IP ranges or uses incomplete blocklists: if 'localhost' not in url. Attackers use alternative representations: 127.0.0.1, 0.0.0.0, [::1], decimal IPs 2130706433, or DNS rebinding. Missing validation for 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16 private ranges enables internal network access.

Not Handling HTTP Redirects, Enabling Redirect-Based SSRF

Applications don't restrict redirects: requests.get(url, allow_redirects=True). Attackers provide public URL that redirects to internal services. Initial validation passes, redirect bypasses checks. Location header controlled by attacker redirects to http://localhost:6379 or cloud metadata endpoints, circumventing URL validation.

Building URLs with User Input Instead of Using URL Parameters

Code constructs URLs through string operations: requests.get(f'https://api.example.com?user={user}'). Instead of using params argument. User input user=@internal.local/admin changes effective hostname. At-symbol manipulates URL parsing. Building URLs via concatenation creates injection enabling host manipulation and credential smuggling.

Fixes

1

Validate URLs Against Allowlist of Permitted Domains

Use explicit allowlist: ALLOWED_DOMAINS = ['api.example.com', 'cdn.example.com']; parsed = urllib.parse.urlparse(url); if parsed.netloc not in ALLOWED_DOMAINS: abort(400). Check both scheme and netloc. Reject rather than attempting to sanitize. Allowlist approach prevents all SSRF by restricting destinations.

2

Block Private IP Ranges and Localhost Addresses

Validate IP addresses: import ipaddress; ip = ipaddress.ip_address(socket.gethostbyname(hostname)); if ip.is_private or ip.is_loopback: abort(400). Check after DNS resolution. Block 10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16, 127.0.0.0/8, 169.254.0.0/16, ::1. Prevents internal network and metadata endpoint access.

3

Restrict URL Schemes to HTTP/HTTPS Only

Validate scheme before requests: parsed = urllib.parse.urlparse(url); if parsed.scheme not in ['http', 'https']: abort(400). Explicitly allow only web protocols. Reject file://, gopher://, dict://, ftp://, and other schemes. Scheme validation prevents local file access and protocol smuggling attacks.

4

Disable Redirects or Validate Redirect Destinations

Disable automatic redirects: requests.get(url, allow_redirects=False, timeout=5). Or validate redirect locations: response = requests.head(url, allow_redirects=False); if response.is_redirect: validate(response.headers['Location']). Apply same allowlist validation to redirect targets. Prevents redirect-based SSRF bypasses.

5

Use Indirect References Instead of Direct URL Input

Map user input to predefined URLs: URL_MAPPING = {'api': 'https://api.example.com', 'cdn': 'https://cdn.example.com'}; url = URL_MAPPING.get(request.args['service']). Users select service by key, not URL. Application controls all destination URLs. Eliminates SSRF by removing user control over URLs.

6

Configure Network Segmentation and Egress Filtering

Deploy application in isolated network segment. Configure firewall rules blocking outbound access to private IPs. Use egress proxies with allowlists. Implement network policies: only allow outbound to specific external APIs. Defense-in-depth approach limits SSRF impact even if application validation bypassed.

Detect This Vulnerability in Your Code

Sourcery automatically identifies flask server-side request forgery (ssrf) via requests and many other security issues in your codebase.