SQL injection from AWS Lambda event data in psycopg cursor execute

Critical Risk SQL Injection
pythonaws-lambdapsycopgpostgresqlsql-injectionserverless

What it is

SQL injection vulnerability in Python AWS Lambda functions where event data is concatenated into SQL strings and passed to psycopg cursor.execute without parameterization, allowing attackers to manipulate PostgreSQL database operations.

import json
import psycopg2
import os
from typing import Dict, Any, List

def get_db_connection():
    return psycopg2.connect(
        host=os.environ['DB_HOST'],
        database=os.environ['DB_NAME'],
        user=os.environ['DB_USER'],
        password=os.environ['DB_PASSWORD'],
        port=os.environ.get('DB_PORT', 5432)
    )

def lambda_handler(event, context):
    """AWS Lambda handler for PostgreSQL operations"""

    action = event.get('action')

    if action == 'get_project':
        return get_project(event)
    elif action == 'search_projects':
        return search_projects(event)
    elif action == 'update_status':
        return update_project_status(event)
    else:
        return {'statusCode': 400, 'body': 'Invalid action'}

def get_project(event):
    # VULNERABLE: Direct event data in SQL
    project_id = event.get('projectId')

    conn = get_db_connection()
    cursor = conn.cursor()

    query = f"SELECT * FROM projects WHERE id = {project_id}"
    cursor.execute(query)

    result = cursor.fetchone()
    cursor.close()
    conn.close()

    return {
        'statusCode': 200,
        'body': json.dumps(result)
    }

def search_projects(event):
    # VULNERABLE: String formatting
    name = event.get('name', '')
    owner = event.get('owner', '')
    min_budget = event.get('minBudget', 0)

    conn = get_db_connection()
    cursor = conn.cursor()

    query = ("SELECT id, name, owner FROM projects WHERE "
             f"name ILIKE '%{name}%' AND owner = '{owner}' "
             f"AND budget >= {min_budget}")

    cursor.execute(query)
    results = cursor.fetchall()

    cursor.close()
    conn.close()

    return {
        'statusCode': 200,
        'body': json.dumps(results)
    }

def update_project_status(event):
    # VULNERABLE: % formatting
    project_id = event.get('projectId')
    new_status = event.get('newStatus')
    notes = event.get('notes', '')

    conn = get_db_connection()
    cursor = conn.cursor()

    query = ("UPDATE projects SET status = '%s', notes = '%s' "
             "WHERE id = %s") % (new_status, notes, project_id)

    cursor.execute(query)
    conn.commit()

    cursor.close()
    conn.close()

    return {
        'statusCode': 200,
        'body': json.dumps({'message': 'Status updated'})
    }

# Malicious Lambda event examples:
# {"action": "get_project", "projectId": "1 OR 1=1 --"}
# {"action": "search_projects", "name": "'; DROP TABLE projects; --"}
# {"action": "update_status", "newStatus": "'; DELETE FROM projects WHERE '1'='1'; --"}
import json
import psycopg2
import psycopg2.extras
import os
import logging
from typing import Dict, Any, List, Optional

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def get_db_connection():
    return psycopg2.connect(
        host=os.environ['DB_HOST'],
        database=os.environ['DB_NAME'],
        user=os.environ['DB_USER'],
        password=os.environ['DB_PASSWORD'],
        port=os.environ.get('DB_PORT', 5432),
        cursor_factory=psycopg2.extras.RealDictCursor
    )

def validate_project_id(project_id: Any) -> int:
    """Validate and convert project ID"""
    try:
        pid = int(project_id)
        if pid <= 0:
            raise ValueError("Project ID must be positive")
        return pid
    except (ValueError, TypeError):
        raise ValueError(f"Invalid project ID: {project_id}")

def validate_email(email: str) -> str:
    """Basic email validation"""
    if not email or '@' not in email or len(email) > 100:
        raise ValueError("Invalid email format")
    return email.strip()

