Flask Command Injection via os.system()

Critical Risk Command Injection
flaskpythoncommand-injectionos-systemremote-code-executionsystem-commands

What it is

The Flask application uses os.system() or similar functions with user-controlled input, leading to command injection vulnerabilities. Attackers can execute arbitrary system commands on the server, potentially gaining full control of the system, accessing sensitive files, or compromising the entire infrastructure by injecting malicious commands through user input that gets passed to system command execution functions.

# Vulnerable: os.system() with user input in Flask import os import subprocess from flask import Flask, request, jsonify app = Flask(__name__) # Extremely dangerous: os.system() with user input @app.route('/execute', methods=['POST']) def execute_command(): command = request.json.get('command', '') # CRITICAL: Command injection possible result = os.system(command) return jsonify({'result': result}) # Another dangerous pattern @app.route('/file-operations', methods=['POST']) def file_operations(): operation = request.json.get('operation', '') filename = request.json.get('filename', '') # Dangerous: User controls command arguments command = f"{operation} {filename}" try: os.system(command) return jsonify({'status': 'command_executed'}) except Exception as e: return jsonify({'error': str(e)}) # Backup functionality with command injection @app.route('/backup', methods=['POST']) def create_backup(): backup_path = request.json.get('path', '') backup_name = request.json.get('name', '') # Dangerous: User-controlled backup command backup_command = f"tar -czf {backup_name} {backup_path}" try: os.system(backup_command) return jsonify({'status': 'backup_created'}) except Exception as e: return jsonify({'error': str(e)}) # File conversion with subprocess @app.route('/convert', methods=['POST']) def convert_file(): input_file = request.json.get('input_file', '') output_file = request.json.get('output_file', '') format_type = request.json.get('format', '') # Dangerous: subprocess with shell=True try: convert_cmd = f"convert {input_file} {output_file}.{format_type}" subprocess.run(convert_cmd, shell=True, check=True) return jsonify({'status': 'converted'}) except subprocess.CalledProcessError as e: return jsonify({'error': str(e)}) # Log processing with grep @app.route('/search-logs', methods=['GET']) def search_logs(): log_file = request.args.get('log_file', '') search_pattern = request.args.get('pattern', '') # Dangerous: grep command with user input try: grep_command = f"grep '{search_pattern}' {log_file}" result = os.popen(grep_command).read() return jsonify({'matches': result}) except Exception as e: return jsonify({'error': str(e)}) # System information gathering @app.route('/system-info', methods=['GET']) def get_system_info(): info_type = request.args.get('type', '') # Dangerous: Direct system command execution commands = { 'disk': 'df -h', 'memory': 'free -m', 'processes': 'ps aux', 'network': 'netstat -tuln' } if info_type in commands: try: result = os.popen(commands[info_type]).read() return jsonify({'info': result}) except Exception as e: return jsonify({'error': str(e)}) return jsonify({'error': 'Invalid info type'}) # File download with wget @app.route('/download', methods=['POST']) def download_file(): url = request.json.get('url', '') destination = request.json.get('destination', '') # Dangerous: wget command with user input download_command = f"wget '{url}' -O {destination}" try: os.system(download_command) return jsonify({'status': 'downloaded'}) except Exception as e: return jsonify({'error': str(e)}) # Database backup @app.route('/db-backup', methods=['POST']) def backup_database(): db_name = request.json.get('database', '') backup_file = request.json.get('backup_file', '') # Dangerous: Database backup command backup_cmd = f"mysqldump {db_name} > {backup_file}" try: os.system(backup_cmd) return jsonify({'status': 'database_backup_created'}) except Exception as e: return jsonify({'error': str(e)}) # Image processing @app.route('/process-image', methods=['POST']) def process_image(): image_path = request.json.get('image_path', '') operations = request.json.get('operations', []) # Dangerous: ImageMagick commands for operation in operations: command = f"convert {image_path} {operation} {image_path}" try: os.system(command) except Exception as e: return jsonify({'error': str(e)}) return jsonify({'status': 'image_processed'}) # Archive extraction @app.route('/extract', methods=['POST']) def extract_archive(): archive_path = request.json.get('archive', '') extract_path = request.json.get('extract_to', '') # Dangerous: Archive extraction command extract_command = f"tar -xzf {archive_path} -C {extract_path}" try: os.system(extract_command) return jsonify({'status': 'extracted'}) except Exception as e: return jsonify({'error': str(e)})
# Secure: Safe alternatives to os.system() in Flask import subprocess import shutil import os from pathlib import Path from flask import Flask, request, jsonify from marshmallow import Schema, fields, ValidationError as MarshmallowValidationError import re import tarfile import zipfile app = Flask(__name__) # Safe: Input validation schemas class FileOperationSchema(Schema): operation = fields.Str(validate=lambda x: x in ['copy', 'move', 'delete']) filename = fields.Str(required=True) class BackupSchema(Schema): path = fields.Str(required=True) name = fields.Str(required=True) # Safe: No direct command execution @app.route('/execute', methods=['POST']) def safe_execute_operation(): try: # Validate input operation_data = validate_operation_request(request.json or {}) # Execute safe operation result = execute_safe_operation(operation_data) return jsonify(result) except ValueError as e: return jsonify({'error': str(e)}), 400 def validate_operation_request(data): operation_type = data.get('type', '') # Only allow specific safe operations allowed_operations = ['file_info', 'directory_list', 'system_status'] if operation_type not in allowed_operations: raise ValueError('Operation not allowed') return {'type': operation_type, 'parameters': data.get('parameters', {})} def execute_safe_operation(operation_data): operation_type = operation_data['type'] parameters = operation_data['parameters'] if operation_type == 'file_info': return get_safe_file_info(parameters) elif operation_type == 'directory_list': return get_safe_directory_list(parameters) elif operation_type == 'system_status': return get_safe_system_status() def get_safe_file_info(parameters): filename = parameters.get('filename', '') # Validate filename if not filename or '..' in filename: raise ValueError('Invalid filename') # Safe: Use pathlib safe_base_dir = Path('/app/data') file_path = safe_base_dir / filename try: # Ensure file is within safe directory file_path.resolve().relative_to(safe_base_dir.resolve()) if file_path.exists(): stat = file_path.stat() return { 'exists': True, 'size': stat.st_size, 'modified': stat.st_mtime } else: return {'exists': False} except (ValueError, OSError): raise ValueError('File access denied') def get_safe_directory_list(parameters): directory = parameters.get('directory', '') # Validate directory if not directory or '..' in directory: raise ValueError('Invalid directory') safe_base_dir = Path('/app/data') dir_path = safe_base_dir / directory try: # Ensure directory is within safe path dir_path.resolve().relative_to(safe_base_dir.resolve()) if dir_path.is_dir(): files = [] for item in dir_path.iterdir(): files.append({ 'name': item.name, 'type': 'directory' if item.is_dir() else 'file', 'size': item.stat().st_size if item.is_file() else None }) return {'files': files} else: return {'error': 'Directory not found'} except (ValueError, OSError): raise ValueError('Directory access denied') def get_safe_system_status(): # Safe: Use Python libraries instead of system commands import psutil return { 'cpu_percent': psutil.cpu_percent(), 'memory_percent': psutil.virtual_memory().percent, 'disk_usage': { 'total': psutil.disk_usage('/').total, 'used': psutil.disk_usage('/').used, 'free': psutil.disk_usage('/').free } } # Safe: File operations with validation @app.route('/file-operations', methods=['POST']) def safe_file_operations(): try: # Validate input schema = FileOperationSchema() data = schema.load(request.json or {}) # Execute safe file operation result = execute_safe_file_operation(data) return jsonify(result) except MarshmallowValidationError as e: return jsonify({'error': e.messages}), 400 except ValueError as e: return jsonify({'error': str(e)}), 400 def execute_safe_file_operation(data): operation = data['operation'] filename = data['filename'] # Validate filename if not re.match(r'^[a-zA-Z0-9._-]+$', filename): raise ValueError('Invalid filename characters') if '..' in filename: raise ValueError('Path traversal not allowed') # Define safe base directory safe_dir = Path('/app/data/user_files') file_path = safe_dir / filename try: # Ensure file is within safe directory file_path.resolve().relative_to(safe_dir.resolve()) except ValueError: raise ValueError('File path outside allowed directory') if operation == 'copy': backup_path = safe_dir / 'backups' / filename backup_path.parent.mkdir(exist_ok=True) shutil.copy2(file_path, backup_path) return {'status': 'File copied successfully'} elif operation == 'move': archive_path = safe_dir / 'archive' / filename archive_path.parent.mkdir(exist_ok=True) shutil.move(file_path, archive_path) return {'status': 'File moved successfully'} elif operation == 'delete': if file_path.exists(): file_path.unlink() return {'status': 'File deleted successfully'} else: return {'status': 'File not found'} # Safe: Backup functionality @app.route('/backup', methods=['POST']) def safe_create_backup(): try: # Validate input schema = BackupSchema() data = schema.load(request.json or {}) # Create backup safely result = create_safe_backup(data) return jsonify(result) except MarshmallowValidationError as e: return jsonify({'error': e.messages}), 400 except ValueError as e: return jsonify({'error': str(e)}), 400 def create_safe_backup(data): backup_path = data['path'] backup_name = data['name'] # Validate inputs if not re.match(r'^[a-zA-Z0-9._-]+$', backup_name): raise ValueError('Invalid backup name') if '..' in backup_path: raise ValueError('Invalid backup path') # Define safe directories safe_source_dir = Path('/app/data') safe_backup_dir = Path('/app/backups') source_path = safe_source_dir / backup_path backup_file_path = safe_backup_dir / f'{backup_name}.tar.gz' try: # Ensure source is within safe directory source_path.resolve().relative_to(safe_source_dir.resolve()) # Create backup using Python libraries safe_backup_dir.mkdir(exist_ok=True) with tarfile.open(backup_file_path, 'w:gz') as tar: if source_path.exists(): tar.add(source_path, arcname=backup_path) return { 'status': 'Backup created successfully', 'backup_file': backup_file_path.name } except (ValueError, OSError) as e: raise ValueError(f'Backup creation failed: {str(e)}') # Safe: File conversion using Python libraries @app.route('/convert', methods=['POST']) def safe_convert_file(): try: # Get uploaded file if 'file' not in request.files: raise ValueError('No file provided') uploaded_file = request.files['file'] output_format = request.form.get('format', '') # Validate file and format validated_data = validate_conversion_request(uploaded_file, output_format) # Perform safe conversion result = perform_safe_conversion(validated_data) return jsonify(result) except ValueError as e: return jsonify({'error': str(e)}), 400 def validate_conversion_request(uploaded_file, output_format): # Validate file type allowed_types = ['image/jpeg', 'image/png', 'image/gif'] if uploaded_file.content_type not in allowed_types: raise ValueError('File type not allowed') # Validate file size max_size = 10 * 1024 * 1024 # 10MB if len(uploaded_file.read()) > max_size: raise ValueError('File too large') uploaded_file.seek(0) # Reset file pointer # Validate output format allowed_formats = ['jpeg', 'png', 'webp'] if output_format not in allowed_formats: raise ValueError('Output format not allowed') return { 'file': uploaded_file, 'output_format': output_format } def perform_safe_conversion(data): # Safe: Use Pillow for image conversion from PIL import Image import io uploaded_file = data['file'] output_format = data['output_format'] try: # Open and validate image image = Image.open(uploaded_file) image.verify() # Verify it's a valid image # Reopen for processing uploaded_file.seek(0) image = Image.open(uploaded_file) # Convert format output_io = io.BytesIO() image.save(output_io, format=output_format.upper()) # Save converted file output_filename = f'converted_{uploaded_file.filename}.{output_format}' output_path = Path('/app/data/converted') / output_filename output_path.parent.mkdir(exist_ok=True) with open(output_path, 'wb') as f: f.write(output_io.getvalue()) return { 'status': 'Conversion successful', 'output_file': output_filename } except Exception as e: raise ValueError(f'Conversion failed: {str(e)}') # Safe: Log search using Python @app.route('/search-logs', methods=['GET']) def safe_search_logs(): try: log_type = request.args.get('log_type', '') search_pattern = request.args.get('pattern', '') # Validate inputs validated_search = validate_log_search(log_type, search_pattern) # Perform safe log search results = search_logs_safely(validated_search) return jsonify({'matches': results}) except ValueError as e: return jsonify({'error': str(e)}), 400 def validate_log_search(log_type, pattern): # Validate log type allowed_log_types = ['application', 'error', 'access'] if log_type not in allowed_log_types: raise ValueError('Invalid log type') # Validate search pattern if not pattern or len(pattern) > 100: raise ValueError('Invalid search pattern') # Only allow alphanumeric and safe characters if not re.match(r'^[a-zA-Z0-9\s._-]+$', pattern): raise ValueError('Pattern contains invalid characters') return { 'log_type': log_type, 'pattern': pattern } def search_logs_safely(search_data): log_type = search_data['log_type'] pattern = search_data['pattern'] # Define safe log file path log_file = Path('/app/logs') / f'{log_type}.log' if not log_file.exists(): return [] matches = [] try: # Safe: Read and search file using Python with open(log_file, 'r') as f: for line_num, line in enumerate(f, 1): if pattern.lower() in line.lower(): matches.append({ 'line_number': line_num, 'content': line.strip()[:200] # Limit content length }) # Limit results if len(matches) >= 100: break except Exception: raise ValueError('Log search failed') return matches if __name__ == '__main__': app.run(debug=False)

