Python AWS Lambda psycopg SQL Injection Vulnerability

Critical Risk SQL Injection
pythonaws-lambdapostgresqlpsycopgsql-injectionserverlessclouddatabaseinjectionuser-inputparameterized-queriesevent-driven

What it is

A critical vulnerability that occurs when AWS Lambda functions using Python and psycopg (PostgreSQL adapter) construct SQL queries by concatenating user input from Lambda events directly into query strings. This serverless environment vulnerability allows attackers to inject malicious SQL code through Lambda event parameters, potentially leading to data theft, database corruption, or unauthorized access to PostgreSQL databases and cloud resources.

import json
import psycopg2
import os
from typing import Dict, Any, List

def get_db_connection():
    return psycopg2.connect(
        host=os.environ['DB_HOST'],
        database=os.environ['DB_NAME'],
        user=os.environ['DB_USER'],
        password=os.environ['DB_PASSWORD'],
        port=os.environ.get('DB_PORT', 5432)
    )

def lambda_handler(event, context):
    """AWS Lambda handler for PostgreSQL operations"""

    action = event.get('action')

    if action == 'get_project':
        return get_project(event)
    elif action == 'search_projects':
        return search_projects(event)
    elif action == 'update_status':
        return update_project_status(event)
    else:
        return {'statusCode': 400, 'body': 'Invalid action'}

def get_project(event):
    # VULNERABLE: Direct event data in SQL
    project_id = event.get('projectId')

    conn = get_db_connection()
    cursor = conn.cursor()

    query = f"SELECT * FROM projects WHERE id = {project_id}"
    cursor.execute(query)

    result = cursor.fetchone()
    cursor.close()
    conn.close()

    return {
        'statusCode': 200,
        'body': json.dumps(result)
    }

def search_projects(event):
    # VULNERABLE: String formatting
    name = event.get('name', '')
    owner = event.get('owner', '')
    min_budget = event.get('minBudget', 0)

    conn = get_db_connection()
    cursor = conn.cursor()

    query = ("SELECT id, name, owner FROM projects WHERE "
             f"name ILIKE '%{name}%' AND owner = '{owner}' "
             f"AND budget >= {min_budget}")

    cursor.execute(query)
    results = cursor.fetchall()

    cursor.close()
    conn.close()

    return {
        'statusCode': 200,
        'body': json.dumps(results)
    }

def update_project_status(event):
    # VULNERABLE: % formatting
    project_id = event.get('projectId')
    new_status = event.get('newStatus')
    notes = event.get('notes', '')

    conn = get_db_connection()
    cursor = conn.cursor()

    query = ("UPDATE projects SET status = '%s', notes = '%s' "
             "WHERE id = %s") % (new_status, notes, project_id)

    cursor.execute(query)
    conn.commit()

    cursor.close()
    conn.close()

    return {
        'statusCode': 200,
        'body': json.dumps({'message': 'Status updated'})
    }

# Malicious Lambda event examples:
# {"action": "get_project", "projectId": "1 OR 1=1 --"}
# {"action": "search_projects", "name": "'; DROP TABLE projects; --"}
# {"action": "update_status", "newStatus": "'; DELETE FROM projects WHERE '1'='1'; --"}
import json
import psycopg2
import psycopg2.extras
import os
import logging
from typing import Dict, Any, List, Optional

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def get_db_connection():
    return psycopg2.connect(
        host=os.environ['DB_HOST'],
        database=os.environ['DB_NAME'],
        user=os.environ['DB_USER'],
        password=os.environ['DB_PASSWORD'],
        port=os.environ.get('DB_PORT', 5432),
        cursor_factory=psycopg2.extras.RealDictCursor
    )

def validate_project_id(project_id: Any) -> int:
    """Validate and convert project ID"""
    try:
        pid = int(project_id)
        if pid <= 0:
            raise ValueError("Project ID must be positive")
        return pid
    except (ValueError, TypeError):
        raise ValueError(f"Invalid project ID: {project_id}")

def validate_email(email: str) -> str:
    """Basic email validation"""
    if not email or '@' not in email or len(email) > 100:
        raise ValueError("Invalid email format")
    return email.strip()

def lambda_handler(event, context):
    """AWS Lambda handler for PostgreSQL operations"""

    try:
        action = event.get('action')

        if action == 'get_project':
            return get_project(event)
        elif action == 'search_projects':
            return search_projects(event)
        elif action == 'update_status':
            return update_project_status(event)
        else:
            return {
                'statusCode': 400,
                'body': json.dumps({'error': 'Invalid action'})
            }

    except ValueError as e:
        logger.error(f"Validation error: {e}")
        return {
            'statusCode': 400,
            'body': json.dumps({'error': str(e)})
        }
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return {
            'statusCode': 500,
            'body': json.dumps({'error': 'Internal server error'})
        }

