D2
Администратор
- Регистрация
- 19 Фев 2025
- Сообщения
- 4,380
- Реакции
- 0
Т.к. много работы с клаудами, делал для себя такой скрипт
Массовая многопоточная распаковка текстовых файлов из архивов RAR/ZIP
- Отслеживание прогресса работы
- friendly gui (для удобства)
- копирование запароленных архивов в отдельную директорию
- присвоение уникальных имен файлам, для избежания дальнейших проблем с кривыми именами при работе ( в моем случае сид серчера )
Код: Скопировать в буфер обмена
soure:
Код: Скопировать в буфер обмена
Массовая многопоточная распаковка текстовых файлов из архивов RAR/ZIP
- Отслеживание прогресса работы
- friendly gui (для удобства)
- копирование запароленных архивов в отдельную директорию
- присвоение уникальных имен файлам, для избежания дальнейших проблем с кривыми именами при работе ( в моем случае сид серчера )
Код: Скопировать в буфер обмена
pip install PyQt5==5.15.11 rarfile==4.2 tqdm==4.66.4 qt_material==2.14
soure:
Код: Скопировать в буфер обмена
Код:
import os
import zipfile
import rarfile
import uuid
import shutil
import multiprocessing
from tqdm import tqdm
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QLabel, QLineEdit, QPushButton, QCheckBox
from qt_material import apply_stylesheet
class ArchiveExtractorGUI(QWidget):
def __init__(self):
super().__init__()
self.setWindowTitle("Archive Extractor")
self.initUI()
def initUI(self):
layout = QVBoxLayout()
self.target_directory_label = QLabel("Введите путь к исходной директории:")
self.target_directory_input = QLineEdit()
layout.addWidget(self.target_directory_label)
layout.addWidget(self.target_directory_input)
self.output_directory_label = QLabel("Введите путь к директории для сохранения:")
self.output_directory_input = QLineEdit()
layout.addWidget(self.output_directory_label)
layout.addWidget(self.output_directory_input)
self.password_protected_directory_label = QLabel("Введите путь к директории для архивов с паролем:")
self.password_protected_directory_input = QLineEdit()
layout.addWidget(self.password_protected_directory_label)
layout.addWidget(self.password_protected_directory_input)
self.copy_password_protected_checkbox = QCheckBox("Копировать архивы с паролем")
self.copy_password_protected_checkbox.setChecked(True) # По умолчанию включено
layout.addWidget(self.copy_password_protected_checkbox)
self.extract_button = QPushButton("Извлечь файлы")
self.extract_button.clicked.connect(self.extractFiles)
layout.addWidget(self.extract_button)
self.setLayout(layout)
def extractFiles(self):
target_directory = self.target_directory_input.text()
output_directory = self.output_directory_input.text()
password_protected_directory = self.password_protected_directory_input.text()
copy_password_protected = self.copy_password_protected_checkbox.isChecked()
if not target_directory or not output_directory or (copy_password_protected and not password_protected_directory):
print("Пожалуйста, заполните все поля.")
return
password_file = "protected_archives.log" # Файл для сохранения имен архивов с паролями
unpack_log_files(target_directory, output_directory, password_file, password_protected_directory, copy_password_protected)
def is_text_file(file_name):
#text_formats = {'.txt', '.sql', '.log', '.csv', '.json', '.xml', '.html', '.htm', '.ini', '.cfg', '.md'}
#text_formats = {'.py', '.ts'}
text_formats = {'.doc', '.docx', '.php', '.xlsx', '.py', '.ts', '.txt', '.sql', '.csv', '.json', '.xml', '.cfg', '.md'}
return os.path.splitext(file_name)[1].lower() in text_formats
def extract_files_from_archive(archive, output_directory):
try:
for item in archive.namelist():
if is_text_file(item):
unique_id = str(uuid.uuid4().hex)
ext = os.path.splitext(item)[1]
new_file_name = f"{unique_id}{ext}"
with archive.open(item, "r") as source_file:
try:
with open(os.path.join(output_directory, new_file_name), "w", encoding="utf-8", errors="ignore") as target_file:
target_file.write(source_file.read().decode("utf-8", errors="ignore"))
except Exception:
pass #игнорируем ошибки сохранения
except Exception:
pass #Игнорируем ошибки при работе с архивом
def extract_files_from_rar(archive_path, output_directory, password_file="protected_archives.log", password_protected_directory=None, copy_password_protected=True):
try:
with rarfile.RarFile(archive_path, "r") as archive:
extract_files_from_archive(archive, output_directory)
except rarfile.PasswordRequired:
try:
with open(password_file, "a", encoding="utf-8") as protected_file:
protected_file.write(f"{archive_path}\n")
if copy_password_protected and password_protected_directory:
shutil.copy(archive_path, password_protected_directory)
except Exception:
pass #Игнорируем ошибки копирования или записи в лог
except Exception:
pass #Игнорируем другие ошибки работы с архивом
def process_archive(file_path, output_directory, password_file="protected_archives.log", password_protected_directory=None, copy_password_protected=True):
try:
if file_path.lower().endswith(".zip"):
try:
with zipfile.ZipFile(file_path, "r") as archive:
archive.testzip() # Проверяем корректность ZIP архива
extract_files_from_archive(archive, output_directory)
except zipfile.BadZipFile:
print(f"Ошибка: {file_path} не является корректным ZIP архивом. Пропуск...")
except RuntimeError:
try:
with open(password_file, "a", encoding="utf-8") as protected_file:
protected_file.write(f"{file_path}\n")
if copy_password_protected and password_protected_directory:
shutil.copy(file_path, password_protected_directory)
except Exception:
pass # Игнорируем ошибки копирования или записи в лог
elif file_path.lower().endswith(".rar"):
try:
extract_files_from_rar(file_path, output_directory, password_file, password_protected_directory, copy_password_protected)
except Exception:
pass # Игнорируем ошибки обработки RAR архива
except Exception:
pass # Игнорируем любые другие ошибки
def process_wrapper(args):
try:
process_archive(*args)
except Exception:
pass # Игнорируем ошибки в процессе обработки архива
return 1
def unpack_log_files(target_directory, output_directory, password_file="protected_archives.log", password_protected_directory=None, copy_password_protected=True):
archives_to_process = []
for root, _, files in os.walk(target_directory):
for file_name in files:
file_path = os.path.join(root, file_name)
# Добавляем только архивы .rar и .zip
if file_name.lower().endswith((".rar", ".zip")):
archives_to_process.append(file_path)
max_processes = min(multiprocessing.cpu_count(), 20)
processed_count = 0
with multiprocessing.Pool(processes=max_processes) as pool:
args_list = [(file_path, output_directory, password_file, password_protected_directory, copy_password_protected) for file_path in archives_to_process]
for _ in tqdm(pool.imap_unordered(process_wrapper, args_list), total=len(args_list)):
processed_count += 1
print(f"Обработано архивов: {processed_count}")
if __name__ == "__main__":
import sys
app = QApplication(sys.argv)
apply_stylesheet(app, theme='dark_purple.xml')
window = ArchiveExtractorGUI()
window.resize(800, 600)
window.setWindowTitle("Archive Extractor")
window.show()
sys.exit(app.exec_())