import requests
from flask import Flask, request
import urllib3
# Vulnerable: Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@app.route('/api_call')
def api_call():
# Vulnerable: Certificate verification disabled
url = 'https://api.internal.com/data'
response = requests.get(url, verify=False) # DANGEROUS
return response.json()
@app.route('/webhook', methods=['POST'])
def webhook():
# Vulnerable: POST without certificate validation
webhook_url = 'https://external-webhook.com/notify'
data = request.get_json()
# Dangerous: MITM attacks possible
response = requests.post(
webhook_url,
json=data,
verify=False # Certificate validation disabled
)
return {'status': response.status_code}
# Vulnerable: Global session with disabled verification
session = requests.Session()
session.verify = False # All requests vulnerable
import requests
from flask import Flask, request
import ssl
import certifi
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
# Secure SSL context configuration
def create_secure_context():
"""Create secure SSL context with proper validation."""
context = ssl.create_default_context(cafile=certifi.where())
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED
context.minimum_version = ssl.TLSVersion.TLSv1_2
return context
# Secure session configuration
def create_secure_session():
"""Create requests session with secure SSL configuration."""
session = requests.Session()
# Ensure certificate verification is enabled
session.verify = True # Explicit verification
# Use latest CA bundle
session.verify = certifi.where()
# Configure secure headers
session.headers.update({
'User-Agent': 'SecureApp/1.0',
'Accept': 'application/json'
})
return session
@app.route('/api_call')
def api_call():
"""Secure API call with certificate validation."""
try:
url = 'https://api.internal.com/data'
# Secure: Certificate verification enabled
response = requests.get(
url,
verify=True, # Explicit verification
timeout=10,
headers={'Accept': 'application/json'}
)
response.raise_for_status()
return response.json()
except requests.exceptions.SSLError as e:
return {'error': f'SSL verification failed: {str(e)}'}, 400
except requests.exceptions.RequestException as e:
return {'error': f'Request failed: {str(e)}'}, 500
@app.route('/webhook', methods=['POST'])
def webhook():
"""Secure webhook with certificate validation."""
try:
webhook_url = 'https://external-webhook.com/notify'
data = request.get_json()
if not data:
return {'error': 'No data provided'}, 400
# Secure: Full certificate validation
response = requests.post(
webhook_url,
json=data,
verify=certifi.where(), # Use trusted CA bundle
timeout=15,
headers={
'Content-Type': 'application/json',
'User-Agent': 'SecureWebhookClient/1.0'
}
)
response.raise_for_status()
return {
'status': 'success',
'response_code': response.status_code
}
except requests.exceptions.SSLError as e:
return {'error': f'SSL verification failed: {str(e)}'}, 400
except requests.exceptions.RequestException as e:
return {'error': f'Webhook failed: {str(e)}'}, 500
# Secure: Session with certificate validation
secure_session = create_secure_session()
@app.route('/secure_session_example')
def secure_session_example():
"""Example using secure session."""
try:
response = secure_session.get(
'https://api.example.com/secure-endpoint',
timeout=10
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
return {'error': str(e)}, 500
# Advanced: Certificate pinning
class CertificatePinningAdapter(HTTPAdapter):
"""HTTP adapter with certificate pinning."""
def __init__(self, expected_fingerprint, *args, **kwargs):
self.expected_fingerprint = expected_fingerprint
super().__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
ctx = create_urllib3_context()
ctx.check_hostname = True
ctx.verify_mode = ssl.CERT_REQUIRED
kwargs['ssl_context'] = ctx
return super().init_poolmanager(*args, **kwargs)
@app.route('/pinned_request')
def pinned_request():
"""Example with certificate pinning."""
try:
session = requests.Session()
# Pin certificate for critical connections
expected_fingerprint = 'sha256:...' # Your expected certificate fingerprint
adapter = CertificatePinningAdapter(expected_fingerprint)
session.mount('https://critical-api.com', adapter)
response = session.get(
'https://critical-api.com/sensitive-data',
timeout=10
)
response.raise_for_status()
return response.json()
except requests.exceptions.SSLError as e:
return {'error': f'Certificate pinning failed: {str(e)}'}, 400
except requests.exceptions.RequestException as e:
return {'error': str(e)}, 500
# Configuration for internal CA
@app.route('/internal_ca_example')
def internal_ca_example():
"""Example with custom CA bundle for internal services."""
try:
# Use custom CA bundle for internal certificates
ca_bundle_path = '/etc/ssl/certs/internal-ca-bundle.pem'
response = requests.get(
'https://internal-service.company.com/api',
verify=ca_bundle_path, # Custom CA bundle
timeout=10
)
response.raise_for_status()
return response.json()
except requests.exceptions.SSLError as e:
return {'error': f'Internal SSL verification failed: {str(e)}'}, 400
except requests.exceptions.RequestException as e:
return {'error': str(e)}, 500