import urllib.request
from flask import request
@app.route('/download_file')
def download_file():
# Vulnerable: FTP URL exposes credentials and data
ftp_url = 'ftp://user:password@ftp.example.com/file.txt'
response = urllib.request.urlopen(ftp_url)
return response.read()
@app.route('/fetch_ftp')
def fetch_ftp():
# Vulnerable: User-controlled FTP access
ftp_host = request.args.get('host')
filename = request.args.get('file')
# Dangerous: FTP with potential credential exposure
ftp_url = f'ftp://anonymous@{ftp_host}/{filename}'
try:
response = urllib.request.urlopen(ftp_url)
return response.read()
except:
return 'FTP error', 500
import urllib.request
import urllib.parse
from flask import request
import ftplib
import ssl
def validate_secure_url(url):
"""Validate URL uses secure protocols only."""
parsed = urllib.parse.urlparse(url)
secure_schemes = ['https', 'sftp', 'ftps']
if parsed.scheme not in secure_schemes:
raise ValueError(f'Insecure protocol {parsed.scheme} not allowed. Use: {secure_schemes}')
return parsed
@app.route('/download_file')
def download_file():
"""Secure file download using HTTPS."""
try:
# Secure: Use HTTPS instead of FTP
https_url = 'https://secure.example.com/api/files/file.txt'
validate_secure_url(https_url)
# Create secure SSL context
context = ssl.create_default_context()
response = urllib.request.urlopen(
https_url,
timeout=30,
context=context
)
return response.read()
except (ValueError, urllib.error.URLError) as e:
return f'Download failed: {str(e)}', 500
@app.route('/fetch_ftp')
def fetch_ftp():
"""Secure file fetch with validation."""
return {'error': 'FTP access disabled for security. Use HTTPS API instead.'}, 400
# Secure alternative using SFTP
class SecureFTPClient:
"""Secure FTP client using SFTP."""
def __init__(self, host, username, key_file=None):
self.host = host
self.username = username
self.key_file = key_file
# Validate host is in allowlist
allowed_hosts = ['sftp.trusted.com', 'secure-ftp.partner.com']
if host not in allowed_hosts:
raise ValueError(f'Host {host} not in allowlist')
def download_file(self, remote_path, max_size=10*1024*1024):
"""Securely download file via SFTP."""
try:
import paramiko # Requires paramiko for SFTP
# Create SSH client
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.RejectPolicy()) # Strict host key checking
# Connect with key-based authentication
ssh.connect(
self.host,
username=self.username,
key_filename=self.key_file,
timeout=30
)
# Create SFTP client
sftp = ssh.open_sftp()
# Check file size before download
file_stat = sftp.stat(remote_path)
if file_stat.st_size > max_size:
raise ValueError(f'File too large: {file_stat.st_size} bytes')
# Download file securely
with sftp.open(remote_path, 'rb') as remote_file:
content = remote_file.read()
sftp.close()
ssh.close()
return content
except ImportError:
raise RuntimeError('paramiko library required for SFTP')
except Exception as e:
raise RuntimeError(f'SFTP download failed: {str(e)}')
@app.route('/secure_file_download')
def secure_file_download():
"""Example of secure file download."""
filename = request.args.get('file', '')
# Validate filename
if not filename or '..' in filename or filename.startswith('/'):
return 'Invalid filename', 400
try:
# Use secure SFTP client
sftp_client = SecureFTPClient(
host='sftp.trusted.com',
username='secure_user',
key_file='/path/to/private/key'
)
content = sftp_client.download_file(f'/secure/{filename}')
return content, 200, {
'Content-Type': 'application/octet-stream',
'Content-Disposition': f'attachment; filename={filename}'
}
except (ValueError, RuntimeError) as e:
return f'Download error: {str(e)}', 500
# Alternative: Cloud storage approach
@app.route('/cloud_download')
def cloud_download():
"""Secure cloud storage download."""
filename = request.args.get('file', '')
if not filename:
return 'Filename required', 400
# Use cloud storage API instead of FTP
cloud_url = f'https://storage.cloud.com/secure-bucket/{filename}'
try:
validate_secure_url(cloud_url)
# Add authentication headers
request_obj = urllib.request.Request(
cloud_url,
headers={
'Authorization': 'Bearer YOUR_SECURE_TOKEN',
'User-Agent': 'SecureApp/1.0'
}
)
context = ssl.create_default_context()
response = urllib.request.urlopen(request_obj, timeout=30, context=context)
return response.read()
except (ValueError, urllib.error.URLError) as e:
return f'Cloud download failed: {str(e)}', 500