С днём ИБ! Парсим таргеты с Censys без ограничений

D2

Администратор
Регистрация
19 Фев 2025
Сообщения
4,380
Реакции
0
Дорогие форумичани, хочу поздравить всех с праздником Информационной Безопасности! Желаю всем крепкого здоровья, благополучия в семье и по больше сетей =)

В данной статье я бы хотел поделиться скриптом и показать способ, с помощью которого можно парсить таргеты с Censys, без ограничений. Ну приступим, приятного прочтения!
Для начало давайте поймем, каким образом это осуществляется. Открываем документацию библиотеки https://censys-python.readthedocs.io/en/stable/usage-v2.html и выбираем 2 версию API. Есть 3 основных api эндпоинта, с которыми взаимодействует библиотека - search, view и aggregate. В нашем случае search-а достаточно. Для работы с этим эндпоинтом будем использовать класс CensysHosts и его метод search:
1701360375798.png



Переходим к методу в API Reference и видим интересный параметр - cursor:
1701360421706.png



Дело в том, что для нумерации страницы используется токен cursor-а, и в каждом ответе на наш запрос должны возвращаться токены курсора для предыдущий и следующий страницы. А данные токены можем передавать в аргументе нашего метода.
Сначала попробуем кинуть обычный запрос, чтобы увидеть токены курсоров:
Bash: Скопировать в буфер обмена
Код:
pip install censys
python3 -c "\
import os;\
import json;\
from censys.search import CensysHosts;\
os.environ['CENSYS_API_ID'] = '676cb3ab-2743-4bfe-a34d-2f89e213fea1';\
os.environ['CENSYS_API_SECRET'] = 'ST3SDT49R9znGcU0UbKjQuT0rcckj7FH';\
h = CensysHosts();\
query = h.search('dana-na', per_page=1);\
print(json.dumps(query(), sort_keys=True, indent=4))
"
1701360533416.png



Странно, токены corsor-ов нет, хотя в документации самого api-эндпоинта https://search.censys.io/api они есть - POST -> /v2/hosts/search:
1701360699461.png



Значит где-то, что-то режется. Открываем саму либу и ищем наш метод:
1701360747880.png



Сам метод в свою очередь возвращает класс CensysSearchAPIv2.Query. Если перейдем в него и посмотрим на метод __call__:
1701360780636.png



То увидим, что режется тело ответа и возвращается только массив таргетов hits. Поменяем на все тело ответа и сделаем запрос:
Bash: Скопировать в буфер обмена
Код:
pip show censys # pip покажет путь где лежит либа и по нему переходите
cd /usr/local/lib/python3.10/dist-packages/censys
vi search/v2/api.py
find ./ | grep -E "(/__pycache__$|\.pyc$|\.pyo$)" | xargs rm -rf
python3 -c "\
import os;\
import json;\
from censys.search import CensysHosts;\
os.environ['CENSYS_API_ID'] = '676cb3ab-2743-4bfe-a34d-2f89e213fea1';\
os.environ['CENSYS_API_SECRET'] = 'ST3SDT49R9znGcU0UbKjQuT0rcckj7FH';\
h = CensysHosts();\
query = h.search('dana-na', per_page=1);\
print(json.dumps(query(), sort_keys=True, indent=4))
"
1701360885623.png


1701360932360.png



Воо, теперь видим полный ответ - тут и наши токены cursor-ов, количество таргетов и код со статусом ответа =)) Имея эти токены курсоров, можно парсить таргеты с бесплатными аккаунтами без ограничений. А в случае, если в процессе парсинга с одного аккаунта будем упираться в лимиты или другие ограничения, то просто с других аккаунтов будем кидать запрос с курсором на следующую страницу и парсить дальше =))
А теперь перейдем к скрипту, но перед этим нужно зарегать аккаунты https://search.censys.io/register, забрать API ID и Secret https://search.censys.io/account/api и указать в файл config.py. Кстати, они по навешали фильтры для тэмп-мэйлов =( Вот сервис с gmail адресами - https://www.emailnator.com/
Python: Скопировать в буфер обмена
Код:
class Container:
    total_host_count = int()
    parsed_host_count = int()
    next_link_cursor = str()
    cis_countries = ['RU', 'UA', 'KZ', 'UZ', 'TJ', 'MD', 'KG', 'BY', 'AZ', 'AM']
    accounts = {
        "1": {
            "id": "7aec4000-e476-4e89-93c8-2298da95f68d",
            "secret": "gv0N9XJuSYbcTCTSO0tcvci4uwofzycp"
        },
        "2": {
            "id": "14245e42-310f-484f-ad6d-0fb437be316d",
            "secret": "IPPJfA9u0hhrzaTMBCdmAhlYWGVhbNRC"
        },
        "3": {
            "id": "6f3dc9e1-188d-4751-bef7-642f7708c4f7",
            "secret": "eDeuGHT0zRouofJzooIzOH019bBHnoZ1"
        },
        "4": {
            "id": "5d0ca1f2-71db-4cc6-bc75-59ed5ba5a5fe",
            "secret": "iSAIepiPW07121nigmj3zv9v1TSZRzV4"
        },
        "5": {
            "id": "676cb3ab-2743-4bfe-a34d-2f89e213fea1",
            "secret": "ST3SDT49R9znGcU0UbKjQuT0rcckj7FH"
        },
        "6": {
            "id": "eb44f799-f55b-40df-9a32-b8ce34893e1c",
            "secret": "WWxQxLFzvydVphW6LeXciz21hutmJgCo"
        },
        "7": {
            "id": "0396d4b2-82eb-4dd2-88ac-a64e03c50b31",
            "secret": "PZhQIsGGwehQzkm5aU24TzwpTOCFOE59"
        },
        "8": {
            "id": "0fd3cab3-5e83-4366-8250-6e30dc3629e9",
            "secret": "QT8N2v8PspJPm7OMjslwe7MBz7d4lGYS"
        }
    }

    def __init__(self):
        pass

