import urllib.request
import os
from flask import request
@app.route('/download_update')
def download_update():
# Vulnerable: HTTP download without integrity check
update_url = 'http://updates.example.com/app-update.zip'
local_file = '/tmp/update.zip'
# Insecure: File downloaded over HTTP
urllib.request.urlretrieve(update_url, local_file)
return 'Update downloaded'
@app.route('/fetch_resource')
def fetch_resource():
# Vulnerable: User-controlled download URL
resource_url = request.args.get('url')
filename = request.args.get('filename', 'download.tmp')
# Dangerous: No validation of URL or destination
local_path = f'/tmp/{filename}'
urllib.request.urlretrieve(resource_url, local_path)
return f'Downloaded to {local_path}'
import urllib.request
import urllib.parse
import ssl
import hashlib
import os
from flask import request
def validate_secure_download_url(url):
"""Validate URL for secure downloading."""
parsed = urllib.parse.urlparse(url)
if parsed.scheme != 'https':
raise ValueError('Only HTTPS downloads allowed')
# Allowlist of permitted download domains
allowed_domains = [
'downloads.trusted.com',
'secure.example.com',
'cdn.verified.com'
]
if parsed.netloc not in allowed_domains:
raise ValueError(f'Domain {parsed.netloc} not in allowlist')
return parsed
def secure_urlretrieve(url, filename, expected_hash=None, max_size=50*1024*1024):
"""Secure file download with validation."""
# Validate URL
validate_secure_download_url(url)
# Validate filename
if '..' in filename or filename.startswith('/'):
raise ValueError('Invalid filename')
# Create secure download directory
download_dir = '/var/secure_downloads'
os.makedirs(download_dir, exist_ok=True)
local_path = os.path.join(download_dir, filename)
# Create secure SSL context
context = ssl.create_default_context()
# Custom opener with progress tracking
def download_with_progress(url, filename, max_size):
with urllib.request.urlopen(url, timeout=30, context=context) as response:
# Check content length
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > max_size:
raise ValueError(f'File too large: {content_length} bytes')
# Download with size limit
downloaded = 0
hash_obj = hashlib.sha256()
with open(filename, 'wb') as f:
while True:
chunk = response.read(8192)
if not chunk:
break
downloaded += len(chunk)
if downloaded > max_size:
os.remove(filename)
raise ValueError(f'Download exceeded size limit: {max_size} bytes')
hash_obj.update(chunk)
f.write(chunk)
return hash_obj.hexdigest()
try:
file_hash = download_with_progress(url, local_path, max_size)
# Verify hash if provided
if expected_hash and file_hash != expected_hash:
os.remove(local_path)
raise ValueError('File integrity check failed')
return local_path, file_hash
except Exception as e:
# Clean up on error
if os.path.exists(local_path):
os.remove(local_path)
raise RuntimeError(f'Secure download failed: {str(e)}')
@app.route('/download_update')
def download_update():
"""Secure update download with integrity verification."""
try:
# Secure download with known hash
update_url = 'https://secure.example.com/app-update.zip'
expected_hash = 'a1b2c3d4e5f6...' # Known good hash
local_file, file_hash = secure_urlretrieve(
update_url,
'app-update.zip',
expected_hash=expected_hash
)
return {
'status': 'success',
'file': local_file,
'hash': file_hash
}
except (ValueError, RuntimeError) as e:
return {'error': str(e)}, 500
@app.route('/fetch_resource')
def fetch_resource():
"""Secure resource download with validation."""
resource_url = request.args.get('url', '')
filename = request.args.get('filename', '')
if not resource_url or not filename:
return {'error': 'URL and filename required'}, 400
# Validate filename format
if not filename.replace('-', '').replace('_', '').replace('.', '').isalnum():
return {'error': 'Invalid filename format'}, 400
try:
local_file, file_hash = secure_urlretrieve(
resource_url,
filename,
max_size=10*1024*1024 # 10MB limit
)
return {
'status': 'downloaded',
'file': os.path.basename(local_file),
'size': os.path.getsize(local_file),
'hash': file_hash
}
except (ValueError, RuntimeError) as e:
return {'error': str(e)}, 500
# Additional security: File type validation
def validate_file_type(filename, allowed_types):
"""Validate file extension against allowlist."""
ext = os.path.splitext(filename)[1].lower()
if ext not in allowed_types:
raise ValueError(f'File type {ext} not allowed')
@app.route('/download_document')
def download_document():
"""Download with file type validation."""
doc_url = request.args.get('url', '')
filename = request.args.get('filename', '')
try:
# Validate file type
allowed_types = ['.pdf', '.txt', '.docx', '.xlsx']
validate_file_type(filename, allowed_types)
local_file, file_hash = secure_urlretrieve(
doc_url,
filename,
max_size=5*1024*1024 # 5MB limit for documents
)
return {
'status': 'downloaded',
'document': os.path.basename(local_file),
'hash': file_hash
}
except (ValueError, RuntimeError) as e:
return {'error': str(e)}, 500