TXT MultiTool или комбайн для работы со строками в текстовых файлах

D2

Администратор
Регистрация
19 Фев 2025
Сообщения
4,380
Реакции
0
Написал: rand
Эксклюзивно для: XSS.is

Версия Python: 3.12
Для работы требуется установить:
Bash: Скопировать в буфер обмена
pip install colorlog==6.9.0

Для работы модуля копирования строк из файла:
Bash: Скопировать в буфер обмена
pip install pyperclip==1.9.0

Для работы модуля изменения кодировки файла:
Bash: Скопировать в буфер обмена
pip install chardet==5.2.0

Всем привет, после того как был написан алгоритм по удалению дублей в больших файлах, меня некоторые просили сделать мультитул для работы с текстовиками. В одной ветке мне показали мультитул от разработчика под ником Lays, но судя по Virus Total он протроянен. Поэтому была поставлена задача перед собой повторить его функционал на петухоне. Над продуктом я работаю уже плотно около недели, и получилось уже большую часть функций реализовать. Остальные попробую сделать в течении месяца и буду обновлять этот продукт через редактирование поста (исправление найденных багов, добавление функционала). Продукт тестировался на сгенерированных строках, и может некорректно работать с вашими файлами, по найденым багам и пожеланиям просьба отписаться в ветке.

Структура ПО:
multitool_txt.py
Это основной файл который отрисовывает консольный интерфейс и вызывает основные функции из модулей:

Как выглядит интерфейс:
1733558464849.png


Код:
Python: Скопировать в буфер обмена
Код:
import os
import time
from colorama import init, Fore, Style

# Инициализация colorama
init(autoreset=True)

# Импорт всех функций
from utils.remove_duplicates import remove_duplicates
from utils.compare_databases import compare_files
from utils.normalize_base import process_lines as normalize
from utils.extract_email import extract_emails
from utils.extract_logins import extract_logins
from utils.extract_passwords import extract_passwords
from utils.sort_by_domains import sort_domains
from utils.remove_domains import remove_domains
from utils.merge_with_passwords import append_passwords
from utils.merge_databases import merge_files_in_directory
from utils.split_database import split_file
from utils.randomize import randomize_file
from utils.copy_to_clipboard import copy_to_clipboard
from utils.remove_non_latin import clean_file
from utils.format_database import convert_file_encoding, POPULAR_ENCODINGS

def main_multitool():
    def display_menu():
        print(Fore.CYAN + Style.BRIGHT + "\nВыберите операцию:")
        print(Fore.GREEN + "1. Удалить дубликаты")
        print(Fore.YELLOW + "2. Сравнить базы данных")
        print(Fore.MAGENTA + "3. Найти и извлечь Email из строк")
        print(Fore.BLUE + "4. Извлечь логины")
        print(Fore.RED + "5. Извлечь пароли")
        print(Fore.CYAN + "6. Сортировать по доменам (Email/URL)")
        print(Fore.GREEN + "7. Удалить домены в строках Email")
        print(Fore.YELLOW + "8. Склеить строки с паролями")
        print(Fore.MAGENTA + "9. Объединить базы")
        print(Fore.BLUE + "10. Разбить базу на части")
        print(Fore.RED + "11. Рандомизировать базу")
        print(Fore.CYAN + "12. Скопировать строки в буфер обмена")
        print(Fore.GREEN + "13. Удалить строки с не латинскими символами")
        print(Fore.YELLOW + "14. Изменить кодировку файла")
        print(Fore.MAGENTA + "15. Нормализовать базу")
        print(Fore.RED + Style.BRIGHT + "0. Выйти\n")

    def get_file():
        """Позволяет выбрать файл через диалоговое окно или ввод пути вручную."""
        print("\nВыберите способ ввода файла:")
        print(Fore.CYAN + "1. Ввести путь к файлу вручную")
        print(Fore.MAGENTA + "2. Выбрать файл через окно (пока в разработке)")

        while True:
            try:
                choice = int(input("Ваш выбор: "))
                if choice == 1:
                    file_path = input("Введите полный путь к файлу: ").strip()
                    if os.path.isfile(file_path):
                        return file_path
                    else:
                        print(Fore.RED + "Файл не найден. Попробуйте снова.")
                elif choice == 2:
                    print(Fore.RED + Style.BRIGHT + "Функция в разработке, ожидайте в новой версии!\n")
                    continue
                else:
                    print(Fore.YELLOW + "Введите 1 или 2.")
            except ValueError:
                print(Fore.RED + "Введите число.")

    def get_input(prompt):
        try:
            return int(input(prompt))
        except ValueError:
            print(Fore.RED + "Введите число.")
            return None

    # ASCII-заставка
    print(Fore.LIGHTGREEN_EX + """
 #    #           ##       #       #       #                      ##             #####   #   #   #####
 ##  ##            #       #               #                       #               #     #   #     #  
 # ## #  #    #    #     #####   ###     #####    ####    ####     #               #      # #      #  
 #    #  #    #    #       #       #       #     #    #  #    #    #     ######    #       #       #  
 #    #  #    #    #       #       #       #     #    #  #    #    #               #      # #      #  
 #    #  #   ##    #       #       #       #     #    #  #    #    #               #     #   #     #  
 #    #   ### #   ###       ###  #####      ###   ####    ####    ###              #     #   #     #  
                                                                                                       
 #                                                    #            ###                           #   #    ####    ####             #          
 #                                                    #           #                              #   #   #    #  #    #                        
 #####   #    #          # ###    #####  # ###    #####          ####     ####   # ###            # #    #       #               ###      #####
 #    #  #    #          ##      #    #  ##   #  #    #           #      #    #  ##                #      ####    ####             #     #    
 #    #  #    #          #       #    #  #    #  #    #           #      #    #  #                # #         #       #            #      ####
 #    #   #####          #       #   ##  #    #  #    #           #      #    #  #               #   #   #    #  #    #   ##       #          #
 #####        #          #        ### #  #    #   #####           #       ####   #               #   #    ####    ####    ##     #####   #####
          ####                                                                                                                                
    """)

    while True:
        display_menu()
        choice = get_input(Fore.CYAN + "Ваш выбор: ")

        if choice is None:
            continue

        if choice == 0:
            print(Fore.RED + "Выход из программы.")
            break

        # Выбор файла
        if not choice == 2 and not choice == 9:
            file_name = get_file()
            print(Fore.GREEN + f"Выбран файл: {file_name}")
        else:
            pass

        # Выполнение операций
        if choice == 1: # Удаление дублей
            remove_duplicates(file_name)
            print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
            time.sleep(3)
            main_multitool()
        elif choice == 2: # Сравнение файлов
            print(Fore.RED + "Предупреждение!!! Файлы при сравнении полностью загрузятся в ОЗУ, при большом размере файла возможно переполнение памяти и аварийное завершение работы!")
            input_dir = input("Введите путь к директории с файлами для сравнения: ").strip()
            output_dir = input("Введите путь к директории для сохранения результатов: ").strip()
            compare_files(input_dir, output_dir)
            print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
            time.sleep(3)
            main_multitool()
        elif choice == 3: # Извлекаем Email по регулярке
            output_file = input("Введите путь и имя выходного файла с Email (с расширением): ").strip()
            extract_emails(file_name, output_file)
            print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
            time.sleep(3)
            main_multitool()
        elif choice == 4: # Извлекаем логины
            output_file = input("Введите путь и имя выходного файла с логинами (с расширением): ").strip()
            delimiter = input("Введите разделитель (например, ':', '|', '/', ';'): ").strip()
            position = int(input("Введите позицию логина по разделителю (0 — первая колонка): ").strip())
            extract_logins(file_name, output_file, delimiter, position)
            print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
            time.sleep(3)
            main_multitool()
        elif choice == 5: # Извлекаем пароли из строк
            output_file = input("Введите путь и имя выходного файла с паролями (с расширением): ").strip()
            delimiter = input("Введите разделитель (например, ':', '|', '/', ';'): ").strip()
            position = int(input("Введите позицию пароля по разделителю (0 — первая колонка): ").strip())
            extract_passwords(file_name, output_file, delimiter, position)
            print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
            time.sleep(3)
            main_multitool()
        elif choice == 6: # Сортировка по доменам
            output_dir = input("Введите путь к директории для сохранения: ").strip()
            # Меню выбора режима
            while True:
                print(Fore.YELLOW + "\nВыберите режим обработки:")
                print(Fore.LIGHTMAGENTA_EX + "1. Сортировка всех строк в один файл (one_file)")
                print(Fore.LIGHTCYAN_EX + "2. Сортировка по доменам в отдельные файлы (by_domains)")
                print(Fore.LIGHTRED_EX + "3. Экспорт строк по указанным доменам (filter_domains)")
                print(Fore.LIGHTBLUE_EX + "0. Перезапуск меню.")

                choice = input("Введите номер режима: ").strip()

                if choice == "1":
                    output_mode = "one_file"
                    filter_domains = None
                    break
                elif choice == "2":
                    output_mode = "by_domains"
                    filter_domains = None
                    break
                elif choice == "3":
                    output_mode = "filter_domains"
                    domains_input = input("Введите домены для фильтрации (через запятую): ").strip()
                    filter_domains = [domain.strip() for domain in domains_input.split(',')]
                    break
                elif choice == "0":
                    print("Выход из программы.")
                    time.sleep(3)
                    main_multitool()
                else:
                    print("Неверный выбор. Попробуйте снова.")
            sort_domains(file_name, output_dir, output_mode, filter_domains)
            print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
            time.sleep(3)
            main_multitool()
        elif choice == 7: # Удаление доменов из Email адресов
            output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
            remove_domains(file_name, output_file)
            print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
            time.sleep(3)
            main_multitool()
        elif choice == 8: # Склеить строки с паролями
            print(Fore.RED + "Предупреждение!!! Данная функция очень требовательна к системным ресурсам.")
            output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
            mode = input("Выберите режим работы ('manual' или 'from_file'): ").strip().lower()
            delimiter = input("Введите разделитель для строки и пароля (например, ':', '|', '/', ';'): ").strip()

            if not delimiter:
                print(Fore.RED + "ОШИБКА: Разделитель не указан.")
                return

            if mode == 'manual':
                password = input("Введите пароль для добавления: ").strip()
                append_passwords(file_name, output_file, mode, password=password, delimiter=delimiter)

            elif mode == 'from_file':
                passwords_file = input("Введите путь к файлу с паролями: ").strip()
                append_passwords(file_name, output_file, mode, passwords_file=passwords_file, delimiter=delimiter)

            else:
                print(Fore.RED + "ОШИБКА: Неверный режим. Используйте 'manual' или 'from_file'.")
            print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
            time.sleep(3)
            main_multitool()
        elif choice == 9: # Объединить базы
            directory = input("Введите путь к директории с файлами для слияния: ").strip()
            output_file = input("Введите имя выходного файла (с расширением): ").strip()
            merge_files_in_directory(directory, output_file)
            print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
            time.sleep(3)
            main_multitool()
        elif choice == 10: # Разделить один файл на несколько частей
            output_directory = input("Введите путь для сохранения разделённых файлов: ").strip()
            lines_per_file = int(input("Введите количество строк в каждом выходном файле: ").strip())
            split_file(file_name, output_directory, lines_per_file)
            print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
            time.sleep(3)
            main_multitool()
        elif choice == 11: # Рандомизация строк в файле
            print(Fore.RED + "Предупреждение!!! Файл полностью загрузится в ОЗУ, при большом размере файла возможно переполнение памяти и аварийное завершение работы!")
            output_file = input("Введите путь для сохранения перемешанного файла (с расширением): ").strip()
            buffer_size = int(input("Введите размер буфера (число строк, загружаемых за раз): ").strip())
            randomize_file(file_name, output_file, buffer_size)
            print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
            time.sleep(3)
            main_multitool()
        elif choice == 12: # Копирование строк в буфер.
            try:
                print(Fore.LIGHTGREEN_EX + "Запуск процесса")
                mode = input("Выберите режим (1 — весь файл, 2 — диапазон строк): ").strip()

                if mode == '1':
                    copy_to_clipboard(file_name)
                elif mode == '2':
                    start_line = int(input("Введите номер начальной строки: ").strip())
                    end_line = int(input("Введите номер конечной строки: ").strip())
                    copy_to_clipboard(file_name, start_line, end_line)
                else:
                    print("Неверный выбор режима.")
                    print(Fore.LIGHTRED_EX + "Выбран неверный режим. Перезапуск меню...")
                    time.sleep(3)
                    main_multitool()
            except Exception as e:
                print(Fore.RED + f"Ошибка: {e}")
            finally:
                print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
                time.sleep(3)
                main_multitool()
        elif choice == 13: # Удалить строки с не латинскими буквами, почистить лог от мусора
            output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
            print("Выберите режим обработки спецсимволов:")
            print(Fore.GREEN + "1 - Разрешить все спецсимволы")
            print(Fore.YELLOW + "2 - Разрешить только специальные символы (@, ., _, -)")
            print(Fore.RED + "3 - Не разрешать спецсимволы")
            special_mode_choice = input("Введите номер режима (1, 2 или 3): ").strip()
            special_mode = ''
            if special_mode_choice == '1':
                special_mode = 'all'
            elif special_mode_choice == '2':
                special_mode = 'custom'
            elif special_mode_choice == '3':
                special_mode = 'none'
            else:
                print(Fore.RED + "Неверный выбор режима. Завершение операции.")
                return
            clean_file(file_name, output_file, special_mode)
            print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
            time.sleep(3)
            main_multitool()
        elif choice == 14: # Изменение кодировки и сортировка строк файла
            try:
                print(Fore.LIGHTGREEN_EX + "Запуск операции")

                # Ввод параметров пользователем
                output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
                print(Fore.LIGHTBLUE_EX + "Доступные кодировки:")
                for i, encoding in enumerate(POPULAR_ENCODINGS, 1):
                    print(Fore.LIGHTGREEN_EX + f"{i}. {encoding}")
                while True:
                    try:
                        choice = int(input("Выберите номер целевой кодировки: "))
                        if 1 <= choice <= len(POPULAR_ENCODINGS):
                            target_encoding = POPULAR_ENCODINGS[choice - 1]
                            break
                        else:
                            print("Некорректный выбор. Попробуйте снова.")
                    except ValueError:
                        print("Введите корректный номер из списка.")

                convert_file_encoding(file_name, output_file, target_encoding)

            except KeyboardInterrupt:
                print(Fore.LIGHTRED_EX + "Операция прервана. Перезапуск меню...")
                time.sleep(3)
                main_multitool()
            except Exception as e:
                print(Fore.LIGHTRED_EX + f"Ошибка в основной функции: {e}, перезапуск меню")
                time.sleep(3)
                main_multitool()
            finally:
                print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
                time.sleep(3)
                main_multitool()
        elif choice == 15: # Нормализатор баз
            while True:
                print(Fore.YELLOW + "\nВыберите режим обработки:")
                print(Fore.LIGHTMAGENTA_EX + "1. Форматирование строк (format)")
                print(Fore.LIGHTCYAN_EX + "2. Фильтрация строк по формату (filter)")
                print(Fore.LIGHTRED_EX + "3. Очистка строк от спецсимволов (clean)")
                print(Fore.LIGHTBLUE_EX + "4. Обрезка строк по длине (trim)")
                print(Fore.LIGHTGREEN_EX + "0. Выход.")

                choice = input("Введите номер режима: ").strip()

                if choice == "1":
                    mode = "format"
                    output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
                    delimiter = input("Введите разделитель: ").strip()
                    input_format = input("Введите ожидаемый формат входящей строки (например, log:pass:link): ").strip()
                    format_order = input("Введите новый порядок формата (например, link:log:pass): ").strip()
                    normalize(file_name, output_file, mode=mode, delimiter=delimiter, input_format=input_format, format_order=format_order)
                    print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
                    time.sleep(3)
                    main_multitool()

                elif choice == "2":
                    mode = "filter"
                    output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
                    delimiter = input("Введите разделитель: ").strip()
                    valid_format = input("Введите допустимый формат (например, mail:pass): ").strip()
                    normalize(file_name, output_file, mode=mode, delimiter=delimiter, valid_format=valid_format)
                    print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
                    time.sleep(3)
                    main_multitool()

                elif choice == "3":
                    mode = "clean"
                    output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
                    exclude_chars = input("Введите символы для исключения (например, !@#$%^&*): ").strip()
                    normalize(file_name, output_file, mode=mode, delimiter=None, exclude_chars=exclude_chars)
                    print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
                    time.sleep(3)
                    main_multitool()

                elif choice == "4":
                    mode = "trim"
                    output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
                    min_len = int(input("Введите минимальную длину строки: ").strip())
                    max_len = int(input("Введите максимальную длину строки: ").strip())
                    normalize(file_name, output_file, mode=mode, min_len=min_len, max_len=max_len, delimiter=None)
                    print(Fore.YELLOW + "Операция выполнена. Перезапуск меню...\n")
                    time.sleep(3)
                    main_multitool()

                elif choice == "0":
                    print("Выход из программы.")
                    break

                else:
                    print("Неверный выбор. Попробуйте снова.")
        else:
            print(Fore.RED + "Эта функция пока не реализована. Попробуйте другую опцию.")

