import zipfile
import os
# VULNERABLE: No path validation
def extract_zip(zip_path, extract_to):
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
for member in zip_ref.infolist():
# DANGEROUS: Direct extraction without validation
zip_ref.extract(member, extract_to)
# This allows:
# member.filename = "../../../etc/passwd"
# Results in overwriting system files
print(f"Extracted {zip_path} to {extract_to}")
# VULNERABLE: Manual extraction
def manual_extract(zip_path, extract_to):
with zipfile.ZipFile(zip_path, 'r') as zip_file:
for filename in zip_file.namelist():
# No validation of filename
file_path = os.path.join(extract_to, filename)
with zip_file.open(filename) as source:
with open(file_path, 'wb') as target:
# No size limits - vulnerable to zip bombs
target.write(source.read())
import zipfile
import os
import tempfile
from pathlib import Path
import time
class SecureZipExtractor:
def __init__(self):
self.max_size = 100 * 1024 * 1024 # 100MB
self.max_file_size = 50 * 1024 * 1024 # 50MB per file
self.max_entries = 1000
self.max_depth = 10
self.timeout = 30 # seconds
def extract_zip_securely(self, zip_path, extract_to):
"""Securely extract ZIP file with comprehensive validation"""
extract_path = Path(extract_to).resolve()
extract_path.mkdir(parents=True, exist_ok=True)
if not extract_path.is_dir():
raise ValueError("Destination must be a directory")
start_time = time.time()
with zipfile.ZipFile(zip_path, 'r') as zip_file:
entries = zip_file.infolist()
if len(entries) > self.max_entries:
raise SecurityError(f"Too many entries: {len(entries)}")
total_size = 0
for i, entry in enumerate(entries):
# Check timeout
if time.time() - start_time > self.timeout:
raise SecurityError("Extraction timeout exceeded")
# Validate entry
safe_path = self._validate_entry_path(entry.filename, extract_path)
if entry.is_dir():
safe_path.mkdir(parents=True, exist_ok=True)
self._set_secure_permissions(safe_path)
else:
file_size = self._extract_file_securely(
zip_file, entry, safe_path
)
total_size += file_size
if total_size > self.max_size:
raise SecurityError(
f"Total extraction size exceeded: {total_size}"
)
# Progress reporting
if i % 100 == 0:
print(f"Processed {i+1}/{len(entries)} entries")
print(f"Secure extraction completed. Total size: {total_size} bytes")
return total_size
def _validate_entry_path(self, entry_name, base_path):
"""Validate and resolve entry path safely"""
if not entry_name or not entry_name.strip():
raise SecurityError("Empty entry name")
# Check for dangerous patterns
dangerous_patterns = [
'../', '..\\', '..\\\\',
'/..', '\\..', '\\\\..'
]
for pattern in dangerous_patterns:
if pattern in entry_name:
raise SecurityError(f"Path traversal detected: {entry_name}")
# Check for absolute paths
if entry_name.startswith(('/', '\\')) or \
(len(entry_name) > 1 and entry_name[1] == ':'):
raise SecurityError(f"Absolute path detected: {entry_name}")
# Check directory depth
depth = len([p for p in entry_name.split('/') if p and p != '.'])
if depth > self.max_depth:
raise SecurityError(f"Directory nesting too deep: {depth}")
# Sanitize and resolve path
sanitized = self._sanitize_filename(entry_name)
safe_path = (base_path / sanitized).resolve()
# Ensure path is within base directory
try:
safe_path.relative_to(base_path)
except ValueError:
raise SecurityError(
f"Path traversal attempt: {safe_path} not within {base_path}"
)
return safe_path
def _sanitize_filename(self, filename):
"""Remove dangerous characters from filename"""
import re
# Remove null bytes and control characters
sanitized = ''.join(char for char in filename if ord(char) >= 32)
# Remove dangerous characters
sanitized = re.sub(r'[<>:"|?*]', '', sanitized)
# Normalize path separators
sanitized = sanitized.replace('\\', '/')
# Remove multiple consecutive slashes
sanitized = re.sub(r'/+', '/', sanitized)
return sanitized.strip()
def _extract_file_securely(self, zip_file, entry, output_path):
"""Extract individual file with size validation"""
declared_size = entry.file_size
if declared_size > self.max_file_size:
raise SecurityError(f"File too large: {declared_size} bytes")
# Ensure parent directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)
with zip_file.open(entry) as source:
with open(output_path, 'wb') as target:
total_bytes = 0
chunk_size = 8192
while True:
chunk = source.read(chunk_size)
if not chunk:
break
total_bytes += len(chunk)
# Check size limit
if total_bytes > self.max_file_size:
# Clean up partial file
try:
output_path.unlink()
except OSError:
pass
raise SecurityError(
"File size limit exceeded during extraction"
)
target.write(chunk)
# Set secure permissions
self._set_secure_permissions(output_path)
# Verify final size
if declared_size > 0 and total_bytes != declared_size:
print(f"Warning: Size mismatch for {output_path.name}. "
f"Declared: {declared_size}, Actual: {total_bytes}")
return total_bytes
def _set_secure_permissions(self, path):
"""Set secure file permissions"""
try:
if path.is_file():
# Read/write for owner only
path.chmod(0o600)
elif path.is_dir():
# Read/write/execute for owner only
path.chmod(0o700)
except OSError:
# Permissions not supported on this filesystem
pass
def validate_zip_before_extraction(self, zip_path):
"""Pre-validate ZIP file before extraction"""
try:
with zipfile.ZipFile(zip_path, 'r') as zip_file:
# Check if it's a valid ZIP
if zip_file.testzip() is not None:
raise SecurityError("Corrupted ZIP file")
# Check number of entries
entries = zip_file.infolist()
if len(entries) > self.max_entries:
raise SecurityError(f"Too many entries: {len(entries)}")
# Check total declared size
total_declared = sum(entry.file_size for entry in entries)
if total_declared > self.max_size:
raise SecurityError(f"Total size too large: {total_declared}")
# Check for suspicious patterns
for entry in entries:
if '../' in entry.filename or '..\\' in entry.filename:
raise SecurityError(
f"Suspicious entry detected: {entry.filename}"
)
return True
except zipfile.BadZipFile:
raise SecurityError("Invalid ZIP file")
class SecurityError(Exception):
"""Custom exception for security violations"""
pass
# Example usage
def main():
extractor = SecureZipExtractor()
try:
# Validate before extraction
extractor.validate_zip_before_extraction('archive.zip')
# Perform secure extraction
total_size = extractor.extract_zip_securely(
'archive.zip',
'safe_extraction_dir'
)
print(f"Successfully extracted {total_size} bytes")
except SecurityError as e:
print(f"Security violation: {e}")
except Exception as e:
print(f"Extraction failed: {e}")
if __name__ == '__main__':
main()