# main.py import os import shutil import subprocess import multiprocessing import logging import time import platform import sys import re import unicodedata import csv import psutil import glob import json from datetime import datetime, timedelta # Importando seus módulos locais from generate_manifest import generate_manifests # from src_instruments import find_audio_files from file_parser import parse_mmp_file from file_saver import save_to_json, save_to_yaml from dependency_checker import check_dependencies from utils import ( create_folders_if_not_exist, DATA_FOLDER, METADATA_FOLDER, WAV_FOLDER, MMPZ_FOLDER, MMP_FOLDER, SRC_MMPSEARCH, LOG_FOLDER, SAIDA_ANALISES, ) # === CONFIGURAÇÕES DE SEGURANÇA E LOTES === TIMEOUT_RENDER_SECONDS = 300 # 5 minutos máximo por arquivo MIN_RAM_FREE_MB = 500 # Mínimo de RAM livre para iniciar worker BATCH_SIZE = 250 # Tamanho do lote para o YAML LOTES_FOLDER = os.path.join(SRC_MMPSEARCH, "lotes_yaml") # Pasta dos lotes LIMIT_FILES = 300 # <--- 0 = PROCESSA TUDO | >0 = LIMITA A QUANTIDADE (EX: 50) # === VARIÁVEL GLOBAL PARA O WORKER (CORREÇÃO MULTIPROCESSING) === # Necessário para evitar erro de pickle ao passar Lock via argumentos global_counter = None def create_lightweight_dataset(data_list): """ Filtra apenas os dados cruciais para o frontend (Jekyll), removendo dados inúteis de automação e notas MIDI pesadas que inflam o YAML, mas preservando os parâmetros dos synths para a Web Audio API. """ light_data = [] for proj in data_list: # 1. Dados de cabeçalho p_light = { "file": proj.get("file"), "original_title": proj.get("original_title"), "bpm": proj.get("bpm"), "tags": proj.get("tags", {}), } # 2. Filtragem das Tracks p_light["tracks"] = [] for track in proj.get("tracks", []): t_light = {} # Extrai apenas chaves de identificação da track usadas no frontend for key in ["bassline_name", "instrument_name", "sample_name", "sample"]: if key in track: t_light[key] = track[key] # 3. Preservar Instrumentos e "Enxugar" os Patterns if "instruments" in track: t_light["instruments"] = [] for inst in track["instruments"]: # Copiamos o objeto inteiro do instrumento (O JS InstrumentFactory precisa disso) inst_light = inst.copy() # Limpeza pesada: Removemos notas MIDI, mantemos apenas 'name' e 'steps' (Grid 1/0) if "patterns" in inst: pats_light = [] for pat in inst["patterns"]: pats_light.append( { "name": pat.get("name", ""), "steps": pat.get("steps", []), } ) inst_light["patterns"] = pats_light t_light["instruments"].append(inst_light) p_light["tracks"].append(t_light) light_data.append(p_light) return light_data def init_worker(counter): """Inicializa a variável global dentro de cada processo worker""" global global_counter global_counter = counter # === Função de Sanitização (Slugify) === def slugify(value): """ Normaliza a string: remove acentos, converte para minúsculas, remove caracteres não alfanuméricos e substitui espaços por hifens. """ value = str(value) value = ( unicodedata.normalize("NFKD", value).encode("ascii", "ignore").decode("ascii") ) value = value.lower() value = re.sub(r"[^\w\s-]", "", value) value = re.sub(r"[-\s_]+", "-", value) return value.strip("-_") def check_system_dependencies(): """Verifica se as ferramentas necessárias estão instaladas.""" required_tools = ["lmms"] missing = [] for tool in required_tools: if shutil.which(tool) is None: missing.append(tool) if missing: logging.critical(f"FERRAMENTAS FALTANDO: {', '.join(missing)}") logging.critical("Por favor instale: sudo apt-get install " + " ".join(missing)) sys.exit(1) def get_cpu_safe_count(): """Retorna contagem de CPU menos 1 para não travar o SO.""" try: count = multiprocessing.cpu_count() return max(1, count - 1) except: return 1 # --- Funções de Info de Sistema --- def get_linux_mem_info(): try: with open("/proc/meminfo", "r") as f: for line in f: if "MemTotal" in line: kb_value = int(line.split()[1]) return kb_value / (1024 * 1024) except (IOError, ValueError): return 0 except Exception as e: logging.warning(f"Erro ao ler memória: {e}") return 0 def get_cpu_model_name(): try: with open("/proc/cpuinfo", "r") as f: for line in f: if "model name" in line: return line.split(":")[1].strip() except Exception: return platform.processor() def log_system_info(): try: logging.info("=" * 30) logging.info("AUDITORIA DE AMBIENTE (HARDWARE)") logging.info("=" * 30) uname = platform.uname() logging.info(f"Sistema: {uname.system} {uname.release}") logging.info(f"Node: {uname.node}") cpu_model = get_cpu_model_name() cores_logical = multiprocessing.cpu_count() safe_cores = get_cpu_safe_count() mem_total_gb = get_linux_mem_info() logging.info(f"CPU Modelo: {cpu_model}") logging.info( f"Núcleos (Total): {cores_logical} | Workers Seguros: {safe_cores}" ) logging.info(f"Memória Total: {mem_total_gb:.2f} GB") total, used, free = shutil.disk_usage(".") logging.info(f"Disco (Livre): {free // (2**30)} GB") logging.info("=" * 30) except Exception as e: logging.warning(f"Falha ao coletar info do sistema: {e}") def setup_logger(): os.makedirs(LOG_FOLDER, exist_ok=True) timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") log_filename = os.path.join(LOG_FOLDER, f"execucao_{timestamp}.log") logger = logging.getLogger() logger.setLevel(logging.INFO) formatter = logging.Formatter( "%(asctime)s [%(levelname)s] %(message)s", datefmt="%d/%m/%Y %H:%M:%S" ) file_handler = logging.FileHandler(log_filename, encoding="utf-8") file_handler.setFormatter(formatter) logger.addHandler(file_handler) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) return logger, log_filename def process_single_file(args): """ Worker robusto com CONTADOR COMPARTILHADO (THREAD-SAFE). """ # Desempacota os argumentos (O CONTADOR NÃO VEM MAIS AQUI) file_name, clean_slug, total_files = args file_path = os.path.join(MMP_FOLDER, file_name) pid = os.getpid() # === Lógica do Contador (Acessando via Global) === # O global_counter foi injetado pelo init_worker global global_counter current_idx = 0 if global_counter: with global_counter.get_lock(): global_counter.value += 1 current_idx = global_counter.value remaining = total_files - current_idx # Log com status de progresso logging.info( f"[PID {pid}] [Progresso: {current_idx}/{total_files} | Faltam: {remaining}] Iniciando: {file_name}" ) # Métricas para Auditoria start_time = time.time() ram_usage_mb = 0.0 result = { "success": False, "file": file_name, "slug": clean_slug, "data": None, "error": None, "duration_s": 0.0, "ram_mb": 0.0, "file_size_mb": 0.0, } try: # Checagem de Tamanho if os.path.exists(file_path): result["file_size_mb"] = os.path.getsize(file_path) / (1024 * 1024) # Checagem de Segurança de RAM mem = psutil.virtual_memory() if (mem.available / (1024 * 1024)) < MIN_RAM_FREE_MB: result["error"] = "RAM Insuficiente para iniciar worker" logging.warning(f"[PID {pid}] Pulei {file_name}: RAM Crítica") return result original_base_name = os.path.splitext(file_name)[0] ogg_name = clean_slug + ".ogg" json_name = clean_slug + ".json" yml_name = clean_slug + ".yml" target_mmp_path = "" # 1. Tratamento e Extração if file_name.endswith(".mmpz"): destination_path = os.path.join(MMPZ_FOLDER, file_name) if not os.path.exists(destination_path): shutil.move(file_path, destination_path) elif os.path.exists(file_path): os.remove(file_path) mmp_temp_name = clean_slug + ".mmp" output_mmp_path = os.path.join(MMP_FOLDER, mmp_temp_name) abs_dest = os.path.abspath(destination_path) abs_mmp_out = os.path.abspath(output_mmp_path) with open(abs_mmp_out, "w") as outfile: subprocess.run( ["lmms", "--dump", abs_dest], stdout=outfile, stderr=subprocess.PIPE, check=True, timeout=60, env={"QT_QPA_PLATFORM": "offscreen", **os.environ}, ) target_mmp_path = output_mmp_path elif file_name.endswith(".mmp"): target_mmp_path = file_path # 2. GERAÇÃO DE ÁUDIO if os.path.exists(target_mmp_path): abs_target_mmp = os.path.abspath(target_mmp_path) abs_ogg_out = os.path.abspath(os.path.join(WAV_FOLDER, ogg_name)) render_process = subprocess.run( ["lmms", "-r", abs_target_mmp, "-o", abs_ogg_out, "-f", "ogg"], check=False, capture_output=True, text=True, timeout=TIMEOUT_RENDER_SECONDS, env={"QT_QPA_PLATFORM": "offscreen", **os.environ}, ) if render_process.returncode != 0: err_msg = ( render_process.stderr if render_process.stderr else "Erro desconhecido" ) if render_process.returncode == -11: err_msg = f"LMMS CRASH (SIGSEGV): {err_msg}" raise subprocess.CalledProcessError( render_process.returncode, "lmms render", output=render_process.stdout, stderr=err_msg, ) logging.info(f"[PID {pid}] Áudio ogg gerado: {ogg_name}") else: raise FileNotFoundError( "Arquivo MMP alvo não encontrado para renderização." ) # 3. Parsing e Salvamento INDIVIDUAL if os.path.exists(target_mmp_path): mmp_data = parse_mmp_file(target_mmp_path) if mmp_data: mmp_data["file"] = clean_slug mmp_data["original_title"] = original_base_name # Salva metadados individuais save_to_json(mmp_data, os.path.join(METADATA_FOLDER, json_name)) save_to_yaml(mmp_data, os.path.join(DATA_FOLDER, yml_name)) result["success"] = True result["data"] = mmp_data else: result["error"] = "Dados vazios após parsing." else: result["error"] = "Arquivo MMP não encontrado para parsing." try: process = psutil.Process(pid) ram_usage_mb = process.memory_info().rss / (1024 * 1024) except: ram_usage_mb = 0.0 except subprocess.TimeoutExpired: result["error"] = f"Timeout ({TIMEOUT_RENDER_SECONDS}s) excedido." logging.error(f"[PID {pid}] TIMEOUT em {file_name}") except subprocess.CalledProcessError as e: if isinstance(e.stderr, bytes): err_msg = e.stderr.decode("utf-8", errors="ignore") else: err_msg = str(e.stderr) if e.stderr else str(e) result["error"] = f"Erro LMMS (Code {e.returncode}): {err_msg}" logging.error(f"[PID {pid}] Falha LMMS: {result['error']}") except Exception as e: result["error"] = f"Erro geral: {str(e)}" logging.error(f"[PID {pid}] {file_name}: {result['error']}") result["duration_s"] = time.time() - start_time result["ram_mb"] = ram_usage_mb return result def rebuild_all_yamls(): """Lê todos os JSONs individuais e reconstrói APENAS os arquivos necessários (JSON e YAML leve).""" logging.info("Reconstruindo all.json e all_leve.yml a partir dos projetos individuais...") all_data = [] # Busca todos os arquivos .json na pasta de metadados json_files = glob.glob(os.path.join(METADATA_FOLDER, "*.json")) for j_file in json_files: filename = os.path.basename(j_file) if filename in ["all.json", "samples-manifest.json", "mmp-manifest.json", "dependency_report.json"]: continue try: with open(j_file, 'r', encoding='utf-8') as f: data = json.load(f) all_data.append(data) except Exception as e: logging.error(f"Erro ao ler {j_file} para agregar: {e}") if all_data: # 1. Salva o JSON completo (A biblioteca JSON é nativa e MUITO mais eficiente em RAM) save_to_json(all_data, os.path.join(METADATA_FOLDER, "all.json")) # 2. Transforma o 'elefante' num 'camundongo' ANTES de passar pro YAML dados_leves = create_lightweight_dataset(all_data) # 3. Salva APENAS a versão leve para o Jekyll (Isso não vai estourar a memória) save_to_yaml(dados_leves, os.path.join(DATA_FOLDER, "all_leve.yml")) logging.info(f"Arquivos agregados (all.json / all_leve.yml) atualizados com {len(all_data)} projetos!") def main_parallel(): logger, log_file_path = setup_logger() start_time_global = time.time() audit_csv_path = os.path.join(SAIDA_ANALISES, "audit_pipeline.csv") check_system_dependencies() log_system_info() # Cria pastas (incluindo a de Lotes) create_folders_if_not_exist( [MMPZ_FOLDER, WAV_FOLDER, METADATA_FOLDER, DATA_FOLDER, LOTES_FOLDER] ) logging.info( "=== Iniciando Pipeline Otimizado (Multi-Batch + Paralelo + Contador) ===" ) if not os.path.exists(MMP_FOLDER): logging.critical(f"Pasta {MMP_FOLDER} não encontrada.") return all_files_raw = [f for f in os.listdir(MMP_FOLDER) if f.endswith((".mmp", ".mmpz"))] if not all_files_raw: logging.warning("Nenhum arquivo encontrado.") return # === APLICAÇÃO DO LIMITE DE ARQUIVOS (ALTERAÇÃO AQUI) === if LIMIT_FILES and LIMIT_FILES > 0: logging.info( f"MODO LIMITADO: Processando apenas os primeiros {LIMIT_FILES} arquivos." ) all_files_raw = all_files_raw[:LIMIT_FILES] total_files = len(all_files_raw) logging.info(f"Total de arquivos a processar: {total_files}") # === CONFIGURAÇÃO DO CONTADOR COMPARTILHADO === # Usamos multiprocessing.Value diretamente (Shared Memory) shared_counter = multiprocessing.Value("i", 0) # === PRÉ-PROCESSAMENTO DOS SLUGS === tasks = [] invalid_count = 0 logging.info("Preparando tarefas...") for file_name in all_files_raw: original_base = os.path.splitext(file_name)[0] slug = slugify(original_base) if not slug: invalid_count += 1 slug = f"titulo-incorreto-{invalid_count}" logging.warning( f"Nome inválido detectado: '{file_name}'. Renomeando para '{slug}'" ) # Passamos APENAS os dados estáticos. O contador vai pelo initializer. tasks.append((file_name, slug, total_files)) # === EXECUÇÃO PARALELA === num_cores = get_cpu_safe_count() logging.info(f"Processando com {num_cores} workers (Safe Mode).") with open(audit_csv_path, "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow( [ "Timestamp", "Arquivo_Original", "Slug", "Tamanho_MB", "Duracao_Sec", "RAM_Worker_MB", "Status", "Erro", ] ) successful_data = [] failed_files = [] # === O POOL AGORA USA INITIALIZER === # Isso injeta o shared_counter em cada worker de forma segura antes das tarefas começarem with multiprocessing.Pool( processes=num_cores, initializer=init_worker, initargs=(shared_counter,) ) as pool: results = pool.map(process_single_file, tasks) # Coleta de resultados with open(audit_csv_path, "a", newline="", encoding="utf-8") as f: writer = csv.writer(f) for res in results: ts = datetime.now().strftime("%H:%M:%S") status = "SUCESSO" if res["success"] else "FALHA" writer.writerow( [ ts, res["file"], res["slug"], f"{res['file_size_mb']:.2f}", f"{res['duration_s']:.2f}", f"{res['ram_mb']:.2f}", status, res["error"] or "", ] ) if res["success"]: successful_data.append(res["data"]) else: failed_files.append(res) # === SALVAMENTO EM LOTES (BATCHING) === if successful_data: logging.info(f"Total processado com sucesso: {len(successful_data)}") logging.info(f"Iniciando gravação em lotes (Batch Size: {BATCH_SIZE})...") # 1. Salvar JSON Completo logging.info("Salvando all.json completo...") save_to_json(successful_data, os.path.join(METADATA_FOLDER, "all.json")) # --- Salvar YAML Completo (all.yml) --- logging.info("Salvando all.yml completo...") save_to_yaml(successful_data, os.path.join(DATA_FOLDER, "all.yml")) # ---------------------------------------------- # --- SALVAR VERSÃO LEVE PARA O JEKYLL --- logging.info("Gerando e salvando all_leve.yml para o Jekyll...") dados_leves = create_lightweight_dataset(successful_data) save_to_yaml(dados_leves, os.path.join(DATA_FOLDER, "all_leve.yml")) # 2. Salvar YAML em Fatias (Para evitar memory overflow) total_items = len(successful_data) for i in range(0, total_items, BATCH_SIZE): batch = successful_data[i : i + BATCH_SIZE] batch_num = (i // BATCH_SIZE) + 1 batch_filename = os.path.join( LOTES_FOLDER, f"projetos_lote_{batch_num:03d}.yml" ) logging.info( f"Gravando Lote {batch_num} ({len(batch)} itens) em {batch_filename}..." ) save_to_yaml(batch, batch_filename) logging.info("Gravação de lotes concluída.") # Geração dos manifestos try: manifest_report = generate_manifests(SRC_MMPSEARCH) # base_dir_instruments = "/nethome/jotachina/projetos/mmpSearch/mmp/instruments" # manifest_instruments = find_audio_files(base_dir_instruments) except Exception as e: manifest_report = {"generated": [], "failed": [str(e)]} duration = time.time() - start_time_global logging.info("=" * 60) logging.info( f"FIM - Tempo Total: {str(timedelta(seconds=int(duration)))} | Sucessos: {len(successful_data)} | Falhas: {len(failed_files)}" ) logging.info(f"Relatório de Auditoria: {audit_csv_path}") if failed_files: logging.info("--- Detalhe das Falhas ---") for f in failed_files: logging.error(f"{f['file']}: {f['error']}") try: check_dependencies( os.path.join(METADATA_FOLDER, "all.json"), os.path.join(METADATA_FOLDER, "samples-manifest.json"), os.path.join(METADATA_FOLDER, "dependency_report.json"), ) except Exception: pass if __name__ == "__main__": main_parallel()