if __name__ == "__main__":
    main_multitool()

Реализованные модули:

1. Удаление дубликатов (remove_duplicates.py), это просто копипаст мультипроцессорного клинера и сортировщика из моей предыдущей ветки.
Код:
Python: Скопировать в буфер обмена
Код:
# Импорт необходимых библиотек
import os  # Для работы с операционной системой и файловой системой
import heapq  # Для эффективного слияния отсортированных последовательностей
import time  # Для измерения времени выполнения скрипта
import logging  # Для ведения логов
import shutil  # Для удаления файлов и директории TEMP
import uuid  # Для генерации уникальных идентификаторов
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed  # Для параллельного выполнения задач
from colorlog import ColoredFormatter  # Для создания цветных логов

# Настройка цветного логирования
formatter = ColoredFormatter(
    "%(log_color)s%(asctime)s - %(levelname)s - %(message)s",
    datefmt=None,
    reset=True,
    log_colors={
        'DEBUG': 'cyan',
        'INFO': 'green',
        'WARNING': 'yellow',
        'ERROR': 'red',
        'CRITICAL': 'bold_red',
    }
)

# Создание и настройка обработчика логов
handler = logging.StreamHandler()  # Создаем обработчик для вывода логов в консоль
handler.setFormatter(formatter)  # Устанавливаем форматтер для обработчика
logger = logging.getLogger()  # Получаем объект логгера
logger.addHandler(handler)  # Добавляем обработчик к логгеру
logger.setLevel(logging.INFO)  # Устанавливаем уровень логирования

# Создание временной директории
temp_dir = os.path.join(os.getcwd(), 'TEMP')  # Путь к временной директории в текущей рабочей директории
if not os.path.exists(temp_dir):  # Если директория не существует
    os.makedirs(temp_dir)  # Создаем её

def create_temp_merged_file(temp_dir):
    """
    Создает временный файл для слияния данных.

    :param temp_dir: Путь к временной директории
    """
    unique_filename = os.path.join(temp_dir, f"tempfile_{uuid.uuid4().hex}.tmp")  # Генерируем уникальное имя файла
    temp_merged_file = open(unique_filename, 'w', encoding='utf-8')  # Открываем файл для записи
    return temp_merged_file, unique_filename  # Возвращаем объект файла и его имя

def heavy_computation(n):
    """
    Выполняет тяжелое вычисление (для симуляции нагрузки, процентов на 5 по моим замерам увеличивает скорость обработки лога).

    :param n: Число для вычислений
    :return: Результат вычисления
    """
    logger.debug(f"Запуск тяжелого вычисления с параметром: {n}")
    result = 0
    for i in range(n):
        result += i * i  # Выполняем сложение квадратов чисел
    logger.debug(f"Результат тяжелого вычисления: {result}")
    return result

def process_chunk_and_write_multiprocess(chunk, temp_dir):
    """
    Обрабатывает чанк данных и записывает результат во временный файл.

    :param chunk: Список строк для обработки
    :param temp_dir: Путь к временной директории
    :return: Имя созданного временного файла или None в случае ошибки
    """
    try:
        logger.info(f"Начало обработки чанка размером {len(chunk)} строк (процесс)")
        heavy_computation(10000000)  # Симуляция тяжелых вычислений
        unique_items = set(chunk)  # Убираем дубликаты на уровне чанка
        sorted_chunk = sorted(unique_items)  # Сортируем уникальные элементы
        unique_filename = os.path.join(temp_dir, f"tempfile_{uuid.uuid4()}.tmp")  # Генерируем уникальное имя файла
        with open(unique_filename, 'w', encoding='utf-8') as temp_file:
            temp_file.write("\n".join(sorted_chunk) + "\n")  # Сохраняем только уникальные строки
        return unique_filename
    except Exception as e:
        logger.error(f"Ошибка при обработке чанка: {e}")
        return None

