SQL injection via raw SQL queries in Django applications

Critical Risk injection
pythondjangosql-injectionraw-sqldatabase-securityparameterizationorm-bypass

What it is

SQL injection vulnerabilities in Django occur when user input is directly concatenated or interpolated into raw SQL queries without proper parameterization. This allows attackers to manipulate database queries, potentially leading to unauthorized data access, data modification, privilege escalation, and complete database compromise.

# VULNERABLE: Django views with SQL injection flaws
from django.shortcuts import render
from django.http import JsonResponse, HttpResponse
from django.db import connection
from django.contrib.auth.models import User
from django.views.decorators.csrf import csrf_exempt
import json

# VULNERABLE: Direct string concatenation in raw SQL
def vulnerable_user_search(request):
    search_term = request.GET.get('q', '')
    
    # VULNERABLE: Direct concatenation allows SQL injection
    query = f"SELECT * FROM auth_user WHERE username LIKE '%{search_term}%'"
    
    with connection.cursor() as cursor:
        cursor.execute(query)  # VULNERABLE: Unsanitized input
        results = cursor.fetchall()
    
    return JsonResponse({'users': results})

# VULNERABLE: String formatting in raw queries
def vulnerable_get_user_posts(request, user_id):
    # VULNERABLE: String formatting allows injection
    query = "SELECT * FROM blog_post WHERE author_id = %s AND status = '%s'" % (
        user_id, 
        request.GET.get('status', 'published')
    )
    
    with connection.cursor() as cursor:
        cursor.execute(query)
        posts = cursor.fetchall()
    
    return JsonResponse({'posts': posts})

# VULNERABLE: Dynamic ORDER BY clause
def vulnerable_list_products(request):
    sort_by = request.GET.get('sort', 'name')
    order = request.GET.get('order', 'asc')
    
    # VULNERABLE: Dynamic ORDER BY without validation
    query = f"SELECT * FROM products ORDER BY {sort_by} {order}"
    
    with connection.cursor() as cursor:
        cursor.execute(query)
        products = cursor.fetchall()
    
    return JsonResponse({'products': products})

# VULNERABLE: Complex search with multiple injection points
@csrf_exempt
def vulnerable_advanced_search(request):
    if request.method == 'POST':
        data = json.loads(request.body)
        
        category = data.get('category', '')
        min_price = data.get('min_price', 0)
        max_price = data.get('max_price', 1000)
        search_text = data.get('search', '')
        
        # VULNERABLE: Multiple injection points
        query = f"""
            SELECT p.*, c.name as category_name 
            FROM products p 
            JOIN categories c ON p.category_id = c.id 
            WHERE c.name = '{category}' 
            AND p.price BETWEEN {min_price} AND {max_price}
            AND p.description LIKE '%{search_text}%'
            ORDER BY p.created_at DESC
        """
        
        with connection.cursor() as cursor:
            cursor.execute(query)
            results = cursor.fetchall()
        
        return JsonResponse({'results': results})

# VULNERABLE: Admin functionality with elevated privileges
def vulnerable_admin_user_management(request):
    if not request.user.is_staff:
        return JsonResponse({'error': 'Unauthorized'}, status=403)
    
    action = request.POST.get('action')
    user_id = request.POST.get('user_id')
    
    # VULNERABLE: Admin operations with string concatenation
    if action == 'delete':
        query = f"DELETE FROM auth_user WHERE id = {user_id}"
    elif action == 'activate':
        query = f"UPDATE auth_user SET is_active = 1 WHERE id = {user_id}"
    elif action == 'deactivate':
        query = f"UPDATE auth_user SET is_active = 0 WHERE id = {user_id}"
    else:
        return JsonResponse({'error': 'Invalid action'})
    
    with connection.cursor() as cursor:
        cursor.execute(query)  # VULNERABLE: Could affect multiple users
    
    return JsonResponse({'success': True})

# VULNERABLE: Report generation with dynamic queries
def vulnerable_generate_report(request):
    start_date = request.GET.get('start_date')
    end_date = request.GET.get('end_date')
    department = request.GET.get('department')
    
    # VULNERABLE: Date injection and department filtering
    query = f"""
        SELECT u.username, p.title, p.created_at
        FROM auth_user u
        JOIN blog_post p ON u.id = p.author_id
        WHERE p.created_at BETWEEN '{start_date}' AND '{end_date}'
        AND u.department = '{department}'
    """
    
    with connection.cursor() as cursor:
        cursor.execute(query)
        report_data = cursor.fetchall()
    
    return JsonResponse({'report': report_data})

# VULNERABLE: Custom authentication bypass
def vulnerable_custom_login(request):
    username = request.POST.get('username')
    password = request.POST.get('password')
    
    # VULNERABLE: Classic authentication bypass opportunity
    query = f"""
        SELECT id, username, is_staff 
        FROM auth_user 
        WHERE username = '{username}' AND password = '{password}'
    """
    
    with connection.cursor() as cursor:
        cursor.execute(query)
        user_data = cursor.fetchone()
    
    if user_data:
        # Login successful
        return JsonResponse({'success': True, 'user': user_data})
    else:
        return JsonResponse({'error': 'Invalid credentials'})

# VULNERABLE: Bulk operations
def vulnerable_bulk_update(request):
    if request.method == 'POST':
        updates = json.loads(request.body).get('updates', [])
        
        for update in updates:
            user_id = update.get('id')
            field = update.get('field')
            value = update.get('value')
            
            # VULNERABLE: Dynamic field updates
            query = f"UPDATE auth_user SET {field} = '{value}' WHERE id = {user_id}"
            
            with connection.cursor() as cursor:
                cursor.execute(query)
    
    return JsonResponse({'success': True})
# SECURE: Django views with proper SQL injection protection
from django.shortcuts import render
from django.http import JsonResponse, HttpResponse
from django.db import connection
from django.contrib.auth.models import User
from django.views.decorators.csrf import csrf_exempt
from django.core.exceptions import ValidationError
from django.db.models import Q, Count, Avg
from django.contrib.auth import authenticate
from django.utils import timezone
from datetime import datetime
import json
import re

# SECURE: Parameterized raw SQL queries
def secure_user_search(request):
    search_term = request.GET.get('q', '').strip()
    
    # Input validation
    if len(search_term) > 50:
        return JsonResponse({'error': 'Search term too long'}, status=400)
    
    # SECURE: Parameterized query with %s placeholder
    query = "SELECT id, username, email, first_name, last_name FROM auth_user WHERE username LIKE %s"
    search_param = f'%{search_term}%'
    
    with connection.cursor() as cursor:
        cursor.execute(query, [search_param])  # SECURE: Parameters separate
        columns = [col[0] for col in cursor.description]
        results = [dict(zip(columns, row)) for row in cursor.fetchall()]
    
    return JsonResponse({'users': results})

# SECURE: Better approach using Django ORM
def secure_user_search_orm(request):
    search_term = request.GET.get('q', '').strip()
    
    if len(search_term) > 50:
        return JsonResponse({'error': 'Search term too long'}, status=400)
    
    # SECURE: Django ORM with built-in protection
    users = User.objects.filter(
        Q(username__icontains=search_term) |
        Q(first_name__icontains=search_term) |
        Q(last_name__icontains=search_term)
    ).values('id', 'username', 'email', 'first_name', 'last_name')[:50]
    
    return JsonResponse({'users': list(users)})

# SECURE: Parameterized queries with validation
def secure_get_user_posts(request, user_id):
    # Input validation
    try:
        user_id = int(user_id)
    except (ValueError, TypeError):
        return JsonResponse({'error': 'Invalid user ID'}, status=400)
    
    status = request.GET.get('status', 'published')
    
    # Validate status parameter
    valid_statuses = ['published', 'draft', 'pending', 'archived']
    if status not in valid_statuses:
        return JsonResponse({'error': 'Invalid status'}, status=400)
    
    # SECURE: Parameterized query
    query = """
        SELECT id, title, content, created_at, status 
        FROM blog_post 
        WHERE author_id = %s AND status = %s 
        ORDER BY created_at DESC
    """
    
    with connection.cursor() as cursor:
        cursor.execute(query, [user_id, status])
        columns = [col[0] for col in cursor.description]
        posts = [dict(zip(columns, row)) for row in cursor.fetchall()]
    
    return JsonResponse({'posts': posts})

# SECURE: Safe dynamic sorting
def secure_list_products(request):
    sort_by = request.GET.get('sort', 'name')
    order = request.GET.get('order', 'asc')
    
    # SECURE: Whitelist allowed sort fields
    allowed_sort_fields = {
        'name': 'name',
        'price': 'price',
        'created': 'created_at',
        'category': 'category_id'
    }
    
    # SECURE: Validate sort field
    if sort_by not in allowed_sort_fields:
        sort_by = 'name'
    
    # SECURE: Validate order direction
    if order.lower() not in ['asc', 'desc']:
        order = 'asc'
    
    # SECURE: Build safe ORDER BY clause
    order_clause = f"{allowed_sort_fields[sort_by]} {order.upper()}"
    
    # SECURE: Safe query construction
    query = f"SELECT id, name, price, category_id, created_at FROM products ORDER BY {order_clause}"
    
    with connection.cursor() as cursor:
        cursor.execute(query)  # Safe because order_clause is validated
        columns = [col[0] for col in cursor.description]
        products = [dict(zip(columns, row)) for row in cursor.fetchall()]
    
    return JsonResponse({'products': products})

# SECURE: Better ORM-based sorting
def secure_list_products_orm(request):
    sort_by = request.GET.get('sort', 'name')
    order = request.GET.get('order', 'asc')
    
    # SECURE: Whitelist and map to model fields
    sort_mapping = {
        'name': 'name',
        'price': 'price',
        'created': 'created_at',
        'category': 'category__name'
    }
    
    sort_field = sort_mapping.get(sort_by, 'name')
    if order.lower() == 'desc':
        sort_field = f'-{sort_field}'
    
    # SECURE: Django ORM handles escaping
    products = Product.objects.select_related('category').order_by(sort_field).values(
        'id', 'name', 'price', 'category__name', 'created_at'
    )[:100]
    
    return JsonResponse({'products': list(products)})

# SECURE: Complex search with proper parameterization
@csrf_exempt
def secure_advanced_search(request):
    if request.method != 'POST':
        return JsonResponse({'error': 'Method not allowed'}, status=405)
    
    try:
        data = json.loads(request.body)
    except json.JSONDecodeError:
        return JsonResponse({'error': 'Invalid JSON'}, status=400)
    
    # SECURE: Input validation and sanitization
    category = data.get('category', '').strip()
    search_text = data.get('search', '').strip()
    
    try:
        min_price = float(data.get('min_price', 0))
        max_price = float(data.get('max_price', 1000))
    except (ValueError, TypeError):
        return JsonResponse({'error': 'Invalid price range'}, status=400)
    
    # Validate inputs
    if len(category) > 50 or len(search_text) > 100:
        return JsonResponse({'error': 'Search parameters too long'}, status=400)
    
    if min_price < 0 or max_price < min_price or max_price > 10000:
        return JsonResponse({'error': 'Invalid price range'}, status=400)
    
    # SECURE: Parameterized query with all values passed separately
    query = """
        SELECT p.id, p.name, p.price, p.description, c.name as category_name 
        FROM products p 
        JOIN categories c ON p.category_id = c.id 
        WHERE c.name = %s 
        AND p.price BETWEEN %s AND %s
        AND p.description LIKE %s
        ORDER BY p.created_at DESC
        LIMIT 100
    """
    
    search_param = f'%{search_text}%'
    
    with connection.cursor() as cursor:
        cursor.execute(query, [category, min_price, max_price, search_param])
        columns = [col[0] for col in cursor.description]
        results = [dict(zip(columns, row)) for row in cursor.fetchall()]
    
    return JsonResponse({'results': results})

# SECURE: ORM-based advanced search
def secure_advanced_search_orm(request):
    if request.method != 'POST':
        return JsonResponse({'error': 'Method not allowed'}, status=405)
    
    try:
        data = json.loads(request.body)
    except json.JSONDecodeError:
        return JsonResponse({'error': 'Invalid JSON'}, status=400)
    
    # Input validation (same as above)
    category = data.get('category', '').strip()
    search_text = data.get('search', '').strip()
    
    try:
        min_price = float(data.get('min_price', 0))
        max_price = float(data.get('max_price', 1000))
    except (ValueError, TypeError):
        return JsonResponse({'error': 'Invalid price range'}, status=400)
    
    # SECURE: Django ORM with Q objects
    products = Product.objects.select_related('category').filter(
        category__name=category,
        price__gte=min_price,
        price__lte=max_price,
        description__icontains=search_text
    ).values(
        'id', 'name', 'price', 'description', 'category__name'
    ).order_by('-created_at')[:100]
    
    return JsonResponse({'results': list(products)})

# SECURE: Admin functionality with proper validation
def secure_admin_user_management(request):
    if not request.user.is_staff:
        return JsonResponse({'error': 'Unauthorized'}, status=403)
    
    if request.method != 'POST':
        return JsonResponse({'error': 'Method not allowed'}, status=405)
    
    action = request.POST.get('action')
    user_id = request.POST.get('user_id')
    
    # Input validation
    try:
        user_id = int(user_id)
    except (ValueError, TypeError):
        return JsonResponse({'error': 'Invalid user ID'}, status=400)
    
    # Validate action
    valid_actions = ['delete', 'activate', 'deactivate']
    if action not in valid_actions:
        return JsonResponse({'error': 'Invalid action'}, status=400)
    
    # Check if user exists
    try:
        user = User.objects.get(id=user_id)
    except User.DoesNotExist:
        return JsonResponse({'error': 'User not found'}, status=404)
    
    # Prevent self-modification
    if user.id == request.user.id:
        return JsonResponse({'error': 'Cannot modify own account'}, status=400)
    
    # SECURE: Use Django ORM for safe operations
    if action == 'delete':
        user.delete()
    elif action == 'activate':
        user.is_active = True
        user.save()
    elif action == 'deactivate':
        user.is_active = False
        user.save()
    
    return JsonResponse({'success': True, 'message': f'User {action}d successfully'})

# SECURE: Report generation with proper parameterization
def secure_generate_report(request):
    start_date = request.GET.get('start_date')
    end_date = request.GET.get('end_date')
    department = request.GET.get('department')
    
    # SECURE: Date validation
    try:
        start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
        end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
    except (ValueError, TypeError):
        return JsonResponse({'error': 'Invalid date format. Use YYYY-MM-DD'}, status=400)
    
    if start_date > end_date:
        return JsonResponse({'error': 'Start date must be before end date'}, status=400)
    
    # Validate department
    if not department or len(department) > 50:
        return JsonResponse({'error': 'Invalid department'}, status=400)
    
    # SECURE: Parameterized query
    query = """
        SELECT u.username, p.title, p.created_at
        FROM auth_user u
        JOIN blog_post p ON u.id = p.author_id
        WHERE p.created_at::date BETWEEN %s AND %s
        AND u.department = %s
        ORDER BY p.created_at DESC
        LIMIT 1000
    """
    
    with connection.cursor() as cursor:
        cursor.execute(query, [start_date, end_date, department])
        columns = [col[0] for col in cursor.description]
        report_data = [dict(zip(columns, row)) for row in cursor.fetchall()]
    
    return JsonResponse({'report': report_data})

# SECURE: Proper authentication implementation
def secure_custom_login(request):
    if request.method != 'POST':
        return JsonResponse({'error': 'Method not allowed'}, status=405)
    
    username = request.POST.get('username', '').strip()
    password = request.POST.get('password', '')
    
    # Input validation
    if not username or not password:
        return JsonResponse({'error': 'Username and password required'}, status=400)
    
    if len(username) > 150 or len(password) > 128:
        return JsonResponse({'error': 'Invalid credentials'}, status=400)
    
    # SECURE: Use Django's built-in authentication
    user = authenticate(request, username=username, password=password)
    
    if user is not None and user.is_active:
        # Login successful
        from django.contrib.auth import login
        login(request, user)
        
        return JsonResponse({
            'success': True, 
            'user': {
                'id': user.id,
                'username': user.username,
                'is_staff': user.is_staff
            }
        })
    else:
        return JsonResponse({'error': 'Invalid credentials'}, status=401)