💡 Why This Fix Works

See fix suggestions for detailed explanation.

Why it happens

Flask views execute commands with user data: os.system(f'ping {request.args["host"]}') or os.system('convert ' + filename). Shell interprets special characters like ; && | allowing command chaining. Attackers inject: host=example.com; cat /etc/passwd executing arbitrary commands with application privileges.

Root causes

Passing User Input Directly to os.system() Calls

Flask views execute commands with user data: os.system(f'ping {request.args["host"]}') or os.system('convert ' + filename). Shell interprets special characters like ; && | allowing command chaining. Attackers inject: host=example.com; cat /etc/passwd executing arbitrary commands with application privileges.

Using Shell=True with subprocess and Unvalidated Input

Code passes user input to subprocess with shell: subprocess.call(f'ls {directory}', shell=True). Shell parsing enables injection through metacharacters. Even subprocess.run() vulnerable with shell=True. Command substitution $(command), pipes, redirects, and wildcards all exploitable. Shell=True makes any user input dangerous.

Constructing Shell Commands Through String Concatenation

Building commands with concatenation or formatting: cmd = 'tar -czf backup.tar.gz ' + path; os.system(cmd). String operations don't sanitize shell metacharacters. Even with partial validation, complex payloads bypass filters. Combining multiple user inputs creates injection opportunities through concatenation order.