def merge_files_parallel(temp_files, output_file, num_threads=24):
    """
    Выполняет параллельное слияние временных файлов.

    :param temp_files: Список временных файлов для слияния
    :param output_file: Имя выходного файла
    :param num_threads: Количество потоков для использования
    :return: (количество уникальных строк, количество дубликатов, имя выходного файла)
    """
    try:
        logger.info(f"Параллельное слияние {len(temp_files)} временных файлов с использованием {num_threads} потоков")
        unique_count = 0
        duplicate_count = 0

        file_iters = []
        try:
            for temp_file in temp_files:
                file_iters.append \
                    (open(temp_file, 'r', encoding='utf-8', errors='replace'))  # Открываем все временные файлы

            merged_iter = heapq.merge(*[iter(f) for f in file_iters])  # Создаем итератор для слияния

            with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
                prev_line = None
                for line in merged_iter:
                    line = line.strip()
                    if line != prev_line:  # Если текущая строка отличается от предыдущей
                        outfile.write(line + '\n')  # Записываем её в выходной файл
                        prev_line = line
                        unique_count += 1
                    else:
                        duplicate_count += 1  # Увеличиваем счетчик дубликатов

        except Exception as e:
            logger.error(f"Ошибка при слиянии файлов: {e}")
            return 0, 0

        finally:
            for f in file_iters:
                f.close()  # Закрываем все открытые файлы

        logger.info(f"Слияние завершено. Уникальных строк: {unique_count}, дублей удалено: {duplicate_count}")
        return unique_count, duplicate_count, output_file

    except Exception as e:
        logger.error(f"Ошибка при параллельном слиянии файлов: {e}")
        return 0, 0

def batch_merge(temp_files, batch_size, temp_dir, num_merge_processes=24):
    """
    Выполняет пакетное слияние временных файлов.

    :param temp_files: Список временных файлов
    :param batch_size: Размер пакета для слияния
    :param temp_dir: Путь к временной директории
    :param num_merge_processes: Количество процессов для слияния
    :return: (список объединенных файлов, общее количество уникальных строк, общее количество дубликатов)
    """
    try:
        logger.info(f"Начало пакетного слияния с размером пакета {batch_size}")
        merged_files = []
        total_unique_count = 0
        total_duplicate_count = 0

        with ProcessPoolExecutor(max_workers=num_merge_processes) as merge_executor:
            futures = []

            for i in range(0, len(temp_files), batch_size):
                batch = temp_files[i:i + batch_size]  # Формируем пакет файлов
                logger.info(f"Слияние пакета с файлов {i + 1} по {min(i + batch_size, len(temp_files))}")

                temp_merged_file, unique_filename = create_temp_merged_file(temp_dir)
                temp_merged_file.close()
                futures.append(merge_executor.submit(merge_files_parallel, batch, unique_filename))

            for future in as_completed(futures):
                unique_count, duplicate_count, temp_file = future.result()  # Получаем результат слияния
                if unique_count or duplicate_count:
                    merged_files.append(temp_file)  # Добавляем временный файл в список
                total_unique_count += unique_count
                total_duplicate_count += duplicate_count

            for temp_file in temp_files:
                if os.path.exists(temp_file):
                    logger.info(f"Удаление временного файла: {temp_file}")
                    os.remove(temp_file)  # Удаляем обработанные временные файлы
                else:
                    logger.warning(f"Файл не найден для удаления: {temp_file}")

        return merged_files, total_unique_count, total_duplicate_count

    except Exception as e:
        logger.error(f"Ошибка при пакетном слиянии: {e}")
        return temp_files, 0, 0

def final_merge(temp_dir, output_file):
    """
    Выполняет финальное слияние всех оставшихся временных файлов.

    :param temp_dir: Путь к временной директории
    :param output_file: Имя выходного файла
    :return: (количество уникальных строк, количество удаленных дубликатов)
    """
    logger.info(f"Финальная стадия слияния временных файлов из папки {temp_dir}.")
    temp_files = [os.path.join(temp_dir, f) for f in os.listdir(temp_dir) if os.path.isfile(os.path.join(temp_dir, f))]  # Список временных файлов для финального слияния

    if len(temp_files) > 1:  # Если файлов больше одного, запускаем слияние
        unique_count, duplicate_count, output_file = merge_files_parallel(temp_files, output_file)  # Параллельно сливаем файлы
    elif len(temp_files) == 1:  # Если остался только один файл
        logger.info(f"Остался один файл. Переименование {temp_files[0]} в {output_file}")
        os.rename(temp_files[0], output_file)  # Переименовываем файл в выходной
        unique_count, duplicate_count = 0, 0  # Устанавливаем нулевые значения для счетчиков
    else:
        logger.error("Не осталось временных файлов для слияния!")
        return 0, 0  # Возвращаем нули в случае ошибки

    logger.info(f"Финальное слияние завершено. Уникальных строк: {unique_count}, дублей удалено: {duplicate_count}")

    try:
        shutil.rmtree(temp_dir)  # Удаляем временную директорию
        logger.info(f"Временная папка {temp_dir} успешно удалена.")
    except Exception as e:
        logger.error(f"Ошибка при удалении временной папки {temp_dir}: {e}")

    return unique_count, duplicate_count  # Возвращаем количество уникальных строк и дубликатов

def read_and_process_chunks_multiprocess(input_file, chunk_size=2000000, num_processes=24):
    """
    Читает входной файл по чанкам и обрабатывает их в многопроцессорном режиме.

    :param input_file: Имя входного файла
    :param chunk_size: Размер чанка (количество строк)
    :param num_processes: Количество процессов для обработки
    :return: (список временных файлов, общее количество прочитанных строк)
    """
    temp_files = []  # Список для временных файлов
    chunk = []  # Буфер для хранения чанка строк
    original_count = 0  # Счетчик общего числа строк

    logger.info(f"Чтение и обработка файла {input_file} в {num_processes} процессах...")

    try:
        with open(input_file, 'r', encoding='utf-8', errors='ignore') as infile:  # Открываем входной файл для чтения
            with ProcessPoolExecutor \
                    (max_workers=num_processes) as executor:  # Создаем процессный пул для обработки чанков
                futures = []  # Список задач для выполнения

                for line in infile:  # Читаем файл построчно
                    chunk.append(line.strip())  # Добавляем строку в чанк
                    original_count += 1  # Увеличиваем счетчик строк
                    if len(chunk) >= chunk_size:  # Если чанк достиг нужного размера
                        futures.append(executor.submit(process_chunk_and_write_multiprocess, chunk, temp_dir))  # Отправляем чанк на обработку в пул процессов
                        chunk = []  # Очищаем чанк

                if chunk:  # Если остались строки после завершения цикла
                    futures.append(executor.submit(process_chunk_and_write_multiprocess, chunk, temp_dir))  # Обрабатываем оставшийся чанк

                for future in as_completed(futures):  # Ожидаем завершения всех задач
                    temp_file = future.result()  # Получаем результат задачи
                    if temp_file:
                        temp_files.append(temp_file)  # Добавляем временный файл в список

        logger.info \
            (f"Чтение и обработка завершены (процессами). Всего строк: {original_count}")  # Логируем завершение обработки файла

    except Exception as e:
        logger.error(f"Ошибка при чтении файла (процессы): {e}")  # Логируем ошибку в случае сбоя

    return temp_files, original_count  # Возвращаем список временных файлов и общее количество строк

def sort_and_uniq_streaming_multiprocess(input_file, output_file, chunk_size=2000000, batch_size=10, num_processes=24, num_merge_processes=24):
    """
    Основная функция для сортировки и удаления дубликатов из большого файла с использованием многопроцессорной обработки.

    :param input_file: Имя входного файла
    :param output_file: Имя выходного файла
    :param chunk_size: Размер чанка для обработки
    :param batch_size: Размер пакета для слияния
    :param num_processes: Количество процессов для обработки чанков
    :param num_merge_processes: Количество процессов для слияния
    :return: (общее количество строк, количество уникальных строк)
    """
    logger.info(f"Старт обработки файла {input_file}...")  # Логируем начало процесса

    temp_files, original_count = read_and_process_chunks_multiprocess(input_file, chunk_size, num_processes)  # Читаем и обрабатываем файл по чанкам
    logger.info(f"Начинается пакетное слияние временных файлов...")  # Логируем начало пакетного слияния

    total_unique_count = 0  # Инициализируем счетчик всех уникальных строк
    total_duplicate_count = 0  # Инициализируем счетчик всех дубликатов

    while len(temp_files) > batch_size:  # Пока временных файлов больше, чем размер пакета
        temp_files, unique_count, duplicate_count = batch_merge(temp_files, batch_size, temp_dir, num_merge_processes)  # Выполняем пакетное слияние
        total_unique_count += unique_count  # Обновляем общий счетчик уникальных строк
        total_duplicate_count += duplicate_count  # Обновляем общий счетчик дубликатов

    unique_count, duplicate_count = final_merge(temp_dir, output_file)  # Выполняем финальное слияние
    total_unique_count += unique_count  # Обновляем счетчик уникальных строк
    total_duplicate_count += duplicate_count  # Обновляем счетчик дубликатов

    return original_count, unique_count  # Возвращаем общее количество строк и количество уникальных строк

# Основная функция модуля
def remove_duplicates(data):
    tic = time.perf_counter()  # Начало отсчета времени
    input_file = data
    output_file = "output.txt"  # Указываем экспортируемый файл
    original_count, unique_count = sort_and_uniq_streaming_multiprocess(input_file, output_file, num_processes=24, num_merge_processes=24)  # Запускаем основную функцию обработки
    tac = time.perf_counter()  # Конец отсчета времени
    logging.info(f"Все временные файлы удалены. Уникальных строк: {unique_count}, Дублей удалено (на всех этапах процессов слияния): {original_count - unique_count}")  # Логируем результаты
    logging.info(f"Всего обработано строк: {original_count}")  # Логируем количество обработанных строк
    logging.info(f"Удаление дублей и сортировка заняли {tac - tic:0.2f} секунд")  # Логируем время выполнения
    logging.info(f"Файл успешно сохранен в {output_file}")