def get_project(event):
    # Input validation
    project_id = validate_project_id(event.get('projectId'))

    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        # SECURE: Parameterized query
        query = "SELECT id, name, owner, status, budget, created_at FROM projects WHERE id = %s"
        cursor.execute(query, (project_id,))

        result = cursor.fetchone()

        if not result:
            return {
                'statusCode': 404,
                'body': json.dumps({'error': 'Project not found'})
            }

        return {
            'statusCode': 200,
            'body': json.dumps(dict(result))
        }

    finally:
        cursor.close()
        conn.close()

def search_projects(event):
    # Input validation
    name = event.get('name', '').strip()
    owner = event.get('owner', '').strip()
    min_budget = event.get('minBudget', 0)

    if len(name) > 100:
        raise ValueError("Name search term too long")

    if owner:
        owner = validate_email(owner)

    # Validate budget
    try:
        min_budget = float(min_budget)
        if min_budget < 0:
            min_budget = 0
    except (ValueError, TypeError):
        min_budget = 0

    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        # SECURE: Parameterized query with dynamic conditions
        conditions = []
        params = []

        base_query = "SELECT id, name, owner, status, budget FROM projects WHERE 1=1"

        if name:
            conditions.append(" AND name ILIKE %s")
            params.append(f"%{name}%")

        if owner:
            conditions.append(" AND owner = %s")
            params.append(owner)

        if min_budget > 0:
            conditions.append(" AND budget >= %s")
            params.append(min_budget)

        query = base_query + "".join(conditions) + " ORDER BY created_at DESC LIMIT 100"
        cursor.execute(query, params)

        results = cursor.fetchall()

        return {
            'statusCode': 200,
            'body': json.dumps([dict(row) for row in results])
        }

    finally:
        cursor.close()
        conn.close()

def update_project_status(event):
    # Input validation
    project_id = validate_project_id(event.get('projectId'))
    new_status = event.get('newStatus', '').strip()
    notes = event.get('notes', '').strip()

    # Validate status
    valid_statuses = {'planning', 'active', 'completed', 'cancelled', 'on_hold'}
    if new_status not in valid_statuses:
        raise ValueError(f"Invalid status: {new_status}")

    if len(notes) > 1000:
        raise ValueError("Notes too long")

    conn = get_db_connection()
    cursor = conn.cursor()

    try:
        # SECURE: Parameterized query with transaction
        conn.autocommit = False

        # Check if project exists first
        check_query = "SELECT id FROM projects WHERE id = %s"
        cursor.execute(check_query, (project_id,))

        if not cursor.fetchone():
            return {
                'statusCode': 404,
                'body': json.dumps({'error': 'Project not found'})
            }

        # Update with parameterized query
        update_query = ("""
            UPDATE projects 
            SET status = %s, notes = %s, updated_at = CURRENT_TIMESTAMP 
            WHERE id = %s
        """)
        cursor.execute(update_query, (new_status, notes, project_id))

        # Log the change
        log_query = ("""
            INSERT INTO project_status_log (project_id, old_status, new_status, notes, changed_at) 
            VALUES (%s, (SELECT status FROM projects WHERE id = %s), %s, %s, CURRENT_TIMESTAMP)
        """)
        cursor.execute(log_query, (project_id, project_id, new_status, notes))

        conn.commit()

        return {
            'statusCode': 200,
            'body': json.dumps({
                'message': 'Status updated successfully',
                'projectId': project_id,
                'newStatus': new_status
            })
        }

    except psycopg2.Error as e:
        conn.rollback()
        logger.error(f"Database error: {e}")
        raise

    finally:
        cursor.close()
        conn.close()

# Example with connection pooling for high-frequency operations
import psycopg2.pool

connection_pool = None

def get_pooled_connection():
    global connection_pool
    if connection_pool is None:
        connection_pool = psycopg2.pool.SimpleConnectionPool(
            1, 10,  # min and max connections
            host=os.environ['DB_HOST'],
            database=os.environ['DB_NAME'],
            user=os.environ['DB_USER'],
            password=os.environ['DB_PASSWORD'],
            port=os.environ.get('DB_PORT', 5432)
        )
    return connection_pool.getconn()

def return_pooled_connection(conn):
    global connection_pool
    if connection_pool:
        connection_pool.putconn(conn)

💡 Why This Fix Works

The vulnerable code uses f-strings, string formatting, and % formatting to insert Lambda event data directly into psycopg SQL queries. The fixed version uses parameterized queries with %s placeholders, implements comprehensive input validation, proper error handling, transactions for consistency, and demonstrates connection pooling for performance.

Why it happens