Using popen, exec, or spawn Functions with Unsanitized Data

Legacy functions execute user input: os.popen(command), os.exec*(command), or os.spawn*(). These invoke shell interpreters. Code like os.popen(f'grep {pattern} file.txt') allows injection. Pattern value like '. /etc/passwd #' changes command behavior. All os.exec and spawn variants vulnerable without array syntax.

Insufficient Input Validation Before Command Execution

Weak validation attempts fail: if ';' not in command: os.system(command). Incomplete blocklists miss alternatives: && || ` $() newlines, etc. Regex validation errors allow bypasses. Even allowlist validation fails with path traversal or argument injection. Validation alone insufficient for shell command construction.

Fixes

1

Never Use os.system(), Use subprocess Without shell=True

Replace os.system() with subprocess.run(['ping', '-c', '4', host], shell=False, check=True). Pass arguments as list, never string. Each list element is argument, preventing shell interpretation. No parsing of metacharacters. Use capture_output=True for output. This prevents all shell injection.

2

Validate Input Against Strict Allowlists Before Execution

For limited commands, use allowlist: ALLOWED_HOSTS = ['example.com', 'test.com']; if host in ALLOWED_HOSTS: subprocess.run(['ping', host]). Validate format with regex: re.match(r'^[a-z0-9.-]+$', host). Reject rather than sanitize. Combine allowlist with subprocess list syntax for defense-in-depth.

3

Use Specialized Libraries Instead of Shell Commands

Replace shell commands with Python libraries: requests instead of curl, zipfile/tarfile instead of tar/unzip, Pillow instead of imagemagick convert, gitpython instead of git commands. Libraries provide safe APIs without shell interpretation. Eliminate command injection surface by avoiding subprocess entirely.

4

Implement Command Argument Escaping with shlex.quote()

If shell absolutely required, use shlex.quote(): from shlex import quote; cmd = f'ping {quote(host)}'. Escapes shell metacharacters with proper quoting. However, prefer subprocess list syntax over escaping. shlex.quote() defense-in-depth only, not primary protection. Never trust it alone.

5

Run Commands with Least Privilege and Sandboxing

Execute subprocesses with reduced privileges: subprocess.run(command, user='nobody', group='nogroup'). Use containers, chroot, or seccomp for isolation. Set resource limits: timeout=10, rlimit parameters. Drop capabilities with setuid/setgid. Even if injection occurs, limit damage through privilege separation and sandboxing.

6

Use Indirect References Instead of Direct Command Construction

Map user input to predefined commands: COMMANDS = {'backup': ['tar', '-czf', 'backup.tar.gz', '/data'], 'restore': ['tar', '-xzf', 'backup.tar.gz']}; cmd = COMMANDS.get(request.args['action']). Users select from safe options. Application controls all command components. No user data in command construction.

Detect This Vulnerability in Your Code

Sourcery automatically identifies flask command injection via os.system() and many other security issues in your codebase.