2. Сравнение текстовых баз (compare_databases.py). Работает следующим образом, указываете директорию где находятся текстовые файлы с базами, происходит их сравнение на дубли, дубли очищаются на выходе, получаете базы без дублей+файл с удаленными дублями (типа антипаблика).
Код:
Python: Скопировать в буфер обмена
Код:
import os
import logging
import time
from collections import defaultdict

# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


def read_file_lines(filepath):
    """
    Читает строки из файла построчно и возвращает их как множество.
    :param filepath: Путь к файлу.
    :return: Генератор строк.
    """
    logger.info(f"Начато чтение строк из файла: {filepath}")
    total_lines = 0
    try:
        with open(filepath, 'r', encoding='utf-8') as file:
            for line in file:
                total_lines += 1
                yield line.strip()
        logger.info(f"Успешно прочитано {total_lines} строк из файла: {filepath}")
    except Exception as e:
        logger.error(f"Ошибка при чтении файла {filepath}: {e}")
        raise


def compare_files(input_dir, output_dir):
    """
    Сравнивает строки в текстовых файлах из указанной директории.

    :param input_dir: Директория с текстовыми файлами.
    :param output_dir: Директория для сохранения результатов.
    """
    try:
        start_time = time.time()

        # Проверка входной и выходной директории
        logger.info(f"Проверка существования директории {input_dir}")
        if not os.path.exists(input_dir):
            raise FileNotFoundError(f"Директория {input_dir} не найдена.")

        logger.info(f"Проверка существования директории {output_dir}")
        if not os.path.exists(output_dir):
            logger.info(f"Директория не найдена, создаём: {output_dir}")
            os.makedirs(output_dir)

        # Получаем список текстовых файлов
        logger.info(f"Получение списка файлов в директории: {input_dir}")
        files = [os.path.join(input_dir, f) for f in os.listdir(input_dir) if os.path.isfile(os.path.join(input_dir, f))]
        if len(files) < 2:
            raise ValueError("Для сравнения необходимо минимум два файла.")

        logger.info(f"Найдено файлов для сравнения: {len(files)}. Список файлов: {files}")

        # Хранилище строк
        file_lines = defaultdict(set)
        all_lines = set()

        # Чтение строк из файлов
        for filepath in files:
            logger.info(f"Чтение строк из файла: {filepath}")
            for line in read_file_lines(filepath):
                file_lines[filepath].add(line)
                all_lines.add(line)

        # Определяем общие строки
        logger.info("Вычисление совпадающих строк...")
        common_lines = set.intersection(*(lines for lines in file_lines.values()))
        logger.info(f"Обнаружено {len(common_lines)} совпадающих строк.")

        # Уникальные строки для каждого файла
        for filepath, lines in file_lines.items():
            unique_lines = lines - common_lines
            unique_output_path = os.path.join(output_dir, f"unique_{os.path.basename(filepath)}")
            logger.info(f"Сохранение уникальных строк для {os.path.basename(filepath)}. Всего строк: {len(unique_lines)}.")
            with open(unique_output_path, 'w', encoding='utf-8', errors='replace') as file:
                file.writelines(f"{line}\n" for line in unique_lines)
            logger.info(f"Уникальные строки для {os.path.basename(filepath)} сохранены в {unique_output_path}")

        # Сохраняем совпадающие строки
        common_output_path = os.path.join(output_dir, "common_lines.txt")
        logger.info(f"Сохранение совпадающих строк. Всего строк: {len(common_lines)}.")
        with open(common_output_path, 'w', encoding='utf-8', errors='replace') as file:
            file.writelines(f"{line}\n" for line in common_lines)
        logger.info(f"Совпадающие строки сохранены в {common_output_path}")

        end_time = time.time()
        logger.info(f"Сравнение завершено за {end_time - start_time:.2f} секунд.")

    except Exception as e:
        logger.error(f"Ошибка при сравнении файлов: {e}")
        raise e


def main():
    input_dir = input("Введите путь к директории с файлами для сравнения: ").strip()
    output_dir = input("Введите путь к директории для сохранения результатов: ").strip()

    try:
        logger.info("Начало выполнения программы сравнения текстовых файлов.")
        compare_files(input_dir, output_dir)
        logger.info("Сравнение завершено успешно.")
    except Exception as e:
        logger.error(f"Ошибка: {e}")


if __name__ == "__main__":
    main()
3. Найти и извлечь Email из строк (extract_email.py), находит по регулярке в текстовом файле Email адреса и извлекает их.
Код:
Python: Скопировать в буфер обмена
Код:
import os
import re
import heapq
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("extract_email.log", encoding="utf-8")
    ]
)

# Регулярное выражение для извлечения email
EMAIL_REGEX = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}')

def extract_emails(input_file: str, output_file: str):
    """
    Извлекает email-адреса из строк входного файла и сохраняет их в отдельный файл.

    :param input_file: Путь к входному файлу
    :param output_file: Имя выходного файла для сохранения email-адресов
    """
    start_time = time.time()
    logger.info(f"Начало извлечения email-адресов. Входной файл: {input_file}, Выходной файл: {output_file}")

    try:
        # Проверяем наличие директории для выходного файла
        output_dir = os.path.dirname(output_file)
        if output_dir and not os.path.exists(output_dir):
            logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
            os.makedirs(output_dir)

        total_emails_extracted = 0  # Счётчик извлечённых email-адресов

        # Открываем входной и выходной файлы
        with open(input_file, 'r', encoding='utf-8', errors='replace') as infile, \
             open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:

            # Используем heapq.merge для обработки строк
            for line in heapq.merge(iter(infile)):
                try:
                    # Извлекаем все email из строки
                    emails = EMAIL_REGEX.findall(line)
                    if emails:
                        for email in emails:
                            outfile.write(email + '\n')
                        total_emails_extracted += len(emails)
                except Exception as e:
                    logger.warning(f"Ошибка обработки строки: {line.strip()}. Детали: {e}")

        logger.info(f"Извлечение завершено. Всего email-адресов извлечено: {total_emails_extracted}")

    except FileNotFoundError:
        logger.error(f"Файл {input_file} не найден.")
        return
    except Exception as e:
        logger.error(f"Ошибка в процессе извлечения: {e}")
        return

    finally:
        # Рассчитываем время выполнения
        end_time = time.time()
        elapsed_time = end_time - start_time
        logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")


def main():
    try:
        logger.info("Запуск программы")
        # Запрос параметров у пользователя
        input_file = input("Введите путь к входному файлу (с расширением): ").strip()
        output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()

        extract_emails(input_file, output_file)

    except KeyboardInterrupt:
        logger.warning("Операция прервана пользователем.")
    except Exception as e:
        logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
    main()
4. Извлечь логины (extract_logins.py). Работает по принципу, указываете разделитель в вашем логе и столбец с логином (начинается с 0), извлекаете логины.
Код:
Python: Скопировать в буфер обмена
Код:
import os
import heapq
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("extract_logins.log", encoding="utf-8")
    ]
)

def extract_logins(input_file: str, output_file: str, delimiter: str, position: int):
    """
    Извлекает логины из строк файла и сохраняет их в отдельный файл.

    :param input_file: Путь к входному файлу
    :param output_file: Имя выходного файла для сохранения логинов
    :param delimiter: Разделитель, используемый для извлечения логина
    :param position: Позиция логина по разделителю (0 — первая колонка)
    """
    start_time = time.time()
    logger.info(f"Начало извлечения логинов. Входной файл: {input_file}, Выходной файл: {output_file}, "
                f"Разделитель: '{delimiter}', Позиция: {position}")

    try:
        # Проверяем наличие директории для выходного файла
        output_dir = os.path.dirname(output_file)
        if output_dir and not os.path.exists(output_dir):
            logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
            os.makedirs(output_dir)

        total_login_extracted = 0  # Счётчик извлечённых логинов

        # Открываем входной и выходной файлы
        with open(input_file, 'r', encoding='utf-8', errors='replace') as infile, \
             open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:

            # Использую heapq для обработки строк
            for line in heapq.merge(iter(infile)):
                try:
                    # Разбиваю строку по разделителю
                    parts = line.strip().split(delimiter)
                    if len(parts) > position:
                        login = parts[position].strip()
                        outfile.write(login + '\n')
                        total_login_extracted += 1
                    else:
                        logger.warning(f"Пропуск строки: {line.strip()} (мало частей для позиции {position})")
                except Exception as e:
                    logger.warning(f"Ошибка обработки строки: {line.strip()}. Детали: {e}")

        logger.info(f"Извлечение завершено. Всего логинов извлечено: {total_login_extracted}")

    except FileNotFoundError:
        logger.error(f"Файл {input_file} не найден.")
        return
    except Exception as e:
        logger.error(f"Ошибка в процессе извлечения: {e}")
        return

    finally:
        # Рассчитываем время выполнения
        end_time = time.time()
        elapsed_time = end_time - start_time
        logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")


def main():
    try:
        logger.info("Запуск программы")
        # Запрос параметров у пользователя
        input_file = input("Введите путь к входному файлу (с раширением): ").strip()
        output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
        delimiter = input("Введите разделитель (например, ':', '|', '/', ';'): ").strip()
        position = int(input("Введите позицию логина по разделителю (0 — первая колонка): ").strip())

        extract_logins(input_file, output_file, delimiter, position)

    except KeyboardInterrupt:
        logger.warning("Операция прервана пользователем.")
    except Exception as e:
        logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
    main()

5. Извлечь пароли (extract_passwords.py). Все тоже самое что и для логинов, просто дубль функционала.
Код:
Python: Скопировать в буфер обмена
Код:
import os
import heapq
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("extract_password.log", encoding="utf-8")
    ]
)

