# main.py import os import json import shutil import subprocess import multiprocessing import logging import time import platform import sys import re import unicodedata from datetime import datetime # Importando seus módulos locais from generate_manifest import generate_manifests from file_parser import parse_mmp_file from file_saver import save_to_json, save_to_yaml from dependency_checker import check_dependencies from utils import ( create_folders_if_not_exist, BASE_PATH, DATA_FOLDER, METADATA_FOLDER, WAV_FOLDER, MMPZ_FOLDER, MMP_FOLDER, SRC_MMPSEARCH, LOG_FOLDER, ) # === Função de Sanitização (Slugify) === def slugify(value): """ Normaliza a string: remove acentos, converte para minúsculas, remove caracteres não alfanuméricos e substitui espaços por hifens. Ex: 'Ação & Reação!' -> 'acao-reacao' """ value = str(value) # Normaliza unicode (ex: ã -> a) value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii') value = value.lower() # Remove tudo que não for letra, número ou espaço/hífen value = re.sub(r'[^\w\s-]', '', value) # Substitui espaços e underlines por hífen value = re.sub(r'[-\s_]+', '-', value) return value.strip('-_') def check_system_dependencies(): """Verifica se as ferramentas necessárias estão instaladas antes de iniciar.""" required_tools = ["lmms"] missing = [] for tool in required_tools: if shutil.which(tool) is None: missing.append(tool) if missing: logging.critical(f"FERRAMENTAS FALTANDO: {', '.join(missing)}") logging.critical("Por favor instale: sudo apt-get install " + " ".join(missing)) sys.exit(1) def get_linux_mem_info(): try: with open("/proc/meminfo", "r") as f: for line in f: if "MemTotal" in line: kb_value = int(line.split()[1]) return kb_value / (1024 * 1024) except (IOError, ValueError): return 0 except Exception as e: logging.warning(f"Erro ao ler memória: {e}") return 0 def get_cpu_model_name(): try: with open("/proc/cpuinfo", "r") as f: for line in f: if "model name" in line: return line.split(":")[1].strip() except Exception: return platform.processor() def log_system_info(): try: logging.info("=" * 30) logging.info("AUDITORIA DE AMBIENTE (HARDWARE)") logging.info("=" * 30) uname = platform.uname() logging.info(f"Sistema: {uname.system} {uname.release}") logging.info(f"Node: {uname.node}") cpu_model = get_cpu_model_name() cores_logical = multiprocessing.cpu_count() mem_total_gb = get_linux_mem_info() logging.info(f"CPU Modelo: {cpu_model}") logging.info(f"Núcleos: {cores_logical}") logging.info(f"Memória Total: {mem_total_gb:.2f} GB") total, used, free = shutil.disk_usage(".") logging.info(f"Disco (Livre): {free // (2**30)} GB") logging.info("=" * 30) except Exception as e: logging.warning(f"Falha ao coletar info do sistema: {e}") def setup_logger(): os.makedirs(LOG_FOLDER, exist_ok=True) timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") log_filename = os.path.join(LOG_FOLDER, f"execucao_{timestamp}.log") logger = logging.getLogger() logger.setLevel(logging.INFO) formatter = logging.Formatter( "%(asctime)s [%(levelname)s] %(message)s", datefmt="%d/%m/%Y %H:%M:%S" ) file_handler = logging.FileHandler(log_filename, encoding="utf-8") file_handler.setFormatter(formatter) logger.addHandler(file_handler) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) return logger, log_filename def process_single_file(args): """ Worker robusto. Recebe uma tupla (file_name, clean_slug) pré-calculada para evitar colisão de nomes inválidos. """ file_name, clean_slug = args file_path = os.path.join(MMP_FOLDER, file_name) pid = os.getpid() result = {"success": False, "file": file_name, "data": None, "error": None} # Recupera o nome original base para exibição original_base_name = os.path.splitext(file_name)[0] # Define os nomes de saída baseados no Slug pré-calculado wav_name = clean_slug + ".wav" json_name = clean_slug + ".json" yml_name = clean_slug + ".yml" try: logging.info(f"[PID {pid}] Processando: {file_name} -> Slug: {clean_slug}") # 1. Tratamento MMPZ if file_name.endswith(".mmpz"): destination_path = os.path.join(MMPZ_FOLDER, file_name) # Move o mmpz original para a pasta de backup/mmpz if not os.path.exists(destination_path): shutil.move(file_path, destination_path) elif os.path.exists(file_path): # Se já existe lá, apaga o da origem para não duplicar/confundir os.remove(file_path) # Define o nome do arquivo MMP extraído usando o SLUG (padronizado) mmp_temp_name = clean_slug + ".mmp" output_mmp_path = os.path.join(MMP_FOLDER, mmp_temp_name) abs_dest = os.path.abspath(destination_path) abs_mmp_out = os.path.abspath(output_mmp_path) # Comando para descompactar (dump) dump_cmd = ["lmms", "--dump", abs_dest] try: with open(abs_mmp_out, "w") as outfile: subprocess.run( dump_cmd, stdout=outfile, stderr=subprocess.PIPE, check=True, env={"QT_QPA_PLATFORM": "offscreen", **os.environ}, ) except subprocess.CalledProcessError as e: result["error"] = f"Erro no dump MMP: {e.stderr.decode('utf-8')}" logging.error(f"[PID {pid}] {result['error']}") return result # Comando para gerar WAV abs_wav_out = os.path.abspath(os.path.join(WAV_FOLDER, wav_name)) wav_cmd = [ "lmms", "-r", abs_dest, "-o", abs_wav_out, "-f", "wav", ] try: subprocess.run(wav_cmd, check=True, capture_output=True, text=True) logging.info(f"[PID {pid}] Áudio WAV gerado: {wav_name}") except subprocess.CalledProcessError as e: logging.warning(f"[PID {pid}] Falha no WAV: {e.stderr}") # Define qual arquivo será lido pelo parser file_to_parse = output_mmp_path elif file_name.endswith(".mmp"): # Se já for MMP, usa ele direto file_to_parse = file_path # (Opcional) Gerar WAV para .mmp puro se necessário # Se quiser gerar wav para .mmp puro, a lógica seria similar à acima # 2. Parsing e Salvamento if os.path.exists(file_to_parse): mmp_data = parse_mmp_file(file_to_parse) if mmp_data: # Injeta metadados de padronização mmp_data["file"] = clean_slug mmp_data["original_title"] = original_base_name # Salva metadados usando os nomes sanitizados save_to_json( mmp_data, os.path.join(METADATA_FOLDER, json_name) ) save_to_yaml(mmp_data, os.path.join(DATA_FOLDER, yml_name)) result["success"] = True result["data"] = mmp_data # === IMPORTANTE: NÃO DELETAMOS MAIS O ARQUIVO MMP === # O arquivo file_to_parse (que agora é slug.mmp) permanece na pasta. else: result["error"] = "Dados vazios após parsing." else: result["error"] = "Arquivo MMP não encontrado para parsing." except Exception as e: result["error"] = f"Erro geral: {str(e)}" logging.error(f"[PID {pid}] {file_name}: {result['error']}") return result def main_parallel(): logger, log_file_path = setup_logger() start_time = time.time() check_system_dependencies() log_system_info() logging.info("=== Iniciando Pipeline Otimizado (Com Sanitização e Persistência MMP) ===") create_folders_if_not_exist([MMPZ_FOLDER, WAV_FOLDER, METADATA_FOLDER, DATA_FOLDER]) if not os.path.exists(MMP_FOLDER): logging.critical(f"Pasta {MMP_FOLDER} não encontrada.") return all_files_raw = [f for f in os.listdir(MMP_FOLDER) if f.endswith((".mmp", ".mmpz"))] if not all_files_raw: logging.warning("Nenhum arquivo encontrado.") return # === PRÉ-PROCESSAMENTO DOS SLUGS === # Calcula os slugs sequencialmente para garantir contagem correta de nomes inválidos tasks = [] invalid_count = 0 logging.info("Calculando nomes sanitizados e preparando tarefas...") for file_name in all_files_raw: original_base = os.path.splitext(file_name)[0] slug = slugify(original_base) # Se o nome sanitizado ficou vazio (ex: "!!!.mmp" -> ""), gera nome padrão if not slug: invalid_count += 1 slug = f"titulo-incorreto-{invalid_count}" logging.warning(f"Nome inválido detectado: '{file_name}'. Renomeando para '{slug}'") tasks.append((file_name, slug)) num_cores = multiprocessing.cpu_count() logging.info(f"Processando {len(tasks)} arquivos com {num_cores} workers.") # O pool.map agora envia a tupla (arquivo, slug) with multiprocessing.Pool(processes=num_cores) as pool: results = pool.map(process_single_file, tasks) successful_data = [r["data"] for r in results if r["success"]] failed_files = [r for r in results if not r["success"]] if successful_data: save_to_json(successful_data, os.path.join(METADATA_FOLDER, "all.json")) save_to_yaml(successful_data, os.path.join(DATA_FOLDER, "all.yml")) try: manifest_report = generate_manifests(SRC_MMPSEARCH) except Exception as e: manifest_report = {"generated": [], "failed": [str(e)]} duration = time.time() - start_time logging.info("=" * 60) logging.info( f"FIM - Tempo: {duration:.2f}s | Sucessos: {len(successful_data)} | Falhas: {len(failed_files)}" ) if failed_files: logging.info("--- Detalhe das Falhas ---") for f in failed_files: logging.error(f"{f['file']}: {f['error']}") try: check_dependencies( os.path.join(METADATA_FOLDER, "all.json"), os.path.join(METADATA_FOLDER, "samples-manifest.json"), os.path.join(METADATA_FOLDER, "dependency_report.json"), ) except Exception: pass def rebuild_indexes(): logging.info("Regerando índices globais (all.json / all.yml)...") all_data = [] ignored_files = [ "all.json", "samples-manifest.json", "mmp-manifest.json", "dependency_report.json", ] if os.path.exists(METADATA_FOLDER): for f in os.listdir(METADATA_FOLDER): if f.endswith(".json") and f not in ignored_files: try: with open( os.path.join(METADATA_FOLDER, f), "r", encoding="utf-8" ) as json_file: data = json.load(json_file) all_data.append(data) except Exception as e: logging.error(f"Erro ao ler {f} para índice global: {e}") if all_data: save_to_json(all_data, os.path.join(METADATA_FOLDER, "all.json")) save_to_yaml(all_data, os.path.join(DATA_FOLDER, "all.yml")) logging.info("Índices globais atualizados com sucesso.") return len(all_data) if __name__ == "__main__": main_parallel()