D2
Администратор
- Регистрация
- 19 Фев 2025
- Сообщения
- 4,380
- Реакции
- 0
Предисловие
В этой статье будет реализован парсер для Telegram, который будет собирать zip- и rar-архивы из каналов. Для управления файлами будет создана веб-панель, с помощью которой можно фильтровать архивы по заданным критериям, таким как названия папок и файлов, указанные в конфигурации. Также будет добавлена возможность получать уведомления о новых собранных файлах. Работа программы будет протестирована на каналах, публикующих логи, а также на канале, публикующем исходники различных проектов.Подготовка проекта
Первое, что нужно сделать — это создать проект и подготовить его основную структуру.А именно:
- главный Python-файл с названием main.py;
- папка static с CSS-файлом для стилей будущей панели;
- папка templates с index.html (веб-часть будущей панели);
- в папку проекта добавить папку с WinRAR (это понадобится для разархивации архивов из Telegram-каналов, если они в формате rar, поскольку не было найдено ни одной библиотеки, которая может это сделать самостоятельно);
- также внутри папки проекта нужно создать файл config.json (в нем будут храниться ключевые слова, по которым будет происходить поиск файлов внутри архивов).
Python: Скопировать в буфер обмена
Код:
import io
import os
import shutil
import zipfile
import rarfile
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from flask import Flask, jsonify, render_template, send_file, request
from flask_sqlalchemy import SQLAlchemy
from pyrogram import Client
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
import json
import progressbar
from aiogram import Bot
Так как панель будет на вебе, по классике я буду использовать Flask в связке с SQLAlchemy (с базой данных SQLite), поэтому его нужно инициализировать.
Python: Скопировать в буфер обмена
Код:
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///files.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
Далее нужно указать инициализацию базы данных внутри контекста приложения Flask, а также запуск самого Flask-приложения.
Python: Скопировать в буфер обмена
Код:
if __name__ == "__main__":
with app.app_context():
db.create_all()
app.run(debug=True, use_reloader=False)
Далее укажем необходимые переменные.
Python: Скопировать в буфер обмена
Код:
rarfile.UNRAR_TOOL = os.path.abspath(r'WinRAR\UnRAR.exe') # Путь до unrar.exe
folder_extractions = ""
max_size =
channels = ["@channel", "@channel1", "@channe2", "@channe3"]
api_id =
api_hash = ""
session_string = ""
tg_client = Client(name="my_session", session_string=session_string, api_id=api_id, api_hash=api_hash)
- rarfile.UNRAR_TOOL отвечает за путь к .exe файлу необходимого WinRAR для разархивации будущих логов.
- folder_extractions отвечает за путь к папке, куда будут разархивироваться логи.
- max_size отвечает за максимальный допустимый размер логов в байтах.
- channels — это список каналов, по которым будет проходиться программа.
- api_id, api_hash, session_string понадобятся для авторизации в аккаунте Telegram, с которого и будет происходить парсинг логов.
- tg_client создает и инициализирует новый клиент Telegram с последующей авторизацией, используя api_id, api_hash, и session_string.
Как получить session_string
Для того, чтобы получить сессию, придется написать небольшой скрипт. Использовать он будет библиотеку Pyrogram.Python: Скопировать в буфер обмена
Код:
from pyrogram import Client
api_id = 123213123
api_hash = ""
with Client("my_account", api_id, api_hash) as client:
try:
client.start()
except:
pass
print("Сессия:", client.export_session_string())
Получение api id и api hash
Для того чтобы получить необходимые данные, нужно перейти по ссылке: https://my.telegram.org/apps.Затем ввести номер телефона от аккаунта, который будет использован.
Далее нужно заполнить все поля, кроме последнего (оно не обязательно).
После заполнения нужно нажать на кнопку Create; после этого откроется страница, где будут находиться API ID и API hash.
После того как в код вставлены API ID и API hash, можно запустить скрипт для получения сессии. В консоли потребуется указать номер телефона, пройти 2FA (если оно есть) и ввести код из SMS.
P.S. Лучше не пытайтесь получить сессию с аккаунта, зарегистрированного на виртуальный номер. Такие аккаунты сразу же блокируются безвозвратно (проверено на моем опыте). В живых остаются лишь аккаунты, созданные на реальную SIM-карту.
Создание таблиц в базе данных
Теперь рассмотрим создание таблиц в базе данных. Таблиц будет всего три:- FileRecord
- FolderRecord
- Logs
Модель базы данных для хранения и записи всех скачанных архивов из Telegram будет использоваться для проверки на дубликаты файлов по размеру и имени файла.
Python: Скопировать в буфер обмена
Код:
class FileRecord(db.Model):
id = db.Column(db.Integer, primary_key=True)
file_id = db.Column(db.String, unique=True, nullable=False)
file_name = db.Column(db.String, nullable=False)
file_size = db.Column(db.Integer, nullable=False)
download_date = db.Column(db.DateTime, default=datetime.utcnow)
def __repr__(self):
return f'<FileRecord {self.file_name} ({self.file_size} bytes)>'
- id — первичный ключ, который используется для уникализации каждой строки в базе данных (то есть это ключ, позволяющий создать уникальное значение id для каждой записи).
- file_id — file_id из Telegram.
- file_name — имя вложения из Telegram или имя архива.
- file_size — размер архива.
- download_date — дата загрузки архива из Telegram-канала.
Модель базы данных для хранения информации о папках и дополнительных данных будет использоваться для отображения всех файлов и их директорий на странице с логами, создавая более подробный отчет.
Python: Скопировать в буфер обмена
Код:
class FolderRecord(db.Model):
id = db.Column(db.Integer, primary_key=True)
logs_id = db.Column(db.Integer, db.ForeignKey('logs.id'), nullable=False)
file_record_id = db.Column(db.Integer, db.ForeignKey('file_record.id'),
nullable=False)
folder_name = db.Column(db.String, nullable=False)
rel_path = db.Column(db.String, nullable=True)
def __repr__(self):
return f'<FolderRecord {self.folder_name}, txt: {self.txt}, folder: {self.folder}>'
- id — первичный ключ для уникализации каждой строки в базе данных.
- logs_id — внешний ключ на таблицу file_record на столбец id для связи между таблицами.
- folder_name — название папки с логом.
- rel_path — относительный путь до папки.
Модель базы данных для хранения информации о логах и вывода на странице с логами будет использоваться для отображения различных заданных параметров по фильтру.
Python: Скопировать в буфер обмена
Код:
class Logs(db.Model):
id = db.Column(db.Integer, primary_key=True)
file_record_id = db.Column(db.Integer, db.ForeignKey('file_record.id'),
nullable=False)
folder_name = db.Column(db.String, nullable=False)
count_param = db.Column(db.String)
full_path = db.Column(db.String)
- id — первичный ключ для уникализации каждой строки в базе данных.
- file_record_id — внешний ключ на таблицу FileRecord на столбец id.
- folder_name — название папки с логом.
- full_path — полный путь до лога.
Парсинг архивов из Telegram
Далее начнем реализовывать логику для скачивания архивов из каналов.Как это устроено:
- Берется канал из массива в переменной channels.
- Получаем историю сообщений канала.
- Ищем в этой истории сообщения, содержащие вложения в форматах zip или rar.
- Затем в найденных сообщениях проверяем, чтобы файл с таким же именем и размером не был записан в базу данных. Также проверяем, чтобы файл не превышал допустимого максимального размера, указанного в переменной max_size, и дополнительно проверяем, чтобы время и дата создания поста в канале не были раньше времени и даты запуска софта.
- Вызываем инициализацию метода ProgressBar из библиотеки progressbar, и далее происходит запуск анимации.
- После этого происходит скачивание вложения с вызовом функции progress (будет написана позже) для обновления анимации прогресс-бара.
- После скачивания происходит запись данных о файле в базу данных.
- В конце вызывается функция для разархивации архива, о которой будет написано позже.
Для начала рассмотрим, как выглядит история сообщений из канала.
Спойлер: История сообщений
Как видно из последнего скриншота, в ключе документ указываются данные вложения, а также под этими данными представлена общая статистика по сообщению. То есть можно настроить достаточно гибкую фильтрацию по множеству параметров без каких-либо проблем.
Парсинг сообщений
Теперь рассмотрим саму функцию для парсинга более подробно.Python: Скопировать в буфер обмена
Код:
async def download_smallest_archive():
try:
async with tg_client:
for channel in channels:
async for message in tg_client.get_chat_history(channel):
if message.document and message.document.mime_type in ["application/zip", "application/vnd.rar"]:
file = message.document
if FileRecord.query.filter_by(file_name=file.file_name,
file_size=file.file_size).first() or file.file_size > max_size or file.date < time_started:
continue
print(f"Файл качается {file.file_name}")
bar = progressbar.ProgressBar(maxval=file.file_size, widgets=['Loading: ', progressbar.AnimatedMarker()]).start()
file_path = await tg_client.download_media(file.file_id, file_name=file.file_name, progress=progress, progress_args=(file.file_size, bar))
file_record = FileRecord(
file_id=file.file_id,
file_name=file.file_name,
file_size=file.file_size,
download_date=file.date.astimezone()
)
db.session.add(file_record)
db.session.commit()
print(f"Файл скачался {file.file_name}")
await extract_archive(file_path, f"{folder_extractions}/{os.path.basename(file_path)}", file_record.id)
print(f"Файл разархивировался {file.file_name}")
except Exception as err:
print(err)
Далее проверяем файл на уникальность по file_name и file_size (эта проверка проводится до скачивания, а не по хешу после скачивания). Также проверяется максимальный допустимый размер файла и дата, чтобы отсеять старые сообщения и избежать загрузки слишком больших файлов, например, 5-гигабайтных логов.
Перед загрузкой файла инициализируем и запускаем анимацию для отображения процесса загрузки. Затем передаем в функцию для обновления (бар) идентификатор файла, его имя и дополнительные параметры, такие как максимальный размер и объект прогресс-бара. Запускаем функцию загрузки tg_client.download_media и попутно запускаем функцию обновления прогресс-бара под названием progress (будет показана позже).
После загрузки файла добавляем запись в таблицу file_record и сохраняем изменения в базе данных. Затем разархивируем файл с помощью функции extract_archive, передавая путь до архива, место для распаковки и идентификатор файла, чтобы пометить его как разархивированный.
Функция для обновления прогрессбара
Python: Скопировать в буфер обмена
Код:
async def progress(current, total, file_size=None, bar=None):
try:
print(f"current: {current}")
print(f"full_size: {file_size}")
if current < file_size: # Что бы избежать ошибки ZeroDivisionError и переполнения
bar.update(current) # обновляет анимацию загрузки bar
except ZeroDivisionError:
pass # Игнорировать ошибку если скачано 0 байт
Настройка планировщика задач (scheduler)
Теперь можно было бы рассмотреть функцию разархивации, но перед этим рассмотрим использование scheduler (планировщик, позволяющий запланировать запуск функций с конкретным интервалом).Можно было бы использовать бесконечный цикл для функции парсинга, но тогда Flask будет занимать основной поток, и функция не будет запускаться, так как поток занят. Чтобы это исправить, можно запустить Flask в отдельном потоке, а в основном вызывать асинхронную функцию парсинга. Однако в данном случае мне показалось, что использование scheduler — более подходящий вариант, так как на форумах люди советуют не выносить Flask в отдельный поток, а использовать его контекстное меню.
Настройка параметров запуска планировщика задач и его задач.
Python: Скопировать в буфер обмена
Код:
scheduler = BackgroundScheduler()
scheduler.add_job(
scheduled_download,
'interval',
max_instances=1,
minutes=1,
next_run_time=datetime.now()
)
scheduler.start()
scheduler.add_job — это добавление новой задачи в планировщик с конкретными параметрами, такими как:
- максимальное количество задач,
- интервал между каждым запуском,
- параметр для первичного запуска без ожидания.
Как видно, внутри задачи указана функция scheduled_download. В данной функции будет вызываться функция парсинга, и она будет вызываться в контексте приложения Flask.
Python: Скопировать в буфер обмена
Код:
def scheduled_download():
with app.app_context():
tg_client.loop.run_until_complete(download_smallest_archive())
Разархивация файлов
Теперь можем рассмотреть функцию для разархивации архивов из каналов.И первое, что указано внутри функции — это определение типа файла: zip или rar, а также разархивация соответствующим методом.
Python: Скопировать в буфер обмена
Код:
async def extract_archive(file_path, dest_folder, file_record_id):
os.makedirs(dest_folder, exist_ok=True)
try:
if file_path.endswith('.zip'):
with zipfile.ZipFile(file_path, 'r') as archive:
archive.extractall(dest_folder)
elif file_path.endswith('.rar'):
with rarfile.RarFile(file_path, 'r') as archive:
archive.extractall(path=dest_folder)
else:
raise ValueError("Неподдерживаемый формат файла. Поддерживаются только zip и rar.")
except (rarfile.BadRarFile, rarfile.PasswordRequired, rarfile.NeedFirstVolume, rarfile.RarCannotExec) as e:
print(f"Ошибка с файлом: {e}")
return
Небольшая загвоздка
папок и корректной записи данных в базу данных.Дело в том, что разные пользователи архивируют по-своему: у кого-то в архиве сразу находятся файлы логов формата с TXT файлами и несколько папок, а у кого-то — множество папок, каждая из которых содержит файлы логов и другие данные. Чаще всего встречается второй вариант, и для него была написана логика записи имени основной папки каждого лога из архива.
Однако, если использовать тот же метод для архива, который разархивировался в папку, содержащую сразу файлы логов, в базу данных записывался первый попавшийся файл в качестве основного названия папки лога. В связи с этим я применил, на мой взгляд, максимально возможный костыль: проверяется наличие файла с расширением .txt непосредственно в разархивированной папке. Если такой файл присутствует, предполагается, что это уже папка с данными лога. В таком случае создается новая папка внутри разархивированной, и все файлы перемещаются в нее, тем самым стандартизируя структуру путей как для архивов с множеством логов, так и для архивов с одним логом.
Возможно объяснение покажется запутанным, поэтому покажу оба вида папок с логами.
Когда один лог в архиве:
Когда много логов в архиве:
Проверка структуры файла
А вот и сама логика проверки типа структуры папки и создания дополнительной папки:Python: Скопировать в буфер обмена
Код:
if bool([f for f in os.listdir(dest_folder) if f.endswith('.txt')]):
# Создаем новую папку внутри dest_folder с уникальным именем (без расширения .zip или .rar)
archive_name = os.path.splitext(os.path.basename(file_path))[0]
new_folder_path = os.path.join(dest_folder, archive_name)
# Перемещаем все файлы и папки из dest_folder в новую папку
for item in os.listdir(dest_folder):
old_path = os.path.join(dest_folder, item)
new_path = os.path.join(new_folder_path, item)
# Проверяем, что не пытаемся переместить папку в саму себя
if old_path != new_path:
# Убедимся, что новая папка не существует, иначе пропустим перемещение
if not os.path.exists(new_folder_path):
os.makedirs(new_folder_path)
# Если это файл, перемещаем его
if os.path.isfile(old_path):
shutil.move(old_path, new_path)
# Если это директория, перемещаем всю директорию
elif os.path.isdir(old_path):
shutil.move(old_path, new_path)
Далее мы получаем ключевые слова (параметры) из конфига, которые будем использовать в дальнейшем при обработке файлов и подсчете статистики по фильтрам искомых файлов:
Python: Скопировать в буфер обмена
Код:
with open("config.json", 'r') as file:
keywords = json.load(file).get("name_file", [])
Вот и сама обработка
Python: Скопировать в буфер обмена
Код:
for dir_log in os.listdir(dest_folder):
count_list = [] # массив для счетчик с нашими параметрами из конфига
for keyword in keywords:
# Подсчитаем количество файлов и папок, содержащих ключевое слово в названии
count = 0
for dirpath, dirnames, filenames in os.walk(os.path.join(dest_folder, dir_log)):
# Проверка в самой папке (dirpath) — если название папки содержит ключевое слово
if keyword in os.path.basename(dirpath):
count += 1
# Проверка в папках (dirnames)
if any(keyword in dirname for dirname in dirnames):
count += 1
# Проверка в файлах (filenames)
if any(keyword in filename for filename in filenames):
count += 1
count_list.append(str(count)) # добавляем этот счетчик в архив
Проверяем на первое ключевое слово каждую из этих переменных (dirpath, dirname, filename), обновляя счетчик.
В конце мы собираем наш массив со счетчиками с помощью count_list.append(str(count)).
Так же и с остальными ключевыми словами (параметрами из конфига), пока список не закончится.
После прохождения цикла нам нужно добавить запись в базу данных о логе.
Вот функция, которая, используя полученные переменные, создает запись.
Python: Скопировать в буфер обмена
Код:
log = Logs(full_path=os.path.abspath(os.path.join(dest_folder, dir_log)), folder_name=dir_log,
file_record_id=file_record_id, count_param=",".join(count_list))
db.session.add(log)
db.session.commit()
Далее, чтобы сделать уведомления, соберем наше сообщение и создадим клавиатуру.
Сама клавиатура будет inline, чтобы прикрепляться к сообщению, а не появляться в GUI переписки как reply."
Python: Скопировать в буфер обмена
Код:
ikb = InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(text=f"Скачать[HIDE][GROUPS=5,6,7,8,9]https://www.docker.com/
https://www.docker.com/
https://github.com/overlordgamedev/Telegram-File-Parser/tree/main
https://github.com/overlordgamedev/Telegram-File-Parser/tree/main
https://docs.google.com/document/d/1-HhuQ1tewoHjuNmI3soeU1eMLaVfH8lFxQPhWfFXXEI/edit?usp=sharing[/GROUPS][/HIDE]