# Vulnerable: os.system() with user input in Django
import os
import subprocess
from django.http import JsonResponse
from django.views import View
from django.core.files.storage import default_storage
# Extremely dangerous: os.system() with user input
class FileOperationView(View):
def post(self, request):
filename = request.POST.get('filename', '')
operation = request.POST.get('operation', '')
try:
# CRITICAL: Command injection possible
command = f"{operation} {filename}"
result = os.system(command)
return JsonResponse({'result': result})
except Exception as e:
return JsonResponse({'error': str(e)})
# Another dangerous pattern
def backup_user_data(request):
user_id = request.GET.get('user_id', '')
backup_path = request.GET.get('path', '')
# Dangerous: User controls command arguments
backup_command = f"tar -czf {backup_path}/user_{user_id}.tar.gz /data/users/{user_id}/"
try:
os.system(backup_command)
return JsonResponse({'status': 'backup_completed'})
except Exception as e:
return JsonResponse({'error': str(e)})
# File conversion with user input
def convert_file(request):
input_file = request.POST.get('input_file', '')
output_format = request.POST.get('format', '')
output_file = request.POST.get('output_file', '')
# Dangerous: subprocess with shell=True
try:
convert_cmd = f"convert {input_file} {output_file}.{output_format}"
subprocess.run(convert_cmd, shell=True, check=True)
return JsonResponse({'status': 'converted'})
except subprocess.CalledProcessError as e:
return JsonResponse({'error': str(e)})
# Log analysis with user input
def analyze_logs(request):
log_file = request.GET.get('log_file', '')
search_pattern = request.GET.get('pattern', '')
# Dangerous: grep command with user input
try:
grep_command = f"grep '{search_pattern}' {log_file}"
result = os.popen(grep_command).read()
return JsonResponse({'matches': result})
except Exception as e:
return JsonResponse({'error': str(e)})
# System maintenance
def system_maintenance(request):
if request.method == 'POST':
action = request.POST.get('action', '')
target = request.POST.get('target', '')
# Dangerous: Direct system command execution
maintenance_commands = {
'cleanup': f"rm -rf {target}",
'backup': f"cp -r {target} /backups/",
'restart': f"systemctl restart {target}"
}
if action in maintenance_commands:
try:
os.system(maintenance_commands[action])
return JsonResponse({'status': f'{action}_completed'})
except Exception as e:
return JsonResponse({'error': str(e)})
# Secure: Safe alternatives to os.system() in Django
import subprocess
import shutil
import os
from pathlib import Path
from django.http import JsonResponse
from django.views import View
from django.core.files.storage import default_storage
from django.core.exceptions import ValidationError
from django.conf import settings
import re
# Safe: Validated file operations without system commands
class SafeFileOperationView(View):
def post(self, request):
filename = request.POST.get('filename', '')
operation = request.POST.get('operation', '')
try:
# Validate inputs
validated_data = self.validate_file_operation(filename, operation)
# Execute safe operation
result = self.execute_safe_operation(validated_data)
return JsonResponse({'result': result})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_file_operation(self, filename, operation):
# Validate filename
if not filename or len(filename) > 255:
raise ValidationError('Invalid filename')
# Only allow safe filename characters
if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
raise ValidationError('Filename contains invalid characters')
# Prevent directory traversal
if '..' in filename or filename.startswith('/'):
raise ValidationError('Invalid file path')
# Validate operation
allowed_operations = ['copy', 'move', 'delete', 'info']
if operation not in allowed_operations:
raise ValidationError('Operation not allowed')
return {'filename': filename, 'operation': operation}
def execute_safe_operation(self, data):
filename = data['filename']
operation = data['operation']
# Define safe base directory
base_dir = Path(settings.MEDIA_ROOT) / 'user_files'
file_path = base_dir / filename
# Ensure file is within base directory
try:
file_path.resolve().relative_to(base_dir.resolve())
except ValueError:
raise ValidationError('File path outside allowed directory')
# Execute safe operations using Python
if operation == 'copy':
backup_path = base_dir / 'backups' / filename
backup_path.parent.mkdir(exist_ok=True)
shutil.copy2(file_path, backup_path)
return 'File copied successfully'
elif operation == 'move':
archive_path = base_dir / 'archive' / filename
archive_path.parent.mkdir(exist_ok=True)
shutil.move(file_path, archive_path)
return 'File moved successfully'
elif operation == 'delete':
if file_path.exists():
file_path.unlink()
return 'File deleted successfully'
return 'File not found'
elif operation == 'info':
if file_path.exists():
stat = file_path.stat()
return {
'size': stat.st_size,
'modified': stat.st_mtime,
'exists': True
}
return {'exists': False}
# Safe: File backup without system commands
def safe_backup_user_data(request):
user_id = request.GET.get('user_id', '')
try:
# Validate user ID
if not user_id.isdigit():
raise ValidationError('Invalid user ID')
user_id = int(user_id)
# Check if user exists and belongs to current user
if request.user.id != user_id and not request.user.is_staff:
raise ValidationError('Access denied')
# Create backup using Python libraries
result = create_user_backup(user_id)
return JsonResponse({'status': 'backup_completed', 'backup_id': result})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def create_user_backup(user_id):
import tarfile
import tempfile
from datetime import datetime
# Define safe paths
user_data_dir = Path(settings.MEDIA_ROOT) / 'users' / str(user_id)
backup_dir = Path(settings.MEDIA_ROOT) / 'backups'
backup_dir.mkdir(exist_ok=True)
# Create backup filename
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_filename = f'user_{user_id}_{timestamp}.tar.gz'
backup_path = backup_dir / backup_filename
# Create tar archive using Python
with tarfile.open(backup_path, 'w:gz') as tar:
if user_data_dir.exists():
tar.add(user_data_dir, arcname=f'user_{user_id}')
return backup_filename
# Safe: File conversion using Python libraries
def safe_convert_file(request):
try:
# Get uploaded file
if 'file' not in request.FILES:
raise ValidationError('No file provided')
uploaded_file = request.FILES['file']
output_format = request.POST.get('format', '')
# Validate file and format
validated_data = validate_conversion_request(uploaded_file, output_format)
# Perform safe conversion
result = perform_safe_conversion(validated_data)
return JsonResponse({'status': 'converted', 'output_file': result})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_conversion_request(uploaded_file, output_format):
# Validate file type
allowed_types = ['image/jpeg', 'image/png', 'image/gif']
if uploaded_file.content_type not in allowed_types:
raise ValidationError('File type not allowed')
# Validate file size
max_size = 10 * 1024 * 1024 # 10MB
if uploaded_file.size > max_size:
raise ValidationError('File too large')
# Validate output format
allowed_formats = ['jpeg', 'png', 'webp']
if output_format not in allowed_formats:
raise ValidationError('Output format not allowed')
return {
'file': uploaded_file,
'output_format': output_format
}
def perform_safe_conversion(data):
from PIL import Image
import io
uploaded_file = data['file']
output_format = data['output_format']
# Convert using PIL/Pillow
try:
# Open and validate image
image = Image.open(uploaded_file)
image.verify() # Verify it's a valid image
# Reopen for processing (verify() closes the file)
uploaded_file.seek(0)
image = Image.open(uploaded_file)
# Convert format
output_io = io.BytesIO()
image.save(output_io, format=output_format.upper())
# Save converted file
output_filename = f'converted_{uploaded_file.name}.{output_format}'
output_path = Path(settings.MEDIA_ROOT) / 'converted' / output_filename
output_path.parent.mkdir(exist_ok=True)
with open(output_path, 'wb') as f:
f.write(output_io.getvalue())
return output_filename
except Exception as e:
raise ValidationError('Conversion failed')
# Safe: Log analysis without shell commands
def safe_analyze_logs(request):
search_pattern = request.GET.get('pattern', '')
try:
# Validate search pattern
validated_pattern = validate_search_pattern(search_pattern)
# Perform safe log search
results = search_logs_safely(validated_pattern)
return JsonResponse({'matches': results})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_search_pattern(pattern):
if not pattern or len(pattern) > 100:
raise ValidationError('Invalid search pattern')
# Only allow alphanumeric and safe characters
if not re.match(r'^[a-zA-Z0-9\s._-]+$', pattern):
raise ValidationError('Pattern contains invalid characters')
return pattern
def search_logs_safely(pattern):
import re
# Define safe log file path
log_file = Path(settings.BASE_DIR) / 'logs' / 'application.log'
if not log_file.exists():
return []
matches = []
try:
# Read and search file using Python
with open(log_file, 'r') as f:
for line_num, line in enumerate(f, 1):
if pattern.lower() in line.lower():
matches.append({
'line_number': line_num,
'content': line.strip()[:200] # Limit content length
})
# Limit results
if len(matches) >= 100:
break
except Exception:
raise ValidationError('Log search failed')
return matches
# Safe: System maintenance without shell commands
def safe_system_maintenance(request):
if request.method == 'POST':
action = request.POST.get('action', '')
try:
# Validate action
if action not in get_allowed_maintenance_actions():
raise ValidationError('Action not allowed')
# Execute safe maintenance
result = execute_safe_maintenance(action)
return JsonResponse({'status': result})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def get_allowed_maintenance_actions():
return ['cleanup_temp', 'clear_cache', 'compress_logs']
def execute_safe_maintenance(action):
if action == 'cleanup_temp':
# Clean temporary files using Python
temp_dir = Path(settings.MEDIA_ROOT) / 'temp'
if temp_dir.exists():
for file_path in temp_dir.glob('*'):
if file_path.is_file():
file_path.unlink()
return 'Temporary files cleaned'
elif action == 'clear_cache':
# Clear Django cache
from django.core.cache import cache
cache.clear()
return 'Cache cleared'
elif action == 'compress_logs':
# Compress old logs using Python
import gzip
log_dir = Path(settings.BASE_DIR) / 'logs'
for log_file in log_dir.glob('*.log'):
if log_file.stat().st_size > 10 * 1024 * 1024: # 10MB
compressed_path = log_file.with_suffix('.log.gz')
with open(log_file, 'rb') as f_in:
with gzip.open(compressed_path, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
log_file.unlink() # Remove original
return 'Logs compressed'
return 'Maintenance completed'