# Vulnerable: Raw SQL with user input in Django
from django.http import JsonResponse
from django.views import View
from django.db import connection
import json
# Dangerous: String concatenation in raw SQL
class UserSearchView(View):
def get(self, request):
username = request.GET.get('username', '')
email = request.GET.get('email', '')
# CRITICAL: SQL injection in raw query
with connection.cursor() as cursor:
query = f"SELECT * FROM auth_user WHERE username = '{username}' OR email = '{email}'"
cursor.execute(query)
results = cursor.fetchall()
users = []
for row in results:
users.append({
'id': row[0],
'username': row[1],
'email': row[2]
})
return JsonResponse({'users': users})
# Another dangerous pattern with formatting
def product_search(request):
category = request.GET.get('category', '')
min_price = request.GET.get('min_price', '0')
max_price = request.GET.get('max_price', '1000')
order_by = request.GET.get('order', 'name')
# Dangerous: Multiple user inputs in SQL
query = """
SELECT p.id, p.name, p.price, c.name as category
FROM products p
JOIN categories c ON p.category_id = c.id
WHERE c.name = '{}'
AND p.price BETWEEN {} AND {}
ORDER BY {}
""".format(category, min_price, max_price, order_by)
with connection.cursor() as cursor:
cursor.execute(query)
results = cursor.fetchall()
return JsonResponse({'products': list(results)})
# Complex query with user input
def advanced_search(request):
search_data = json.loads(request.body)
table_name = search_data.get('table', '')
columns = search_data.get('columns', [])
conditions = search_data.get('conditions', [])
# Dangerous: User controls table, columns, and conditions
column_list = ', '.join(columns)
condition_clauses = []
for condition in conditions:
field = condition['field']
operator = condition['operator']
value = condition['value']
condition_clauses.append(f"{field} {operator} '{value}'")
where_clause = ' AND '.join(condition_clauses)
query = f"SELECT {column_list} FROM {table_name} WHERE {where_clause}"
with connection.cursor() as cursor:
cursor.execute(query)
results = cursor.fetchall()
return JsonResponse({'data': list(results)})
# Report generation with user input
def generate_report(request):
report_type = request.POST.get('report', '')
date_from = request.POST.get('from', '')
date_to = request.POST.get('to', '')
group_by = request.POST.get('group_by', '')
# Dangerous: Dynamic SQL construction
if report_type == 'sales':
query = f"""
SELECT {group_by}, SUM(amount) as total
FROM sales
WHERE date >= '{date_from}' AND date <= '{date_to}'
GROUP BY {group_by}
"""
elif report_type == 'users':
query = f"""
SELECT {group_by}, COUNT(*) as count
FROM users
WHERE created_at >= '{date_from}' AND created_at <= '{date_to}'
GROUP BY {group_by}
"""
with connection.cursor() as cursor:
cursor.execute(query)
results = cursor.fetchall()
return JsonResponse({'report': list(results)})
# Authentication bypass
def custom_login(request):
username = request.POST.get('username', '')
password = request.POST.get('password', '')
# Dangerous: Authentication check with raw SQL
query = f"""
SELECT id, username, is_staff
FROM auth_user
WHERE username = '{username}' AND password = '{password}'
"""
with connection.cursor() as cursor:
cursor.execute(query)
user_data = cursor.fetchone()
if user_data:
return JsonResponse({'success': True, 'user_id': user_data[0]})
else:
return JsonResponse({'success': False})
# Data export with user-controlled query
def export_data(request):
export_query = request.POST.get('query', '')
format_type = request.POST.get('format', 'json')
# Dangerous: User provides entire SQL query
with connection.cursor() as cursor:
try:
cursor.execute(export_query)
results = cursor.fetchall()
if format_type == 'json':
return JsonResponse({'data': list(results)})
except Exception as e:
return JsonResponse({'error': str(e)})
# Secure: Safe raw SQL usage in Django
from django.http import JsonResponse
from django.views import View
from django.db import connection
from django.core.exceptions import ValidationError
import json
import re
# Safe: Parameterized queries
class SafeUserSearchView(View):
def get(self, request):
username = request.GET.get('username', '')
email = request.GET.get('email', '')
try:
# Validate inputs
validated_data = self.validate_search_params(username, email)
# Safe: Parameterized query
users = self.search_users_safely(validated_data)
return JsonResponse({'users': users})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_search_params(self, username, email):
# Validate username
if username and (len(username) > 150 or not re.match(r'^[a-zA-Z0-9_.-]+$', username)):
raise ValidationError('Invalid username format')
# Validate email
if email and (len(email) > 254 or '@' not in email):
raise ValidationError('Invalid email format')
return {'username': username, 'email': email}
def search_users_safely(self, validated_data):
with connection.cursor() as cursor:
# Safe: Parameterized query
query = """
SELECT id, username, email, first_name, last_name
FROM auth_user
WHERE (%s = '' OR username = %s)
AND (%s = '' OR email = %s)
LIMIT 100
"""
cursor.execute(query, [
validated_data['username'], validated_data['username'],
validated_data['email'], validated_data['email']
])
results = cursor.fetchall()
users = []
for row in results:
users.append({
'id': row[0],
'username': row[1],
'email': row[2],
'full_name': f"{row[3]} {row[4]}".strip()
})
return users
# Safe: Product search with validation
def safe_product_search(request):
category = request.GET.get('category', '')
min_price = request.GET.get('min_price', '0')
max_price = request.GET.get('max_price', '1000')
order_by = request.GET.get('order', 'name')
try:
# Validate all inputs
validated_data = validate_product_search_params(category, min_price, max_price, order_by)
# Search products safely
products = search_products_safely(validated_data)
return JsonResponse({'products': products})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_product_search_params(category, min_price, max_price, order_by):
# Validate category
allowed_categories = ['electronics', 'clothing', 'books', 'home', 'sports']
if category and category not in allowed_categories:
raise ValidationError('Invalid category')
# Validate prices
try:
min_price = float(min_price) if min_price else 0
max_price = float(max_price) if max_price else 10000
if min_price < 0 or max_price < 0 or min_price > max_price:
raise ValidationError('Invalid price range')
except ValueError:
raise ValidationError('Invalid price format')
# Validate order field
allowed_order_fields = ['name', 'price', 'created_at', 'category']
if order_by not in allowed_order_fields:
raise ValidationError('Invalid sort field')
return {
'category': category,
'min_price': min_price,
'max_price': max_price,
'order_by': order_by
}
def search_products_safely(validated_data):
with connection.cursor() as cursor:
# Safe: Parameterized query with validated order field
base_query = """
SELECT p.id, p.name, p.price, c.name as category, p.description
FROM products p
JOIN categories c ON p.category_id = c.id
WHERE (%s = '' OR c.name = %s)
AND p.price BETWEEN %s AND %s
"""
# Safe: Validated order field (allowlist checked)
order_field = validated_data['order_by']
if order_field == 'category':
order_field = 'c.name'
else:
order_field = f'p.{order_field}'
query = f"{base_query} ORDER BY {order_field} LIMIT 100"
cursor.execute(query, [
validated_data['category'], validated_data['category'],
validated_data['min_price'], validated_data['max_price']
])
results = cursor.fetchall()
products = []
for row in results:
products.append({
'id': row[0],
'name': row[1],
'price': float(row[2]),
'category': row[3],
'description': row[4][:200] if row[4] else '' # Limit description
})
return products
# Safe: Advanced search with structured parameters
def safe_advanced_search(request):
try:
search_data = json.loads(request.body)
# Validate search structure
validated_search = validate_advanced_search(search_data)
# Execute safe search
results = execute_safe_advanced_search(validated_search)
return JsonResponse({'data': results})
except (json.JSONDecodeError, ValidationError) as e:
return JsonResponse({'error': 'Invalid search parameters'}, status=400)
def validate_advanced_search(search_data):
# Validate table
allowed_tables = ['products', 'users', 'orders']
table = search_data.get('table', '')
if table not in allowed_tables:
raise ValidationError('Table not allowed')
# Validate columns
allowed_columns = {
'products': ['id', 'name', 'price', 'category', 'description'],
'users': ['id', 'username', 'email', 'first_name', 'last_name'],
'orders': ['id', 'user_id', 'total', 'status', 'created_at']
}
columns = search_data.get('columns', [])
table_columns = allowed_columns[table]
validated_columns = []
for col in columns:
if col in table_columns:
validated_columns.append(col)
if not validated_columns:
validated_columns = ['id'] # Default safe column
# Validate conditions
conditions = search_data.get('conditions', [])
validated_conditions = []
for condition in conditions:
if not isinstance(condition, dict):
continue
field = condition.get('field', '')
operator = condition.get('operator', '')
value = condition.get('value', '')
# Validate field
if field not in table_columns:
continue
# Validate operator
allowed_operators = ['=', '!=', '>', '<', '>=', '<=', 'LIKE']
if operator not in allowed_operators:
continue
# Validate value
if isinstance(value, (str, int, float)) and len(str(value)) <= 100:
validated_conditions.append({
'field': field,
'operator': operator,
'value': value
})
return {
'table': table,
'columns': validated_columns,
'conditions': validated_conditions
}
def execute_safe_advanced_search(search_params):
table = search_params['table']
columns = search_params['columns']
conditions = search_params['conditions']
# Build safe query
column_list = ', '.join(columns)
# Build WHERE clause with parameters
where_parts = []
params = []
for condition in conditions:
where_parts.append(f"{condition['field']} {condition['operator']} %s")
params.append(condition['value'])
where_clause = ' AND '.join(where_parts) if where_parts else '1=1'
query = f"SELECT {column_list} FROM {table} WHERE {where_clause} LIMIT 100"
with connection.cursor() as cursor:
cursor.execute(query, params)
results = cursor.fetchall()
columns_names = [desc[0] for desc in cursor.description]
# Convert to list of dictionaries
data = []
for row in results:
row_dict = dict(zip(columns_names, row))
data.append(row_dict)
return data
# Safe: Report generation with predefined queries
def safe_generate_report(request):
report_type = request.POST.get('report', '')
date_from = request.POST.get('from', '')
date_to = request.POST.get('to', '')
try:
# Validate inputs
validated_data = validate_report_params(report_type, date_from, date_to)
# Generate report safely
report_data = generate_report_safely(validated_data)
return JsonResponse({'report': report_data})
except ValidationError as e:
return JsonResponse({'error': str(e)}, status=400)
def validate_report_params(report_type, date_from, date_to):
# Validate report type
allowed_reports = ['sales_by_category', 'user_registrations', 'order_summary']
if report_type not in allowed_reports:
raise ValidationError('Invalid report type')
# Validate dates
from datetime import datetime
try:
if date_from:
from_date = datetime.strptime(date_from, '%Y-%m-%d').date()
else:
from_date = None
if date_to:
to_date = datetime.strptime(date_to, '%Y-%m-%d').date()
else:
to_date = None
if from_date and to_date and from_date > to_date:
raise ValidationError('Invalid date range')
except ValueError:
raise ValidationError('Invalid date format. Use YYYY-MM-DD')
return {
'report_type': report_type,
'date_from': from_date,
'date_to': to_date
}
def generate_report_safely(validated_data):
report_type = validated_data['report_type']
date_from = validated_data['date_from']
date_to = validated_data['date_to']
with connection.cursor() as cursor:
if report_type == 'sales_by_category':
query = """
SELECT c.name as category, COUNT(o.id) as order_count, SUM(o.total) as total_sales
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
JOIN products p ON oi.product_id = p.id
JOIN categories c ON p.category_id = c.id
WHERE (%s IS NULL OR DATE(o.created_at) >= %s)
AND (%s IS NULL OR DATE(o.created_at) <= %s)
GROUP BY c.id, c.name
ORDER BY total_sales DESC
LIMIT 50
"""
cursor.execute(query, [date_from, date_from, date_to, date_to])
elif report_type == 'user_registrations':
query = """
SELECT DATE(date_joined) as registration_date, COUNT(*) as new_users
FROM auth_user
WHERE (%s IS NULL OR DATE(date_joined) >= %s)
AND (%s IS NULL OR DATE(date_joined) <= %s)
GROUP BY DATE(date_joined)
ORDER BY registration_date DESC
LIMIT 100
"""
cursor.execute(query, [date_from, date_from, date_to, date_to])
elif report_type == 'order_summary':
query = """
SELECT
DATE(created_at) as order_date,
COUNT(*) as order_count,
AVG(total) as avg_order_value,
SUM(total) as total_revenue
FROM orders
WHERE (%s IS NULL OR DATE(created_at) >= %s)
AND (%s IS NULL OR DATE(created_at) <= %s)
GROUP BY DATE(created_at)
ORDER BY order_date DESC
LIMIT 100
"""
cursor.execute(query, [date_from, date_from, date_to, date_to])
results = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
# Convert to list of dictionaries
report_data = []
for row in results:
row_dict = dict(zip(columns, row))
# Convert Decimal to float for JSON serialization
for key, value in row_dict.items():
if hasattr(value, '__float__'):
row_dict[key] = float(value)
report_data.append(row_dict)
return report_data
# Safe: Authentication with parameterized queries
def safe_custom_login(request):
username = request.POST.get('username', '')
password = request.POST.get('password', '')
try:
# Validate inputs
if not username or not password:
raise ValidationError('Username and password required')
if len(username) > 150 or len(password) > 128:
raise ValidationError('Invalid credentials')
# Safe: Use Django's built-in authentication
from django.contrib.auth import authenticate
user = authenticate(username=username, password=password)
if user:
return JsonResponse({
'success': True,
'user_id': user.id,
'username': user.username
})
else:
return JsonResponse({'success': False, 'error': 'Invalid credentials'})
except ValidationError as e:
return JsonResponse({'success': False, 'error': str(e)})