# Vulnerable: subprocess with user input in Flask
import subprocess
from flask import Flask, request, jsonify
app = Flask(__name__)
# Extremely dangerous: subprocess with shell=True
@app.route('/execute', methods=['POST'])
def execute_command():
command = request.json.get('command', '')
# CRITICAL: Command injection via shell=True
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True)
return jsonify({
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': result.returncode
})
except Exception as e:
return jsonify({'error': str(e)})
# Another dangerous pattern
@app.route('/file-ops', methods=['POST'])
def file_operations():
operation = request.json.get('operation', '')
filename = request.json.get('filename', '')
# Dangerous: User controls command arguments
command = [operation, filename]
try:
# Still dangerous if operation is user-controlled
result = subprocess.run(command, capture_output=True, text=True)
return jsonify({
'output': result.stdout,
'status': 'executed'
})
except Exception as e:
return jsonify({'error': str(e)})
# Git operations with subprocess
@app.route('/git', methods=['POST'])
def git_operations():
git_command = request.json.get('git_command', '')
repository = request.json.get('repository', '')
# Dangerous: Git command injection
command = f"git {git_command} {repository}"
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True)
return jsonify({
'git_output': result.stdout,
'git_error': result.stderr
})
except Exception as e:
return jsonify({'error': str(e)})
# Image processing with ImageMagick
@app.route('/convert-image', methods=['POST'])
def convert_image():
input_file = request.json.get('input_file', '')
output_file = request.json.get('output_file', '')
options = request.json.get('options', '')
# Dangerous: ImageMagick command injection
command = f"convert {options} {input_file} {output_file}"
try:
subprocess.run(command, shell=True, check=True)
return jsonify({'status': 'image_converted'})
except subprocess.CalledProcessError as e:
return jsonify({'error': str(e)})
# System monitoring
@app.route('/system', methods=['GET'])
def system_info():
info_type = request.args.get('type', '')
options = request.args.get('options', '')
# Dangerous: System command with user options
commands = {
'ps': f'ps {options}',
'netstat': f'netstat {options}',
'top': f'top {options}',
'df': f'df {options}'
}
if info_type in commands:
try:
result = subprocess.run(
commands[info_type],
shell=True,
capture_output=True,
text=True
)
return jsonify({'output': result.stdout})
except Exception as e:
return jsonify({'error': str(e)})
return jsonify({'error': 'Invalid system info type'})
# Archive operations
@app.route('/archive', methods=['POST'])
def archive_files():
archive_type = request.json.get('type', 'tar')
source_path = request.json.get('source', '')
archive_name = request.json.get('archive_name', '')
compression = request.json.get('compression', '')
# Dangerous: Archive command injection
if archive_type == 'tar':
command = f"tar {compression} -cf {archive_name} {source_path}"
elif archive_type == 'zip':
command = f"zip {compression} {archive_name} {source_path}"
try:
subprocess.run(command, shell=True, check=True)
return jsonify({'status': 'archive_created'})
except subprocess.CalledProcessError as e:
return jsonify({'error': str(e)})
# Network operations
@app.route('/network', methods=['POST'])
def network_operations():
operation = request.json.get('operation', '')
target = request.json.get('target', '')
options = request.json.get('options', '')
# Dangerous: Network command injection
network_commands = {
'ping': f'ping {options} {target}',
'traceroute': f'traceroute {options} {target}',
'nslookup': f'nslookup {options} {target}',
'curl': f'curl {options} {target}'
}
if operation in network_commands:
try:
result = subprocess.run(
network_commands[operation],
shell=True,
capture_output=True,
text=True,
timeout=30
)
return jsonify({
'output': result.stdout,
'error': result.stderr
})
except Exception as e:
return jsonify({'error': str(e)})
return jsonify({'error': 'Invalid network operation'})
# Database operations
@app.route('/database', methods=['POST'])
def database_operations():
db_command = request.json.get('command', '')
database = request.json.get('database', '')
query = request.json.get('query', '')
# Dangerous: Database command injection
command = f"{db_command} -D {database} -e '{query}'"
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True)
return jsonify({
'db_output': result.stdout,
'db_error': result.stderr
})
except Exception as e:
return jsonify({'error': str(e)})
# Package management
@app.route('/packages', methods=['POST'])
def package_operations():
package_manager = request.json.get('manager', 'apt')
action = request.json.get('action', 'install')
package_name = request.json.get('package', '')
options = request.json.get('options', '')
# Dangerous: Package manager command injection
command = f"{package_manager} {action} {options} {package_name}"
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True)
return jsonify({
'package_output': result.stdout,
'package_error': result.stderr
})
except Exception as e:
return jsonify({'error': str(e)})
# Secure: Safe subprocess usage in Flask
import subprocess
import shutil
from flask import Flask, request, jsonify
from pathlib import Path
import re
from marshmallow import Schema, fields, ValidationError as MarshmallowValidationError
import os
app = Flask(__name__)
# Safe: Configuration with allowed operations
app.config['ALLOWED_OPERATIONS'] = {
'file_info': {'command': ['stat'], 'max_args': 1},
'directory_list': {'command': ['ls', '-la'], 'max_args': 1},
'file_count': {'command': ['wc', '-l'], 'max_args': 1}
}
# Input validation schemas
class FileOperationSchema(Schema):
operation = fields.Str(required=True, validate=lambda x: x in ['file_info', 'directory_list', 'file_count'])
target = fields.Str(required=True)
class SystemInfoSchema(Schema):
info_type = fields.Str(required=True, validate=lambda x: x in ['disk_usage', 'memory_info', 'process_count'])
# Safe: No direct command execution
@app.route('/execute', methods=['POST'])
def safe_execute_operation():
try:
# Validate input
schema = FileOperationSchema()
data = schema.load(request.json or {})
# Execute safe operation
result = execute_safe_operation(data)
return jsonify(result)
except (ValueError, MarshmallowValidationError) as e:
return jsonify({'error': str(e)}), 400
def execute_safe_operation(data):
operation = data['operation']
target = data['target']
# Validate target path
validated_target = validate_target_path(target)
# Get operation configuration
op_config = app.config['ALLOWED_OPERATIONS'][operation]
# Build safe command
command = op_config['command'] + [validated_target]
try:
# Safe: No shell=True, use argument list
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=10,
check=False # Don't raise on non-zero exit
)
return {
'operation': operation,
'output': result.stdout[:1000], # Limit output
'error': result.stderr[:1000] if result.stderr else None,
'return_code': result.returncode
}
except subprocess.TimeoutExpired:
raise ValueError('Operation timed out')
except Exception as e:
raise ValueError(f'Operation failed: {str(e)}')
def validate_target_path(target):
"""Validate and sanitize target path"""
if not target:
raise ValueError('Target path is required')
# Remove dangerous characters
if re.search(r'[;&|`$(){}\[\]\*\?<>\"\\]', target):
raise ValueError('Target contains invalid characters')
# Prevent path traversal
if '..' in target or target.startswith('/'):
raise ValueError('Invalid target path')
# Limit to safe directory
safe_base = '/app/data'
target_path = Path(safe_base) / target
try:
# Ensure target is within safe directory
target_path.resolve().relative_to(Path(safe_base).resolve())
return str(target_path)
except ValueError:
raise ValueError('Target path outside allowed directory')
# Safe: File operations using Python libraries
@app.route('/file-ops', methods=['POST'])
def safe_file_operations():
try:
operation = request.json.get('operation', '') if request.json else ''
filename = request.json.get('filename', '') if request.json else ''
# Validate inputs
validated_data = validate_file_operation(operation, filename)
# Execute safe file operation
result = execute_safe_file_operation(validated_data)
return jsonify(result)
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_file_operation(operation, filename):
# Validate operation
allowed_operations = ['copy', 'move', 'delete', 'info']
if operation not in allowed_operations:
raise ValueError('Operation not allowed')
# Validate filename
if not filename or len(filename) > 100:
raise ValueError('Invalid filename')
if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
raise ValueError('Filename contains invalid characters')
return {'operation': operation, 'filename': filename}
def execute_safe_file_operation(data):
operation = data['operation']
filename = data['filename']
# Define safe base directory
safe_dir = Path('/app/data/user_files')
file_path = safe_dir / filename
try:
# Ensure file is within safe directory
file_path.resolve().relative_to(safe_dir.resolve())
except ValueError:
raise ValueError('File path outside allowed directory')
if operation == 'copy':
if not file_path.exists():
raise ValueError('Source file not found')
backup_dir = safe_dir / 'backups'
backup_dir.mkdir(exist_ok=True)
backup_path = backup_dir / filename
shutil.copy2(file_path, backup_path)
return {'status': 'File copied successfully'}
elif operation == 'move':
if not file_path.exists():
raise ValueError('Source file not found')
archive_dir = safe_dir / 'archive'
archive_dir.mkdir(exist_ok=True)
archive_path = archive_dir / filename
shutil.move(file_path, archive_path)
return {'status': 'File moved successfully'}
elif operation == 'delete':
if file_path.exists():
file_path.unlink()
return {'status': 'File deleted successfully'}
else:
return {'status': 'File not found'}
elif operation == 'info':
if file_path.exists():
stat = file_path.stat()
return {
'exists': True,
'size': stat.st_size,
'modified': stat.st_mtime,
'is_file': file_path.is_file(),
'is_dir': file_path.is_dir()
}
else:
return {'exists': False}
# Safe: System information using Python libraries
@app.route('/system', methods=['GET'])
def safe_system_info():
try:
# Validate input
schema = SystemInfoSchema()
data = schema.load(request.args)
# Get system information safely
result = get_safe_system_info(data['info_type'])
return jsonify(result)
except (ValueError, MarshmallowValidationError) as e:
return jsonify({'error': str(e)}), 400
def get_safe_system_info(info_type):
"""Get system information using Python libraries"""
import psutil
if info_type == 'disk_usage':
disk = psutil.disk_usage('/')
return {
'total': disk.total,
'used': disk.used,
'free': disk.free,
'percent': (disk.used / disk.total) * 100
}
elif info_type == 'memory_info':
memory = psutil.virtual_memory()
return {
'total': memory.total,
'available': memory.available,
'used': memory.used,
'percent': memory.percent
}
elif info_type == 'process_count':
return {
'total_processes': len(psutil.pids()),
'cpu_count': psutil.cpu_count(),
'cpu_percent': psutil.cpu_percent()
}
# Safe: Image processing using Python libraries
@app.route('/convert-image', methods=['POST'])
def safe_convert_image():
try:
# Get uploaded file
if 'file' not in request.files:
raise ValueError('No file provided')
uploaded_file = request.files['file']
output_format = request.form.get('format', 'PNG')
# Validate inputs
validated_data = validate_image_conversion(uploaded_file, output_format)
# Convert image safely
result = convert_image_safely(validated_data)
return jsonify(result)
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_image_conversion(uploaded_file, output_format):
# Validate file type
allowed_types = ['image/jpeg', 'image/png', 'image/gif']
if uploaded_file.content_type not in allowed_types:
raise ValueError('File type not allowed')
# Validate file size
max_size = 10 * 1024 * 1024 # 10MB
uploaded_file.seek(0, 2) # Seek to end
file_size = uploaded_file.tell()
uploaded_file.seek(0) # Reset
if file_size > max_size:
raise ValueError('File too large')
# Validate output format
allowed_formats = ['PNG', 'JPEG', 'GIF', 'WEBP']
if output_format.upper() not in allowed_formats:
raise ValueError('Output format not allowed')
return {
'file': uploaded_file,
'output_format': output_format.upper()
}
def convert_image_safely(data):
"""Convert image using Pillow"""
from PIL import Image
import io
uploaded_file = data['file']
output_format = data['output_format']
try:
# Open and validate image
image = Image.open(uploaded_file)
image.verify() # Verify it's a valid image
# Reopen for processing (verify() closes the file)
uploaded_file.seek(0)
image = Image.open(uploaded_file)
# Convert format
output_io = io.BytesIO()
# Convert to RGB if necessary (for JPEG)
if output_format == 'JPEG' and image.mode in ('RGBA', 'LA', 'P'):
image = image.convert('RGB')
image.save(output_io, format=output_format)
# Save converted file
output_filename = f'converted_{uploaded_file.filename}.{output_format.lower()}'
output_path = Path('/app/data/converted') / output_filename
output_path.parent.mkdir(exist_ok=True)
with open(output_path, 'wb') as f:
f.write(output_io.getvalue())
return {
'status': 'Image converted successfully',
'output_file': output_filename,
'original_format': image.format,
'new_format': output_format
}
except Exception as e:
raise ValueError(f'Image conversion failed: {str(e)}')
# Safe: Archive operations using Python libraries
@app.route('/archive', methods=['POST'])
def safe_archive_files():
try:
archive_type = request.json.get('type', 'tar') if request.json else ''
source_name = request.json.get('source', '') if request.json else ''
archive_name = request.json.get('archive_name', '') if request.json else ''
# Validate inputs
validated_data = validate_archive_operation(archive_type, source_name, archive_name)
# Create archive safely
result = create_archive_safely(validated_data)
return jsonify(result)
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_archive_operation(archive_type, source_name, archive_name):
# Validate archive type
allowed_types = ['tar', 'zip']
if archive_type not in allowed_types:
raise ValueError('Archive type not allowed')
# Validate names
for name in [source_name, archive_name]:
if not name or not re.match(r'^[a-zA-Z0-9._-]+$', name):
raise ValueError('Invalid file/archive name')
return {
'type': archive_type,
'source': source_name,
'archive_name': archive_name
}
def create_archive_safely(data):
"""Create archive using Python libraries"""
import tarfile
import zipfile
archive_type = data['type']
source_name = data['source']
archive_name = data['archive_name']
# Define safe directories
source_dir = Path('/app/data/user_files')
archive_dir = Path('/app/data/archives')
archive_dir.mkdir(exist_ok=True)
source_path = source_dir / source_name
if not source_path.exists():
raise ValueError('Source file/directory not found')
# Ensure source is within safe directory
try:
source_path.resolve().relative_to(source_dir.resolve())
except ValueError:
raise ValueError('Source path outside allowed directory')
if archive_type == 'tar':
archive_path = archive_dir / f'{archive_name}.tar.gz'
with tarfile.open(archive_path, 'w:gz') as tar:
tar.add(source_path, arcname=source_name)
elif archive_type == 'zip':
archive_path = archive_dir / f'{archive_name}.zip'
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
if source_path.is_file():
zipf.write(source_path, source_name)
else:
for file_path in source_path.rglob('*'):
if file_path.is_file():
relative_path = file_path.relative_to(source_path.parent)
zipf.write(file_path, str(relative_path))
return {
'status': 'Archive created successfully',
'archive_file': archive_path.name,
'archive_type': archive_type
}
# Safe: Network operations using Python libraries
@app.route('/network', methods=['POST'])
def safe_network_operations():
try:
operation = request.json.get('operation', '') if request.json else ''
target = request.json.get('target', '') if request.json else ''
# Validate inputs
validated_data = validate_network_operation(operation, target)
# Execute safe network operation
result = execute_safe_network_operation(validated_data)
return jsonify(result)
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_network_operation(operation, target):
# Validate operation
allowed_operations = ['ping', 'resolve_hostname']
if operation not in allowed_operations:
raise ValueError('Network operation not allowed')
# Validate target
if not target:
raise ValueError('Target is required')
# Basic hostname/IP validation
if not re.match(r'^[a-zA-Z0-9.-]+$', target):
raise ValueError('Invalid target format')
return {'operation': operation, 'target': target}
def execute_safe_network_operation(data):
"""Execute network operations using Python libraries"""
import socket
import platform
operation = data['operation']
target = data['target']
if operation == 'ping':
# Safe: Use platform-specific ping command with controlled arguments
if platform.system().lower() == 'windows':
command = ['ping', '-n', '4', target]
else:
command = ['ping', '-c', '4', target]
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=10
)
return {
'operation': 'ping',
'target': target,
'output': result.stdout[:500], # Limit output
'success': result.returncode == 0
}
except subprocess.TimeoutExpired:
return {
'operation': 'ping',
'target': target,
'error': 'Ping timed out',
'success': False
}
elif operation == 'resolve_hostname':
try:
ip_address = socket.gethostbyname(target)
return {
'operation': 'resolve_hostname',
'hostname': target,
'ip_address': ip_address,
'success': True
}
except socket.gaierror as e:
return {
'operation': 'resolve_hostname',
'hostname': target,
'error': str(e),
'success': False
}
if __name__ == '__main__':
app.run(debug=False)