# Vulnerable: Unsafe pickle deserialization in Lambda
import pickle
import json
import boto3
import base64
def lambda_handler(event, context):
# Dangerous: Deserializing user-provided pickle data
pickle_data = event.get('data', '')
try:
# Extremely dangerous: Can execute arbitrary code
decoded_data = base64.b64decode(pickle_data)
obj = pickle.loads(decoded_data)
return {
'statusCode': 200,
'body': json.dumps({'result': str(obj)})
}
except Exception as e:
return {
'statusCode': 400,
'body': json.dumps({'error': str(e)})
}
# Another vulnerable pattern - S3 pickle files
def process_s3_pickle(event, context):
bucket = event['bucket']
key = event['key']
s3_client = boto3.client('s3')
try:
# Download pickle file
response = s3_client.get_object(Bucket=bucket, Key=key)
pickle_data = response['Body'].read()
# Dangerous: Deserializing pickle from S3 without validation
obj = pickle.loads(pickle_data)
# Process the object
result = process_object(obj)
return {'result': result}
except Exception as e:
return {'error': str(e)}
# Session data deserialization
def load_session(event, context):
session_id = event.get('session_id', '')
# Load session data from database or cache
session_data = get_session_from_db(session_id)
if session_data:
# Dangerous: Pickle deserialization of session data
session_obj = pickle.loads(session_data)
return {'session': session_obj.__dict__}
return {'error': 'Session not found'}
# Caching with pickle
def get_cached_data(event, context):
cache_key = event.get('cache_key', '')
# Check cache (Redis, DynamoDB, etc.)
cached_data = get_from_cache(cache_key)
if cached_data:
# Dangerous: Deserializing cached pickle data
return pickle.loads(cached_data)
# Generate and cache new data
data = generate_data()
pickled_data = pickle.dumps(data)
set_cache(cache_key, pickled_data)
return data
# Secure: Safe alternatives to pickle deserialization
import json
import boto3
import base64
import hmac
import hashlib
import os
from typing import Any, Dict
from dataclasses import dataclass, asdict, field
from datetime import datetime
def lambda_handler(event, context):
data_format = event.get('format', 'json')
data_content = event.get('data', '')
# Use safe formats instead of pickle
safe_formats = {
'json': deserialize_json,
'msgpack': deserialize_msgpack,
'signed_json': deserialize_signed_json
}
if data_format not in safe_formats:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Unsupported data format'})
}
try:
result = safe_formats[data_format](data_content)
return {
'statusCode': 200,
'body': json.dumps({'result': result}, default=str)
}
except Exception as e:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Deserialization failed'})
}
# Safe JSON deserialization
def deserialize_json(data: str) -> Dict[str, Any]:
try:
# JSON is safe for deserialization
obj = json.loads(data)
# Validate structure if needed
if isinstance(obj, dict):
return validate_object_structure(obj)
return {'data': obj}
except json.JSONDecodeError:
raise ValueError('Invalid JSON format')
# Signed JSON for integrity
def deserialize_signed_json(data: str) -> Dict[str, Any]:
try:
# Data format: base64(json_data).signature
parts = data.split('.')
if len(parts) != 2:
raise ValueError('Invalid signed data format')
encoded_data, signature = parts
# Verify signature
secret_key = os.environ.get('SIGNING_KEY', '').encode()
if not secret_key:
raise ValueError('Signing key not configured')
expected_signature = hmac.new(
secret_key,
encoded_data.encode(),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
raise ValueError('Invalid signature')
# Decode and parse JSON
json_data = base64.b64decode(encoded_data).decode('utf-8')
return json.loads(json_data)
except Exception as e:
raise ValueError('Signed data verification failed')
# MessagePack alternative (if available)
def deserialize_msgpack(data: str) -> Dict[str, Any]:
try:
import msgpack
decoded_data = base64.b64decode(data)
obj = msgpack.unpackb(decoded_data, raw=False, strict_map_key=False)
return {'data': obj}
except ImportError:
raise ValueError('MessagePack not available')
except Exception:
raise ValueError('Invalid MessagePack format')
# Data validation
def validate_object_structure(obj: Dict[str, Any]) -> Dict[str, Any]:
# Define allowed structure
allowed_keys = {'id', 'name', 'value', 'timestamp', 'metadata'}
validated = {}
for key, value in obj.items():
if key in allowed_keys:
# Type validation
if key == 'id' and isinstance(value, (int, str)):
validated[key] = str(value)
elif key == 'name' and isinstance(value, str):
validated[key] = value[:100] # Limit length
elif key == 'value' and isinstance(value, (int, float)):
validated[key] = value
elif key == 'timestamp' and isinstance(value, str):
# Validate timestamp format
try:
datetime.fromisoformat(value.replace('Z', '+00:00'))
validated[key] = value
except ValueError:
pass # Skip invalid timestamps
elif key == 'metadata' and isinstance(value, dict):
validated[key] = {k: v for k, v in value.items() if isinstance(k, str) and isinstance(v, (str, int, float))}
return validated
# Safe S3 data processing
def process_s3_data_secure(event, context):
bucket = event.get('bucket', '')
key = event.get('key', '')
# Validate inputs
if not bucket or not key:
raise ValueError('Bucket and key required')
# Validate file extension
allowed_extensions = ['.json', '.csv', '.txt']
if not any(key.endswith(ext) for ext in allowed_extensions):
raise ValueError('File type not allowed')
s3_client = boto3.client('s3')
try:
response = s3_client.get_object(Bucket=bucket, Key=key)
file_content = response['Body'].read().decode('utf-8')
# Process based on file type
if key.endswith('.json'):
data = json.loads(file_content)
return process_json_data(data)
elif key.endswith('.csv'):
return process_csv_data(file_content)
elif key.endswith('.txt'):
return process_text_data(file_content)
except Exception as e:
raise Exception('Failed to process S3 file')
# Secure session management
@dataclass
class SessionData:
user_id: str
created_at: datetime
last_accessed: datetime
data: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SessionData':
return cls(
user_id=str(data['user_id']),
created_at=datetime.fromisoformat(data['created_at']),
last_accessed=datetime.fromisoformat(data['last_accessed']),
data=data.get('data', {})
)
def load_session_secure(event, context):
session_id = event.get('session_id', '')
if not session_id:
raise ValueError('Session ID required')
# Load session as JSON from database
session_json = get_session_from_db(session_id)
if session_json:
try:
session_data = json.loads(session_json)
session_obj = SessionData.from_dict(session_data)
# Update last accessed
session_obj.last_accessed = datetime.utcnow()
# Save updated session
save_session_to_db(session_id, json.dumps(session_obj.to_dict(), default=str))
return {'session': session_obj.to_dict()}
except (json.JSONDecodeError, KeyError, ValueError):
# Invalid session data
delete_session_from_db(session_id)
raise ValueError('Invalid session data')
raise ValueError('Session not found')
# Safe caching with JSON
def get_cached_data_secure(event, context):
cache_key = event.get('cache_key', '')
if not cache_key:
raise ValueError('Cache key required')
# Validate cache key format
if not cache_key.isalnum():
raise ValueError('Invalid cache key format')
# Check cache
cached_json = get_from_cache(cache_key)
if cached_json:
try:
return json.loads(cached_json)
except json.JSONDecodeError:
# Invalid cached data, remove it
delete_from_cache(cache_key)
# Generate new data
data = generate_data()
# Cache as JSON
set_cache(cache_key, json.dumps(data, default=str))
return data
# Helper functions (implement based on your infrastructure)
def get_session_from_db(session_id: str) -> str:
# Implement database lookup
pass
def save_session_to_db(session_id: str, session_data: str) -> None:
# Implement database save
pass
def delete_session_from_db(session_id: str) -> None:
# Implement database delete
pass
def get_from_cache(key: str) -> str:
# Implement cache lookup
pass
def set_cache(key: str, value: str) -> None:
# Implement cache set
pass
def delete_from_cache(key: str) -> None:
# Implement cache delete
pass