# SECURE: Django views without subprocess injection
from django.http import JsonResponse
from django.shortcuts import render
from django.views.decorators.csrf import csrf_protect
from django.contrib.auth.decorators import permission_required
from django.core.exceptions import ValidationError
from django import forms
import os
import hashlib
from pathlib import Path
import re
import logging
logger = logging.getLogger(__name__)
# SECURE: Form validation
class FileOperationForm(forms.Form):
OPERATION_CHOICES = [
('info', 'File Information'),
('hash', 'Calculate Hash'),
('lines', 'Count Lines'),
('size', 'File Size')
]
operation = forms.ChoiceField(choices=OPERATION_CHOICES)
filename = forms.CharField(max_length=255)
def clean_filename(self):
filename = self.cleaned_data['filename']
# Strict validation
if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
raise ValidationError('Invalid filename format')
if len(filename) > 255:
raise ValidationError('Filename too long')
if '..' in filename or filename.startswith('.'):
raise ValidationError('Invalid filename')
return filename
class LogAnalysisForm(forms.Form):
LOG_FILE_CHOICES = [
('/var/log/app.log', 'Application Log'),
('/var/log/error.log', 'Error Log'),
('/var/log/access.log', 'Access Log')
]
log_file = forms.ChoiceField(choices=LOG_FILE_CHOICES)
filter_pattern = forms.CharField(max_length=100, required=False)
lines = forms.IntegerField(min_value=1, max_value=1000, initial=100)
def clean_filter_pattern(self):
pattern = self.cleaned_data.get('filter_pattern', '')
if pattern:
# Allow only alphanumeric and basic characters
if not re.match(r'^[a-zA-Z0-9\s._-]+$', pattern):
raise ValidationError('Invalid filter pattern')
return pattern
# SECURE: File processor using native Python
class SecureFileProcessor:
def __init__(self):
self.safe_directory = Path('/secure/uploads')
self.max_file_size = 10 * 1024 * 1024 # 10MB
def process_file(self, operation, filename):
# Get validated file path
file_path = self.get_safe_file_path(filename)
operations = {
'info': self.get_file_info,
'hash': self.calculate_hash,
'lines': self.count_lines,
'size': self.get_file_size
}
return operations[operation](file_path)
def get_safe_file_path(self, filename):
file_path = (self.safe_directory / filename).resolve()
# Security checks
if not str(file_path).startswith(str(self.safe_directory.resolve())):
raise ValueError('File not in safe directory')
if not file_path.exists():
raise FileNotFoundError('File not found')
if file_path.stat().st_size > self.max_file_size:
raise ValueError('File too large')
return file_path
def get_file_info(self, file_path):
stat = file_path.stat()
return {
'name': file_path.name,
'size': stat.st_size,
'modified': stat.st_mtime,
'permissions': oct(stat.st_mode)[-3:],
'extension': file_path.suffix
}
def calculate_hash(self, file_path):
with open(file_path, 'rb') as f:
content = f.read()
return {
'md5': hashlib.md5(content).hexdigest(),
'sha256': hashlib.sha256(content).hexdigest(),
'size': len(content)
}
def count_lines(self, file_path):
with open(file_path, 'r', encoding='utf-8') as f:
lines = sum(1 for _ in f)
return {'lines': lines}
def get_file_size(self, file_path):
size = file_path.stat().st_size
return {
'size_bytes': size,
'size_human': self.format_size(size)
}
def format_size(self, size):
units = ['B', 'KB', 'MB', 'GB']
for unit in units:
if size < 1024:
return f'{size:.2f} {unit}'
size /= 1024
return f'{size:.2f} TB'
# SECURE: Log analyzer using native Python
class SecureLogAnalyzer:
def __init__(self):
self.allowed_log_files = {
'/var/log/app.log': 'Application Log',
'/var/log/error.log': 'Error Log',
'/var/log/access.log': 'Access Log'
}
def analyze_log(self, log_file, filter_pattern, lines):
if log_file not in self.allowed_log_files:
raise ValueError('Log file not allowed')
if not os.path.exists(log_file):
raise FileNotFoundError('Log file not found')
# Read log file safely
matching_lines = []
try:
with open(log_file, 'r', encoding='utf-8') as f:
# Read last N lines efficiently
all_lines = f.readlines()
recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
for line in recent_lines:
if not filter_pattern or filter_pattern.lower() in line.lower():
matching_lines.append(line.strip())
except UnicodeDecodeError:
raise ValueError('Log file contains invalid characters')
return {
'log_file': log_file,
'total_lines': len(matching_lines),
'filter_applied': bool(filter_pattern),
'lines': matching_lines
}
# Initialize processors
file_processor = SecureFileProcessor()
log_analyzer = SecureLogAnalyzer()
# SECURE: Views with proper validation
@csrf_protect
@permission_required('app.can_manage_files')
def secure_file_operations(request):
if request.method == 'POST':
form = FileOperationForm(request.POST)
if form.is_valid():
try:
result = file_processor.process_file(
form.cleaned_data['operation'],
form.cleaned_data['filename']
)
# Log successful operation
logger.info(f'File operation {form.cleaned_data["operation"]} '
f'on {form.cleaned_data["filename"]} by {request.user}')
return JsonResponse({
'success': True,
'operation': form.cleaned_data['operation'],
'result': result
})
except (ValueError, FileNotFoundError) as e:
return JsonResponse({
'success': False,
'error': str(e)
})
except Exception as e:
logger.exception(f'File operation error: {e}')
return JsonResponse({
'success': False,
'error': 'Operation failed'
})
else:
return JsonResponse({
'success': False,
'errors': form.errors
})
# GET request - show form
form = FileOperationForm()
return render(request, 'secure_file_ops.html', {'form': form})
@csrf_protect
@permission_required('app.can_view_logs')
def secure_log_analysis(request):
if request.method == 'POST':
form = LogAnalysisForm(request.POST)
if form.is_valid():
try:
result = log_analyzer.analyze_log(
form.cleaned_data['log_file'],
form.cleaned_data['filter_pattern'],
form.cleaned_data['lines']
)
# Log access
logger.info(f'Log analysis on {form.cleaned_data["log_file"]} '
f'by {request.user}')
return JsonResponse({
'success': True,
'result': result
})
except (ValueError, FileNotFoundError) as e:
return JsonResponse({
'success': False,
'error': str(e)
})
except Exception as e:
logger.exception(f'Log analysis error: {e}')
return JsonResponse({
'success': False,
'error': 'Analysis failed'
})
else:
return JsonResponse({
'success': False,
'errors': form.errors
})
# GET request
form = LogAnalysisForm()
return render(request, 'secure_log_analysis.html', {'form': form})
# Health check endpoint
def health_check(request):
return JsonResponse({
'status': 'healthy',
'available_operations': ['info', 'hash', 'lines', 'size'],
'security_features': [
'Input validation',
'CSRF protection',
'Permission checks',
'Native Python operations',
'No subprocess execution'
]
})
# SECURE: URLs with proper naming
from django.urls import path
urlpatterns = [
path('file-ops/', secure_file_operations, name='secure_file_operations'),
path('logs/', secure_log_analysis, name='secure_log_analysis'),
path('health/', health_check, name='health_check'),
]