Merge File Sort или быстрая сортировка строк и удаление дублей на C# в больших логах .txt

D2

Администратор
Регистрация
19 Фев 2025
Сообщения
4,380
Реакции
0
Всем привет, вариант отработки этого (https://xss.is/threads/123192/) алгоритма на C#, при компиляции в Visual Studio 2022 не забудьте поставить Nuget Packet: Microsoft.Extensions.Logging

Написал: rand
Эксклюзивно для: XSS.is

C#: Скопировать в буфер обмена
Код:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Text;
using Microsoft.Extensions.Logging;
using System.Threading;

class Program
{
    // Инициализация логгера для вывода информации в консоль с уровнем сообщений от Information и выше
    private static readonly ILogger logger = LoggerFactory.Create(builder =>
    {
        builder.AddConsole();
        builder.AddFilter(level => level >= LogLevel.Information);
    }).CreateLogger<Program>();

    // Определение временной директории для хранения промежуточных файлов
    private static string tempDir = Path.Combine(Directory.GetCurrentDirectory(), "TEMP");

    // Объекты блокировки для синхронизации потоков при работе с общими ресурсами
    private static readonly object _finalTempFilesLock = new object();
    private static readonly object _totalCountLock = new object();

    // Основной метод программы
    static async Task Main(string[] args)
    {
        // Запуск таймера для измерения времени выполнения программы
        var stopwatch = Stopwatch.StartNew();
        string inputFile = "large_random_emails.txt";   // Имя импортируемого файла
        string outputFile = "output-sorted-unique.txt"; // Имя экспортируемого файла

        logger.LogInformation($"Запуск программы для обработки файла {inputFile} и сохранения в {outputFile}");

        // Запуск асинхронной функции для сортировки и удаления дубликатов
        int originalCount = await SortAndUniqStreaming(inputFile, outputFile);

        // Остановка таймера и вывод итоговой информации
        stopwatch.Stop();
        logger.LogInformation($"Всего обработано строк: {originalCount}");
        logger.LogInformation($"Удаление дублей и сортировка заняли {stopwatch.Elapsed.TotalSeconds:F4} секунд");
    }

    // Асинхронная функция, которая реализует чтение файла, сортировку и удаление дубликатов
    static async Task<int> SortAndUniqStreaming(string inputFile, string outputFile, int chunkSize = 2000000, int batchSize = 10)
    {
        // Потокобезопасная коллекция для хранения задач по обработке чанков
        ConcurrentBag<Task<string>> tempFileTasks = new ConcurrentBag<Task<string>>();
        List<string> chunk = new List<string>();  // Список для хранения текущего чанка данных
        int originalCount = 0; // Счётчик общего количества строк

        // Проверка наличия временной директории и создание её при необходимости
        if (!Directory.Exists(tempDir))
        {
            Directory.CreateDirectory(tempDir);
        }

        logger.LogInformation($"Чтение файла {inputFile}...");

        // Чтение файла построчно
        using (StreamReader reader = new StreamReader(inputFile, Encoding.UTF8))
        {
            while (!reader.EndOfStream)
            {
                string line = await reader.ReadLineAsync();  // Чтение строки
                chunk.Add(line.Trim());                      // Добавление строки в чанк
                originalCount++;                             // Увеличение счётчика строк

                // Если чанк заполнен (достигнут лимит chunkSize), отправляем его на обработку
                if (chunk.Count >= chunkSize)
                {
                    var chunkCopy = chunk.ToArray();         // Копирование чанка для передачи в задачу
                    logger.LogInformation($"Чанк заполнен ({chunk.Count} строк). Начинаем обработку.");
                    tempFileTasks.Add(Task.Run(() => ProcessChunk(chunkCopy))); // Запуск задачи на обработку чанка
                    chunk.Clear();                          // Очистка чанка для новых данных
                }
            }

            // Обработка последнего неполного чанка, если он остался
            if (chunk.Count > 0)
            {
                var chunkCopy = chunk.ToArray();
                logger.LogInformation($"Обработка последнего неполного чанка размером {chunk.Count} строк.");
                tempFileTasks.Add(Task.Run(() => ProcessChunk(chunkCopy)));
            }
        }

        // Ожидание завершения всех задач по обработке чанков
        logger.LogInformation("Ожидание завершения обработки всех чанков...");
        string[] tempFiles = await Task.WhenAll(tempFileTasks);

        logger.LogInformation("Все чанки обработаны. Начинается пакетное слияние временных файлов батчами...");

        // Удаление пустых файлов и подготовка к слиянию
        tempFiles = tempFiles.Where(file => file != null).ToArray();
        ConcurrentBag<string> finalTempFiles = new ConcurrentBag<string>(); // Хранилище финальных временных файлов
        int totalUniqueCount = 0;  // Общее количество уникальных строк
        int totalDuplicateCount = 0; // Общее количество дубликатов

        // Разбиение файлов на батчи для пакетной обработки
        var mergeTasks = new List<Task>();
        for (int i = 0; i < tempFiles.Length; i += batchSize)
        {
            var batch = tempFiles.Skip(i).Take(batchSize).ToArray();

            // Создание задачи для слияния батча
            mergeTasks.Add(Task.Run(async () =>
            {
                if (batch.Length > 1)
                {
                    string tempOutputFile = Path.Combine(tempDir, Path.GetRandomFileName()); // Имя временного файла для слияния
                    logger.LogInformation($"Слияние батча из {batch.Length} файлов в {tempOutputFile}.");
                    var mergeResult = await MergeFiles(batch, tempOutputFile);  // Слияние файлов в батче

                    finalTempFiles.Add(tempOutputFile); // Добавление итогового файла в общий список

                    // Увеличение глобальных счётчиков уникальных строк и дубликатов
                    Interlocked.Add(ref totalUniqueCount, mergeResult.Item1);
                    Interlocked.Add(ref totalDuplicateCount, mergeResult.Item2);

                    // Удаление временных файлов после их слияния
                    foreach (var tempFile in batch)
                    {
                        if (File.Exists(tempFile))
                        {
                            File.Delete(tempFile);
                        }
                    }
                }
                else
                {
                    // Если в батче только один файл, добавляем его без изменений
                    finalTempFiles.Add(batch[0]);
                }
            }));
        }

        // Ожидание завершения всех задач по слиянию файлов
        await Task.WhenAll(mergeTasks);

        // Финальное слияние временных файлов в выходной файл
        if (finalTempFiles.Count > 0)
        {
            logger.LogInformation($"Финальное слияние {finalTempFiles.Count} временных файлов в {outputFile}.");
            var mergeFinalResult = await MergeFiles(finalTempFiles.ToArray(), outputFile); // Окончательное слияние
            totalUniqueCount += mergeFinalResult.Item1; // Добавление уникальных строк
            totalDuplicateCount += mergeFinalResult.Item2; // Добавление дубликатов
        }

        // Вывод информации о количестве уникальных строк и дубликатов
        logger.LogInformation($"Уникальных строк: {totalUniqueCount}, дублей удалено: {totalDuplicateCount}");
        return originalCount; // Возврат общего количества строк
    }

    // Функция для обработки чанка: удаление дубликатов и запись уникальных строк во временный файл
    static async Task<string> ProcessChunk(string[] chunk)
    {
        try
        {
            logger.LogInformation($"Начало обработки чанка размером {chunk.Length} строк");

            if (chunk.Length == 0)
            {
                logger.LogWarning("Пустой чанк, пропуск записи во временный файл.");
                return null; // Пропуск обработки пустого чанка
            }

            int originalCount = chunk.Length; // Количество строк до удаления дублей
            var uniqueItems = chunk.AsParallel().Distinct().OrderBy(x => x).ToList(); // Удаление дублей и сортировка
            int duplicateCount = originalCount - uniqueItems.Count; // Вычисление количества дубликатов

            string tempFilePath = Path.Combine(tempDir, Path.GetRandomFileName()); // Создание имени временного файла

            if (uniqueItems.Count > 0)
            {
                // Запись уникальных строк во временный файл
                logger.LogInformation($"Чанк перед записью: {uniqueItems.Count} уникальных строк, дублей удалено: {duplicateCount}. Запись в файл {tempFilePath}");
                await File.WriteAllLinesAsync(tempFilePath, uniqueItems);
                logger.LogInformation($"Чанк успешно записан во временный файл {tempFilePath}");
                return tempFilePath; // Возвращаем путь к временному файлу
            }

            logger.LogWarning($"Чанк пуст после удаления дублей, файл {tempFilePath} не создан.");
            return null; // Если после удаления дублей чанк пуст, не создаём файл
        }
        catch (Exception ex)
        {
            // Логируем ошибку и возвращаем null
            logger.LogError($"Ошибка при обработке чанка: {ex}");
            return null;
        }
    }

    // Функция для слияния временных файлов и удаления дублей
    static async Task<(int, int)> MergeFiles(string[] tempFiles, string outputFile)
    {
        try
        {
            logger.LogInformation($"Окончательное слияние {tempFiles.Length} временных файлов в {outputFile}");

            int uniqueCount = 0;    // Счетчик уникальных строк
            int duplicateCount = 0; // Счетчик дубликатов

            // Создаем StreamWriter для записи результата в выходной файл
            using (StreamWriter writer = new StreamWriter(outputFile, false, Encoding.UTF8))
            {
                // Открываем каждый временный файл с помощью StreamReader
                var readers = tempFiles.Select(file => new StreamReader(file)).ToList();
                // Используем итератор для объединения данных из всех файлов
                var mergedIter = Merge(readers);

                string prevLine = null;  // Переменная для отслеживания предыдущей строки

                // Проход по всем строкам, полученным из слияния
                await foreach (var line in mergedIter)
                {
                    // Если текущая строка не совпадает с предыдущей (т.е. строка уникальна)
                    if (line != prevLine)
                    {
                        // Записываем строку в файл
                        await writer.WriteLineAsync(line);
                        prevLine = line;  // Обновляем предыдущую строку
                        uniqueCount++;    // Увеличиваем счетчик уникальных строк
                    }
                    else
                    {
                        duplicateCount++; // Если строка повторяется, увеличиваем счетчик дубликатов
                    }
                }

                // Закрываем все StreamReader после завершения обработки
                foreach (var reader in readers)
                {
                    reader.Close();
                }
            }

            // Логируем результаты слияния: сколько уникальных строк и сколько дубликатов было удалено
            logger.LogInformation($"Слияние завершено. Уникальных строк: {uniqueCount}, дублей удалено: {duplicateCount}");
            return (uniqueCount, duplicateCount);  // Возвращаем количество уникальных строк и дубликатов
        }
        catch (Exception ex)
        {
            // Логируем ошибки, если произошел сбой при слиянии
            logger.LogError($"Ошибка при слиянии файлов: {ex}");
            return (0, 0);  // Возвращаем 0 в случае ошибки
        }
    }

    // Итератор для слияния строк из всех временных файлов
    static async IAsyncEnumerable<string> Merge(IEnumerable<StreamReader> readers)
    {
        // Используем SortedDictionary для сортировки строк при слиянии
        var pq = new SortedDictionary<string, List<StreamReader>>();

        // Проходим по каждому StreamReader и добавляем первую строку в словарь
        foreach (var reader in readers)
        {
            if (!reader.EndOfStream)
            {
                string line = await reader.ReadLineAsync();
                if (!pq.ContainsKey(line))
                    pq[line] = new List<StreamReader>();
                pq[line].Add(reader);
            }
        }

        // Пока есть строки для обработки
        while (pq.Count > 0)
        {
            var first = pq.First();  // Получаем первую (наименьшую) строку из словаря
            yield return first.Key;  // Возвращаем строку

            // Читаем следующие строки из соответствующих StreamReader
            foreach (var reader in first.Value)
            {
                if (!reader.EndOfStream)
                {
                    string line = await reader.ReadLineAsync();
                    if (!pq.ContainsKey(line))
                        pq[line] = new List<StreamReader>();
                    pq[line].Add(reader);  // Добавляем строку в словарь для дальнейшей обработки
                }
            }

            // Удаляем обработанную строку из словаря
            pq.Remove(first.Key);
        }
    }
}

Тестирование 100 миллионов строк на файле размером 2GB (Ryzen 5600 4700mhz, 32GB DDR4 3600Mhz):

1727102537685.png



P.S. Есть баги со счетчиками расчета дублей строк (мне лень пока это фиксить, буду признателен если кто пофиксит), давно не кодил на шарпе, но по образцовому файлу в 2 гигабайта, сортирует и чистит правильно.

1727106688483.png




По Virus Total один детект после компиляции:

1727107160784.png
 
Сверху Снизу