import os
import subprocess
import tempfile
import shutil
import hashlib
import magic
from PIL import Image, ImageFile
from flask import Flask, request, jsonify
from werkzeug.utils import secure_filename
import logging
app = Flask(__name__)
class SecureImageProcessor:
def __init__(self):
self.upload_dir = os.path.abspath('secure_uploads')
self.allowed_formats = {'JPEG', 'PNG', 'GIF'}
self.max_file_size = 10 * 1024 * 1024 # 10MB
self.max_dimensions = (8000, 8000)
self.max_pixels = 50000000 # 50MP
# Security settings for ImageMagick
self.imagemagick_policy = {
'memory': '256MiB',
'map': '512MiB',
'area': '128MB',
'disk': '1GiB',
'time': '120',
}
self.ensure_upload_directory()
self.setup_imagemagick_security()
def ensure_upload_directory(self):
if not os.path.exists(self.upload_dir):
os.makedirs(self.upload_dir, mode=0o755)
def setup_imagemagick_security(self):
"""Configure ImageMagick security policy"""
policy_content = '''
<policymap>
<!-- Disable dangerous formats -->
<policy domain="coder" rights="none" pattern="PS" />
<policy domain="coder" rights="none" pattern="EPS" />
<policy domain="coder" rights="none" pattern="PDF" />
<policy domain="coder" rights="none" pattern="XPS" />
<policy domain="coder" rights="none" pattern="MVG" />
<policy domain="coder" rights="none" pattern="MSL" />
<policy domain="coder" rights="none" pattern="TEXT" />
<policy domain="coder" rights="none" pattern="SHOW" />
<policy domain="coder" rights="none" pattern="WIN" />
<policy domain="coder" rights="none" pattern="PLT" />
<!-- Resource limits -->
<policy domain="resource" name="memory" value="256MiB"/>
<policy domain="resource" name="map" value="512MiB"/>
<policy domain="resource" name="area" value="128MB"/>
<policy domain="resource" name="disk" value="1GiB"/>
<policy domain="resource" name="time" value="120"/>
<!-- Disable indirect reads -->
<policy domain="path" rights="none" pattern="@*"/>
</policymap>
'''
# Write policy to temporary file for this session
self.policy_file = tempfile.NamedTemporaryFile(mode='w', suffix='.xml', delete=False)
self.policy_file.write(policy_content)
self.policy_file.close()
# Set environment variable
os.environ['MAGICK_CONFIGURE_PATH'] = os.path.dirname(self.policy_file.name)
def validate_image_comprehensive(self, file_path):
"""Comprehensive image validation"""
# 1. File size check
file_size = os.path.getsize(file_path)
if file_size > self.max_file_size:
raise ValueError(f"File too large: {file_size} bytes")
# 2. Magic bytes validation
mime_type = magic.from_file(file_path, mime=True)
allowed_mimes = ['image/jpeg', 'image/png', 'image/gif']
if mime_type not in allowed_mimes:
raise ValueError(f"Invalid MIME type: {mime_type}")
# 3. PIL validation
try:
with Image.open(file_path) as img:
# Verify format
if img.format not in self.allowed_formats:
raise ValueError(f"Unsupported format: {img.format}")
# Check dimensions
if (img.width > self.max_dimensions[0] or
img.height > self.max_dimensions[1]):
raise ValueError("Image dimensions too large")
# Check total pixels
if img.width * img.height > self.max_pixels:
raise ValueError("Image too large (pixels)")
# Verify image by loading it
img.verify()
except Exception as e:
raise ValueError(f"Invalid image: {e}")
# 4. Content scanning
self.scan_for_malicious_content(file_path)
return True
def scan_for_malicious_content(self, file_path):
"""Scan file for malicious patterns"""
with open(file_path, 'rb') as f:
content = f.read()
# Convert to string for pattern matching
content_str = content.decode('utf-8', errors='ignore')
malicious_patterns = [
r'<\?php',
r'<script[^>]*>',
r'<%[^>]*%>',
r'javascript:',
r'vbscript:',
r'eval\s*\(',
r'exec\s*\(',
r'system\s*\(',
r'shell_exec\s*\(',
r'file_get_contents\s*\(',
r'@[^\s]*\|', # ImageMagick delegates
r'\|[^\s]*', # Command pipes
]
import re
for pattern in malicious_patterns:
if re.search(pattern, content_str, re.IGNORECASE):
raise ValueError(f"Malicious content detected: {pattern}")
def sanitize_filename(self, filename):
"""Generate secure filename"""
# Extract extension
_, ext = os.path.splitext(filename)
ext = ext.lower()
# Validate extension
allowed_exts = ['.jpg', '.jpeg', '.png', '.gif']
if ext not in allowed_exts:
ext = '.jpg' # Default to JPEG
# Generate secure filename
timestamp = str(int(time.time()))
random_hash = hashlib.md5(os.urandom(32)).hexdigest()[:16]
return f"{timestamp}_{random_hash}{ext}"
def process_image_safely(self, input_path, operation='resize', **kwargs):
"""Safely process image using PIL instead of ImageMagick when possible"""
# Validate input
self.validate_image_comprehensive(input_path)
# Generate output filename
output_filename = self.sanitize_filename(os.path.basename(input_path))
output_path = os.path.join(self.upload_dir, 'processed_' + output_filename)
try:
if operation == 'resize':
return self.resize_image_pil(input_path, output_path, kwargs)
elif operation == 'convert':
return self.convert_image_pil(input_path, output_path, kwargs)
else:
raise ValueError(f"Unsupported operation: {operation}")
except Exception as e:
# Clean up on error
if os.path.exists(output_path):
os.remove(output_path)
raise e
def resize_image_pil(self, input_path, output_path, options):
"""Resize image using PIL (safer than ImageMagick)"""
width = options.get('width', 800)
height = options.get('height', 600)
quality = options.get('quality', 85)
# Validate dimensions
if width > 5000 or height > 5000:
raise ValueError("Output dimensions too large")
with Image.open(input_path) as img:
# Remove any EXIF data and metadata
img_without_exif = Image.new(img.mode, img.size)
img_without_exif.putdata(list(img.getdata()))
# Resize
resized = img_without_exif.resize((width, height), Image.Resampling.LANCZOS)
# Save as JPEG to normalize format
resized.save(output_path, 'JPEG', quality=quality, optimize=True)
return output_path
def convert_image_pil(self, input_path, output_path, options):
"""Convert image format using PIL"""
target_format = options.get('format', 'JPEG').upper()
quality = options.get('quality', 85)
if target_format not in ['JPEG', 'PNG', 'GIF']:
raise ValueError(f"Unsupported target format: {target_format}")
with Image.open(input_path) as img:
# Remove metadata
img_clean = Image.new(img.mode, img.size)
img_clean.putdata(list(img.getdata()))
# Convert and save
if target_format == 'JPEG':
if img_clean.mode in ('RGBA', 'LA', 'P'):
# Convert to RGB for JPEG
rgb_img = Image.new('RGB', img_clean.size, (255, 255, 255))
rgb_img.paste(img_clean, mask=img_clean.split()[-1] if img_clean.mode in ('RGBA', 'LA') else None)
img_clean = rgb_img
img_clean.save(output_path, 'JPEG', quality=quality, optimize=True)
else:
img_clean.save(output_path, target_format)
return output_path
def calculate_file_hash(self, file_path):
"""Calculate SHA-256 hash of file"""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
def cleanup_temp_files(self):
"""Clean up temporary files"""
if hasattr(self, 'policy_file'):
try:
os.unlink(self.policy_file.name)
except OSError:
pass
# Initialize processor
processor = SecureImageProcessor()
@app.route('/process-image', methods=['POST'])
def process_image():
try:
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No file selected'}), 400
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.tmp') as temp_file:
file.save(temp_file.name)
temp_path = temp_file.name
try:
# Get processing parameters
operation = request.form.get('operation', 'resize')
width = int(request.form.get('width', 800))
height = int(request.form.get('height', 600))
quality = int(request.form.get('quality', 85))
# Validate parameters
if not (100 <= width <= 2000 and 100 <= height <= 2000):
return jsonify({'error': 'Invalid dimensions'}), 400
if not (50 <= quality <= 100):
return jsonify({'error': 'Invalid quality'}), 400
# Process image
output_path = processor.process_image_safely(
temp_path,
operation=operation,
width=width,
height=height,
quality=quality
)
# Calculate file hash
file_hash = processor.calculate_file_hash(output_path)
return jsonify({
'success': True,
'message': 'Image processed successfully',
'output': os.path.basename(output_path),
'size': os.path.getsize(output_path),
'hash': file_hash
})
finally:
# Clean up temp file
if os.path.exists(temp_path):
os.remove(temp_path)
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
logging.error(f"Image processing error: {e}")
return jsonify({'error': 'Internal server error'}), 500
@app.route('/image/<filename>')
def serve_image(filename):
"""Serve processed images securely"""
# Validate filename
if not re.match(r'^processed_\d+_[a-f0-9]{16}\.(jpg|jpeg|png|gif)$', filename):
return jsonify({'error': 'Invalid filename'}), 400
file_path = os.path.join(processor.upload_dir, filename)
if not os.path.exists(file_path):
return jsonify({'error': 'File not found'}), 404
# Security headers
response = make_response(send_file(file_path))
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['Content-Disposition'] = 'inline'
return response
if __name__ == '__main__':
import atexit
# Register cleanup
atexit.register(processor.cleanup_temp_files)
app.run(debug=False, host='127.0.0.1')