def extract_passwords(input_file: str, output_file: str, delimiter: str, position: int):
    """
    Извлекает пароли из строк файла и сохраняет их в отдельный файл.

    :param input_file: Путь к входному файлу
    :param output_file: Имя выходного файла для сохранения паролей
    :param delimiter: Разделитель, используемый для извлечения пароля
    :param position: Позиция пароля по разделителю (0 — первая колонка)
    """
    start_time = time.time()
    logger.info(f"Начало извлечения паролей. Входной файл: {input_file}, Выходной файл: {output_file}, "
                f"Разделитель: '{delimiter}', Позиция: {position}")

    try:
        # Проверяем наличие директории для выходного файла
        output_dir = os.path.dirname(output_file)
        if output_dir and not os.path.exists(output_dir):
            logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
            os.makedirs(output_dir)

        total_passwords_extracted = 0  # Счётчик извлечённых паролей

        # Открываем входной и выходной файлы
        with open(input_file, 'r', encoding='utf-8', errors='replace') as infile, \
             open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:

            # Использую heapq для обработки строк
            for line in heapq.merge(iter(infile)):
                try:
                    # Разбиваю строку по разделителю
                    parts = line.strip().split(delimiter)
                    if len(parts) > position:
                        password = parts[position].strip()
                        outfile.write(password + '\n')
                        total_passwords_extracted += 1
                    else:
                        logger.warning(f"Пропуск строки: {line.strip()} (мало частей для позиции {position})")
                except Exception as e:
                    logger.warning(f"Ошибка обработки строки: {line.strip()}. Детали: {e}")

        logger.info(f"Извлечение завершено. Всего паролей извлечено: {total_passwords_extracted}")

    except FileNotFoundError:
        logger.error(f"Файл {input_file} не найден.")
        return
    except Exception as e:
        logger.error(f"Ошибка в процессе извлечения: {e}")
        return

    finally:
        # Рассчитываем время выполнения
        end_time = time.time()
        elapsed_time = end_time - start_time
        logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")


def main():
    try:
        logger.info("Запуск программы")
        # Запрос параметров у пользователя
        input_file = input("Введите путь к входному файлу (с раширением): ").strip()
        output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()
        delimiter = input("Введите разделитель (например, ':', '|', '/', ';'): ").strip()
        position = int(input("Введите позицию пароля по разделителю (0 — первая колонка): ").strip())

        extract_passwords(input_file, output_file, delimiter, position)

    except KeyboardInterrupt:
        logger.warning("Операция прервана пользователем.")
    except Exception as e:
        logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
    main()

6. Сортировать по доменам (Email/URL) - (sort_by_domains.py), работает в трех режимах (one_file/by_domains/filter_domains). Режим когда просто выполняется сортировка строк по доменам в 1 выходной файл, и когда файлы разбиваются по доменам, а также чтение лога и фильтрация в файл или по файлам по конкретно указанным доменам. Email или URL вычисляется динамически по регулярке.
Код:
Python: Скопировать в буфер обмена
Код:
import os
import re
import heapq
import logging
import time
import tempfile
import shutil

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s",
    handlers=[logging.StreamHandler(),
              logging.FileHandler("sort_by_domains.log", encoding="utf-8", mode='a')]
)

# Регулярные выражения для извлечения email и URL
EMAIL_REGEX = re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}')
URL_REGEX = re.compile(r'(https?://)?([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})')


def extract_domain(string: str):
    """
    Извлекает домен из строки (email или URL).

    :param string: Строка, содержащая email или URL
    :return: Домен или None
    """
    # Проверка для email
    email_match = EMAIL_REGEX.search(string)
    if email_match:
        domain = email_match.group(0).split('@')[1]
        logger.debug(f"Извлечен домен электронной почты: {domain}")
        return domain

    # Проверка для URL
    url_match = URL_REGEX.search(string)
    if url_match:
        domain = url_match.group(2)
        # Извлекаем только зону первого уровня (например, '.ru', '.com')
        parts = domain.split('.')
        if len(parts) > 1:
            top_level_domain = parts[-1]
            # Собираем домен из первого уровня
            domain = f".{top_level_domain}"
        logger.debug(f"Извлечен домен URL (по зоне первого уровня): {domain}")
        return domain

    logger.debug(f"Не удалось извлечь домен из строки: {string}")
    return None


def sort_domains(input_file: str, output_dir: str, output_mode: str, filter_domains=None):
    """
    Обрабатывает строки из входного файла и сохраняет домены в файлы.

    :param input_file: Путь к входному файлу
    :param output_dir: Путь к директории для сохранения результатов
    :param output_mode: Режим обработки ('one_file', 'by_domains', 'filter_domains')
    :param filter_domains: Список доменов для фильтрации (только для режима 'filter_domains')
    """
    try:
        # Засекаем время начала обработки
        start_time = time.time()
        logger.info(f"Начало обработки файла: {input_file}")
        logger.info(f"Режим обработки: {output_mode}")
        if filter_domains:
            logger.info(f"Фильтрация по доменам: {', '.join(filter_domains)}")

        # Проверяем наличие директории для выходных файлов
        if not os.path.exists(output_dir):
            logger.warning(f"Директория {output_dir} не существует. Создаём новую.")
            os.makedirs(output_dir)

        if output_mode == 'one_file':
            # Создаем временную папку для хранения промежуточных файлов
            temp_dir = tempfile.mkdtemp(dir=output_dir)
            logger.info(f"Создана временная папка для промежуточных файлов: {temp_dir}")

        # Статистика обработки
        total_lines = 0
        processed_lines = 0
        unique_domains = set()
        domain_line_counts = {}

        # Открываем входной файл
        with open(input_file, 'r', encoding='utf-8', errors='replace') as infile:
            # Словарь для хранения доменов по файлам
            domain_files = {}

            # Чтение строк из файла с использованием heapq.merge для эффективности и экономии ОЗУ
            for line in heapq.merge(infile):
                line = line.strip()
                total_lines += 1

                # Извлечение домена из строки
                domain = extract_domain(line)
                if domain:
                    processed_lines += 1
                    unique_domains.add(domain)

                    # Подсчет строк для каждого домена
                    domain_line_counts[domain] = domain_line_counts.get(domain, 0) + 1

                    if output_mode == "one_file":
                        # В режиме 'one_file' записываем все строки в файлы по доменам
                        temp_file_path = os.path.join(temp_dir, f"{domain}.tmp")
                        if domain not in domain_files:
                            domain_files[domain] = open(temp_file_path, 'w', encoding='utf-8')
                        domain_files[domain].write(line + '\n')
                    elif output_mode == "by_domains":
                        # В режиме 'by_domains' создаём файлы для каждого домена
                        if domain not in domain_files:
                            domain_files[domain] = open(os.path.join(output_dir, f"{domain}.txt"), 'a', encoding='utf-8')
                        domain_files[domain].write(line + '\n')
                    elif output_mode == "filter_domains" and filter_domains:
                        # В режиме 'filter_domains' сохраняем строки только для указанных доменов
                        if domain in filter_domains:
                            if domain not in domain_files:
                                domain_files[domain] = open(os.path.join(output_dir, f"{domain}.txt"), 'a', encoding='utf-8')
                            domain_files[domain].write(line + '\n')

            # Закрытие временных файлов для доменов
            for file in domain_files.values():
                file.close()

        # В режиме 'one_file' объединяем все временные файлы и сортируем их
        if output_mode == "one_file":
            logger.info("Начинаем слияние и сортировку временных файлов...")
            with open(os.path.join(output_dir, "output.txt"), 'w', encoding='utf-8') as output_file:
                # Создаём генератор для всех временных файлов
                all_temp_files = [open(os.path.join(temp_dir, f"{domain}.tmp"), 'r', encoding='utf-8')
                                  for domain in domain_files.keys()]

                # Слияние временных файлов с сортировкой по домену в один файл
                merged_lines = heapq.merge(*all_temp_files, key=lambda line: extract_domain(line))
                output_file.writelines(merged_lines)

                # Закрытие всех временных файлов
                for file in all_temp_files:
                    file.close()

                logger.info("Слияние и сортировка завершены. Все данные записаны в output.txt.")

            # Удаляем временную папку и все ее содержимое
            shutil.rmtree(temp_dir)
            logger.info(f"Временная папка {temp_dir} была удалена.")

        # Засекаем время окончания обработки
        end_time = time.time()
        elapsed_time = end_time - start_time

        # Логирование статистики обработки
        logger.info(f"Статистика обработки:")
        logger.info(f"- Всего строк в файле: {total_lines}")
        logger.info(f"- Обработано строк с доменами: {processed_lines}")
        logger.info(f"- Уникальных доменов найдено: {len(unique_domains)}")
        logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")

        # Логирование распределения доменов (топ-5)
        sorted_domains = sorted(domain_line_counts.items(), key=lambda x: x[1], reverse=True)
        logger.info("Топ-5 доменов по количеству строк:")
        for domain, count in sorted_domains[:5]:
            logger.info(f"  - {domain}: {count} строк")

    except FileNotFoundError:
        logger.error(f"ОШИБКА: Файл {input_file} не найден.")
    except PermissionError:
        logger.error(f"ОШИБКА: Нет прав доступа к файлу {input_file} или директории {output_dir}.")
    except Exception as e:
        logger.error(f"КРИТИЧЕСКАЯ ОШИБКА в процессе обработки: {e}")


def main():
    try:
        logger.info("=" * 50)
        logger.info("СТАРТ СКРИПТА: Сортировка доменов")
        logger.info("=" * 50)

        # Запрос параметров у пользователя
        input_file = input("Введите путь к входному файлу (с расширением): ").strip()
        output_dir = input("Введите путь к директории для сохранения: ").strip()

        # Меню выбора режима
        while True:
            print("\nВыберите режим обработки:")
            print("1. Сортировка всех строк в один файл (one_file)")
            print("2. Сортировка по доменам в отдельные файлы (by_domains)")
            print("3. Экспорт строк по указанным доменам (filter_domains)")
            print("0. Выход")

            choice = input("Введите номер режима: ").strip()

            if choice == "1":
                output_mode = "one_file"
                filter_domains = None
                break
            elif choice == "2":
                output_mode = "by_domains"
                filter_domains = None
                break
            elif choice == "3":
                output_mode = "filter_domains"
                domains_input = input("Введите домены для фильтрации (через запятую): ").strip()
                filter_domains = [domain.strip() for domain in domains_input.split(',')]
                break
            elif choice == "0":
                print("Выход из программы.")
                return
            else:
                print("Неверный выбор. Попробуйте снова.")

        # Обработка строк в зависимости от выбранного режима
        sort_domains(input_file, output_dir, output_mode, filter_domains)

        logger.info("=" * 50)
        logger.info("ЗАВЕРШЕНИЕ СКРИПТА")
        logger.info("=" * 50)

    except KeyboardInterrupt:
        logger.warning("ВНИМАНИЕ: Операция прервана пользователем.")
    except Exception as e:
        logger.error(f"КРИТИЧЕСКАЯ ОШИБКА в основной функции: {e}")



if __name__ == "__main__":
    main()

7. Удаление доменов из Email адресов. (remove_domains.py) Прописываете путь к файлу c мыльниками, на выходе получаете файл без @domain.com
Код:
Python: Скопировать в буфер обмена
Код:
import os
import heapq
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("remove_domains.log", encoding="utf-8", mode='a')
    ]
)


