import json
import mysql.connector
import os
import logging
from typing import Dict, Any, Optional
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def get_db_connection():
return mysql.connector.connect(
host=os.environ['DB_HOST'],
database=os.environ['DB_NAME'],
user=os.environ['DB_USER'],
password=os.environ['DB_PASSWORD'],
autocommit=False
)
def validate_user_id(user_id: Any) -> int:
"""Validate and convert user ID"""
try:
uid = int(user_id)
if uid <= 0:
raise ValueError("User ID must be positive")
return uid
except (ValueError, TypeError):
raise ValueError(f"Invalid user ID: {user_id}")
def validate_email(email: str) -> str:
"""Basic email validation"""
if not email or '@' not in email or len(email) > 100:
raise ValueError("Invalid email format")
return email.strip()
def lambda_handler(event, context):
"""AWS Lambda handler for user operations"""
try:
action = event.get('action')
if action == 'get_user':
return get_user(event)
elif action == 'search_users':
return search_users(event)
elif action == 'update_status':
return update_user_status(event)
else:
return {
'statusCode': 400,
'body': json.dumps({'error': 'Invalid action'})
}
except ValueError as e:
logger.error(f"Validation error: {e}")
return {
'statusCode': 400,
'body': json.dumps({'error': str(e)})
}
except Exception as e:
logger.error(f"Unexpected error: {e}")
return {
'statusCode': 500,
'body': json.dumps({'error': 'Internal server error'})
}
def get_user(event):
# Input validation
user_id = validate_user_id(event.get('userId'))
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
try:
# SECURE: Parameterized query
query = "SELECT id, name, email, department, status FROM users WHERE id = %s"
cursor.execute(query, (user_id,))
result = cursor.fetchone()
if not result:
return {
'statusCode': 404,
'body': json.dumps({'error': 'User not found'})
}
return {
'statusCode': 200,
'body': json.dumps(result)
}
finally:
cursor.close()
conn.close()
def search_users(event):
# Input validation
name = event.get('name', '').strip()
department = event.get('department', '').strip()
min_salary = event.get('minSalary', 0)
if len(name) > 100:
raise ValueError("Name search term too long")
# Validate department
valid_departments = {'IT', 'HR', 'Finance', 'Marketing', 'Sales'}
if department and department not in valid_departments:
raise ValueError(f"Invalid department: {department}")
# Validate salary
try:
min_salary = float(min_salary)
if min_salary < 0:
min_salary = 0
except (ValueError, TypeError):
min_salary = 0
conn = get_db_connection()
cursor = conn.cursor(dictionary=True)
try:
# SECURE: Parameterized query with dynamic conditions
conditions = []
params = []
base_query = "SELECT id, name, email, department FROM users WHERE 1=1"
if name:
conditions.append(" AND name LIKE %s")
params.append(f"%{name}%")
if department:
conditions.append(" AND department = %s")
params.append(department)
if min_salary > 0:
conditions.append(" AND salary >= %s")
params.append(min_salary)
query = base_query + "".join(conditions) + " LIMIT 100"
cursor.execute(query, params)
results = cursor.fetchall()
return {
'statusCode': 200,
'body': json.dumps(results)
}
finally:
cursor.close()
conn.close()
def update_user_status(event):
# Input validation
user_email = validate_email(event.get('userEmail', ''))
new_status = event.get('newStatus', '').strip()
reason = event.get('reason', '').strip()
# Validate status
valid_statuses = {'active', 'inactive', 'suspended', 'pending'}
if new_status not in valid_statuses:
raise ValueError(f"Invalid status: {new_status}")
if len(reason) > 500:
raise ValueError("Reason too long")
conn = get_db_connection()
cursor = conn.cursor()
try:
# SECURE: Parameterized query
query = ("UPDATE users SET status = %s, status_reason = %s, "
"updated_at = NOW() WHERE email = %s")
cursor.execute(query, (new_status, reason, user_email))
if cursor.rowcount == 0:
return {
'statusCode': 404,
'body': json.dumps({'error': 'User not found'})
}
conn.commit()
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Status updated successfully',
'rowsAffected': cursor.rowcount
})
}
except mysql.connector.Error as e:
conn.rollback()
logger.error(f"Database error: {e}")
raise
finally:
cursor.close()
conn.close()