def lambda_handler(event, context):
    """AWS Lambda handler for PostgreSQL operations"""

    try:
        action = event.get('action')

        if action == 'get_project':
            return get_project(event)
        elif action == 'search_projects':
            return search_projects(event)
        elif action == 'update_status':
            return update_project_status(event)
        else:
            return {
                'statusCode': 400,
                'body': json.dumps({'error': 'Invalid action'})
            }

    except ValueError as e:
        logger.error(f"Validation error: {e}")
        return {
            'statusCode': 400,
            'body': json.dumps({'error': str(e)})
        }
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': 'Internal server error'})
        }

def get_project(event):
    # Input validation
    project_id = validate_project_id(event.get('projectId'))

    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        # SECURE: Parameterized query
        query = "SELECT id, name, owner, status, budget, created_at FROM projects WHERE id = %s"
        cursor.execute(query, (project_id,))

        result = cursor.fetchone()

        if not result:
            return {
                'statusCode': 404,
                'body': json.dumps({'error': 'Project not found'})
            }

        return {
            'statusCode': 200,
            'body': json.dumps(dict(result))
        }

    finally:
        cursor.close()
        conn.close()

def search_projects(event):
    # Input validation
    name = event.get('name', '').strip()
    owner = event.get('owner', '').strip()
    min_budget = event.get('minBudget', 0)

    if len(name) > 100:
        raise ValueError("Name search term too long")

    if owner:
        owner = validate_email(owner)

    # Validate budget
    try:
        min_budget = float(min_budget)
        if min_budget < 0:
            min_budget = 0
    except (ValueError, TypeError):
        min_budget = 0

    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        # SECURE: Parameterized query with dynamic conditions
        conditions = []
        params = []

        base_query = "SELECT id, name, owner, status, budget FROM projects WHERE 1=1"

        if name:
            conditions.append(" AND name ILIKE %s")
            params.append(f"%{name}%")

        if owner:
            conditions.append(" AND owner = %s")
            params.append(owner)

        if min_budget > 0:
            conditions.append(" AND budget >= %s")
            params.append(min_budget)

        query = base_query + "".join(conditions) + " ORDER BY created_at DESC LIMIT 100"
        cursor.execute(query, params)

        results = cursor.fetchall()

        return {
            'statusCode': 200,
            'body': json.dumps([dict(row) for row in results])
        }

    finally:
        cursor.close()
        conn.close()

def update_project_status(event):
    # Input validation
    project_id = validate_project_id(event.get('projectId'))
    new_status = event.get('newStatus', '').strip()
    notes = event.get('notes', '').strip()

    # Validate status
    valid_statuses = {'planning', 'active', 'completed', 'cancelled', 'on_hold'}
    if new_status not in valid_statuses:
        raise ValueError(f"Invalid status: {new_status}")

    if len(notes) > 1000:
        raise ValueError("Notes too long")

    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        # SECURE: Parameterized query with transaction
        conn.autocommit = False

        # Check if project exists first
        check_query = "SELECT id FROM projects WHERE id = %s"
        cursor.execute(check_query, (project_id,))

        if not cursor.fetchone():
            return {
                'statusCode': 404,
                'body': json.dumps({'error': 'Project not found'})
            }

        # Update with parameterized query
        update_query = ("UPDATE projects SET status = %s, notes = %s, "
                       "updated_at = CURRENT_TIMESTAMP WHERE id = %s")

        cursor.execute(update_query, (new_status, notes, project_id))

        # Log the change
        log_query = ("INSERT INTO project_status_log (project_id, old_status, new_status, notes, changed_at) "
                    "VALUES (%s, (SELECT status FROM projects WHERE id = %s), %s, %s, CURRENT_TIMESTAMP)")
        cursor.execute(log_query, (project_id, project_id, new_status, notes))

        conn.commit()

        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': 'Status updated successfully',
                'projectId': project_id,
                'newStatus': new_status
            })
        }

    except psycopg2.Error as e:
        conn.rollback()
        logger.error(f"Database error: {e}")
        raise

    finally:
        cursor.close()
        conn.close()