def extract_name(email: str) -> str:
    """
    Извлекает имя пользователя из email (часть до @).

    :param email: Строка с email.
    :return: Имя пользователя или пустая строка.
    """
    if "@" in email:
        return email.split("@")[0]
    return email


def line_generator(file_path: str):
    """
    Генератор для построчного чтения файла.

    :param file_path: Путь к входному файлу.
    :yield: Очищенная строка из файла.
    """
    with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
        for line in file:
            yield line.strip()


def remove_domains(input_file: str, output_file: str):
    """
    Удаляет домены из строк электронной почты и записывает результат в выходной файл.

    :param input_file: Путь к входному файлу.
    :param output_file: Путь к выходному файлу.
    """
    try:
        start_time = time.time()
        logger.info(f"Начало обработки файла: {input_file}")

        # Проверяем наличие директории для выходного файла и создаём её при необходимости
        output_dir = os.path.dirname(output_file)
        if output_dir and not os.path.exists(output_dir):
            logger.warning(f"Директория {output_dir} не существует. Создаём новую.")
            os.makedirs(output_dir)

        # Проверяем наличие расширения у выходного файла
        if not os.path.splitext(output_file)[1]:
            logger.warning(f"У файла {output_file} отсутствует расширение. Добавляем '.txt'.")
            output_file += ".txt"

        # Открыть файл для записи и обработать данные
        with open(output_file, 'w', encoding='utf-8') as outfile:
            processed_lines = (
                extract_name(email) for email in line_generator(input_file)
            )
            for name in heapq.merge(processed_lines):
                outfile.write(name + '\n')

        end_time = time.time()
        logger.info(f"Обработка завершена. Время выполнения: {end_time - start_time:.2f} секунд.")
        logger.info(f"Результат записан в файл: {output_file}")

    except FileNotFoundError:
        logger.error(f"ОШИБКА: Файл {input_file} не найден.")
    except PermissionError:
        logger.error(f"ОШИБКА: Нет прав доступа к файлу {input_file} или {output_file}.")
    except Exception as e:
        logger.error(f"КРИТИЧЕСКАЯ ОШИБКА: {e}")


def main():
    try:
        logger.info("=" * 50)
        logger.info("СТАРТ СКРИПТА: Удаление доменов из email")
        logger.info("=" * 50)

        # Запрос параметров у пользователя
        input_file = input("Введите путь к входному файлу (с расширением): ").strip()
        output_file = input("Введите путь к выходному файлу (с расширением): ").strip()

        if not os.path.exists(input_file):
            logger.error(f"Входной файл {input_file} не найден.")
            return

        remove_domains(input_file, output_file)

        logger.info("=" * 50)
        logger.info("ЗАВЕРШЕНИЕ СКРИПТА")
        logger.info("=" * 50)

    except KeyboardInterrupt:
        logger.warning("ВНИМАНИЕ: Операция прервана пользователем.")
    except Exception as e:
        logger.error(f"КРИТИЧЕСКАЯ ОШИБКА в основной функции: {e}")


if __name__ == "__main__":
    main()
8. Склеить строки с паролями(merge_with_passwords.py) - (допустим надо склеить строки для брута). Работает в двух режимах. ('manual' или 'from_file'). В первом режиме вы задаете 1 пароль на строку, во втором склеиваете 2 списка это логины и пароли (допустим).
Код:
Python: Скопировать в буфер обмена
Код:
import os
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("append_passwords.log", encoding='utf-8', mode='a')
    ]
)


def line_generator(file_path: str):
    """
    Генератор для построчного чтения файла.

    :param file_path: Путь к входному файлу.
    """
    with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
        for line in file:
            yield line.strip()


def password_generator(file_path: str):
    """
    Генератор для построчного чтения файла с паролями.

    :param file_path: Путь к файлу с паролями.
    :yield: Пароль из файла.
    """
    with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
        for line in file:
            yield line.strip()


def append_passwords_with_password_file(input_file: str, output_file: str, passwords_file: str, delimiter: str):
    """
    Режим добавления паролей из файла, добавляя все пароли на каждую строку.

    :param input_file: Путь к входному файлу.
    :param output_file: Путь к выходному файлу.
    :param passwords_file: Путь к файлу с паролями.
    :param delimiter: Разделитель для строки и пароля.
    """
    with open(output_file, 'w', encoding='utf-8') as outfile:
        input_lines = line_generator(input_file)
        password_lines = list(password_generator(passwords_file))  # Сохраняем все пароли в список

        for line in input_lines:
            for password in password_lines:
                outfile.write(f"{line}{delimiter}{password}\n")


def append_passwords_with_manual_password(input_file: str, output_file: str, password: str, delimiter: str):
    """
    Режим добавления указанного пользователем пароля к строкам.

    :param input_file: Путь к входному файлу.
    :param output_file: Путь к выходному файлу.
    :param password: Пароль для добавления.
    :param delimiter: Разделитель для строки и пароля.
    """
    with open(output_file, 'w', encoding='utf-8') as outfile:
        for line in line_generator(input_file):
            outfile.write(f"{line}{delimiter}{password}\n")


def append_passwords(input_file: str, output_file: str, mode: str, password: str = None, passwords_file: str = None, delimiter: str = ':'):
    """
    Добавляет пароли к строкам в зависимости от выбранного режима.

    :param input_file: Путь к входному файлу.
    :param output_file: Путь к выходному файлу.
    :param mode: Режим работы ('manual' или 'from_file').
    :param password: Пароль для режима 'manual'.
    :param passwords_file: Путь к файлу с паролями для режима 'from_file'.
    :param delimiter: Разделитель для строки и пароля.
    """
    try:
        start_time = time.time()
        logger.info(f"Начало обработки файла: {input_file}")
        logger.info(f"Выбранный режим: {mode}")

        # Проверяем наличие директории для выходного файла и создаём её при необходимости
        output_dir = os.path.dirname(output_file)
        if output_dir and not os.path.exists(output_dir):
            logger.warning(f"Директория {output_dir} не существует. Создаём новую.")
            os.makedirs(output_dir)

        # Проверяем наличие расширения у выходного файла
        if not os.path.splitext(output_file)[1]:
            logger.warning(f"У файла {output_file} отсутствует расширение. Добавляем '.txt'.")
            output_file += ".txt"

        if mode == 'manual':
            if password is None:
                raise ValueError("Для режима 'manual' необходимо указать пароль.")
            append_passwords_with_manual_password(input_file, output_file, password, delimiter)

        elif mode == 'from_file':
            if passwords_file is None or not os.path.exists(passwords_file):
                raise ValueError("Для режима 'from_file' необходимо указать существующий файл с паролями.")
            append_passwords_with_password_file(input_file, output_file, passwords_file, delimiter)

        else:
            raise ValueError("Неверный режим. Используйте 'manual' или 'from_file'.")

        end_time = time.time()
        logger.info(f"Обработка завершена. Время выполнения: {end_time - start_time:.2f} секунд.")
        logger.info(f"Результат записан в файл: {output_file}")

    except Exception as e:
        logger.error(f"КРИТИЧЕСКАЯ ОШИБКА: {e}")


def main():
    try:
        logger.info("=" * 50)
        logger.info("СТАРТ СКРИПТА: Добавление пароля к строкам")
        logger.info("=" * 50)

        # Запрос параметров у пользователя
        input_file = input("Введите путь к входному файлу (с расширением): ").strip()
        output_file = input("Введите путь к выходному файлу (с расширением): ").strip()
        mode = input("Выберите режим работы ('manual' или 'from_file'): ").strip().lower()
        delimiter = input("Введите разделитель для строки и пароля (например, ':', '|', '/', ';'): ").strip()

        if not delimiter:
            logger.error("ОШИБКА: Разделитель не указан.")
            return

        if mode == 'manual':
            password = input("Введите пароль для добавления: ").strip()
            append_passwords(input_file, output_file, mode, password=password, delimiter=delimiter)

        elif mode == 'from_file':
            passwords_file = input("Введите путь к файлу с паролями: ").strip()
            append_passwords(input_file, output_file, mode, passwords_file=passwords_file, delimiter=delimiter)

        else:
            logger.error("ОШИБКА: Неверный режим. Используйте 'manual' или 'from_file'.")

        logger.info("=" * 50)
        logger.info("ЗАВЕРШЕНИЕ СКРИПТА")
        logger.info("=" * 50)

    except KeyboardInterrupt:
        logger.warning("ВНИМАНИЕ: Операция прервана пользователем.")
    except Exception as e:
        logger.error(f"КРИТИЧЕСКАЯ ОШИБКА в основной функции: {e}")


if __name__ == "__main__":
    main()

9. Объединение баз. (merge_databases.py) Выполнение слияния нескольких текстовиков в один файл. Указываете директорию с текстовиками, на выходе получаете один файл.
Код:
Python: Скопировать в буфер обмена
Код:
import os
import heapq
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("merge_databases.log", encoding="utf-8")
    ]
)


def merge_files_in_directory(directory: str, output_file: str):
    """
    Сливает все файлы из указанной директории в один файл.

    :param directory: Путь к директории с файлами для слияния
    :param output_file: Имя выходного файла
    """
    start_time = time.time()
    logger.info(f"Начало слияния файлов. Директория: {directory}, Выходной файл: {output_file}")

    try:
        # Получаем список всех файлов в директории
        temp_files = [os.path.join(directory, f) for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]

        if not temp_files:
            logger.error(f"В указанной директории {directory} не найдено файлов для слияния.")
            return

        logger.info(f"Обнаружено {len(temp_files)} файлов для обработки.")

        file_iters = []
        total_lines_written = 0  # Счётчик строк

        try:
            # Открываем все файлы для чтения
            for temp_file in temp_files:
                logger.info(f"Открытие файла: {temp_file}")
                file_iters.append(open(temp_file, 'r', encoding='utf-8', errors='replace'))

            # Открываем выходной файл для записи
            with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
                logger.info(f"Запись в файл: {output_file}")

                # Записываем строки из всех файлов, используя merge
                for line in heapq.merge(*[iter(f) for f in file_iters]):
                    outfile.write(line)
                    total_lines_written += 1

            logger.info(f"Слияние завершено. Всего строк записано: {total_lines_written}")

        except Exception as e:
            logger.error(f"Ошибка при обработке файлов: {e}")
            return

        finally:
            # Закрываем все открытые файлы
            for f in file_iters:
                logger.info(f"Закрытие файла: {f.name}")
                f.close()

    except Exception as e:
        logger.error(f"Ошибка в процессе слияния: {e}")
        return

    finally:
        # Рассчитываем время выполнения
        end_time = time.time()
        elapsed_time = end_time - start_time
        logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")


