SQL injection from variable SQL string concatenation in aiopg execute

Critical Risk SQL Injection
pythonaiopgpostgresqlsql-injectionasync

What it is

SQL injection vulnerability in Python async applications using aiopg where SQL queries are assembled using string concatenation or formatting with variables instead of bound parameters.

# VULNERABLE: aiopg with string concatenation
import aiopg
import asyncio
from aiohttp import web
import json

DSN = 'dbname=myapp user=appuser password=password host=127.0.0.1'

# Vulnerable database functions
async def authenticate_user(username, password):
    async with aiopg.create_pool(DSN) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                # SQL injection vulnerability
                query = f"""
                    SELECT id, username, email, role
                    FROM users
                    WHERE username = '{username}'
                    AND password = crypt('{password}', password)
                """
                await cur.execute(query)
                return await cur.fetchone()

async def get_user_transactions(user_id, account_type, start_date, end_date):
    async with aiopg.create_pool(DSN) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                # Multiple injection points
                query = f"""
                    SELECT t.*, a.account_name
                    FROM transactions t
                    JOIN accounts a ON t.account_id = a.id
                    WHERE t.user_id = {user_id}
                """

                if account_type:
                    query += f" AND a.account_type = '{account_type}'"
                if start_date:
                    query += f" AND t.transaction_date >= '{start_date}'"
                if end_date:
                    query += f" AND t.transaction_date <= '{end_date}'"

                query += " ORDER BY t.transaction_date DESC"

                await cur.execute(query)
                return await cur.fetchall()

async def update_account_balance(account_id, amount, transaction_type, description):
    async with aiopg.create_pool(DSN) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                # Get current balance
                balance_query = f"SELECT balance FROM accounts WHERE id = {account_id}"
                await cur.execute(balance_query)
                current = await cur.fetchone()

                if current:
                    new_balance = current[0] + amount if transaction_type == 'credit' else current[0] - amount

                    # Update balance
                    update_query = f"""
                        UPDATE accounts
                        SET balance = {new_balance},
                            last_transaction = NOW()
                        WHERE id = {account_id}
                    """
                    await cur.execute(update_query)

                    # Log transaction
                    log_query = f"""
                        INSERT INTO transaction_log (account_id, amount, type, description, timestamp)
                        VALUES ({account_id}, {amount}, '{transaction_type}', '{description}', NOW())
                    """
                    await cur.execute(log_query)

                    await conn.commit()
                    return True
                return False

async def search_logs(search_term, log_level, start_time, limit=100):
    async with aiopg.create_pool(DSN) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                # Complex dynamic query building
                query = "SELECT * FROM application_logs WHERE 1=1"

                if search_term:
                    query += f" AND message LIKE '%{search_term}%'"
                if log_level:
                    query += f" AND level = '{log_level}'"
                if start_time:
                    query += f" AND timestamp >= '{start_time}'"

                query += f" ORDER BY timestamp DESC LIMIT {limit}"

                await cur.execute(query)
                return await cur.fetchall()

# Aiohttp routes using vulnerable functions
async def login(request):
    data = await request.json()
    username = data.get('username')
    password = data.get('password')

    user = await authenticate_user(username, password)
    if user:
        return web.json_response({'success': True, 'user': dict(user)})
    return web.json_response({'error': 'Invalid credentials'}, status=401)

# Attack examples:
# username = "admin' --"
# search_term = "'; DROP TABLE users; --"
# SECURE: Parameterized queries with input validation
import aiopg
import asyncio
from aiohttp import web
from psycopg2 import sql
import bcrypt
import re
from datetime import datetime
from typing import Optional, Dict, List, Tuple

DSN = 'dbname=myapp user=appuser password=password host=127.0.0.1'

# Input validation functions
def validate_username(username: str) -> str:
    if not username or not re.match(r'^[a-zA-Z0-9_]{3,30}$', username):
        raise ValueError("Invalid username format")
    return username

def validate_date(date_str: str) -> datetime:
    try:
        return datetime.fromisoformat(date_str)
    except (ValueError, TypeError):
        raise ValueError("Invalid date format")

def validate_account_type(account_type: str) -> str:
    allowed_types = ['checking', 'savings', 'credit', 'investment']
    if account_type not in allowed_types:
        raise ValueError("Invalid account type")
    return account_type

def validate_log_level(level: str) -> str:
    allowed_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
    if level not in allowed_levels:
        raise ValueError("Invalid log level")
    return level

