mmpSearch/scripts/handler/main.py

370 lines
12 KiB
Python
Executable File

# main.py
import os
import json
import shutil
import subprocess
import multiprocessing
import logging
import time
import platform
import sys
import re
import unicodedata
from datetime import datetime
# Importando seus módulos locais
from generate_manifest import generate_manifests
from file_parser import parse_mmp_file
from file_saver import save_to_json, save_to_yaml
from dependency_checker import check_dependencies
from utils import (
create_folders_if_not_exist,
BASE_PATH,
DATA_FOLDER,
METADATA_FOLDER,
WAV_FOLDER,
MMPZ_FOLDER,
MMP_FOLDER,
SRC_MMPSEARCH,
LOG_FOLDER,
)
# === Função de Sanitização (Slugify) ===
def slugify(value):
"""
Normaliza a string: remove acentos, converte para minúsculas,
remove caracteres não alfanuméricos e substitui espaços por hifens.
Ex: 'Ação & Reação!' -> 'acao-reacao'
"""
value = str(value)
# Normaliza unicode (ex: ã -> a)
value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')
value = value.lower()
# Remove tudo que não for letra, número ou espaço/hífen
value = re.sub(r'[^\w\s-]', '', value)
# Substitui espaços e underlines por hífen
value = re.sub(r'[-\s_]+', '-', value)
return value.strip('-_')
def check_system_dependencies():
"""Verifica se as ferramentas necessárias estão instaladas antes de iniciar."""
required_tools = ["lmms"]
missing = []
for tool in required_tools:
if shutil.which(tool) is None:
missing.append(tool)
if missing:
logging.critical(f"FERRAMENTAS FALTANDO: {', '.join(missing)}")
logging.critical("Por favor instale: sudo apt-get install " + " ".join(missing))
sys.exit(1)
def get_linux_mem_info():
try:
with open("/proc/meminfo", "r") as f:
for line in f:
if "MemTotal" in line:
kb_value = int(line.split()[1])
return kb_value / (1024 * 1024)
except (IOError, ValueError):
return 0
except Exception as e:
logging.warning(f"Erro ao ler memória: {e}")
return 0
def get_cpu_model_name():
try:
with open("/proc/cpuinfo", "r") as f:
for line in f:
if "model name" in line:
return line.split(":")[1].strip()
except Exception:
return platform.processor()
def log_system_info():
try:
logging.info("=" * 30)
logging.info("AUDITORIA DE AMBIENTE (HARDWARE)")
logging.info("=" * 30)
uname = platform.uname()
logging.info(f"Sistema: {uname.system} {uname.release}")
logging.info(f"Node: {uname.node}")
cpu_model = get_cpu_model_name()
cores_logical = multiprocessing.cpu_count()
mem_total_gb = get_linux_mem_info()
logging.info(f"CPU Modelo: {cpu_model}")
logging.info(f"Núcleos: {cores_logical}")
logging.info(f"Memória Total: {mem_total_gb:.2f} GB")
total, used, free = shutil.disk_usage(".")
logging.info(f"Disco (Livre): {free // (2**30)} GB")
logging.info("=" * 30)
except Exception as e:
logging.warning(f"Falha ao coletar info do sistema: {e}")
def setup_logger():
os.makedirs(LOG_FOLDER, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_filename = os.path.join(LOG_FOLDER, f"execucao_{timestamp}.log")
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s", datefmt="%d/%m/%Y %H:%M:%S"
)
file_handler = logging.FileHandler(log_filename, encoding="utf-8")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger, log_filename
def process_single_file(args):
"""
Worker robusto.
Garante geração de WAV tanto para .mmpz quanto para .mmp.
"""
# === TRATAMENTO DE ENTRADA (Tupla ou String) ===
if isinstance(args, str):
file_name = args
original_base = os.path.splitext(file_name)[0]
clean_slug = slugify(original_base)
if not clean_slug:
clean_slug = f"upload-incorreto-{int(time.time())}"
else:
file_name, clean_slug = args
# ===============================================
file_path = os.path.join(MMP_FOLDER, file_name)
pid = os.getpid()
result = {"success": False, "file": file_name, "data": None, "error": None}
# Recupera o nome original base para exibição
original_base_name = os.path.splitext(file_name)[0]
# Define os nomes de saída baseados no Slug
wav_name = clean_slug + ".wav"
json_name = clean_slug + ".json"
yml_name = clean_slug + ".yml"
# Variável que vai armazenar o caminho final do arquivo .mmp a ser processado/renderizado
target_mmp_path = ""
try:
logging.info(f"[PID {pid}] Processando: {file_name} -> Slug: {clean_slug}")
# 1. Tratamento e Extração
if file_name.endswith(".mmpz"):
destination_path = os.path.join(MMPZ_FOLDER, file_name)
# Move o mmpz original para a pasta de backup/mmpz
if not os.path.exists(destination_path):
shutil.move(file_path, destination_path)
elif os.path.exists(file_path):
os.remove(file_path)
# Define o nome do arquivo MMP extraído usando o SLUG
mmp_temp_name = clean_slug + ".mmp"
output_mmp_path = os.path.join(MMP_FOLDER, mmp_temp_name)
abs_dest = os.path.abspath(destination_path)
abs_mmp_out = os.path.abspath(output_mmp_path)
# Comando para descompactar (dump)
dump_cmd = ["lmms", "--dump", abs_dest]
try:
with open(abs_mmp_out, "w") as outfile:
subprocess.run(
dump_cmd,
stdout=outfile,
stderr=subprocess.PIPE,
check=True,
env={"QT_QPA_PLATFORM": "offscreen", **os.environ},
)
# Define o arquivo extraído como o alvo para renderização e parsing
target_mmp_path = output_mmp_path
except subprocess.CalledProcessError as e:
result["error"] = f"Erro no dump MMP: {e.stderr.decode('utf-8')}"
logging.error(f"[PID {pid}] {result['error']}")
return result
elif file_name.endswith(".mmp"):
# Se já for MMP, ele é o próprio alvo
target_mmp_path = file_path
# 2. GERAÇÃO DE ÁUDIO (AGORA FORA DO IF/ELSE)
# Isso garante que rode tanto para MMPZ (usando o extraído) quanto para MMP puro
if os.path.exists(target_mmp_path):
abs_target_mmp = os.path.abspath(target_mmp_path)
abs_wav_out = os.path.abspath(os.path.join(WAV_FOLDER, wav_name))
wav_cmd = [
"lmms",
"-r",
abs_target_mmp, # Usa sempre o .mmp descompactado
"-o",
abs_wav_out,
"-f",
"wav",
]
try:
subprocess.run(wav_cmd, check=True, capture_output=True, text=True)
logging.info(f"[PID {pid}] Áudio WAV gerado com sucesso: {wav_name}")
except subprocess.CalledProcessError as e:
logging.warning(f"[PID {pid}] Falha na geração do WAV: {e.stderr}")
else:
result["error"] = "Arquivo alvo MMP não encontrado para renderização."
logging.error(f"[PID {pid}] {result['error']}")
return result
# 3. Parsing e Salvamento de Dados
if os.path.exists(target_mmp_path):
mmp_data = parse_mmp_file(target_mmp_path)
if mmp_data:
# Injeta metadados de padronização
mmp_data["file"] = clean_slug
mmp_data["original_title"] = original_base_name
# Salva metadados
save_to_json(
mmp_data, os.path.join(METADATA_FOLDER, json_name)
)
save_to_yaml(mmp_data, os.path.join(DATA_FOLDER, yml_name))
result["success"] = True
result["data"] = mmp_data
# Mantemos o arquivo .mmp na pasta para referência futura
else:
result["error"] = "Dados vazios após parsing."
else:
result["error"] = "Arquivo MMP não encontrado para parsing."
except Exception as e:
result["error"] = f"Erro geral: {str(e)}"
logging.error(f"[PID {pid}] {file_name}: {result['error']}")
return result
def main_parallel():
logger, log_file_path = setup_logger()
start_time = time.time()
check_system_dependencies()
log_system_info()
logging.info("=== Iniciando Pipeline Otimizado (Com Sanitização e Persistência MMP) ===")
create_folders_if_not_exist([MMPZ_FOLDER, WAV_FOLDER, METADATA_FOLDER, DATA_FOLDER])
if not os.path.exists(MMP_FOLDER):
logging.critical(f"Pasta {MMP_FOLDER} não encontrada.")
return
all_files_raw = [f for f in os.listdir(MMP_FOLDER) if f.endswith((".mmp", ".mmpz"))]
if not all_files_raw:
logging.warning("Nenhum arquivo encontrado.")
return
# === PRÉ-PROCESSAMENTO DOS SLUGS ===
tasks = []
invalid_count = 0
logging.info("Calculando nomes sanitizados e preparando tarefas...")
for file_name in all_files_raw:
original_base = os.path.splitext(file_name)[0]
slug = slugify(original_base)
# Se o nome sanitizado ficou vazio (ex: "!!!.mmp" -> ""), gera nome padrão
if not slug:
invalid_count += 1
slug = f"titulo-incorreto-{invalid_count}"
logging.warning(f"Nome inválido detectado: '{file_name}'. Renomeando para '{slug}'")
tasks.append((file_name, slug))
num_cores = multiprocessing.cpu_count()
logging.info(f"Processando {len(tasks)} arquivos com {num_cores} workers.")
# O pool.map envia a tupla (arquivo, slug)
with multiprocessing.Pool(processes=num_cores) as pool:
results = pool.map(process_single_file, tasks)
successful_data = [r["data"] for r in results if r["success"]]
failed_files = [r for r in results if not r["success"]]
if successful_data:
save_to_json(successful_data, os.path.join(METADATA_FOLDER, "all.json"))
save_to_yaml(successful_data, os.path.join(DATA_FOLDER, "all.yml"))
try:
manifest_report = generate_manifests(SRC_MMPSEARCH)
except Exception as e:
manifest_report = {"generated": [], "failed": [str(e)]}
duration = time.time() - start_time
logging.info("=" * 60)
logging.info(
f"FIM - Tempo: {duration:.2f}s | Sucessos: {len(successful_data)} | Falhas: {len(failed_files)}"
)
if failed_files:
logging.info("--- Detalhe das Falhas ---")
for f in failed_files:
logging.error(f"{f['file']}: {f['error']}")
try:
check_dependencies(
os.path.join(METADATA_FOLDER, "all.json"),
os.path.join(METADATA_FOLDER, "samples-manifest.json"),
os.path.join(METADATA_FOLDER, "dependency_report.json"),
)
except Exception:
pass
def rebuild_indexes():
logging.info("Regerando índices globais (all.json / all.yml)...")
all_data = []
ignored_files = [
"all.json",
"samples-manifest.json",
"mmp-manifest.json",
"dependency_report.json",
]
if os.path.exists(METADATA_FOLDER):
for f in os.listdir(METADATA_FOLDER):
if f.endswith(".json") and f not in ignored_files:
try:
with open(
os.path.join(METADATA_FOLDER, f), "r", encoding="utf-8"
) as json_file:
data = json.load(json_file)
all_data.append(data)
except Exception as e:
logging.error(f"Erro ao ler {f} para índice global: {e}")
if all_data:
save_to_json(all_data, os.path.join(METADATA_FOLDER, "all.json"))
save_to_yaml(all_data, os.path.join(DATA_FOLDER, "all.yml"))
logging.info("Índices globais atualizados com sucesso.")
return len(all_data)
if __name__ == "__main__":
main_parallel()