import urllib.request
import urllib.parse
import ssl
import os
from flask import request
def validate_secure_download_protocol(url):
"""Ensure only secure protocols are used."""
parsed = urllib.parse.urlparse(url)
secure_protocols = ['https', 'sftp', 'ftps']
if parsed.scheme not in secure_protocols:
raise ValueError(f'Insecure protocol {parsed.scheme}. Use: {secure_protocols}')
return parsed
@app.route('/download_backup')
def download_backup():
"""Secure backup download using HTTPS API."""
try:
# Secure: Use HTTPS API instead of FTP
backup_api_url = 'https://secure-backup.company.com/api/backups/latest'
validate_secure_download_protocol(backup_api_url)
# Create secure request with authentication
request_obj = urllib.request.Request(
backup_api_url,
headers={
'Authorization': 'Bearer YOUR_SECURE_TOKEN',
'User-Agent': 'SecureBackupClient/1.0'
}
)
# Secure SSL context
context = ssl.create_default_context()
# Download with timeout and size limits
with urllib.request.urlopen(request_obj, timeout=300, context=context) as response:
if response.headers.get('Content-Type') != 'application/zip':
raise ValueError('Invalid backup file type')
# Secure download directory
download_dir = '/var/secure_backups'
os.makedirs(download_dir, exist_ok=True, mode=0o700)
local_file = os.path.join(download_dir, 'backup.zip')
# Download with size limit (100MB)
max_size = 100 * 1024 * 1024
downloaded = 0
with open(local_file, 'wb') as f:
while True:
chunk = response.read(8192)
if not chunk:
break
downloaded += len(chunk)
if downloaded > max_size:
os.remove(local_file)
raise ValueError('Backup file too large')
f.write(chunk)
return {
'status': 'success',
'file': local_file,
'size': downloaded
}
except (ValueError, urllib.error.URLError) as e:
return {'error': f'Backup download failed: {str(e)}'}, 500
@app.route('/fetch_file')
def fetch_file():
"""Secure file download with strict validation."""
return {
'error': 'FTP downloads disabled for security. Use secure HTTPS API endpoints.'
}, 400
# Secure alternative using SFTP
class SecureFileDownloader:
"""Secure file downloader using encrypted protocols."""
def __init__(self):
self.allowed_hosts = [
'secure-files.company.com',
'backup.trusted.com'
]
def download_via_https(self, url, filename, max_size=50*1024*1024):
"""Download file securely via HTTPS."""
# Validate URL
parsed = urllib.parse.urlparse(url)
if parsed.scheme != 'https':
raise ValueError('Only HTTPS downloads allowed')
if parsed.netloc not in self.allowed_hosts:
raise ValueError(f'Host {parsed.netloc} not in allowlist')
# Validate filename
if '..' in filename or '/' in filename:
raise ValueError('Invalid filename')
# Secure download
context = ssl.create_default_context()
try:
with urllib.request.urlopen(url, timeout=60, context=context) as response:
# Check content type
content_type = response.headers.get('Content-Type', '')
safe_types = [
'application/octet-stream',
'application/zip',
'text/plain',
'application/pdf'
]
if not any(ct in content_type for ct in safe_types):
raise ValueError(f'Unsafe content type: {content_type}')
# Secure download with size check
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > max_size:
raise ValueError('File too large')
# Create secure directory
download_dir = '/var/secure_downloads'
os.makedirs(download_dir, exist_ok=True, mode=0o700)
local_path = os.path.join(download_dir, filename)
downloaded = 0
with open(local_path, 'wb') as f:
while True:
chunk = response.read(8192)
if not chunk:
break
downloaded += len(chunk)
if downloaded > max_size:
os.remove(local_path)
raise ValueError('Download size exceeded')
f.write(chunk)
return local_path, downloaded
except Exception as e:
raise RuntimeError(f'Download failed: {str(e)}')
def download_via_sftp(self, host, remote_path, local_filename, username, key_file):
"""Download file securely via SFTP."""
if host not in self.allowed_hosts:
raise ValueError(f'Host {host} not allowed')
try:
import paramiko
# Create SSH client with strict security
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.RejectPolicy())
# Connect with key authentication
ssh.connect(
host,
username=username,
key_filename=key_file,
timeout=30,
compress=True
)
sftp = ssh.open_sftp()
# Check file size
file_stat = sftp.stat(remote_path)
if file_stat.st_size > 50*1024*1024: # 50MB limit
raise ValueError('File too large for SFTP download')
# Secure local path
download_dir = '/var/secure_downloads'
os.makedirs(download_dir, exist_ok=True, mode=0o700)
local_path = os.path.join(download_dir, local_filename)
# Download file
sftp.get(remote_path, local_path)
sftp.close()
ssh.close()
return local_path, file_stat.st_size
except ImportError:
raise RuntimeError('paramiko library required for SFTP')
except Exception as e:
raise RuntimeError(f'SFTP download failed: {str(e)}')
@app.route('/secure_download')
def secure_download():
"""Secure file download endpoint."""
download_url = request.args.get('url', '')
filename = request.args.get('filename', '')
if not download_url or not filename:
return {'error': 'URL and filename required'}, 400
try:
downloader = SecureFileDownloader()
local_path, size = downloader.download_via_https(download_url, filename)
return {
'status': 'downloaded',
'file': os.path.basename(local_path),
'size': size
}
except (ValueError, RuntimeError) as e:
return {'error': str(e)}, 500