import subprocess
from flask import Flask, request
@app.route('/run_command')
def run_command():
# Vulnerable: shell=True with user input
user_cmd = request.args.get('cmd')
result = subprocess.run(user_cmd, shell=True, capture_output=True)
return result.stdout.decode()
@app.route('/list_files')
def list_files():
# Vulnerable: Command injection via directory parameter
directory = request.args.get('dir', '.')
command = f'ls -la {directory}' # User input in command string
result = subprocess.run(command, shell=True, capture_output=True)
return result.stdout.decode()
@app.route('/backup')
def backup_files():
# Vulnerable: Multiple user inputs in shell command
source = request.args.get('source')
dest = request.args.get('dest')
# Extremely dangerous: User controls entire command
backup_cmd = f'tar -czf {dest} {source}'
subprocess.run(backup_cmd, shell=True)
return 'Backup completed'
import subprocess
import shlex
import os
import re
from flask import Flask, request
# Safe command allowlist
SAFE_COMMANDS = {
'list': '/bin/ls',
'date': '/bin/date',
'whoami': '/usr/bin/whoami',
'pwd': '/bin/pwd'
}
def validate_path(path):
"""Validate file path for safety."""
if not path:
raise ValueError('Path cannot be empty')
# Prevent directory traversal
if '..' in path or path.startswith('/'):
raise ValueError('Invalid path: directory traversal detected')
# Allow only alphanumeric, dash, underscore, dot
if not re.match(r'^[a-zA-Z0-9._/-]+$', path):
raise ValueError('Invalid characters in path')
return path
def safe_subprocess_run(command_list, timeout=10):
"""Safely run subprocess without shell."""
try:
result = subprocess.run(
command_list,
shell=False, # Never use shell=True
capture_output=True,
text=True,
timeout=timeout,
check=False
)
return result
except subprocess.TimeoutExpired:
raise RuntimeError('Command timeout')
except FileNotFoundError:
raise RuntimeError('Command not found')
@app.route('/run_command')
def run_command():
"""Secure command execution with allowlist."""
user_cmd = request.args.get('cmd', '')
# Validate against allowlist
if user_cmd not in SAFE_COMMANDS:
return {'error': f'Command "{user_cmd}" not allowed'}, 400
try:
# Safe: Use full path, no shell
command_path = SAFE_COMMANDS[user_cmd]
result = safe_subprocess_run([command_path])
return {
'output': result.stdout,
'returncode': result.returncode
}
except RuntimeError as e:
return {'error': str(e)}, 500
@app.route('/list_files')
def list_files():
"""Secure file listing with path validation."""
directory = request.args.get('dir', '.')
try:
# Validate directory path
safe_dir = validate_path(directory)
# Ensure directory exists and is within allowed paths
allowed_base_dirs = ['/var/www', '/tmp/uploads', '/opt/data']
full_path = os.path.abspath(safe_dir)
if not any(full_path.startswith(base) for base in allowed_base_dirs):
return {'error': 'Directory not in allowed paths'}, 403
if not os.path.isdir(full_path):
return {'error': 'Directory not found'}, 404
# Safe: Use argument list, no shell
result = safe_subprocess_run(['/bin/ls', '-la', full_path])
return {
'directory': full_path,
'listing': result.stdout,
'returncode': result.returncode
}
except ValueError as e:
return {'error': str(e)}, 400
except RuntimeError as e:
return {'error': str(e)}, 500
@app.route('/backup')
def backup_files():
"""Secure backup with strict validation."""
source = request.args.get('source', '')
dest = request.args.get('dest', '')
if not source or not dest:
return {'error': 'Source and destination required'}, 400
try:
# Validate paths
safe_source = validate_path(source)
safe_dest = validate_path(dest)
# Validate source exists
source_path = os.path.abspath(safe_source)
if not os.path.exists(source_path):
return {'error': 'Source path not found'}, 404
# Ensure destination is in safe directory
dest_path = os.path.abspath(safe_dest)
allowed_backup_dir = '/var/backups'
if not dest_path.startswith(allowed_backup_dir):
return {'error': 'Destination not in allowed backup directory'}, 403
# Create backup directory if needed
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
# Safe: Use argument list with full paths
result = safe_subprocess_run([
'/bin/tar',
'-czf',
dest_path,
'-C',
os.path.dirname(source_path),
os.path.basename(source_path)
], timeout=60)
if result.returncode == 0:
return {
'status': 'success',
'backup_file': dest_path,
'size': os.path.getsize(dest_path)
}
else:
return {
'error': 'Backup failed',
'stderr': result.stderr
}, 500
except ValueError as e:
return {'error': str(e)}, 400
except RuntimeError as e:
return {'error': str(e)}, 500
# Alternative: Use shlex.quote for shell escaping (if shell=True is unavoidable)
def safe_shell_command(base_command, user_args):
"""Safely construct shell command with proper escaping."""
if not isinstance(user_args, list):
user_args = [user_args]
# Escape all arguments
escaped_args = [shlex.quote(str(arg)) for arg in user_args]
# Construct safe command
command = f"{base_command} {' '.join(escaped_args)}"
return command
@app.route('/safe_shell_example')
def safe_shell_example():
"""Example of safer shell usage (still discouraged)."""
filename = request.args.get('filename', '')
try:
validate_path(filename)
# If shell=True is absolutely necessary, use proper escaping
safe_command = safe_shell_command('/bin/cat', [filename])
result = subprocess.run(
safe_command,
shell=True, # Still risky, prefer shell=False
capture_output=True,
text=True,
timeout=10
)
return {
'content': result.stdout,
'command_used': safe_command
}
except ValueError as e:
return {'error': str(e)}, 400
except subprocess.TimeoutExpired:
return {'error': 'Command timeout'}, 408
# Best practice: Command builder class
class SecureCommandBuilder:
"""Secure command builder that prevents injection."""
def __init__(self, base_command):
self.base_command = base_command
self.args = []
# Validate base command
if base_command not in SAFE_COMMANDS.values():
raise ValueError(f'Base command {base_command} not allowed')
def add_argument(self, arg, validator=None):
"""Add argument with optional validation."""
if validator:
arg = validator(arg)
# Basic safety check
if isinstance(arg, str) and len(arg) > 1000:
raise ValueError('Argument too long')
self.args.append(str(arg))
return self
def execute(self, timeout=10):
"""Execute the built command safely."""
command_list = [self.base_command] + self.args
return safe_subprocess_run(command_list, timeout)
@app.route('/builder_example')
def builder_example():
"""Example using secure command builder."""
directory = request.args.get('dir', '.')
try:
# Build command securely
builder = SecureCommandBuilder('/bin/ls')
builder.add_argument('-la')
builder.add_argument(directory, validator=validate_path)
result = builder.execute()
return {'output': result.stdout}
except ValueError as e:
return {'error': str(e)}, 400
except RuntimeError as e:
return {'error': str(e)}, 500