# Example with connection pooling for high-frequency operations
import psycopg2.pool

connection_pool = None

def get_pooled_connection():
    global connection_pool
    if connection_pool is None:
        connection_pool = psycopg2.pool.SimpleConnectionPool(
            1, 10,  # min and max connections
            host=os.environ['DB_HOST'],
            database=os.environ['DB_NAME'],
            user=os.environ['DB_USER'],
            password=os.environ['DB_PASSWORD'],
            port=os.environ.get('DB_PORT', 5432)
        )
    return connection_pool.getconn()

def return_pooled_connection(conn):
    global connection_pool
    if connection_pool:
        connection_pool.putconn(conn)

💡 Why This Fix Works

The vulnerable code uses f-strings, string formatting, and % formatting to insert Lambda event data directly into psycopg SQL queries. The fixed version uses parameterized queries with %s placeholders, implements comprehensive input validation, proper error handling, transactions for consistency, and demonstrates connection pooling for performance.

Why it happens

Event data is concatenated into SQL strings and passed to cursor.execute without parameterization, allowing attacker-controlled SQL fragments.

Root causes

Lambda Event Data Concatenation

Event data is concatenated into SQL strings and passed to cursor.execute without parameterization, allowing attacker-controlled SQL fragments.

Preview example – PYTHON
# VULNERABLE: Event data concatenation
import psycopg2

def lambda_handler(event, context):
    user_id = event.get('userId')
    status = event.get('status')

    # Direct concatenation into SQL
    query = f"SELECT * FROM users WHERE id = {user_id} AND status = '{status}'"

    cursor.execute(query)
    return cursor.fetchall()

String Formatting with psycopg

Using string formatting methods to insert Lambda event fields directly into SQL queries executed by psycopg.

Preview example – PYTHON
# VULNERABLE: String formatting
def update_project_status(event, context):
    project_id = event['projectId']
    new_status = event['newStatus']

    query = "UPDATE projects SET status = '{}' WHERE id = {}".format(
        new_status, project_id
    )
    cursor.execute(query)
    # Allows injection: newStatus = "'; DROP TABLE projects; --"

Fixes

1

Use Parameterized Queries with psycopg

Build static SQL with placeholders and pass values separately, e.g., cursor.execute("SELECT * FROM projects WHERE status = %s", (status,)). Avoid f-strings, string formatting, or concatenation for SQL values.

View implementation – PYTHON
# SECURE: Parameterized query
import psycopg2

def lambda_handler(event, context):
    user_id = event.get('userId')
    status = event.get('status')

    query = "SELECT * FROM users WHERE id = %s AND status = %s"
    cursor.execute(query, (user_id, status))
    return cursor.fetchall()
2

Validate Lambda Event Data

Implement comprehensive input validation for all Lambda event fields before using them in database operations.

View implementation – PYTHON
# SECURE: Input validation with parameterized queries
def validate_user_id(user_id):
    try:
        uid = int(user_id)
        if uid <= 0:
            raise ValueError("User ID must be positive")
        return uid
    except (ValueError, TypeError):
        raise ValueError("Invalid user ID format")

def lambda_handler(event, context):
    # Validate inputs
    user_id = validate_user_id(event.get('userId'))
    valid_statuses = {'active', 'inactive', 'pending'}
    status = event.get('status')

    if status not in valid_statuses:
        raise ValueError(f"Invalid status: {status}")

    query = "SELECT * FROM users WHERE id = %s AND status = %s"
    cursor.execute(query, (user_id, status))
    return cursor.fetchall()

Detect This Vulnerability in Your Code

Sourcery automatically identifies sql injection from aws lambda event data in psycopg cursor execute and many other security issues in your codebase.