Я указал 5 валидных, 2 с полной квотой и 1 невалидный аккаунт, чтобы показать работоспособность скрипта. Сам скрипт:
Python: Скопировать в буфер обмена
Код:
#!/usr/bin/env python3
# coding=utf-8
# ******************************************************************

from censys.common.exceptions import CensysRateLimitExceededException
from censys.common.exceptions import CensysUnauthorizedException
from censys.common.exceptions import CensysSearchException
from censys.search import CensysHosts
from config import Container
import argparse
import time
import json
import sys
import os


if len(sys.argv) <= 1:
    print("%s -h for help." % (sys.argv[0]))
    exit(0)

# Argparser
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--debug",
                    dest="debug",
                    help="Debug mode - [default: False]",
                    action='store_true')
parser.add_argument("-q", "--query",
                    dest="query",
                    type=str,
                    help="Query for search - [Dork here]")
parser.add_argument("-c", "--cursor",
                    dest="cursor",
                    type=str,
                    help="Cursor token for next links")
parser.add_argument("-ca", "--check-accounts",
                    dest="check_accounts",
                    help="Check accounts information",
                    action='store_true')
args = parser.parse_args()


def set_account_data(account_number: str) -> None:
    os.environ["CENSYS_API_ID"] = Container.accounts.get(account_number).get("id")
    os.environ["CENSYS_API_SECRET"] = Container.accounts.get(account_number).get("secret")


def unset_account_data() -> None:
    del os.environ["CENSYS_API_ID"]
    del os.environ["CENSYS_API_SECRET"]


def show_account_info(account_number: str) -> None:
    try:
        c = CensysHosts()
        account = c.account()
        print("Account number -", account_number, ":", json.dumps(account, sort_keys=True, indent=4))
    except Exception as e:
        print("Account with number -", account_number, "not valid! Exception -", e)


def show_accounts_info() -> None:
    for account_number in Container.accounts:
        set_account_data(account_number.strip())
        show_account_info(account_number.strip())
        unset_account_data()


def get_domain(i) -> str:
    if i.get("dns"):
        reverse_dns = i.get("dns").get("reverse_dns")
        if reverse_dns:
            names = reverse_dns.get("names")
            if names:
                return names[0]
            else:
                return i.get("ip")
        else:
            return i.get("ip")
    else:
        return i.get("ip")


def get_matched_ports(i) -> list:
    ports = list()
    if i.get("services"):
        matched_services = i.get("services")
        for service in matched_services:
            ports.append(service.get("port"))
  
    return ports


def get_all_services(i) -> dict:
    all_services = dict()
    if i.get("services"):
        for service in i.get("services"):
            port = service.get("port")
            service_name = service.get("service_name")
            certificate = False
          
            if service.get("certificate"):
                certificate = True

            sub_dict = {
                "service_name": service_name,
                "certificate": certificate
            }

            all_services[port] = sub_dict

    return all_services


def write_into_file(file_name: str, value: str) -> None:
    path = os.path.dirname(os.path.abspath(__file__))
    file = path + "/" + file_name
    with open(file, "a") as my_file:
        my_file.write('\n' + value)


def extract_url(domain: str, matched_ports: list, all_services: dict, degub: bool) -> None:
    scheme = "http"
    if 80 in matched_ports and 443 in matched_ports:
        matched_ports.remove(80)

    for port in matched_ports:
        port_info = all_services.get(port)
        if port_info.get("service_name") == "HTTP" or port_info.get("service_name") == "HTTPS":
            # continue

            if port_info.get("service_name") == "HTTP":
                if port_info.get("certificate") or port == 443:
                    scheme = "https"

            if port == 80 or port == 443:     
                url = scheme + "://" + domain
            else:
                url = scheme + "://" + domain + ":" + str(port)

            if degub:
                print("Result -->", url, "\n")
          
            write_into_file("targets.txt", url)


