D2
Администратор
- Регистрация
- 19 Фев 2025
- Сообщения
- 4,380
- Реакции
- 0
Всем привет, вариант отработки этого (https://xss.is/threads/123192/) алгоритма на C#, при компиляции в Visual Studio 2022 не забудьте поставить Nuget Packet: Microsoft.Extensions.Logging
Написал: rand
Эксклюзивно для: XSS.is
C#: Скопировать в буфер обмена
Тестирование 100 миллионов строк на файле размером 2GB (Ryzen 5600 4700mhz, 32GB DDR4 3600Mhz):
P.S. Есть баги со счетчиками расчета дублей строк (мне лень пока это фиксить, буду признателен если кто пофиксит), давно не кодил на шарпе, но по образцовому файлу в 2 гигабайта, сортирует и чистит правильно.
По Virus Total один детект после компиляции:
Написал: rand
Эксклюзивно для: XSS.is
C#: Скопировать в буфер обмена
Код:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Text;
using Microsoft.Extensions.Logging;
using System.Threading;
class Program
{
// Инициализация логгера для вывода информации в консоль с уровнем сообщений от Information и выше
private static readonly ILogger logger = LoggerFactory.Create(builder =>
{
builder.AddConsole();
builder.AddFilter(level => level >= LogLevel.Information);
}).CreateLogger<Program>();
// Определение временной директории для хранения промежуточных файлов
private static string tempDir = Path.Combine(Directory.GetCurrentDirectory(), "TEMP");
// Объекты блокировки для синхронизации потоков при работе с общими ресурсами
private static readonly object _finalTempFilesLock = new object();
private static readonly object _totalCountLock = new object();
// Основной метод программы
static async Task Main(string[] args)
{
// Запуск таймера для измерения времени выполнения программы
var stopwatch = Stopwatch.StartNew();
string inputFile = "large_random_emails.txt"; // Имя импортируемого файла
string outputFile = "output-sorted-unique.txt"; // Имя экспортируемого файла
logger.LogInformation($"Запуск программы для обработки файла {inputFile} и сохранения в {outputFile}");
// Запуск асинхронной функции для сортировки и удаления дубликатов
int originalCount = await SortAndUniqStreaming(inputFile, outputFile);
// Остановка таймера и вывод итоговой информации
stopwatch.Stop();
logger.LogInformation($"Всего обработано строк: {originalCount}");
logger.LogInformation($"Удаление дублей и сортировка заняли {stopwatch.Elapsed.TotalSeconds:F4} секунд");
}
// Асинхронная функция, которая реализует чтение файла, сортировку и удаление дубликатов
static async Task<int> SortAndUniqStreaming(string inputFile, string outputFile, int chunkSize = 2000000, int batchSize = 10)
{
// Потокобезопасная коллекция для хранения задач по обработке чанков
ConcurrentBag<Task<string>> tempFileTasks = new ConcurrentBag<Task<string>>();
List<string> chunk = new List<string>(); // Список для хранения текущего чанка данных
int originalCount = 0; // Счётчик общего количества строк
// Проверка наличия временной директории и создание её при необходимости
if (!Directory.Exists(tempDir))
{
Directory.CreateDirectory(tempDir);
}
logger.LogInformation($"Чтение файла {inputFile}...");
// Чтение файла построчно
using (StreamReader reader = new StreamReader(inputFile, Encoding.UTF8))
{
while (!reader.EndOfStream)
{
string line = await reader.ReadLineAsync(); // Чтение строки
chunk.Add(line.Trim()); // Добавление строки в чанк
originalCount++; // Увеличение счётчика строк
// Если чанк заполнен (достигнут лимит chunkSize), отправляем его на обработку
if (chunk.Count >= chunkSize)
{
var chunkCopy = chunk.ToArray(); // Копирование чанка для передачи в задачу
logger.LogInformation($"Чанк заполнен ({chunk.Count} строк). Начинаем обработку.");
tempFileTasks.Add(Task.Run(() => ProcessChunk(chunkCopy))); // Запуск задачи на обработку чанка
chunk.Clear(); // Очистка чанка для новых данных
}
}
// Обработка последнего неполного чанка, если он остался
if (chunk.Count > 0)
{
var chunkCopy = chunk.ToArray();
logger.LogInformation($"Обработка последнего неполного чанка размером {chunk.Count} строк.");
tempFileTasks.Add(Task.Run(() => ProcessChunk(chunkCopy)));
}
}
// Ожидание завершения всех задач по обработке чанков
logger.LogInformation("Ожидание завершения обработки всех чанков...");
string[] tempFiles = await Task.WhenAll(tempFileTasks);
logger.LogInformation("Все чанки обработаны. Начинается пакетное слияние временных файлов батчами...");
// Удаление пустых файлов и подготовка к слиянию
tempFiles = tempFiles.Where(file => file != null).ToArray();
ConcurrentBag<string> finalTempFiles = new ConcurrentBag<string>(); // Хранилище финальных временных файлов
int totalUniqueCount = 0; // Общее количество уникальных строк
int totalDuplicateCount = 0; // Общее количество дубликатов
// Разбиение файлов на батчи для пакетной обработки
var mergeTasks = new List<Task>();
for (int i = 0; i < tempFiles.Length; i += batchSize)
{
var batch = tempFiles.Skip(i).Take(batchSize).ToArray();
// Создание задачи для слияния батча
mergeTasks.Add(Task.Run(async () =>
{
if (batch.Length > 1)
{
string tempOutputFile = Path.Combine(tempDir, Path.GetRandomFileName()); // Имя временного файла для слияния
logger.LogInformation($"Слияние батча из {batch.Length} файлов в {tempOutputFile}.");
var mergeResult = await MergeFiles(batch, tempOutputFile); // Слияние файлов в батче
finalTempFiles.Add(tempOutputFile); // Добавление итогового файла в общий список
// Увеличение глобальных счётчиков уникальных строк и дубликатов
Interlocked.Add(ref totalUniqueCount, mergeResult.Item1);
Interlocked.Add(ref totalDuplicateCount, mergeResult.Item2);
// Удаление временных файлов после их слияния
foreach (var tempFile in batch)
{
if (File.Exists(tempFile))
{
File.Delete(tempFile);
}
}
}
else
{
// Если в батче только один файл, добавляем его без изменений
finalTempFiles.Add(batch[0]);
}
}));
}
// Ожидание завершения всех задач по слиянию файлов
await Task.WhenAll(mergeTasks);
// Финальное слияние временных файлов в выходной файл
if (finalTempFiles.Count > 0)
{
logger.LogInformation($"Финальное слияние {finalTempFiles.Count} временных файлов в {outputFile}.");
var mergeFinalResult = await MergeFiles(finalTempFiles.ToArray(), outputFile); // Окончательное слияние
totalUniqueCount += mergeFinalResult.Item1; // Добавление уникальных строк
totalDuplicateCount += mergeFinalResult.Item2; // Добавление дубликатов
}
// Вывод информации о количестве уникальных строк и дубликатов
logger.LogInformation($"Уникальных строк: {totalUniqueCount}, дублей удалено: {totalDuplicateCount}");
return originalCount; // Возврат общего количества строк
}
// Функция для обработки чанка: удаление дубликатов и запись уникальных строк во временный файл
static async Task<string> ProcessChunk(string[] chunk)
{
try
{
logger.LogInformation($"Начало обработки чанка размером {chunk.Length} строк");
if (chunk.Length == 0)
{
logger.LogWarning("Пустой чанк, пропуск записи во временный файл.");
return null; // Пропуск обработки пустого чанка
}
int originalCount = chunk.Length; // Количество строк до удаления дублей
var uniqueItems = chunk.AsParallel().Distinct().OrderBy(x => x).ToList(); // Удаление дублей и сортировка
int duplicateCount = originalCount - uniqueItems.Count; // Вычисление количества дубликатов
string tempFilePath = Path.Combine(tempDir, Path.GetRandomFileName()); // Создание имени временного файла
if (uniqueItems.Count > 0)
{
// Запись уникальных строк во временный файл
logger.LogInformation($"Чанк перед записью: {uniqueItems.Count} уникальных строк, дублей удалено: {duplicateCount}. Запись в файл {tempFilePath}");
await File.WriteAllLinesAsync(tempFilePath, uniqueItems);
logger.LogInformation($"Чанк успешно записан во временный файл {tempFilePath}");
return tempFilePath; // Возвращаем путь к временному файлу
}
logger.LogWarning($"Чанк пуст после удаления дублей, файл {tempFilePath} не создан.");
return null; // Если после удаления дублей чанк пуст, не создаём файл
}
catch (Exception ex)
{
// Логируем ошибку и возвращаем null
logger.LogError($"Ошибка при обработке чанка: {ex}");
return null;
}
}
// Функция для слияния временных файлов и удаления дублей
static async Task<(int, int)> MergeFiles(string[] tempFiles, string outputFile)
{
try
{
logger.LogInformation($"Окончательное слияние {tempFiles.Length} временных файлов в {outputFile}");
int uniqueCount = 0; // Счетчик уникальных строк
int duplicateCount = 0; // Счетчик дубликатов
// Создаем StreamWriter для записи результата в выходной файл
using (StreamWriter writer = new StreamWriter(outputFile, false, Encoding.UTF8))
{
// Открываем каждый временный файл с помощью StreamReader
var readers = tempFiles.Select(file => new StreamReader(file)).ToList();
// Используем итератор для объединения данных из всех файлов
var mergedIter = Merge(readers);
string prevLine = null; // Переменная для отслеживания предыдущей строки
// Проход по всем строкам, полученным из слияния
await foreach (var line in mergedIter)
{
// Если текущая строка не совпадает с предыдущей (т.е. строка уникальна)
if (line != prevLine)
{
// Записываем строку в файл
await writer.WriteLineAsync(line);
prevLine = line; // Обновляем предыдущую строку
uniqueCount++; // Увеличиваем счетчик уникальных строк
}
else
{
duplicateCount++; // Если строка повторяется, увеличиваем счетчик дубликатов
}
}
// Закрываем все StreamReader после завершения обработки
foreach (var reader in readers)
{
reader.Close();
}
}
// Логируем результаты слияния: сколько уникальных строк и сколько дубликатов было удалено
logger.LogInformation($"Слияние завершено. Уникальных строк: {uniqueCount}, дублей удалено: {duplicateCount}");
return (uniqueCount, duplicateCount); // Возвращаем количество уникальных строк и дубликатов
}
catch (Exception ex)
{
// Логируем ошибки, если произошел сбой при слиянии
logger.LogError($"Ошибка при слиянии файлов: {ex}");
return (0, 0); // Возвращаем 0 в случае ошибки
}
}
// Итератор для слияния строк из всех временных файлов
static async IAsyncEnumerable<string> Merge(IEnumerable<StreamReader> readers)
{
// Используем SortedDictionary для сортировки строк при слиянии
var pq = new SortedDictionary<string, List<StreamReader>>();
// Проходим по каждому StreamReader и добавляем первую строку в словарь
foreach (var reader in readers)
{
if (!reader.EndOfStream)
{
string line = await reader.ReadLineAsync();
if (!pq.ContainsKey(line))
pq[line] = new List<StreamReader>();
pq[line].Add(reader);
}
}
// Пока есть строки для обработки
while (pq.Count > 0)
{
var first = pq.First(); // Получаем первую (наименьшую) строку из словаря
yield return first.Key; // Возвращаем строку
// Читаем следующие строки из соответствующих StreamReader
foreach (var reader in first.Value)
{
if (!reader.EndOfStream)
{
string line = await reader.ReadLineAsync();
if (!pq.ContainsKey(line))
pq[line] = new List<StreamReader>();
pq[line].Add(reader); // Добавляем строку в словарь для дальнейшей обработки
}
}
// Удаляем обработанную строку из словаря
pq.Remove(first.Key);
}
}
}
Тестирование 100 миллионов строк на файле размером 2GB (Ryzen 5600 4700mhz, 32GB DDR4 3600Mhz):
P.S. Есть баги со счетчиками расчета дублей строк (мне лень пока это фиксить, буду признателен если кто пофиксит), давно не кодил на шарпе, но по образцовому файлу в 2 гигабайта, сортирует и чистит правильно.
По Virus Total один детект после компиляции: