Python SQLAlchemy Raw Query SQL Injection Vulnerability

Critical Risk SQL Injection
PythonSQLAlchemySQL InjectionDatabaseRaw QueryData Security

What it is

Application uses SQLAlchemy to execute raw SQL queries with user input, creating SQL injection vulnerabilities that can lead to data disclosure, modification, or database compromise.

from sqlalchemy import create_engine, text from flask import Flask, request engine = create_engine('sqlite:///app.db') @app.route('/search_users') def search_users(): # Vulnerable: Direct string interpolation search_term = request.args.get('q') # DANGEROUS: SQL injection vulnerability query = text(f"SELECT * FROM users WHERE name LIKE '%{search_term}%'") with engine.connect() as conn: result = conn.execute(query) return [dict(row) for row in result] @app.route('/get_user/') def get_user(user_id): # Vulnerable: String concatenation sql = "SELECT * FROM users WHERE id = " + user_id with engine.connect() as conn: result = conn.execute(text(sql)) return dict(result.fetchone()) @app.route('/delete_user', methods=['POST']) def delete_user(): # Vulnerable: User input in DELETE query user_id = request.form.get('user_id') # Extremely dangerous: Can delete any data query = text(f"DELETE FROM users WHERE id = {user_id}") with engine.connect() as conn: conn.execute(query) conn.commit() return 'User deleted'
from sqlalchemy import create_engine, text, bindparam from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from flask import Flask, request import re engine = create_engine('sqlite:///app.db') Session = sessionmaker(bind=engine) Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(100)) email = Column(String(100)) def validate_user_input(value, field_type='string', max_length=100): """Validate user input based on expected type.""" if not value: raise ValueError(f'{field_type} cannot be empty') if field_type == 'integer': try: return int(value) except ValueError: raise ValueError('Invalid integer format') elif field_type == 'string': if len(value) > max_length: raise ValueError(f'String too long (max {max_length})') # Basic XSS prevention if '<' in value or '>' in value: raise ValueError('Invalid characters in input') return str(value) else: raise ValueError(f'Unsupported field type: {field_type}') @app.route('/search_users') def search_users(): """Secure user search using parameterized queries.""" try: search_term = request.args.get('q', '') # Validate input safe_search = validate_user_input(search_term, 'string', 50) # Secure: Parameterized query query = text("SELECT id, name, email FROM users WHERE name LIKE :search_term") with engine.connect() as conn: result = conn.execute( query, {'search_term': f'%{safe_search}%'} ) return [dict(row) for row in result] except ValueError as e: return {'error': str(e)}, 400 except Exception as e: return {'error': 'Search failed'}, 500 @app.route('/get_user/') def get_user(user_id): """Secure user retrieval using ORM.""" try: # Validate input safe_user_id = validate_user_input(user_id, 'integer') # Secure: Use ORM instead of raw SQL session = Session() user = session.query(User).filter(User.id == safe_user_id).first() session.close() if not user: return {'error': 'User not found'}, 404 return { 'id': user.id, 'name': user.name, 'email': user.email } except ValueError as e: return {'error': str(e)}, 400 except Exception as e: return {'error': 'Failed to retrieve user'}, 500 @app.route('/delete_user', methods=['POST']) def delete_user(): """Secure user deletion with validation.""" try: user_id = request.form.get('user_id', '') # Validate input safe_user_id = validate_user_input(user_id, 'integer') # Additional authorization check (implement as needed) if not is_authorized_to_delete(safe_user_id): return {'error': 'Not authorized'}, 403 # Secure: Use ORM with proper validation session = Session() user = session.query(User).filter(User.id == safe_user_id).first() if not user: session.close() return {'error': 'User not found'}, 404 session.delete(user) session.commit() session.close() return {'message': 'User deleted successfully'} except ValueError as e: return {'error': str(e)}, 400 except Exception as e: return {'error': 'Deletion failed'}, 500 # Alternative: Secure raw SQL with bindparam @app.route('/advanced_search') def advanced_search(): """Advanced search using bindparam for dynamic queries.""" try: name_filter = request.args.get('name', '') email_filter = request.args.get('email', '') limit = request.args.get('limit', '10') # Validate inputs safe_limit = min(validate_user_input(limit, 'integer'), 100) conditions = [] params = {} if name_filter: safe_name = validate_user_input(name_filter, 'string', 50) conditions.append("name LIKE :name_pattern") params['name_pattern'] = f'%{safe_name}%' if email_filter: safe_email = validate_user_input(email_filter, 'string', 100) conditions.append("email LIKE :email_pattern") params['email_pattern'] = f'%{safe_email}%' # Build query dynamically but safely base_query = "SELECT id, name, email FROM users" if conditions: where_clause = " WHERE " + " AND ".join(conditions) full_query = base_query + where_clause + " LIMIT :limit" else: full_query = base_query + " LIMIT :limit" params['limit'] = safe_limit # Execute with parameters query = text(full_query) with engine.connect() as conn: result = conn.execute(query, params) return [dict(row) for row in result] except ValueError as e: return {'error': str(e)}, 400 except Exception as e: return {'error': 'Advanced search failed'}, 500 # Helper function (implement based on your auth system) def is_authorized_to_delete(user_id): """Check if current user can delete the specified user.""" # Implement your authorization logic return True # Placeholder