def main():
    """
    Основная функция для выбора директории и вызова слияния.
    """
    try:
        logger.info("Запуск программы")
        # Запрос директории и имени выходного файла у пользователя
        directory = input("Введите путь к директории с файлами для слияния: ").strip()
        output_file = input("Введите имя выходного файла (с расширением): ").strip()

        if not os.path.isdir(directory):
            logger.error(f"Указанный путь {directory} не является директорией.")
            return

        merge_files_in_directory(directory, output_file)

    except KeyboardInterrupt:
        logger.warning("Операция прервана пользователем.")
    except Exception as e:
        logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
    main()

10. Разделить один файл на несколько частей (split_database.py). Ну тут уже идет реверс, отдаете один файл, указываете по какому количеству строк его разбить, получаете несколько сплит файлов.
Код:
Python: Скопировать в буфер обмена
Код:
import os
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("split_database.log", encoding="utf-8")
    ]
)


def split_file(input_file: str, output_directory: str, lines_per_file: int):
    """
    Разделяет файл на несколько частей с минимальным использованием памяти.

    :param input_file: Путь к входному файлу
    :param output_directory: Директория для сохранения разделённых файлов
    :param lines_per_file: Количество строк в каждом выходном файле
    """
    start_time = time.time()
    logger.info(f"Начало разделения файла. Входной файл: {input_file}, Директория: {output_directory}, Строк на файл: {lines_per_file}")

    try:
        # Проверка входного файла
        if not os.path.isfile(input_file):
            logger.error(f"Файл {input_file} не найден или не является файлом.")
            return

        # Проверка выходной директории
        if not os.path.exists(output_directory):
            logger.info(f"Выходная директория {output_directory} не найдена. Создаём её.")
            os.makedirs(output_directory)

        # Итеративное чтение и запись
        def file_iterator(file_path):
            with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
                for line in file:
                    yield line

        # Итератор для входного файла
        file_iter = file_iterator(input_file)
        file_index = 1
        current_file = None
        lines_written = 0

        for line in file_iter:
            if lines_written % lines_per_file == 0:  # Новый файл
                if current_file:
                    current_file.close()
                    logger.info(f"Закрыт файл: {current_file.name} (строк: {lines_written})")
                output_file = os.path.join(output_directory, f"split_part_{file_index}.txt")
                current_file = open(output_file, 'w', encoding='utf-8', errors='replace')
                logger.info(f"Создан файл: {output_file}")
                file_index += 1
                lines_written = 0

            current_file.write(line)
            lines_written += 1

        if current_file:
            current_file.close()
            logger.info(f"Закрыт файл: {current_file.name} (строк: {lines_written})")

    except Exception as e:
        logger.error(f"Ошибка в процессе разделения: {e}")
        return

    finally:
        # Рассчитываем время выполнения
        end_time = time.time()
        elapsed_time = end_time - start_time
        logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")


def main():
    """
    Основная функция для выбора входного файла, выходной директории и параметров разделения.
    """
    try:
        logger.info("Запуск программы")

        # Запрос данных у пользователя
        input_file = input("Введите путь к файлу для разделения: ").strip()
        output_directory = input("Введите путь для сохранения разделённых файлов: ").strip()
        lines_per_file = int(input("Введите количество строк в каждом выходном файле: ").strip())

        split_file(input_file, output_directory, lines_per_file)

    except KeyboardInterrupt:
        logger.warning("Операция прервана пользователем.")
    except Exception as e:
        logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
    main()

11. Рандомизация строк в файле (randomize.py). Тут выполняется перемешивание строк в файле который отдаете рандомным образом, на выходе получаете рандомизированные позиции строк в файле.
Код:
Python: Скопировать в буфер обмена
Код:
import os
import random
import logging
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("randomize.log", encoding="utf-8")
    ]
)


def randomize_file(input_file: str, output_file: str, buffer_size: int):
    """
    Перемешивает строки в файле случайным образом.

    :param input_file: Путь к входному файлу
    :param output_file: Путь к выходному файлу
    :param buffer_size: Количество строк, загружаемых в память за раз
    """
    start_time = time.time()
    logger.info(f"Начало рандомизации файла. Входной файл: {input_file}, Выходной файл: {output_file}, Размер буфера: {buffer_size}")

    try:
        # Проверка входного файла
        if not os.path.isfile(input_file):
            logger.error(f"Файл {input_file} не найден или не является файлом.")
            return

        # Проверка выхода
        output_dir = os.path.dirname(output_file)
        if output_dir and not os.path.exists(output_dir):
            logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
            os.makedirs(output_dir)

        # Чтение строк по частям
        all_lines = []
        with open(input_file, 'r', encoding='utf-8', errors='replace') as infile:
            logger.info(f"Чтение строк из {input_file} с использованием буфера")
            buffer = []
            for line in infile:
                buffer.append(line)
                if len(buffer) >= buffer_size:
                    logger.debug(f"Загружен буфер из {len(buffer)} строк")
                    all_lines.extend(buffer)
                    buffer = []
            if buffer:
                all_lines.extend(buffer)

        logger.info(f"Файл загружен в память. Всего строк: {len(all_lines)}")

        # Перемешивание строк
        random.shuffle(all_lines)
        logger.info(f"Строки успешно перемешаны")

        # Запись результата
        with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
            logger.info(f"Запись перемешанных строк в {output_file}")
            outfile.writelines(all_lines)

        logger.info(f"Успешное завершение. Перемешанный файл сохранён в {output_file}")

    except Exception as e:
        logger.error(f"Ошибка при обработке: {e}")

    finally:
        # Рассчитываем время выполнения
        end_time = time.time()
        elapsed_time = end_time - start_time
        logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")


def main():
    """
    Основная функция для выбора входного и выходного файлов, а также настроек рандомизации.
    """
    try:
        logger.info("Запуск программы")

        # Запрос данных у пользователя
        input_file = input("Введите путь к файлу для рандомизации: ").strip()
        output_file = input("Введите путь для сохранения перемешанного файла: ").strip()
        buffer_size = int(input("Введите размер буфера (число строк, загружаемых за раз): ").strip())

        randomize_file(input_file, output_file, buffer_size)

    except KeyboardInterrupt:
        logger.warning("Операция прервана пользователем.")
    except ValueError:
        logger.error("Ошибка: размер буфера должен быть числом.")
    except Exception as e:
        logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
    main()

12. Копирование строк из текстового файла, работает с использованием библиотеки pyperclip, так что не забудьте сделать pip install pyperclip==1.9.0 , работает в двух режимах: 1. Копирование строк из файла полностью; 2. Копирование строк по диапазону из файла. Предупреждение: следите за объемом ОЗУ при работе этого модуля.
Код:

Python: Скопировать в буфер обмена
Код:
import logging
import pyperclip  # Для работы с буфером обмена

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("copy_to_clipboard.log", encoding="utf-8")
    ]
)


def read_file_lines(file_path, start_line=None, end_line=None):
    """
    Генератор для чтения строк из файла с поддержкой диапазона.

    :param file_path: Путь к файлу
    :param start_line: Номер начальной строки (1-based, включительно)
    :param end_line: Номер конечной строки (1-based, включительно)
    """
    with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
        for current_line_num, line in enumerate(f, start=1):
            if start_line and current_line_num < start_line:
                continue
            if end_line and current_line_num > end_line:
                break
            yield line.strip()


def copy_to_clipboard(file_path, start_line=None, end_line=None):
    """
    Копирует содержимое файла или его части в буфер обмена.

    :param file_path: Путь к файлу
    :param start_line: Номер начальной строки (1-based, включительно)
    :param end_line: Номер конечной строки (1-based, включительно)
    """
    try:
        logger.info(f"Начало копирования. Файл: {file_path}, Диапазон: {start_line}-{end_line}")
        lines = list(read_file_lines(file_path, start_line, end_line))

        if not lines:
            logger.warning("Выбранный диапазон пуст.")
            return

        # Отображаем первую и последнюю строки диапазона
        print(f"Первая строка: {lines[0]}")
        print(f"Последняя строка: {lines[-1]}")
        confirm = input("Скопировать в буфер обмена? (y/n): ").strip().lower()

        if confirm != 'y':
            print("Операция отменена пользователем.")
            return

        # Копируем в буфер обмена
        pyperclip.copy('\n'.join(lines))
        print("Данные скопированы в буфер обмена.")
        logger.info("Копирование завершено успешно.")

    except FileNotFoundError:
        logger.error(f"Файл {file_path} не найден.")
    except Exception as e:
        logger.error(f"Ошибка в процессе копирования: {e}")


def main():
    try:
        logger.info("Запуск программы")
        file_path = input("Введите путь к файлу: ").strip()
        mode = input("Выберите режим (1 — весь файл, 2 — диапазон строк): ").strip()

        if mode == '1':
            copy_to_clipboard(file_path)
        elif mode == '2':
            start_line = int(input("Введите номер начальной строки: ").strip())
            end_line = int(input("Введите номер конечной строки: ").strip())
            copy_to_clipboard(file_path, start_line, end_line)
        else:
            print("Неверный выбор режима.")
            logger.warning("Выбран неверный режим.")

    except KeyboardInterrupt:
        logger.warning("Операция прервана пользователем.")
    except Exception as e:
        logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
    main()

13. Чистка текстового файла от мусора. Работает в 3х режимах по выбору (1. Разрешить все спецсимволы, 2. Разрешить только специальные символы (@, ., _, -), 3. Не разрешать спецсимволы)
Python: Скопировать в буфер обмена
Код:
import os
import logging
import time
import string

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("remove_non_latin.log", encoding="utf-8")
    ]
)

def is_latin_line(line: str, special_mode: str) -> bool:
    """
    Проверяет, состоит ли строка из латинских символов и спецсимволов в зависимости от режима.

    :param line: Входная строка
    :param special_mode: Режим обработки спецсимволов:
                         - 'all' - разрешить все спецсимволы.
                         - 'custom' - разрешить только определенные спецсимволы.
                         - 'none' - не разрешать спецсимволы.
    :return: True, если строка соответствует требованиям.
    """
    # Основной набор символов: латинские буквы, цифры и пробелы
    allowed_chars = string.ascii_letters + string.digits + ' '

    if special_mode == 'all':
        # Разрешаем все печатаемые символы
        allowed_chars += string.punctuation
    elif special_mode == 'custom':
        # Разрешаем только указанные символы
        allowed_chars += "@._-"
    # special_mode == 'none': никаких дополнительных символов

    return all(char in allowed_chars for char in line)

