import requests
@app.route('/fetch_api_data')
def fetch_api_data():
# Vulnerable: HTTP session in context manager
with requests.Session() as session:
# Insecure: HTTP connection exposes data
response = session.get('http://api.example.com/data')
return response.json()
@app.route('/submit_form')
def submit_form():
# Vulnerable: HTTP POST with sensitive data
form_data = request.get_json()
with requests.Session() as session:
# Dangerous: Sending credentials over HTTP
response = session.post(
'http://api.backend.com/auth',
json={'username': form_data['user'], 'password': form_data['pass']}
)
return {'status': response.status_code}
@app.route('/proxy_request')
def proxy_request():
# Vulnerable: Multiple HTTP calls in session
target_url = request.args.get('url')
with requests.Session() as session:
# No validation - allows HTTP connections
response = session.get(target_url)
return response.text
import requests
from urllib.parse import urlparse
# Secure session configuration
def create_secure_session():
session = requests.Session()
# Enforce SSL/TLS verification
session.verify = True
# Set secure headers
session.headers.update({
'User-Agent': 'SecureApp/1.0',
'Accept': 'application/json'
})
return session
def validate_https_url(url):
"""Validate that URL uses HTTPS scheme."""
parsed = urlparse(url)
if parsed.scheme != 'https':
raise ValueError('Only HTTPS URLs are allowed')
return url
@app.route('/fetch_api_data')
def fetch_api_data():
# Secure: HTTPS session with proper configuration
with create_secure_session() as session:
try:
# Secure: HTTPS connection
api_url = 'https://api.example.com/data'
validate_https_url(api_url)
response = session.get(
api_url,
timeout=10,
verify=True # Ensure certificate validation
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {'error': f'API request failed: {str(e)}'}, 500
@app.route('/submit_form')
def submit_form():
# Secure: HTTPS POST with proper validation
form_data = request.get_json()
# Validate required fields
if not form_data or 'user' not in form_data or 'pass' not in form_data:
return {'error': 'Missing credentials'}, 400
with create_secure_session() as session:
try:
# Secure: HTTPS endpoint
auth_url = 'https://api.backend.com/auth'
validate_https_url(auth_url)
response = session.post(
auth_url,
json={
'username': form_data['user'][:50], # Limit length
'password': form_data['pass'][:100]
},
timeout=15,
verify=True
)
response.raise_for_status()
return {'status': 'success', 'code': response.status_code}
except requests.exceptions.RequestException as e:
return {'error': f'Authentication failed: {str(e)}'}, 500
@app.route('/proxy_request')
def proxy_request():
# Secure: URL validation and HTTPS enforcement
target_url = request.args.get('url', '')
# Validate URL format and scheme
try:
validate_https_url(target_url)
except ValueError as e:
return {'error': str(e)}, 400
# Allowlist of permitted domains
allowed_domains = ['api.trusted.com', 'secure.partner.com']
parsed_url = urlparse(target_url)
if parsed_url.netloc not in allowed_domains:
return {'error': 'Domain not allowed'}, 400
with create_secure_session() as session:
try:
response = session.get(
target_url,
timeout=10,
verify=True,
allow_redirects=False # Prevent redirect attacks
)
response.raise_for_status()
# Return sanitized response
return {
'status_code': response.status_code,
'content_type': response.headers.get('content-type', ''),
'data': response.text[:1000] # Limit response size
}
except requests.exceptions.RequestException as e:
return {'error': f'Request failed: {str(e)}'}, 500
# Additional secure configuration
class SecureHTTPAdapter(requests.adapters.HTTPAdapter):
"""Custom adapter that enforces HTTPS."""
def send(self, request, **kwargs):
if request.url.startswith('http://'):
raise requests.exceptions.InvalidURL('HTTP connections not allowed')
return super().send(request, **kwargs)
# Example of using the secure adapter
@app.route('/ultra_secure_request')
def ultra_secure_request():
with requests.Session() as session:
# Mount secure adapter that blocks HTTP
session.mount('http://', SecureHTTPAdapter())
session.mount('https://', SecureHTTPAdapter())
try:
response = session.get('https://api.example.com/secure-data')
return response.json()
except requests.exceptions.InvalidURL as e:
return {'error': 'Insecure connection blocked'}, 400