def exclude_cis_countries(query: str) -> str:
    query = "(" + query + ")"
    for i in Container.cis_countries:
        query += ' and not location.country_code: %s' % i
    return query


def run_parser(query: str, next_link_cursor: str, account_number: str) -> None:
    print("Parsing with account number -", account_number, "...")
    h = CensysHosts()
  
    if next_link_cursor:
        print("Query with cursor -", Container.next_link_cursor) 
        query_pages = h.search(query, cursor=next_link_cursor, pages=100)
    else:
        query_pages = h.search(query, pages=100)

    for idx, page in enumerate(query_pages):
        if page["code"] == 200:
            for i in page["result"]["hits"]:
                domain = get_domain(i)
                matched_ports = get_matched_ports(i)
                all_services = get_all_services(i)

                if args.debug:
                    print("!------------Target info------------!")
                    for key, value in i.items():
                        print(key, "-->", value)
                    print("\n" + "Domain -->", domain)
                    print("Matched ports", len(matched_ports), "-->", matched_ports)
                    print("All services on target:")
                    for key, value in all_services.items():
                        print(key, "-->", value)

                extract_url(domain, matched_ports, all_services, args.debug)

            Container.parsed_host_count += len(page["result"]["hits"])
            Container.next_link_cursor = page["result"]["links"]["next"]
            print("Parsed hosts count -", Container.parsed_host_count)
          
            if len(page["result"]["hits"]) < 100:
                print("Available targets count from this page -", len(page["result"]["hits"]))
                return

            time.sleep(1)
            # print("Cursor when ending parser -", Container.next_link_cursor) # Расскомментировать для отладки 
        else:
            print(json.dumps(page, sort_keys=True, indent=4))


def check_query_info(query: str) -> bool:
    h = CensysHosts()
    query_pages = h.search(query)
    for page in query_pages:
        Container.total_host_count = int(page["result"]["total"])
        print("Status -", page["code"], page["status"])
        print("Total -", page["result"]["total"])
        print("Query -", page["result"]["query"])

    return True


def run_controller() -> None:
    query_info = bool()
  
    for account_number in Container.accounts:
        set_account_data(account_number.strip())
        query = exclude_cis_countries(args.query.strip())
  
        try:
            if not query_info:
                query_info = check_query_info(query)
          
            run_parser(query, Container.next_link_cursor, account_number)

            if Container.parsed_host_count > Container.total_host_count - 50: # Дополнительная проверка, а 50-это погрешность. При парсинге может сократиться количество таргетов.
                print("Completed! Result in ./targets.txt")
                return

            time.sleep(1)
        except CensysUnauthorizedException as unauth_e:
            print("Exeption with account number", account_number, "-", unauth_e)
            print("Trying next account...")
            unset_account_data()
            time.sleep(1)
            continue
        except CensysRateLimitExceededException as raitelimit_e:
            print("Exeption with account number", account_number, "-", raitelimit_e)
            print("Trying next account...")
            unset_account_data()
            time.sleep(1)
            continue
        except CensysSearchException as search_e:
            print("Exeption with incorrect search", search_e)
            print("Incorrect query!!!")
            unset_account_data()
            time.sleep(1)
            return
        except Exception as e:
            print("Unexpected exeption", e)
            print("Trying next account...")
            unset_account_data()
            time.sleep(1)
            continue

    print("Parsing completed! Parsed targets count -", Container.parsed_host_count, "Check ./targets.txt")


def main() -> None:
    if args.cursor:
        Container.next_link_cursor = args.cursor

    if args.check_accounts:
        show_accounts_info()

    if args.query:
        run_controller()


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\nKeyboardInterrupt Detected.")
        print("Exiting...")
        print("Your next link cursor -", Container.next_link_cursor) 
        exit(0)

Данный скрипт дополняет query с ограничениями по РУ и под аккаунтом тянет таргеты, а когда упирается в лимит или в другую ошибку, переключается на другой, и используя токен курсора, продолжает тянуть таргеты. Затем начинает парсить таргеты - определяет доменные имена, порты на котором крутится HTTP(S) и в формате <scheme://domain/ip:port> записывает в файлик targets.txt.
chmod +x main.py ./main.py -q "query"
1701361420036.png


1701361454956.png



Проверка аккаунтов:
./main.py -ca
1701361539786.png



Также есть возможность указать курсор. Только потом счетчик сбросится и руками потребуется отсортировывать.
./main.py -q "query" -c "Cursor token"

В итоге на выходе получится готовый файл, который можно подавать на вход чекеру или автоэксплойтеру. На этом все, всем приятного использования =))
 
Сверху Снизу