from sqlalchemy import create_engine, text
from flask import Flask, request
engine = create_engine('sqlite:///app.db')
@app.route('/search_users')
def search_users():
# Vulnerable: Direct string interpolation
search_term = request.args.get('q')
# DANGEROUS: SQL injection vulnerability
query = text(f"SELECT * FROM users WHERE name LIKE '%{search_term}%'")
with engine.connect() as conn:
result = conn.execute(query)
return [dict(row) for row in result]
@app.route('/get_user/')
def get_user(user_id):
# Vulnerable: String concatenation
sql = "SELECT * FROM users WHERE id = " + user_id
with engine.connect() as conn:
result = conn.execute(text(sql))
return dict(result.fetchone())
@app.route('/delete_user', methods=['POST'])
def delete_user():
# Vulnerable: User input in DELETE query
user_id = request.form.get('user_id')
# Extremely dangerous: Can delete any data
query = text(f"DELETE FROM users WHERE id = {user_id}")
with engine.connect() as conn:
conn.execute(query)
conn.commit()
return 'User deleted'
from sqlalchemy import create_engine, text, bindparam
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from flask import Flask, request
import re
engine = create_engine('sqlite:///app.db')
Session = sessionmaker(bind=engine)
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(100))
def validate_user_input(value, field_type='string', max_length=100):
"""Validate user input based on expected type."""
if not value:
raise ValueError(f'{field_type} cannot be empty')
if field_type == 'integer':
try:
return int(value)
except ValueError:
raise ValueError('Invalid integer format')
elif field_type == 'string':
if len(value) > max_length:
raise ValueError(f'String too long (max {max_length})')
# Basic XSS prevention
if '<' in value or '>' in value:
raise ValueError('Invalid characters in input')
return str(value)
else:
raise ValueError(f'Unsupported field type: {field_type}')
@app.route('/search_users')
def search_users():
"""Secure user search using parameterized queries."""
try:
search_term = request.args.get('q', '')
# Validate input
safe_search = validate_user_input(search_term, 'string', 50)
# Secure: Parameterized query
query = text("SELECT id, name, email FROM users WHERE name LIKE :search_term")
with engine.connect() as conn:
result = conn.execute(
query,
{'search_term': f'%{safe_search}%'}
)
return [dict(row) for row in result]
except ValueError as e:
return {'error': str(e)}, 400
except Exception as e:
return {'error': 'Search failed'}, 500
@app.route('/get_user/')
def get_user(user_id):
"""Secure user retrieval using ORM."""
try:
# Validate input
safe_user_id = validate_user_input(user_id, 'integer')
# Secure: Use ORM instead of raw SQL
session = Session()
user = session.query(User).filter(User.id == safe_user_id).first()
session.close()
if not user:
return {'error': 'User not found'}, 404
return {
'id': user.id,
'name': user.name,
'email': user.email
}
except ValueError as e:
return {'error': str(e)}, 400
except Exception as e:
return {'error': 'Failed to retrieve user'}, 500
@app.route('/delete_user', methods=['POST'])
def delete_user():
"""Secure user deletion with validation."""
try:
user_id = request.form.get('user_id', '')
# Validate input
safe_user_id = validate_user_input(user_id, 'integer')
# Additional authorization check (implement as needed)
if not is_authorized_to_delete(safe_user_id):
return {'error': 'Not authorized'}, 403
# Secure: Use ORM with proper validation
session = Session()
user = session.query(User).filter(User.id == safe_user_id).first()
if not user:
session.close()
return {'error': 'User not found'}, 404
session.delete(user)
session.commit()
session.close()
return {'message': 'User deleted successfully'}
except ValueError as e:
return {'error': str(e)}, 400
except Exception as e:
return {'error': 'Deletion failed'}, 500
# Alternative: Secure raw SQL with bindparam
@app.route('/advanced_search')
def advanced_search():
"""Advanced search using bindparam for dynamic queries."""
try:
name_filter = request.args.get('name', '')
email_filter = request.args.get('email', '')
limit = request.args.get('limit', '10')
# Validate inputs
safe_limit = min(validate_user_input(limit, 'integer'), 100)
conditions = []
params = {}
if name_filter:
safe_name = validate_user_input(name_filter, 'string', 50)
conditions.append("name LIKE :name_pattern")
params['name_pattern'] = f'%{safe_name}%'
if email_filter:
safe_email = validate_user_input(email_filter, 'string', 100)
conditions.append("email LIKE :email_pattern")
params['email_pattern'] = f'%{safe_email}%'
# Build query dynamically but safely
base_query = "SELECT id, name, email FROM users"
if conditions:
where_clause = " WHERE " + " AND ".join(conditions)
full_query = base_query + where_clause + " LIMIT :limit"
else:
full_query = base_query + " LIMIT :limit"
params['limit'] = safe_limit
# Execute with parameters
query = text(full_query)
with engine.connect() as conn:
result = conn.execute(query, params)
return [dict(row) for row in result]
except ValueError as e:
return {'error': str(e)}, 400
except Exception as e:
return {'error': 'Advanced search failed'}, 500
# Helper function (implement based on your auth system)
def is_authorized_to_delete(user_id):
"""Check if current user can delete the specified user."""
# Implement your authorization logic
return True # Placeholder