# Secure database functions with parameterized queries
async def authenticate_user(username: str, password: str) -> Optional[Dict]:
    try:
        username = validate_username(username)
    except ValueError:
        return None

    async with aiopg.create_pool(DSN) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                # Parameterized query
                query = """
                    SELECT id, username, email, role, password_hash
                    FROM users
                    WHERE username = %s
                """
                await cur.execute(query, (username,))
                user = await cur.fetchone()

                if user:
                    # Verify password with bcrypt
                    if bcrypt.checkpw(password.encode('utf-8'), user[4].encode('utf-8')):
                        return {
                            'id': user[0],
                            'username': user[1],
                            'email': user[2],
                            'role': user[3]
                        }
                return None

async def get_user_transactions(
    user_id: int,
    account_type: Optional[str] = None,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None
) -> List[Tuple]:
    # Validate inputs
    if not isinstance(user_id, int) or user_id <= 0:
        raise ValueError("Invalid user ID")

    async with aiopg.create_pool(DSN) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                query = """
                    SELECT t.id, t.amount, t.transaction_date,
                           t.description, a.account_name, a.account_type
                    FROM transactions t
                    JOIN accounts a ON t.account_id = a.id
                    WHERE t.user_id = %s
                """
                params = [user_id]

                if account_type:
                    account_type = validate_account_type(account_type)
                    query += " AND a.account_type = %s"
                    params.append(account_type)

                if start_date:
                    start_dt = validate_date(start_date)
                    query += " AND t.transaction_date >= %s"
                    params.append(start_dt)

                if end_date:
                    end_dt = validate_date(end_date)
                    query += " AND t.transaction_date <= %s"
                    params.append(end_dt)

                query += " ORDER BY t.transaction_date DESC"

                await cur.execute(query, tuple(params))
                return await cur.fetchall()

async def update_account_balance(
    account_id: int,
    amount: float,
    transaction_type: str,
    description: str
) -> bool:
    # Validate inputs
    if not isinstance(account_id, int) or account_id <= 0:
        raise ValueError("Invalid account ID")
    if not isinstance(amount, (int, float)) or amount <= 0:
        raise ValueError("Invalid amount")
    if transaction_type not in ['credit', 'debit']:
        raise ValueError("Invalid transaction type")
    if not description or len(description) > 500:
        raise ValueError("Invalid description")

    async with aiopg.create_pool(DSN) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                try:
                    # Start transaction
                    await conn.begin()

                    # Get current balance with row lock
                    balance_query = """
                        SELECT balance FROM accounts
                        WHERE id = %s
                        FOR UPDATE
                    """
                    await cur.execute(balance_query, (account_id,))
                    current = await cur.fetchone()

                    if not current:
                        await conn.rollback()
                        return False

                    # Calculate new balance
                    current_balance = current[0]
                    if transaction_type == 'credit':
                        new_balance = current_balance + amount
                    else:
                        new_balance = current_balance - amount
                        if new_balance < 0:
                            await conn.rollback()
                            raise ValueError("Insufficient funds")

                    # Update balance
                    update_query = """
                        UPDATE accounts
                        SET balance = %s,
                            last_transaction = NOW()
                        WHERE id = %s
                    """
                    await cur.execute(update_query, (new_balance, account_id))

                    # Log transaction
                    log_query = """
                        INSERT INTO transaction_log
                        (account_id, amount, type, description, timestamp)
                        VALUES (%s, %s, %s, %s, NOW())
                    """
                    await cur.execute(log_query, (
                        account_id, amount, transaction_type, description
                    ))

                    # Commit transaction
                    await conn.commit()
                    return True

                except Exception as e:
                    await conn.rollback()
                    raise

async def search_logs(
    search_term: Optional[str] = None,
    log_level: Optional[str] = None,
    start_time: Optional[str] = None,
    limit: int = 100
) -> List[Tuple]:
    # Validate limit
    if not isinstance(limit, int) or limit <= 0 or limit > 1000:
        limit = 100

    async with aiopg.create_pool(DSN) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                query = "SELECT * FROM application_logs WHERE 1=1"
                params = []

                if search_term:
                    # Sanitize search term
                    if len(search_term) > 100:
                        search_term = search_term[:100]
                    query += " AND message ILIKE %s"
                    params.append(f"%{search_term}%")

                if log_level:
                    log_level = validate_log_level(log_level)
                    query += " AND level = %s"
                    params.append(log_level)

                if start_time:
                    start_dt = validate_date(start_time)
                    query += " AND timestamp >= %s"
                    params.append(start_dt)

                query += " ORDER BY timestamp DESC LIMIT %s"
                params.append(limit)

                await cur.execute(query, tuple(params))
                return await cur.fetchall()

# Secure aiohttp routes
async def login(request):
    try:
        data = await request.json()
        username = data.get('username', '')
        password = data.get('password', '')

        if not username or not password:
            return web.json_response(
                {'error': 'Username and password required'},
                status=400
            )

        user = await authenticate_user(username, password)
        if user:
            # In production, create secure session/JWT
            return web.json_response({'success': True, 'user': user})

        return web.json_response(
            {'error': 'Invalid credentials'},
            status=401
        )
    except Exception as e:
        return web.json_response(
            {'error': 'Internal server error'},
            status=500
        )

💡 Why This Fix Works

The vulnerable code uses string concatenation and f-strings to build SQL queries with user input, allowing injection attacks. The fixed version uses parameterized queries with %s placeholders, comprehensive input validation, proper transaction handling, and secure password hashing.

Why it happens

SQL queries are assembled using string concatenation or formatting with variables instead of bound parameters in aiopg, enabling injection.

Root causes

String Concatenation in Async PostgreSQL Queries

SQL queries are assembled using string concatenation or formatting with variables instead of bound parameters in aiopg, enabling injection.

Preview example – PYTHON
# VULNERABLE: String concatenation in aiopg
import aiopg

async def get_user(user_id):
    async with aiopg.create_pool(dsn) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                # String concatenation vulnerability
                query = f"SELECT * FROM users WHERE id = {user_id}"
                await cur.execute(query)
                return await cur.fetchone()

async def search_products(name, category, min_price):
    async with aiopg.create_pool(dsn) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                # Format string vulnerability
                query = "SELECT * FROM products WHERE name LIKE '%%%s%%'" % name
                if category:
                    query += f" AND category = '{category}'"
                if min_price:
                    query += f" AND price >= {min_price}"

                await cur.execute(query)
                return await cur.fetchall()

F-strings and .format() with User Input

Using Python f-strings or .format() to build SQL queries with user-controlled values directly embedded in the query string.

Preview example – PYTHON
# VULNERABLE: F-strings with user input
async def update_user_status(user_id, status, notes):
    async with aiopg.create_pool(dsn) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                # F-string vulnerability
                query = f"""
                    UPDATE users
                    SET status = '{status}',
                        notes = '{notes}',
                        updated_at = NOW()
                    WHERE id = {user_id}
                """
                await cur.execute(query)

async def get_orders_by_date(start_date, end_date, customer_id):
    async with aiopg.create_pool(dsn) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                # .format() vulnerability
                query = """
                    SELECT * FROM orders
                    WHERE customer_id = {}
                    AND order_date BETWEEN '{}' AND '{}'
                """.format(customer_id, start_date, end_date)

                await cur.execute(query)
                return await cur.fetchall()

Fixes

1

Use Parameterized Queries with %s Placeholders

Keep SQL static and pass values separately using %s placeholders. aiopg/psycopg2 will handle proper escaping and prevent injection.

View implementation – PYTHON
# SECURE: Parameterized queries with aiopg
import aiopg

async def get_user(user_id):
    async with aiopg.create_pool(dsn) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                # Parameterized query
                query = "SELECT * FROM users WHERE id = %s"
                await cur.execute(query, (user_id,))
                return await cur.fetchone()

async def search_products(name, category, min_price):
    async with aiopg.create_pool(dsn) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                query = "SELECT * FROM products WHERE 1=1"
                params = []

                if name:
                    query += " AND name ILIKE %s"
                    params.append(f"%{name}%")
                if category:
                    query += " AND category = %s"
                    params.append(category)
                if min_price:
                    query += " AND price >= %s"
                    params.append(min_price)

                await cur.execute(query, tuple(params))
                return await cur.fetchall()
2

Use Named Parameters and Query Building Helpers

For complex queries, use query builders or named parameters with psycopg2.sql module for safe identifier handling.

View implementation – PYTHON
# SECURE: Using psycopg2.sql for safe query building
import aiopg
from psycopg2 import sql

async def dynamic_query(table_name, filters, order_by):
    # Validate table name against whitelist
    allowed_tables = ['users', 'products', 'orders']
    if table_name not in allowed_tables:
        raise ValueError("Invalid table name")

    # Validate order_by against whitelist
    allowed_columns = ['id', 'name', 'created_at', 'updated_at']
    if order_by not in allowed_columns:
        order_by = 'id'

    async with aiopg.create_pool(dsn) as pool:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                # Safe identifier handling with sql module
                query = sql.SQL("SELECT * FROM {} WHERE 1=1").format(
                    sql.Identifier(table_name)
                )

                params = []
                for field, value in filters.items():
                    if field in allowed_columns:
                        query = sql.SQL("{} AND {} = %s").format(
                            query,
                            sql.Identifier(field)
                        )
                        params.append(value)

                # Safe ordering
                query = sql.SQL("{} ORDER BY {}").format(
                    query,
                    sql.Identifier(order_by)
                )

                await cur.execute(query, tuple(params))
                return await cur.fetchall()

Detect This Vulnerability in Your Code

Sourcery automatically identifies sql injection from variable sql string concatenation in aiopg execute and many other security issues in your codebase.