def line_generator(file_path: str):
    """
    Генератор для построчного чтения файла.

    :param file_path: Путь к файлу
    """
    with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
        for line in file:
            yield line.strip()

def clean_file(input_file: str, output_file: str, special_mode: str):
    """
    Очищает файл, удаляя строки с нелатинскими символами или нежелательными символами.

    :param input_file: Путь к входному файлу
    :param output_file: Имя выходного файла
    :param special_mode: Режим обработки спецсимволов ('all', 'custom', 'none')
    """
    start_time = time.time()
    logger.info(f"Начало очистки файла. Входной файл: {input_file}, Выходной файл: {output_file}, "
                f"Режим спецсимволов: {special_mode}")

    try:
        # Проверяем наличие директории для выходного файла
        output_dir = os.path.dirname(output_file)
        if output_dir and not os.path.exists(output_dir):
            logger.info(f"Выходная директория {output_dir} не найдена. Создаём её.")
            os.makedirs(output_dir)

        total_lines = 0  # Общее количество строк
        kept_lines = 0   # Количество оставленных строк
        removed_lines = 0  # Количество удаленных строк

        # Открываем файл для записи
        with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
            for line in line_generator(input_file):
                total_lines += 1
                if is_latin_line(line, special_mode):
                    outfile.write(line + '\n')
                    kept_lines += 1
                else:
                    removed_lines += 1

        logger.info(f"Очистка завершена. Обработано строк: {total_lines}, Сохранено строк: {kept_lines}, "
                    f"Удалено строк: {removed_lines}")

    except FileNotFoundError:
        logger.error(f"Файл {input_file} не найден.")
        return
    except Exception as e:
        logger.error(f"Ошибка в процессе очистки: {e}")
        return
    finally:
        # Рассчитываем время выполнения
        end_time = time.time()
        elapsed_time = end_time - start_time
        logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")

def main():
    try:
        logger.info("Запуск программы")
        # Запрос параметров у юзера
        input_file = input("Введите путь к входному файлу (с расширением): ").strip()
        output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()

        print("Выберите режим обработки спецсимволов:")
        print("1 - Разрешить все спецсимволы")
        print("2 - Разрешить только специальные символы (@, ., _, -)")
        print("3 - Не разрешать спецсимволы")
        special_mode_choice = input("Введите номер режима (1, 2 или 3): ").strip()

        special_mode = ''
        if special_mode_choice == '1':
            special_mode = 'all'
        elif special_mode_choice == '2':
            special_mode = 'custom'
        elif special_mode_choice == '3':
            special_mode = 'none'
        else:
            logger.error("Неверный выбор режима. Завершение программы.")
            return

        clean_file(input_file, output_file, special_mode)

    except KeyboardInterrupt:
        logger.warning("Операция прервана пользователем.")
    except Exception as e:
        logger.error(f"Ошибка в основной функции: {e}")

if __name__ == "__main__":
    main()

14. Изменение кодировки файла. Дисклеймер: Модуль может работать кривовато, на моих тестах вроде нормально преобразует, на ваших боевых файлах как работать будет не знаю. Юзать на свой страх и риск. По найденным ошибкам просьба отписаться в ПМ или в ветке. Поддерживает преобразование в следующие кодировки (исходная кодировка в теории у файла может быть любой):
  1. UTF-8
  2. UTF-16
  3. UTF-16LE
  4. UTF-16BE
  5. ASCII
  6. Windows-1251
  7. ISO-8859-1
  8. KOI8-R
  9. GB18030
  10. Big5
Код:

Python: Скопировать в буфер обмена
Код:
import os
import logging
import chardet
import time

# Настройка логгера
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("format_database.log", encoding="utf-8")
    ]
)

POPULAR_ENCODINGS = [
    "UTF-8", "UTF-16", "UTF-16LE", "UTF-16BE", "ASCII",
    "Windows-1251", "ISO-8859-1", "KOI8-R", "GB18030", "Big5"
]


def detect_file_encoding(file_path, sample_size=262144):
    """
    Определяет кодировку текстового файла.

    :param file_path: Путь к файлу
    :param sample_size: Количество байт для анализа
    :return: Предполагаемая кодировка файла
    """
    try:
        with open(file_path, 'rb') as file:
            raw_data = file.read(sample_size)
        result = chardet.detect(raw_data)
        detected_encoding = result.get('encoding', None)
        confidence = result.get('confidence', 0)

        logger.warning(f"Результат анализа кодировки: {result}")

        if detected_encoding is None or detected_encoding.lower() == 'ascii':
            logger.warning(f"Кодировка файла '{file_path}' не определена или определена как ASCII.")
            print("Укажите предполагаемую кодировку (например, UTF-8, Windows-1251): ")
            detected_encoding = input("Введите кодировку: ").strip()

        elif confidence < 0.8:
            logger.warning(f"Кодировка '{detected_encoding}' определена с низкой уверенностью ({confidence:.2f}).")
            print("Обнаружена кодировка с низкой уверенностью. Хотите указать свою? (y/n): ")
            if input().strip().lower() == 'y':
                detected_encoding = input("Введите кодировку: ").strip()

        logger.info(f"Обнаружена кодировка файла '{file_path}': {detected_encoding}")
        return detected_encoding

    except Exception as e:
        logger.error(f"Ошибка определения кодировки файла {file_path}: {e}")
        return 'utf-8'


def add_bom_if_needed(file_path, encoding):
    """
    Добавляет BOM (маркер порядка байт) в файл, если кодировка поддерживает его.

    :param file_path: Путь к файлу
    :param encoding: Кодировка файла
    """
    bom_map = {
        "utf-8": b'\xef\xbb\xbf',
        "utf-16": b'\xff\xfe',  # UTF-16 LE по умолчанию
        "utf-16le": b'\xff\xfe',
        "utf-16be": b'\xfe\xff'
    }

    if encoding.lower() in bom_map:
        logger.info(f"Добавление BOM для кодировки {encoding}")
        bom = bom_map[encoding.lower()]
        with open(file_path, 'rb') as f:
            content = f.read()

        with open(file_path, 'wb') as f:
            f.write(bom + content)
        logger.info(f"BOM успешно добавлен для файла: {file_path}")


def convert_file_encoding(input_file, output_file, target_encoding):
    """
    Преобразует кодировку текстового файла построчно и добавляет BOM, если это необходимо.

    :param input_file: Путь к входному файлу
    :param output_file: Путь к выходному файлу
    :param target_encoding: Целевая кодировка
    """
    start_time = time.time()
    logger.info(f"Начало преобразования кодировки. Входной файл: {input_file}, Целевая кодировка: {target_encoding}")

    try:
        source_encoding = detect_file_encoding(input_file)
        output_dir = os.path.dirname(output_file)
        if output_dir and not os.path.exists(output_dir):
            os.makedirs(output_dir)

        with open(input_file, 'r', encoding=source_encoding, errors='replace') as infile, \
                open(output_file, 'w', encoding=target_encoding, errors='replace') as outfile:
            for line in infile:
                outfile.write(line.rstrip('\r\n') + '\n')

        # Добавление BOM, если требуется
        add_bom_if_needed(output_file, target_encoding)

        logger.info(f"Преобразование завершено. Файл сохранён: {output_file}")

        # Проверка записи
        verify_result = verify_encoding(output_file)
        logger.info(f"Кодировка выходного файла определена как: {verify_result.get('encoding')}")

    except FileNotFoundError:
        logger.error(f"Файл {input_file} не найден.")
    except Exception as e:
        logger.error(f"Ошибка в процессе преобразования: {e}")
    finally:
        elapsed_time = time.time() - start_time
        logger.info(f"Время выполнения: {elapsed_time:.2f} секунд.")


def verify_encoding(file_path):
    """
    Проверяет кодировку файла.

    :param file_path: Путь к файлу
    :return: Результаты анализа кодировки
    """
    with open(file_path, 'rb') as f:
        raw_data = f.read()
    result = chardet.detect(raw_data)
    return result


def main():
    """
    Основная функция для выполнения программы.
    """
    try:
        logger.info("Запуск программы")

        input_file = input("Введите путь к входному файлу (с расширением): ").strip()
        output_file = input("Введите путь и имя выходного файла (с расширением): ").strip()

        print("Доступные кодировки:")
        for i, encoding in enumerate(POPULAR_ENCODINGS, 1):
            print(f"{i}. {encoding}")

        while True:
            try:
                choice = int(input("Выберите номер целевой кодировки: "))
                if 1 <= choice <= len(POPULAR_ENCODINGS):
                    target_encoding = POPULAR_ENCODINGS[choice - 1]
                    break
                else:
                    print("Некорректный выбор. Попробуйте снова.")
            except ValueError:
                print("Введите корректный номер из списка.")

        convert_file_encoding(input_file, output_file, target_encoding)

    except KeyboardInterrupt:
        logger.warning("Операция прервана пользователем.")
    except Exception as e:
        logger.error(f"Ошибка в основной функции: {e}")


if __name__ == "__main__":
    main()

15. Нормализатор строк в файлах. Работает в 4-х режимах:
  1. Форматирование строк (format) - выполняет форматирование строк в экспортируемый файл, указываете разделитель строки входящего файла и их формат к примеру "ip:mail:pass", указываете нужный формат "mail:pass:ip" на выходе волучаете отформатированные строки.
  2. Фильтрация строк по формату (filter) - указываете разделитель, и формат тех строк которые нужно вытащить из файла, например только mail:pass, на выходе получаете отфильтрованные строки и зфайла.
  3. Очистка строк от спецсимволов (clean) - указываете спецсимволы которые вы хотите почистить из строк, на выходе получаете файл со строками без спецсимволов указанных алгоритму.
  4. Обрезка строк по длине (trim) - указываете диапазон по какой длине отсеивать строки в экспортируемый файл, получаете файл со строками по указанному количеству символов диапазона.
Код: Не влез в размер этого сообщения.

P.S. Модули можно использовать как по отдельности утилитами, так и через основной файл из консольного меню.
P.S.S. Первую версию на публику прикрепил в атаче. По мере свободного времени буду обновлять и исправлять найденные ошибки.
 
Сверху Снизу