from flask import Flask, request, jsonify
from pymongo import MongoClient
import json
import re
from typing import Dict, Any
app = Flask(__name__)
client = MongoClient('mongodb://localhost:27017/')
db = client.myapp
# Whitelist of allowed fields and operators
ALLOWED_FIELDS = ['username', 'email', 'age', 'status', 'created_at']
ALLOWED_OPERATORS = ['$eq', '$ne', '$gt', '$gte', '$lt', '$lte', '$in', '$nin']
ALLOWED_SORT_FIELDS = ['username', 'email', 'age', 'created_at']
def validate_filter(filter_obj: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and sanitize MongoDB filter object"""
if not isinstance(filter_obj, dict):
raise ValueError("Filter must be an object")
validated_filter = {}
for field, condition in filter_obj.items():
# Validate field names
if field not in ALLOWED_FIELDS:
raise ValueError(f"Field '{field}' not allowed")
# Handle simple equality
if isinstance(condition, (str, int, float, bool)):
if isinstance(condition, str):
# Sanitize string values
if len(condition) > 100:
raise ValueError("String value too long")
# Remove potentially dangerous characters
condition = re.sub(r'[{}()\[\]$]', '', condition)
validated_filter[field] = condition
# Handle operator objects
elif isinstance(condition, dict):
validated_condition = {}
for operator, value in condition.items():
if operator not in ALLOWED_OPERATORS:
raise ValueError(f"Operator '{operator}' not allowed")
# Validate operator values
if isinstance(value, str) and len(value) > 100:
raise ValueError("Operator value too long")
validated_condition[operator] = value
validated_filter[field] = validated_condition
else:
raise ValueError(f"Invalid condition type for field '{field}'")
return validated_filter
def validate_sort(sort_obj: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and sanitize MongoDB sort object"""
if not isinstance(sort_obj, dict):
raise ValueError("Sort must be an object")
validated_sort = {}
for field, direction in sort_obj.items():
if field not in ALLOWED_SORT_FIELDS:
raise ValueError(f"Sort field '{field}' not allowed")
if direction not in [1, -1, "asc", "desc"]:
raise ValueError(f"Invalid sort direction for field '{field}'")
# Convert string directions to integers
if direction == "asc":
direction = 1
elif direction == "desc":
direction = -1
validated_sort[field] = direction
return validated_sort
@app.route('/users', methods=['GET'])
def get_users():
try:
filter_param = request.args.get('filter', '{}')
sort_param = request.args.get('sort', '{"created_at": -1}')
limit = min(int(request.args.get('limit', 50)), 100) # Max 100 results
# Parse and validate filter
try:
raw_filter = json.loads(filter_param)
user_filter = validate_filter(raw_filter)
except (json.JSONDecodeError, ValueError) as e:
return jsonify({'error': f'Invalid filter: {str(e)}'}), 400
# Parse and validate sort
try:
raw_sort = json.loads(sort_param)
user_sort = validate_sort(raw_sort)
except (json.JSONDecodeError, ValueError) as e:
return jsonify({'error': f'Invalid sort: {str(e)}'}), 400
# SECURE: Execute validated query with limits
users = list(db.users.find(
user_filter,
{'password': 0, '_id': 0} # Exclude sensitive fields
).sort(list(user_sort.items())).limit(limit))
return jsonify({
'users': users,
'count': len(users),
'limit': limit
})
except Exception as e:
# Log error for monitoring
app.logger.error(f'Database query error: {str(e)}')
return jsonify({'error': 'Query failed'}), 500
if __name__ == '__main__':
app.run(debug=False) # Never run debug in production