Using f-strings to format PostgreSQL queries with data from Lambda events creates direct injection vulnerabilities. Lambda event data is completely untrusted and can be manipulated by attackers to inject malicious SQL code specific to PostgreSQL syntax.

Root causes

F-String Formatting with Lambda Event Data

Using f-strings to format PostgreSQL queries with data from Lambda events creates direct injection vulnerabilities. Lambda event data is completely untrusted and can be manipulated by attackers to inject malicious SQL code specific to PostgreSQL syntax.

Preview example – PYTHON
# VULNERABLE: f-string with Lambda event data
def lambda_handler(event, context):
    project_id = event.get('projectId')  # Could be: "1 OR 1=1 --"
    query = f"SELECT * FROM projects WHERE id = {project_id}"
    cursor.execute(query)
    # Results in: SELECT * FROM projects WHERE id = 1 OR 1=1 --

Percent (%) Formatting in PostgreSQL Queries

Using % formatting to build PostgreSQL queries with Lambda event data creates injection vulnerabilities. This is particularly dangerous with psycopg as % has special meaning in both Python formatting and PostgreSQL parameter placeholders.

Preview example – PYTHON
# VULNERABLE: % formatting (confusing with psycopg parameters)
def search_projects(event):
    name = event.get('name')  # Could be: "'; DROP TABLE projects; --"
    owner = event.get('owner')
    query = "SELECT * FROM projects WHERE name ILIKE '%%%s%%' AND owner = '%s'" % (name, owner)
    cursor.execute(query)

Dynamic Query Building with Event Parameters

Building PostgreSQL queries dynamically using Lambda event parameters without proper validation creates multiple injection points. This is especially risky in serverless environments where events can come from various sources.

Preview example – PYTHON
# VULNERABLE: Dynamic query building
def update_project_status(event):
    project_id = event.get('projectId')
    new_status = event.get('newStatus')
    notes = event.get('notes', '')
    # % formatting creates confusion with psycopg's %s syntax
    query = ("UPDATE projects SET status = '%s', notes = '%s' "
             "WHERE id = %s") % (new_status, notes, project_id)
    cursor.execute(query)

PostgreSQL-Specific Injection Vectors

PostgreSQL offers unique injection opportunities through features like COPY commands, stored procedures, and JSON operations. Lambda functions processing complex event data may be vulnerable to PostgreSQL-specific attacks.

Preview example – PYTHON
# VULNERABLE: PostgreSQL JSON operations
def search_projects_by_metadata(event):
    metadata_key = event.get('key')
    metadata_value = event.get('value')
    # Vulnerable to PostgreSQL JSON injection
    query = f"SELECT * FROM projects WHERE metadata->>'{metadata_key}' = '{metadata_value}'"
    cursor.execute(query)
    # Malicious: key = "'::text; DROP TABLE projects; --"

Fixes

1

Use psycopg Parameterized Queries with %s Placeholders

Always use psycopg's parameterized queries with %s placeholders instead of Python string formatting. Note that psycopg uses %s for all parameter types, unlike other database adapters that might use different placeholder styles.

View implementation – PYTHON
# SECURE: psycopg parameterized query
import psycopg2
import psycopg2.extras

def lambda_handler(event, context):
    try:
        # Input validation
        project_id = validate_project_id(event.get('projectId'))
        
        conn = get_db_connection()
        cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
        
        # Safe parameterized query - psycopg uses %s for all types
        query = "SELECT id, name, owner, status FROM projects WHERE id = %s"
        cursor.execute(query, (project_id,))
        
        result = cursor.fetchone()
        return {
            'statusCode': 200,
            'body': json.dumps(dict(result) if result else None)
        }
    except ValueError as e:
        return {
            'statusCode': 400,
            'body': json.dumps({'error': str(e)})
        }
2

Implement PostgreSQL-Specific Input Validation

Validate Lambda event parameters with PostgreSQL-specific considerations. Be especially careful with JSON operations, array operations, and PostgreSQL's rich type system.

View implementation – PYTHON
# PostgreSQL-specific validation functions
def validate_project_id(project_id):
    if project_id is None:
        raise ValueError("Project ID is required")
    try:
        pid = int(project_id)
        if pid <= 0 or pid > 2147483647:  # PostgreSQL integer limit
            raise ValueError("Project ID out of valid range")
        return pid
    except (ValueError, TypeError):
        raise ValueError(f"Invalid project ID format: {project_id}")

def validate_json_key(key):
    # Validate JSON key for PostgreSQL JSON operations
    if not key or len(key) > 100:
        raise ValueError("Invalid JSON key")
    # Prevent JSON injection
    if any(char in key for char in ["'", '"', ';', '-', '/', '*']):
        raise ValueError("JSON key contains invalid characters")
    return key

