import subprocess
import hashlib
import os
from pathlib import Path
from typing import Dict, List, Any, Optional
from flask import Flask, request, jsonify
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
class SecureSystemOperations:
ALLOWED_OPERATIONS = {
'system_info': {
'uptime': {'command': ['uptime'], 'args': []},
'disk_usage': {'command': ['df', '-h'], 'args': []},
'memory': {'command': ['free', '-h'], 'args': []}
},
'file_ops': {
'info': {'method': 'native'},
'hash': {'method': 'native'},
'count': {'method': 'subprocess', 'command': ['wc', '-l']}
}
}
SAFE_DIRECTORIES = [
Path('/secure/uploads').resolve(),
Path('/secure/documents').resolve()
]
def __init__(self):
# Ensure safe directories exist
for directory in self.SAFE_DIRECTORIES:
directory.mkdir(parents=True, exist_ok=True)
def get_system_info(self, info_type: str) -> Dict[str, Any]:
if info_type not in self.ALLOWED_OPERATIONS['system_info']:
raise ValueError(f'Info type not allowed: {info_type}')
config = self.ALLOWED_OPERATIONS['system_info'][info_type]
try:
# SECURE: subprocess with fixed commands
result = subprocess.run(
config['command'],
capture_output=True,
text=True,
timeout=10
)
return {
'success': result.returncode == 0,
'output': result.stdout,
'error': result.stderr
}
except subprocess.TimeoutExpired:
raise RuntimeError('Command timeout')
def process_file(self, operation: str, filename: str) -> Dict[str, Any]:
if operation not in self.ALLOWED_OPERATIONS['file_ops']:
raise ValueError(f'File operation not allowed: {operation}')
# Validate and resolve file path
file_path = self._validate_file_path(filename)
config = self.ALLOWED_OPERATIONS['file_ops'][operation]
if config['method'] == 'native':
return self._native_file_operation(operation, file_path)
elif config['method'] == 'subprocess':
return self._subprocess_file_operation(config['command'], file_path)
else:
raise ValueError('Unknown operation method')
def _validate_file_path(self, filename: str) -> Path:
import re
# Basic validation
if not re.match(r'^[a-zA-Z0-9._-]+$', filename):
raise ValueError('Invalid filename format')
if len(filename) > 255:
raise ValueError('Filename too long')
# Find file in safe directories
for safe_dir in self.SAFE_DIRECTORIES:
candidate_path = (safe_dir / filename).resolve()
# Ensure path is within safe directory
if str(candidate_path).startswith(str(safe_dir)):
if candidate_path.exists() and candidate_path.is_file():
return candidate_path
raise FileNotFoundError('File not found in safe directories')
def _native_file_operation(self, operation: str, file_path: Path) -> Dict[str, Any]:
if operation == 'info':
stat = file_path.stat()
return {
'operation': 'info',
'filename': file_path.name,
'size': stat.st_size,
'modified': stat.st_mtime,
'permissions': oct(stat.st_mode)[-3:],
'is_readable': os.access(file_path, os.R_OK)
}
elif operation == 'hash':
hasher = hashlib.sha256()
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hasher.update(chunk)
return {
'operation': 'hash',
'filename': file_path.name,
'sha256': hasher.hexdigest(),
'size': file_path.stat().st_size
}
else:
raise ValueError(f'Unknown native operation: {operation}')
def _subprocess_file_operation(self, command: List[str],
file_path: Path) -> Dict[str, Any]:
full_command = command + [str(file_path)]
try:
# SECURE: subprocess with argument list
result = subprocess.run(
full_command,
capture_output=True,
text=True,
timeout=30,
cwd=str(file_path.parent)
)
return {
'operation': 'subprocess',
'command': ' '.join(command),
'filename': file_path.name,
'success': result.returncode == 0,
'output': result.stdout.strip(),
'error': result.stderr.strip()
}
except subprocess.TimeoutExpired:
raise RuntimeError('Command execution timeout')
# Initialize secure operations
secure_ops = SecureSystemOperations()
# SECURE: Flask endpoints with validation
@app.route('/system-info')
def system_info():
try:
info_type = request.args.get('type', '').strip()
if not info_type:
return jsonify({'error': 'Info type required'}), 400
result = secure_ops.get_system_info(info_type)
return jsonify(result)
except ValueError as e:
return jsonify({'error': str(e)}), 400
except Exception as e:
logger.error(f'System info error: {e}')
return jsonify({'error': 'Operation failed'}), 500
@app.route('/file-info/<filename>')
def file_info(filename):
try:
result = secure_ops.process_file('info', filename)
return jsonify(result)
except ValueError as e:
return jsonify({'error': str(e)}), 400
except FileNotFoundError as e:
return jsonify({'error': str(e)}), 404
except Exception as e:
logger.error(f'File info error: {e}')
return jsonify({'error': 'Operation failed'}), 500
@app.route('/file-hash/<filename>')
def file_hash(filename):
try:
result = secure_ops.process_file('hash', filename)
return jsonify(result)
except ValueError as e:
return jsonify({'error': str(e)}), 400
except FileNotFoundError as e:
return jsonify({'error': str(e)}), 404
except Exception as e:
logger.error(f'File hash error: {e}')
return jsonify({'error': 'Operation failed'}), 500
@app.route('/operations')
def list_operations():
return jsonify({
'system_info_types': list(secure_ops.ALLOWED_OPERATIONS['system_info'].keys()),
'file_operations': list(secure_ops.ALLOWED_OPERATIONS['file_ops'].keys()),
'safe_directories': [str(d) for d in secure_ops.SAFE_DIRECTORIES]
})
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Endpoint not found'}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': 'Internal server error'}), 500
if __name__ == '__main__':
app.run(host='127.0.0.1', port=5000, debug=False)