💡 Why This Fix Works

See fix suggestions for detailed explanation.

Why it happens

Code executes raw SQL with formatting: session.execute(f'SELECT * FROM users WHERE id = {user_id}'). String formatting doesn't parameterize. User input in SQL strings enables SQL injection. f-strings, .format(), or + concatenation all vulnerable. SQLAlchemy execute() requires parameterized queries.

Root causes

Using session.execute() with String-Formatted SQL

Code executes raw SQL with formatting: session.execute(f'SELECT * FROM users WHERE id = {user_id}'). String formatting doesn't parameterize. User input in SQL strings enables SQL injection. f-strings, .format(), or + concatenation all vulnerable. SQLAlchemy execute() requires parameterized queries.

Using text() with String Concatenation or Formatting

Formatted text queries: from sqlalchemy import text; session.execute(text(f'SELECT * FROM {table} WHERE name = "{name}"')). text() accepts parameters but string formatting bypasses safety. User-controlled table or column names in queries. Dynamic SQL construction creates injection vectors.

Not Using Parameterized Queries with execute()

Execute without parameters: query = 'SELECT * FROM products WHERE category = "' + category + '"'; session.execute(text(query)). Should use: session.execute(text('SELECT * FROM products WHERE category = :cat'), {'cat': category}). Parameters essential for security. Named or positional parameters required.

Building Dynamic SQL from User Input

User input in query structure: if sort_order == 'asc': query = 'ORDER BY name ASC'; else: query = 'ORDER BY name DESC'; session.execute(text('SELECT * FROM users ' + query)). Dynamic query building from user choices. Even with limited options, validation insufficient.

Using Raw SQL Instead of SQLAlchemy ORM

Avoiding ORM for raw SQL: session.execute(text(sql_string)). ORM provides safe query construction: session.query(User).filter(User.id == user_id). Raw SQL more prone to errors. ORM methods handle parameterization automatically. Developers choose raw SQL for familiarity or perceived performance.

Fixes

1

Always Use Parameterized Queries with Named Parameters

Use bound parameters: from sqlalchemy import text; session.execute(text('SELECT * FROM users WHERE id = :user_id'), {'user_id': user_id}). Named parameters with :param syntax. Pass values in dictionary. SQLAlchemy handles escaping. Works with all SQL statements. Never concatenate user input.

2

Use SQLAlchemy ORM Instead of Raw SQL

ORM query methods: session.query(User).filter(User.id == user_id).first(). Automatic parameterization. Type safety. No SQL injection risk. Readable code. Database abstraction. ORM first approach safer than raw SQL. Use execute() only when ORM insufficient.

3

Use Allowlists for Dynamic Table or Column Names

Validate identifiers against allowlist: ALLOWED_COLUMNS = {'name', 'email', 'created_at'}; if column not in ALLOWED_COLUMNS: abort(400); query = text(f'SELECT {column} FROM users WHERE id = :id'); session.execute(query, {'id': user_id}). Identifiers can't be parameterized, require allowlist validation.

4

Use Query Builder for Complex Dynamic Queries

SQLAlchemy query builder: from sqlalchemy import select, table, column; stmt = select([column('name')]).select_from(table('users')).where(column('id') == bindparam('user_id')); session.execute(stmt, {'user_id': user_id}). Programmatic query construction. Type-safe. Prevents injection through structure.

5

Validate and Sanitize Input Before Parameterized Queries

Input validation defense-in-depth: if not isinstance(user_id, int): abort(400); if not re.match(r'^[a-zA-Z0-9_]+$', username): abort(400). Validate types and formats. Reject unexpected input. Combine with parameterization. Layered security approach.

6

Scan for Raw SQL Execution and Review All Usage

Find execute calls: grep -r '\.execute(' --include="*.py". Review each for parameterization. Use bandit security scanner. Code review for database code. Automated testing with SQL injection payloads. Static analysis detecting raw SQL patterns.

Detect This Vulnerability in Your Code

Sourcery automatically identifies python sqlalchemy raw query sql injection vulnerability and many other security issues in your codebase.