import json
import hmac
import hashlib
from flask import Flask, request
from dataclasses import dataclass, asdict
from typing import Dict, Any
# Secret key for HMAC signatures
SECRET_KEY = 'your-secret-key-here'
@dataclass
class SafeDataObject:
"""Safe data class with known structure."""
name: str
value: int
active: bool
@classmethod
def from_dict(cls, data: Dict[str, Any]):
"""Create object from validated dictionary."""
return cls(
name=str(data.get('name', ''))[:100], # Limit length
value=int(data.get('value', 0)),
active=bool(data.get('active', False))
)
def create_signed_data(data):
"""Create HMAC-signed JSON data."""
json_data = json.dumps(data, separators=(',', ':'))
signature = hmac.new(
SECRET_KEY.encode(),
json_data.encode(),
hashlib.sha256
).hexdigest()
return json.dumps({
'data': data,
'signature': signature
})
def verify_and_load_data(signed_data):
"""Verify signature and load data safely."""
try:
parsed = json.loads(signed_data)
data = parsed['data']
signature = parsed['signature']
# Verify signature
json_data = json.dumps(data, separators=(',', ':'))
expected_signature = hmac.new(
SECRET_KEY.encode(),
json_data.encode(),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
raise ValueError('Invalid signature')
return data
except (json.JSONDecodeError, KeyError, TypeError):
raise ValueError('Invalid data format')
@app.route('/load_object', methods=['POST'])
def load_object():
"""Secure object loading with validation."""
try:
data = request.get_data(as_text=True)
# Verify and load data safely
verified_data = verify_and_load_data(data)
# Create safe object with validation
obj = SafeDataObject.from_dict(verified_data)
return {
'status': 'success',
'object': asdict(obj)
}
except ValueError as e:
return {'error': str(e)}, 400
except Exception as e:
return {'error': 'Failed to load object'}, 500
@app.route('/restore_session')
def restore_session():
"""Secure session restoration using signed JSON."""
session_data = request.cookies.get('session', '')
if not session_data:
return {'message': 'No session found'}
try:
# Verify and decode session data
session = verify_and_load_data(session_data)
# Validate session structure
required_fields = ['username', 'user_id', 'expires']
if not all(field in session for field in required_fields):
raise ValueError('Invalid session structure')
# Check expiration
import time
if session['expires'] < time.time():
raise ValueError('Session expired')
return {
'message': f'Welcome back, {session["username"]}',
'user_id': session['user_id']
}
except ValueError as e:
return {'error': 'Invalid session'}, 401
except Exception as e:
return {'error': 'Session restore failed'}, 500
@app.route('/upload_data', methods=['POST'])
def upload_data():
"""Secure data upload with strict validation."""
if 'data' not in request.files:
return {'error': 'No file uploaded'}, 400
file = request.files['data']
# Limit file size (1MB)
if file.content_length and file.content_length > 1024 * 1024:
return {'error': 'File too large'}, 413
try:
content = file.read(1024 * 1024) # Read with limit
# Parse as JSON only (no arbitrary code execution)
data = json.loads(content)
# Validate data structure
if not isinstance(data, dict):
raise ValueError('Data must be a JSON object')
# Validate specific fields
allowed_fields = ['name', 'value', 'description', 'tags']
for key in data.keys():
if key not in allowed_fields:
raise ValueError(f'Field "{key}" not allowed')
# Process data safely
processed_data = {
'name': str(data.get('name', ''))[:100],
'value': float(data.get('value', 0)) if 'value' in data else None,
'description': str(data.get('description', ''))[:500],
'tags': [str(tag)[:50] for tag in data.get('tags', [])[:10]]
}
return {
'status': 'success',
'processed_data': processed_data
}
except json.JSONDecodeError:
return {'error': 'Invalid JSON format'}, 400
except ValueError as e:
return {'error': str(e)}, 400
except Exception as e:
return {'error': 'Data processing failed'}, 500
# Alternative: Use msgpack for safe binary serialization
@app.route('/msgpack_example', methods=['POST'])
def msgpack_example():
"""Example using msgpack for safe serialization."""
try:
import msgpack
data = request.get_data()
# msgpack is safer than pickle/jsonpickle
unpacked = msgpack.unpackb(data, strict_map_key=False)
# Still validate the structure
if not isinstance(unpacked, dict):
raise ValueError('Expected dictionary')
return {
'status': 'success',
'data': unpacked
}
except ImportError:
return {'error': 'msgpack library not available'}, 500
except (msgpack.exceptions.ExtraData,
msgpack.exceptions.InvalidDocument,
ValueError) as e:
return {'error': f'Invalid msgpack data: {str(e)}'}, 400
# Helper: Create session with signature
@app.route('/create_session', methods=['POST'])
def create_session():
"""Create signed session data."""
username = request.form.get('username', '')
user_id = request.form.get('user_id', '')
if not username or not user_id:
return {'error': 'Username and user_id required'}, 400
import time
session_data = {
'username': username[:50],
'user_id': int(user_id),
'created': time.time(),
'expires': time.time() + 3600 # 1 hour
}
signed_session = create_signed_data(session_data)
response = make_response({'status': 'session created'})
response.set_cookie('session', signed_session, httponly=True, secure=True)
return response