def validate_email(email):
    if not email or '@' not in email or len(email) > 254:  # RFC limit
        raise ValueError("Invalid email format")
    return email.strip().lower()
3

Use PostgreSQL Transactions and Error Handling

Implement proper PostgreSQL transaction management in Lambda functions. Use psycopg's transaction features and handle PostgreSQL-specific exceptions appropriately.

View implementation – PYTHON
# SECURE: PostgreSQL transaction management
import psycopg2
from psycopg2 import sql

def update_project_with_transaction(event):
    project_id = validate_project_id(event.get('projectId'))
    new_status = validate_status(event.get('newStatus'))
    notes = event.get('notes', '').strip()[:1000]
    
    conn = get_db_connection()
    cursor = conn.cursor()
    
    try:
        # Start transaction
        conn.autocommit = False
        
        # Check if project exists
        cursor.execute("SELECT id FROM projects WHERE id = %s", (project_id,))
        if not cursor.fetchone():
            raise ValueError("Project not found")
        
        # Update with parameterized query
        update_query = """
            UPDATE projects 
            SET status = %s, notes = %s, updated_at = CURRENT_TIMESTAMP 
            WHERE id = %s
        """
        cursor.execute(update_query, (new_status, notes, project_id))
        
        # Log the change
        log_query = """
            INSERT INTO project_status_log (project_id, new_status, notes, changed_at)
            VALUES (%s, %s, %s, CURRENT_TIMESTAMP)
        """
        cursor.execute(log_query, (project_id, new_status, notes))
        
        conn.commit()
        
        return {
            'statusCode': 200,
            'body': json.dumps({'message': 'Project updated successfully'})
        }
        
    except psycopg2.Error as e:
        conn.rollback()
        logger.error(f"PostgreSQL error: {e}")
        raise
    finally:
        cursor.close()
        conn.close()
4

Safe PostgreSQL JSON and Array Operations

When working with PostgreSQL's JSON and array features in Lambda functions, use proper parameterization and validation to prevent injection through these PostgreSQL-specific data types.

View implementation – PYTHON
# SECURE: PostgreSQL JSON operations
def search_projects_by_metadata_safe(event):
    metadata_key = validate_json_key(event.get('key'))
    metadata_value = event.get('value', '').strip()[:500]
    
    if not metadata_value:
        raise ValueError("Metadata value is required")
    
    conn = get_db_connection()
    cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
    
    try:
        # Safe JSON operation with parameterized query
        # Use jsonb operator with proper parameterization
        query = "SELECT id, name, owner FROM projects WHERE metadata->>%s = %s"
        cursor.execute(query, (metadata_key, metadata_value))
        
        results = cursor.fetchall()
        return {
            'statusCode': 200,
            'body': json.dumps([dict(row) for row in results])
        }
    finally:
        cursor.close()
        conn.close()

# SECURE: PostgreSQL array operations
def search_projects_by_tags(event):
    tags = event.get('tags', [])
    if not isinstance(tags, list) or len(tags) > 10:
        raise ValueError("Invalid tags array")
    
    # Validate each tag
    validated_tags = []
    for tag in tags:
        if isinstance(tag, str) and len(tag.strip()) <= 50:
            validated_tags.append(tag.strip())
    
    if not validated_tags:
        return {'statusCode': 400, 'body': json.dumps({'error': 'No valid tags'})}
    
    query = "SELECT id, name FROM projects WHERE tags && %s"
    cursor.execute(query, (validated_tags,))
5

Use Connection Pooling with psycopg

Implement connection pooling for psycopg in Lambda functions to improve performance and manage database connections efficiently in the serverless environment.

View implementation – PYTHON
# SECURE: psycopg connection pooling for Lambda
import psycopg2.pool
import threading

# Thread-local storage for connection pool
local_data = threading.local()

def get_connection_pool():
    if not hasattr(local_data, 'pool'):
        local_data.pool = psycopg2.pool.SimpleConnectionPool(
            1, 5,  # min and max connections
            host=os.environ['DB_HOST'],
            database=os.environ['DB_NAME'],
            user=os.environ['DB_USER'],
            password=os.environ['DB_PASSWORD'],
            port=os.environ.get('DB_PORT', 5432)
        )
    return local_data.pool

def get_db_connection():
    pool = get_connection_pool()
    return pool.getconn()

def return_db_connection(conn):
    pool = get_connection_pool()
    pool.putconn(conn)

def lambda_handler(event, context):
    conn = get_db_connection()
    try:
        # Use connection safely
        pass
    finally:
        return_db_connection(conn)

Detect This Vulnerability in Your Code

Sourcery automatically identifies python aws lambda psycopg sql injection vulnerability and many other security issues in your codebase.