mmpSearch/scripts/handler/main.py

503 lines
16 KiB
Python
Executable File

# main.py
import os
import shutil
import subprocess
import multiprocessing
import logging
import time
import platform
import sys
import re
import unicodedata
import csv
import psutil
from datetime import datetime, timedelta
# Importando seus módulos locais
from generate_manifest import generate_manifests
from file_parser import parse_mmp_file
from file_saver import save_to_json, save_to_yaml
from dependency_checker import check_dependencies
from utils import (
create_folders_if_not_exist,
DATA_FOLDER,
METADATA_FOLDER,
WAV_FOLDER,
MMPZ_FOLDER,
MMP_FOLDER,
SRC_MMPSEARCH,
LOG_FOLDER,
SAIDA_ANALISES,
)
# === CONFIGURAÇÕES DE SEGURANÇA E LOTES ===
TIMEOUT_RENDER_SECONDS = 300 # 5 minutos máximo por arquivo
MIN_RAM_FREE_MB = 500 # Mínimo de RAM livre para iniciar worker
BATCH_SIZE = 250 # Tamanho do lote para o YAML
LOTES_FOLDER = os.path.join(SRC_MMPSEARCH, "lotes_yaml") # Pasta dos lotes
LIMIT_FILES = 1000 # <--- 0 = PROCESSA TUDO | >0 = LIMITA A QUANTIDADE (EX: 50)
# === VARIÁVEL GLOBAL PARA O WORKER (CORREÇÃO MULTIPROCESSING) ===
# Necessário para evitar erro de pickle ao passar Lock via argumentos
global_counter = None
def init_worker(counter):
"""Inicializa a variável global dentro de cada processo worker"""
global global_counter
global_counter = counter
# === Função de Sanitização (Slugify) ===
def slugify(value):
"""
Normaliza a string: remove acentos, converte para minúsculas,
remove caracteres não alfanuméricos e substitui espaços por hifens.
"""
value = str(value)
value = (
unicodedata.normalize("NFKD", value).encode("ascii", "ignore").decode("ascii")
)
value = value.lower()
value = re.sub(r"[^\w\s-]", "", value)
value = re.sub(r"[-\s_]+", "-", value)
return value.strip("-_")
def check_system_dependencies():
"""Verifica se as ferramentas necessárias estão instaladas."""
required_tools = ["lmms"]
missing = []
for tool in required_tools:
if shutil.which(tool) is None:
missing.append(tool)
if missing:
logging.critical(f"FERRAMENTAS FALTANDO: {', '.join(missing)}")
logging.critical("Por favor instale: sudo apt-get install " + " ".join(missing))
sys.exit(1)
def get_cpu_safe_count():
"""Retorna contagem de CPU menos 1 para não travar o SO."""
try:
count = multiprocessing.cpu_count()
return max(1, count - 1)
except:
return 1
# --- Funções de Info de Sistema ---
def get_linux_mem_info():
try:
with open("/proc/meminfo", "r") as f:
for line in f:
if "MemTotal" in line:
kb_value = int(line.split()[1])
return kb_value / (1024 * 1024)
except (IOError, ValueError):
return 0
except Exception as e:
logging.warning(f"Erro ao ler memória: {e}")
return 0
def get_cpu_model_name():
try:
with open("/proc/cpuinfo", "r") as f:
for line in f:
if "model name" in line:
return line.split(":")[1].strip()
except Exception:
return platform.processor()
def log_system_info():
try:
logging.info("=" * 30)
logging.info("AUDITORIA DE AMBIENTE (HARDWARE)")
logging.info("=" * 30)
uname = platform.uname()
logging.info(f"Sistema: {uname.system} {uname.release}")
logging.info(f"Node: {uname.node}")
cpu_model = get_cpu_model_name()
cores_logical = multiprocessing.cpu_count()
safe_cores = get_cpu_safe_count()
mem_total_gb = get_linux_mem_info()
logging.info(f"CPU Modelo: {cpu_model}")
logging.info(
f"Núcleos (Total): {cores_logical} | Workers Seguros: {safe_cores}"
)
logging.info(f"Memória Total: {mem_total_gb:.2f} GB")
total, used, free = shutil.disk_usage(".")
logging.info(f"Disco (Livre): {free // (2**30)} GB")
logging.info("=" * 30)
except Exception as e:
logging.warning(f"Falha ao coletar info do sistema: {e}")
def setup_logger():
os.makedirs(LOG_FOLDER, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_filename = os.path.join(LOG_FOLDER, f"execucao_{timestamp}.log")
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s", datefmt="%d/%m/%Y %H:%M:%S"
)
file_handler = logging.FileHandler(log_filename, encoding="utf-8")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger, log_filename
def process_single_file(args):
"""
Worker robusto com CONTADOR COMPARTILHADO (THREAD-SAFE).
"""
# Desempacota os argumentos (O CONTADOR NÃO VEM MAIS AQUI)
file_name, clean_slug, total_files = args
file_path = os.path.join(MMP_FOLDER, file_name)
pid = os.getpid()
# === Lógica do Contador (Acessando via Global) ===
# O global_counter foi injetado pelo init_worker
global global_counter
current_idx = 0
if global_counter:
with global_counter.get_lock():
global_counter.value += 1
current_idx = global_counter.value
remaining = total_files - current_idx
# Log com status de progresso
logging.info(
f"[PID {pid}] [Progresso: {current_idx}/{total_files} | Faltam: {remaining}] Iniciando: {file_name}"
)
# Métricas para Auditoria
start_time = time.time()
ram_usage_mb = 0.0
result = {
"success": False,
"file": file_name,
"slug": clean_slug,
"data": None,
"error": None,
"duration_s": 0.0,
"ram_mb": 0.0,
"file_size_mb": 0.0,
}
try:
# Checagem de Tamanho
if os.path.exists(file_path):
result["file_size_mb"] = os.path.getsize(file_path) / (1024 * 1024)
# Checagem de Segurança de RAM
mem = psutil.virtual_memory()
if (mem.available / (1024 * 1024)) < MIN_RAM_FREE_MB:
result["error"] = "RAM Insuficiente para iniciar worker"
logging.warning(f"[PID {pid}] Pulei {file_name}: RAM Crítica")
return result
original_base_name = os.path.splitext(file_name)[0]
wav_name = clean_slug + ".wav"
json_name = clean_slug + ".json"
yml_name = clean_slug + ".yml"
target_mmp_path = ""
# 1. Tratamento e Extração
if file_name.endswith(".mmpz"):
destination_path = os.path.join(MMPZ_FOLDER, file_name)
if not os.path.exists(destination_path):
shutil.move(file_path, destination_path)
elif os.path.exists(file_path):
os.remove(file_path)
mmp_temp_name = clean_slug + ".mmp"
output_mmp_path = os.path.join(MMP_FOLDER, mmp_temp_name)
abs_dest = os.path.abspath(destination_path)
abs_mmp_out = os.path.abspath(output_mmp_path)
with open(abs_mmp_out, "w") as outfile:
subprocess.run(
["lmms", "--dump", abs_dest],
stdout=outfile,
stderr=subprocess.PIPE,
check=True,
timeout=60,
env={"QT_QPA_PLATFORM": "offscreen", **os.environ},
)
target_mmp_path = output_mmp_path
elif file_name.endswith(".mmp"):
target_mmp_path = file_path
# 2. GERAÇÃO DE ÁUDIO
if os.path.exists(target_mmp_path):
abs_target_mmp = os.path.abspath(target_mmp_path)
abs_wav_out = os.path.abspath(os.path.join(WAV_FOLDER, wav_name))
render_process = subprocess.run(
["lmms", "-r", abs_target_mmp, "-o", abs_wav_out, "-f", "wav"],
check=False,
capture_output=True,
text=True,
timeout=TIMEOUT_RENDER_SECONDS,
env={"QT_QPA_PLATFORM": "offscreen", **os.environ},
)
if render_process.returncode != 0:
err_msg = (
render_process.stderr
if render_process.stderr
else "Erro desconhecido"
)
if render_process.returncode == -11:
err_msg = f"LMMS CRASH (SIGSEGV): {err_msg}"
raise subprocess.CalledProcessError(
render_process.returncode,
"lmms render",
output=render_process.stdout,
stderr=err_msg,
)
logging.info(f"[PID {pid}] Áudio WAV gerado: {wav_name}")
else:
raise FileNotFoundError(
"Arquivo MMP alvo não encontrado para renderização."
)
# 3. Parsing e Salvamento INDIVIDUAL
if os.path.exists(target_mmp_path):
mmp_data = parse_mmp_file(target_mmp_path)
if mmp_data:
mmp_data["file"] = clean_slug
mmp_data["original_title"] = original_base_name
# Salva metadados individuais
save_to_json(mmp_data, os.path.join(METADATA_FOLDER, json_name))
save_to_yaml(mmp_data, os.path.join(DATA_FOLDER, yml_name))
result["success"] = True
result["data"] = mmp_data
else:
result["error"] = "Dados vazios após parsing."
else:
result["error"] = "Arquivo MMP não encontrado para parsing."
try:
process = psutil.Process(pid)
ram_usage_mb = process.memory_info().rss / (1024 * 1024)
except:
ram_usage_mb = 0.0
except subprocess.TimeoutExpired:
result["error"] = f"Timeout ({TIMEOUT_RENDER_SECONDS}s) excedido."
logging.error(f"[PID {pid}] TIMEOUT em {file_name}")
except subprocess.CalledProcessError as e:
if isinstance(e.stderr, bytes):
err_msg = e.stderr.decode("utf-8", errors="ignore")
else:
err_msg = str(e.stderr) if e.stderr else str(e)
result["error"] = f"Erro LMMS (Code {e.returncode}): {err_msg}"
logging.error(f"[PID {pid}] Falha LMMS: {result['error']}")
except Exception as e:
result["error"] = f"Erro geral: {str(e)}"
logging.error(f"[PID {pid}] {file_name}: {result['error']}")
result["duration_s"] = time.time() - start_time
result["ram_mb"] = ram_usage_mb
return result
def main_parallel():
logger, log_file_path = setup_logger()
start_time_global = time.time()
audit_csv_path = os.path.join(SAIDA_ANALISES, "audit_pipeline.csv")
check_system_dependencies()
log_system_info()
# Cria pastas (incluindo a de Lotes)
create_folders_if_not_exist(
[MMPZ_FOLDER, WAV_FOLDER, METADATA_FOLDER, DATA_FOLDER, LOTES_FOLDER]
)
logging.info(
"=== Iniciando Pipeline Otimizado (Multi-Batch + Paralelo + Contador) ==="
)
if not os.path.exists(MMP_FOLDER):
logging.critical(f"Pasta {MMP_FOLDER} não encontrada.")
return
all_files_raw = [f for f in os.listdir(MMP_FOLDER) if f.endswith((".mmp", ".mmpz"))]
if not all_files_raw:
logging.warning("Nenhum arquivo encontrado.")
return
# === APLICAÇÃO DO LIMITE DE ARQUIVOS (ALTERAÇÃO AQUI) ===
if LIMIT_FILES and LIMIT_FILES > 0:
logging.info(
f"MODO LIMITADO: Processando apenas os primeiros {LIMIT_FILES} arquivos."
)
all_files_raw = all_files_raw[:LIMIT_FILES]
total_files = len(all_files_raw)
logging.info(f"Total de arquivos a processar: {total_files}")
# === CONFIGURAÇÃO DO CONTADOR COMPARTILHADO ===
# Usamos multiprocessing.Value diretamente (Shared Memory)
shared_counter = multiprocessing.Value("i", 0)
# === PRÉ-PROCESSAMENTO DOS SLUGS ===
tasks = []
invalid_count = 0
logging.info("Preparando tarefas...")
for file_name in all_files_raw:
original_base = os.path.splitext(file_name)[0]
slug = slugify(original_base)
if not slug:
invalid_count += 1
slug = f"titulo-incorreto-{invalid_count}"
logging.warning(
f"Nome inválido detectado: '{file_name}'. Renomeando para '{slug}'"
)
# Passamos APENAS os dados estáticos. O contador vai pelo initializer.
tasks.append((file_name, slug, total_files))
# === EXECUÇÃO PARALELA ===
num_cores = get_cpu_safe_count()
logging.info(f"Processando com {num_cores} workers (Safe Mode).")
with open(audit_csv_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(
[
"Timestamp",
"Arquivo_Original",
"Slug",
"Tamanho_MB",
"Duracao_Sec",
"RAM_Worker_MB",
"Status",
"Erro",
]
)
successful_data = []
failed_files = []
# === O POOL AGORA USA INITIALIZER ===
# Isso injeta o shared_counter em cada worker de forma segura antes das tarefas começarem
with multiprocessing.Pool(
processes=num_cores, initializer=init_worker, initargs=(shared_counter,)
) as pool:
results = pool.map(process_single_file, tasks)
# Coleta de resultados
with open(audit_csv_path, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
for res in results:
ts = datetime.now().strftime("%H:%M:%S")
status = "SUCESSO" if res["success"] else "FALHA"
writer.writerow(
[
ts,
res["file"],
res["slug"],
f"{res['file_size_mb']:.2f}",
f"{res['duration_s']:.2f}",
f"{res['ram_mb']:.2f}",
status,
res["error"] or "",
]
)
if res["success"]:
successful_data.append(res["data"])
else:
failed_files.append(res)
# === SALVAMENTO EM LOTES (BATCHING) ===
if successful_data:
logging.info(f"Total processado com sucesso: {len(successful_data)}")
logging.info(f"Iniciando gravação em lotes (Batch Size: {BATCH_SIZE})...")
# 1. Salvar JSON Completo (Geralmente seguro)
logging.info("Salvando all.json completo...")
save_to_json(successful_data, os.path.join(METADATA_FOLDER, "all.json"))
# 2. Salvar YAML em Fatias (Para evitar memory overflow)
total_items = len(successful_data)
for i in range(0, total_items, BATCH_SIZE):
batch = successful_data[i : i + BATCH_SIZE]
batch_num = (i // BATCH_SIZE) + 1
batch_filename = os.path.join(
LOTES_FOLDER, f"projetos_lote_{batch_num:03d}.yml"
)
logging.info(
f"Gravando Lote {batch_num} ({len(batch)} itens) em {batch_filename}..."
)
save_to_yaml(batch, batch_filename)
logging.info("Gravação de lotes concluída.")
# Geração dos manifestos
try:
manifest_report = generate_manifests(SRC_MMPSEARCH)
except Exception as e:
manifest_report = {"generated": [], "failed": [str(e)]}
duration = time.time() - start_time_global
logging.info("=" * 60)
logging.info(
f"FIM - Tempo Total: {str(timedelta(seconds=int(duration)))} | Sucessos: {len(successful_data)} | Falhas: {len(failed_files)}"
)
logging.info(f"Relatório de Auditoria: {audit_csv_path}")
if failed_files:
logging.info("--- Detalhe das Falhas ---")
for f in failed_files:
logging.error(f"{f['file']}: {f['error']}")
try:
check_dependencies(
os.path.join(METADATA_FOLDER, "all.json"),
os.path.join(METADATA_FOLDER, "samples-manifest.json"),
os.path.join(METADATA_FOLDER, "dependency_report.json"),
)
except Exception:
pass
if __name__ == "__main__":
main_parallel()