# SECURE: Parameterized queries with input validation
import aiopg
import asyncio
from aiohttp import web
from psycopg2 import sql
import bcrypt
import re
from datetime import datetime
from typing import Optional, Dict, List, Tuple
DSN = 'dbname=myapp user=appuser password=password host=127.0.0.1'
# Input validation functions
def validate_username(username: str) -> str:
if not username or not re.match(r'^[a-zA-Z0-9_]{3,30}$', username):
raise ValueError("Invalid username format")
return username
def validate_date(date_str: str) -> datetime:
try:
return datetime.fromisoformat(date_str)
except (ValueError, TypeError):
raise ValueError("Invalid date format")
def validate_account_type(account_type: str) -> str:
allowed_types = ['checking', 'savings', 'credit', 'investment']
if account_type not in allowed_types:
raise ValueError("Invalid account type")
return account_type
def validate_log_level(level: str) -> str:
allowed_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
if level not in allowed_levels:
raise ValueError("Invalid log level")
return level
# Secure database functions with parameterized queries
async def authenticate_user(username: str, password: str) -> Optional[Dict]:
try:
username = validate_username(username)
except ValueError:
return None
async with aiopg.create_pool(DSN) as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# Parameterized query
query = """
SELECT id, username, email, role, password_hash
FROM users
WHERE username = %s
"""
await cur.execute(query, (username,))
user = await cur.fetchone()
if user:
# Verify password with bcrypt
if bcrypt.checkpw(password.encode('utf-8'), user[4].encode('utf-8')):
return {
'id': user[0],
'username': user[1],
'email': user[2],
'role': user[3]
}
return None
async def get_user_transactions(
user_id: int,
account_type: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None
) -> List[Tuple]:
# Validate inputs
if not isinstance(user_id, int) or user_id <= 0:
raise ValueError("Invalid user ID")
async with aiopg.create_pool(DSN) as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
query = """
SELECT t.id, t.amount, t.transaction_date,
t.description, a.account_name, a.account_type
FROM transactions t
JOIN accounts a ON t.account_id = a.id
WHERE t.user_id = %s
"""
params = [user_id]
if account_type:
account_type = validate_account_type(account_type)
query += " AND a.account_type = %s"
params.append(account_type)
if start_date:
start_dt = validate_date(start_date)
query += " AND t.transaction_date >= %s"
params.append(start_dt)
if end_date:
end_dt = validate_date(end_date)
query += " AND t.transaction_date <= %s"
params.append(end_dt)
query += " ORDER BY t.transaction_date DESC"
await cur.execute(query, tuple(params))
return await cur.fetchall()
async def update_account_balance(
account_id: int,
amount: float,
transaction_type: str,
description: str
) -> bool:
# Validate inputs
if not isinstance(account_id, int) or account_id <= 0:
raise ValueError("Invalid account ID")
if not isinstance(amount, (int, float)) or amount <= 0:
raise ValueError("Invalid amount")
if transaction_type not in ['credit', 'debit']:
raise ValueError("Invalid transaction type")
if not description or len(description) > 500:
raise ValueError("Invalid description")
async with aiopg.create_pool(DSN) as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
# Start transaction
await conn.begin()
# Get current balance with row lock
balance_query = """
SELECT balance FROM accounts
WHERE id = %s
FOR UPDATE
"""
await cur.execute(balance_query, (account_id,))
current = await cur.fetchone()
if not current:
await conn.rollback()
return False
# Calculate new balance
current_balance = current[0]
if transaction_type == 'credit':
new_balance = current_balance + amount
else:
new_balance = current_balance - amount
if new_balance < 0:
await conn.rollback()
raise ValueError("Insufficient funds")
# Update balance
update_query = """
UPDATE accounts
SET balance = %s,
last_transaction = NOW()
WHERE id = %s
"""
await cur.execute(update_query, (new_balance, account_id))
# Log transaction
log_query = """
INSERT INTO transaction_log
(account_id, amount, type, description, timestamp)
VALUES (%s, %s, %s, %s, NOW())
"""
await cur.execute(log_query, (
account_id, amount, transaction_type, description
))
# Commit transaction
await conn.commit()
return True
except Exception as e:
await conn.rollback()
raise
async def search_logs(
search_term: Optional[str] = None,
log_level: Optional[str] = None,
start_time: Optional[str] = None,
limit: int = 100
) -> List[Tuple]:
# Validate limit
if not isinstance(limit, int) or limit <= 0 or limit > 1000:
limit = 100
async with aiopg.create_pool(DSN) as pool:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
query = "SELECT * FROM application_logs WHERE 1=1"
params = []
if search_term:
# Sanitize search term
if len(search_term) > 100:
search_term = search_term[:100]
query += " AND message ILIKE %s"
params.append(f"%{search_term}%")
if log_level:
log_level = validate_log_level(log_level)
query += " AND level = %s"
params.append(log_level)
if start_time:
start_dt = validate_date(start_time)
query += " AND timestamp >= %s"
params.append(start_dt)
query += " ORDER BY timestamp DESC LIMIT %s"
params.append(limit)
await cur.execute(query, tuple(params))
return await cur.fetchall()
# Secure aiohttp routes
async def login(request):
try:
data = await request.json()
username = data.get('username', '')
password = data.get('password', '')
if not username or not password:
return web.json_response(
{'error': 'Username and password required'},
status=400
)
user = await authenticate_user(username, password)
if user:
# In production, create secure session/JWT
return web.json_response({'success': True, 'user': user})
return web.json_response(
{'error': 'Invalid credentials'},
status=401
)
except Exception as e:
return web.json_response(
{'error': 'Internal server error'},
status=500
)