# SECURE: Safe bulk operations
def secure_bulk_update(request):
    if not request.user.is_staff:
        return JsonResponse({'error': 'Unauthorized'}, status=403)
    
    if request.method != 'POST':
        return JsonResponse({'error': 'Method not allowed'}, status=405)
    
    try:
        data = json.loads(request.body)
        updates = data.get('updates', [])
    except json.JSONDecodeError:
        return JsonResponse({'error': 'Invalid JSON'}, status=400)
    
    # Limit number of updates
    if len(updates) > 100:
        return JsonResponse({'error': 'Too many updates'}, status=400)
    
    # SECURE: Whitelist allowed fields
    allowed_fields = {
        'first_name': 'first_name',
        'last_name': 'last_name',
        'email': 'email',
        'is_active': 'is_active'
    }
    
    successful_updates = 0
    errors = []
    
    for i, update in enumerate(updates):
        try:
            user_id = int(update.get('id'))
            field = update.get('field')
            value = update.get('value')
            
            # Validate field
            if field not in allowed_fields:
                errors.append(f'Update {i}: Invalid field {field}')
                continue
            
            # Get user
            user = User.objects.get(id=user_id)
            
            # SECURE: Use setattr with validated field
            setattr(user, allowed_fields[field], value)
            user.full_clean()  # Validate model
            user.save()
            
            successful_updates += 1
            
        except (ValueError, TypeError, User.DoesNotExist, ValidationError) as e:
            errors.append(f'Update {i}: {str(e)}')
            continue
    
    return JsonResponse({
        'success': True,
        'updated': successful_updates,
        'errors': errors
    })

# SECURE: Utility functions for validation
def validate_sql_identifier(identifier):
    """Validate SQL identifiers (table/column names)"""
    if not identifier or not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', identifier):
        return False
    return len(identifier) <= 63  # PostgreSQL limit

def sanitize_search_term(term):
    """Sanitize search terms"""
    if not term:
        return ''
    # Remove SQL special characters but keep alphanumeric and spaces
    return re.sub(r'[^\w\s-]', '', term).strip()[:100]

# Example models (for reference)
class Product(models.Model):
    name = models.CharField(max_length=100)
    price = models.DecimalField(max_digits=10, decimal_places=2)
    description = models.TextField()
    category = models.ForeignKey('Category', on_delete=models.CASCADE)
    created_at = models.DateTimeField(auto_now_add=True)
    
class Category(models.Model):
    name = models.CharField(max_length=50)
    
class BlogPost(models.Model):
    title = models.CharField(max_length=200)
    content = models.TextField()
    author = models.ForeignKey(User, on_delete=models.CASCADE)
    status = models.CharField(max_length=20, default='draft')
    created_at = models.DateTimeField(auto_now_add=True)

💡 Why This Fix Works

The vulnerable examples show various SQL injection scenarios in Django including string concatenation, format string usage, and dynamic query construction. The secure alternatives demonstrate proper parameterization using %s placeholders, input validation, whitelisting for dynamic elements, and leveraging Django's ORM for built-in protection. The secure code also includes comprehensive error handling, input sanitization, and proper authentication mechanisms.

Why it happens

Developers build SQL queries by concatenating user input directly into query strings, bypassing Django's ORM protections and creating injection opportunities.

Root causes

String Concatenation in Raw Queries

Developers build SQL queries by concatenating user input directly into query strings, bypassing Django's ORM protections and creating injection opportunities.

Dynamic Query Construction

Complex business logic requires dynamic SQL queries, leading developers to use raw SQL with inadequate parameterization instead of leveraging Django's safe query methods.

Performance Optimization Attempts

Teams use raw SQL for perceived performance benefits without implementing proper parameter binding, exposing applications to injection attacks.

Fixes

1

Use Parameterized Raw Queries

When raw SQL is necessary, always use Django's parameter substitution with %s placeholders and pass values as a separate list or tuple to prevent injection.

2

Leverage Django ORM Features

Utilize Django's QuerySet API, Q objects, and database functions which provide built-in SQL injection protection while maintaining query flexibility.

3

Implement Input Validation

Validate and sanitize all user inputs before using them in database operations, and use Django forms and serializers for structured input validation.

Detect This Vulnerability in Your Code

Sourcery automatically identifies sql injection via raw sql queries in django applications and many other security issues in your codebase.