# Vulnerable: os.system() with user input in Flask
import os
import subprocess
from flask import Flask, request, jsonify
app = Flask(__name__)
# Extremely dangerous: os.system() with user input
@app.route('/execute', methods=['POST'])
def execute_command():
command = request.json.get('command', '')
# CRITICAL: Command injection possible
result = os.system(command)
return jsonify({'result': result})
# Another dangerous pattern
@app.route('/file-operations', methods=['POST'])
def file_operations():
operation = request.json.get('operation', '')
filename = request.json.get('filename', '')
# Dangerous: User controls command arguments
command = f"{operation} {filename}"
try:
os.system(command)
return jsonify({'status': 'command_executed'})
except Exception as e:
return jsonify({'error': str(e)})
# Backup functionality with command injection
@app.route('/backup', methods=['POST'])
def create_backup():
backup_path = request.json.get('path', '')
backup_name = request.json.get('name', '')
# Dangerous: User-controlled backup command
backup_command = f"tar -czf {backup_name} {backup_path}"
try:
os.system(backup_command)
return jsonify({'status': 'backup_created'})
except Exception as e:
return jsonify({'error': str(e)})
# File conversion with subprocess
@app.route('/convert', methods=['POST'])
def convert_file():
input_file = request.json.get('input_file', '')
output_file = request.json.get('output_file', '')
format_type = request.json.get('format', '')
# Dangerous: subprocess with shell=True
try:
convert_cmd = f"convert {input_file} {output_file}.{format_type}"
subprocess.run(convert_cmd, shell=True, check=True)
return jsonify({'status': 'converted'})
except subprocess.CalledProcessError as e:
return jsonify({'error': str(e)})
# Log processing with grep
@app.route('/search-logs', methods=['GET'])
def search_logs():
log_file = request.args.get('log_file', '')
search_pattern = request.args.get('pattern', '')
# Dangerous: grep command with user input
try:
grep_command = f"grep '{search_pattern}' {log_file}"
result = os.popen(grep_command).read()
return jsonify({'matches': result})
except Exception as e:
return jsonify({'error': str(e)})
# System information gathering
@app.route('/system-info', methods=['GET'])
def get_system_info():
info_type = request.args.get('type', '')
# Dangerous: Direct system command execution
commands = {
'disk': 'df -h',
'memory': 'free -m',
'processes': 'ps aux',
'network': 'netstat -tuln'
}
if info_type in commands:
try:
result = os.popen(commands[info_type]).read()
return jsonify({'info': result})
except Exception as e:
return jsonify({'error': str(e)})
return jsonify({'error': 'Invalid info type'})
# File download with wget
@app.route('/download', methods=['POST'])
def download_file():
url = request.json.get('url', '')
destination = request.json.get('destination', '')
# Dangerous: wget command with user input
download_command = f"wget '{url}' -O {destination}"
try:
os.system(download_command)
return jsonify({'status': 'downloaded'})
except Exception as e:
return jsonify({'error': str(e)})
# Database backup
@app.route('/db-backup', methods=['POST'])
def backup_database():
db_name = request.json.get('database', '')
backup_file = request.json.get('backup_file', '')
# Dangerous: Database backup command
backup_cmd = f"mysqldump {db_name} > {backup_file}"
try:
os.system(backup_cmd)
return jsonify({'status': 'database_backup_created'})
except Exception as e:
return jsonify({'error': str(e)})
# Image processing
@app.route('/process-image', methods=['POST'])
def process_image():
image_path = request.json.get('image_path', '')
operations = request.json.get('operations', [])
# Dangerous: ImageMagick commands
for operation in operations:
command = f"convert {image_path} {operation} {image_path}"
try:
os.system(command)
except Exception as e:
return jsonify({'error': str(e)})
return jsonify({'status': 'image_processed'})
# Archive extraction
@app.route('/extract', methods=['POST'])
def extract_archive():
archive_path = request.json.get('archive', '')
extract_path = request.json.get('extract_to', '')
# Dangerous: Archive extraction command
extract_command = f"tar -xzf {archive_path} -C {extract_path}"
try:
os.system(extract_command)
return jsonify({'status': 'extracted'})
except Exception as e:
return jsonify({'error': str(e)})
# Secure: Safe alternatives to os.system() in Flask
import subprocess
import shutil
import os
from pathlib import Path
from flask import Flask, request, jsonify
from marshmallow import Schema, fields, ValidationError as MarshmallowValidationError
import re
import tarfile
import zipfile
app = Flask(__name__)
# Safe: Input validation schemas
class FileOperationSchema(Schema):
operation = fields.Str(validate=lambda x: x in ['copy', 'move', 'delete'])
filename = fields.Str(required=True)
class BackupSchema(Schema):
path = fields.Str(required=True)
name = fields.Str(required=True)
# Safe: No direct command execution
@app.route('/execute', methods=['POST'])
def safe_execute_operation():
try:
# Validate input
operation_data = validate_operation_request(request.json or {})
# Execute safe operation
result = execute_safe_operation(operation_data)
return jsonify(result)
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_operation_request(data):
operation_type = data.get('type', '')
# Only allow specific safe operations
allowed_operations = ['file_info', 'directory_list', 'system_status']
if operation_type not in allowed_operations:
raise ValueError('Operation not allowed')
return {'type': operation_type, 'parameters': data.get('parameters', {})}
def execute_safe_operation(operation_data):
operation_type = operation_data['type']
parameters = operation_data['parameters']
if operation_type == 'file_info':
return get_safe_file_info(parameters)
elif operation_type == 'directory_list':
return get_safe_directory_list(parameters)
elif operation_type == 'system_status':
return get_safe_system_status()
def get_safe_file_info(parameters):
filename = parameters.get('filename', '')
# Validate filename
if not filename or '..' in filename:
raise ValueError('Invalid filename')
# Safe: Use pathlib
safe_base_dir = Path('/app/data')
file_path = safe_base_dir / filename
try:
# Ensure file is within safe directory
file_path.resolve().relative_to(safe_base_dir.resolve())
if file_path.exists():
stat = file_path.stat()
return {
'exists': True,
'size': stat.st_size,
'modified': stat.st_mtime
}
else:
return {'exists': False}
except (ValueError, OSError):
raise ValueError('File access denied')
def get_safe_directory_list(parameters):
directory = parameters.get('directory', '')
# Validate directory
if not directory or '..' in directory:
raise ValueError('Invalid directory')
safe_base_dir = Path('/app/data')
dir_path = safe_base_dir / directory
try:
# Ensure directory is within safe path
dir_path.resolve().relative_to(safe_base_dir.resolve())
if dir_path.is_dir():
files = []
for item in dir_path.iterdir():
files.append({
'name': item.name,
'type': 'directory' if item.is_dir() else 'file',
'size': item.stat().st_size if item.is_file() else None
})
return {'files': files}
else:
return {'error': 'Directory not found'}
except (ValueError, OSError):
raise ValueError('Directory access denied')
def get_safe_system_status():
# Safe: Use Python libraries instead of system commands
import psutil
return {
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_usage': {
'total': psutil.disk_usage('/').total,
'used': psutil.disk_usage('/').used,
'free': psutil.disk_usage('/').free
}
}
# Safe: File operations with validation
@app.route('/file-operations', methods=['POST'])
def safe_file_operations():
try:
# Validate input
schema = FileOperationSchema()
data = schema.load(request.json or {})
# Execute safe file operation
result = execute_safe_file_operation(data)
return jsonify(result)
except MarshmallowValidationError as e:
return jsonify({'error': e.messages}), 400
except ValueError as e:
return jsonify({'error': str(e)}), 400
def execute_safe_file_operation(data):
operation = data['operation']
filename = data['filename']
# Validate filename
if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
raise ValueError('Invalid filename characters')
if '..' in filename:
raise ValueError('Path traversal not allowed')
# Define safe base directory
safe_dir = Path('/app/data/user_files')
file_path = safe_dir / filename
try:
# Ensure file is within safe directory
file_path.resolve().relative_to(safe_dir.resolve())
except ValueError:
raise ValueError('File path outside allowed directory')
if operation == 'copy':
backup_path = safe_dir / 'backups' / filename
backup_path.parent.mkdir(exist_ok=True)
shutil.copy2(file_path, backup_path)
return {'status': 'File copied successfully'}
elif operation == 'move':
archive_path = safe_dir / 'archive' / filename
archive_path.parent.mkdir(exist_ok=True)
shutil.move(file_path, archive_path)
return {'status': 'File moved successfully'}
elif operation == 'delete':
if file_path.exists():
file_path.unlink()
return {'status': 'File deleted successfully'}
else:
return {'status': 'File not found'}
# Safe: Backup functionality
@app.route('/backup', methods=['POST'])
def safe_create_backup():
try:
# Validate input
schema = BackupSchema()
data = schema.load(request.json or {})
# Create backup safely
result = create_safe_backup(data)
return jsonify(result)
except MarshmallowValidationError as e:
return jsonify({'error': e.messages}), 400
except ValueError as e:
return jsonify({'error': str(e)}), 400
def create_safe_backup(data):
backup_path = data['path']
backup_name = data['name']
# Validate inputs
if not re.match(r'^[a-zA-Z0-9._-]+$', backup_name):
raise ValueError('Invalid backup name')
if '..' in backup_path:
raise ValueError('Invalid backup path')
# Define safe directories
safe_source_dir = Path('/app/data')
safe_backup_dir = Path('/app/backups')
source_path = safe_source_dir / backup_path
backup_file_path = safe_backup_dir / f'{backup_name}.tar.gz'
try:
# Ensure source is within safe directory
source_path.resolve().relative_to(safe_source_dir.resolve())
# Create backup using Python libraries
safe_backup_dir.mkdir(exist_ok=True)
with tarfile.open(backup_file_path, 'w:gz') as tar:
if source_path.exists():
tar.add(source_path, arcname=backup_path)
return {
'status': 'Backup created successfully',
'backup_file': backup_file_path.name
}
except (ValueError, OSError) as e:
raise ValueError(f'Backup creation failed: {str(e)}')
# Safe: File conversion using Python libraries
@app.route('/convert', methods=['POST'])
def safe_convert_file():
try:
# Get uploaded file
if 'file' not in request.files:
raise ValueError('No file provided')
uploaded_file = request.files['file']
output_format = request.form.get('format', '')
# Validate file and format
validated_data = validate_conversion_request(uploaded_file, output_format)
# Perform safe conversion
result = perform_safe_conversion(validated_data)
return jsonify(result)
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_conversion_request(uploaded_file, output_format):
# Validate file type
allowed_types = ['image/jpeg', 'image/png', 'image/gif']
if uploaded_file.content_type not in allowed_types:
raise ValueError('File type not allowed')
# Validate file size
max_size = 10 * 1024 * 1024 # 10MB
if len(uploaded_file.read()) > max_size:
raise ValueError('File too large')
uploaded_file.seek(0) # Reset file pointer
# Validate output format
allowed_formats = ['jpeg', 'png', 'webp']
if output_format not in allowed_formats:
raise ValueError('Output format not allowed')
return {
'file': uploaded_file,
'output_format': output_format
}
def perform_safe_conversion(data):
# Safe: Use Pillow for image conversion
from PIL import Image
import io
uploaded_file = data['file']
output_format = data['output_format']
try:
# Open and validate image
image = Image.open(uploaded_file)
image.verify() # Verify it's a valid image
# Reopen for processing
uploaded_file.seek(0)
image = Image.open(uploaded_file)
# Convert format
output_io = io.BytesIO()
image.save(output_io, format=output_format.upper())
# Save converted file
output_filename = f'converted_{uploaded_file.filename}.{output_format}'
output_path = Path('/app/data/converted') / output_filename
output_path.parent.mkdir(exist_ok=True)
with open(output_path, 'wb') as f:
f.write(output_io.getvalue())
return {
'status': 'Conversion successful',
'output_file': output_filename
}
except Exception as e:
raise ValueError(f'Conversion failed: {str(e)}')
# Safe: Log search using Python
@app.route('/search-logs', methods=['GET'])
def safe_search_logs():
try:
log_type = request.args.get('log_type', '')
search_pattern = request.args.get('pattern', '')
# Validate inputs
validated_search = validate_log_search(log_type, search_pattern)
# Perform safe log search
results = search_logs_safely(validated_search)
return jsonify({'matches': results})
except ValueError as e:
return jsonify({'error': str(e)}), 400
def validate_log_search(log_type, pattern):
# Validate log type
allowed_log_types = ['application', 'error', 'access']
if log_type not in allowed_log_types:
raise ValueError('Invalid log type')
# Validate search pattern
if not pattern or len(pattern) > 100:
raise ValueError('Invalid search pattern')
# Only allow alphanumeric and safe characters
if not re.match(r'^[a-zA-Z0-9\s._-]+$', pattern):
raise ValueError('Pattern contains invalid characters')
return {
'log_type': log_type,
'pattern': pattern
}
def search_logs_safely(search_data):
log_type = search_data['log_type']
pattern = search_data['pattern']
# Define safe log file path
log_file = Path('/app/logs') / f'{log_type}.log'
if not log_file.exists():
return []
matches = []
try:
# Safe: Read and search file using Python
with open(log_file, 'r') as f:
for line_num, line in enumerate(f, 1):
if pattern.lower() in line.lower():
matches.append({
'line_number': line_num,
'content': line.strip()[:200] # Limit content length
})
# Limit results
if len(matches) >= 100:
break
except Exception:
raise ValueError('Log search failed')
return matches
if __name__ == '__main__':
app.run(debug=False)