# main.py import os import shutil import subprocess import multiprocessing import logging import time import platform import sys import re import unicodedata import csv import psutil from datetime import datetime, timedelta # Importando seus módulos locais from generate_manifest import generate_manifests # from src_instruments import find_audio_files from file_parser import parse_mmp_file from file_saver import save_to_json, save_to_yaml from dependency_checker import check_dependencies from utils import ( create_folders_if_not_exist, DATA_FOLDER, METADATA_FOLDER, WAV_FOLDER, MMPZ_FOLDER, MMP_FOLDER, SRC_MMPSEARCH, LOG_FOLDER, SAIDA_ANALISES, ) # === CONFIGURAÇÕES DE SEGURANÇA E LOTES === TIMEOUT_RENDER_SECONDS = 300 # 5 minutos máximo por arquivo MIN_RAM_FREE_MB = 500 # Mínimo de RAM livre para iniciar worker BATCH_SIZE = 250 # Tamanho do lote para o YAML LOTES_FOLDER = os.path.join(SRC_MMPSEARCH, "lotes_yaml") # Pasta dos lotes LIMIT_FILES = 1000 # <--- 0 = PROCESSA TUDO | >0 = LIMITA A QUANTIDADE (EX: 50) # === VARIÁVEL GLOBAL PARA O WORKER (CORREÇÃO MULTIPROCESSING) === # Necessário para evitar erro de pickle ao passar Lock via argumentos global_counter = None def init_worker(counter): """Inicializa a variável global dentro de cada processo worker""" global global_counter global_counter = counter # === Função de Sanitização (Slugify) === def slugify(value): """ Normaliza a string: remove acentos, converte para minúsculas, remove caracteres não alfanuméricos e substitui espaços por hifens. """ value = str(value) value = ( unicodedata.normalize("NFKD", value).encode("ascii", "ignore").decode("ascii") ) value = value.lower() value = re.sub(r"[^\w\s-]", "", value) value = re.sub(r"[-\s_]+", "-", value) return value.strip("-_") def check_system_dependencies(): """Verifica se as ferramentas necessárias estão instaladas.""" required_tools = ["lmms"] missing = [] for tool in required_tools: if shutil.which(tool) is None: missing.append(tool) if missing: logging.critical(f"FERRAMENTAS FALTANDO: {', '.join(missing)}") logging.critical("Por favor instale: sudo apt-get install " + " ".join(missing)) sys.exit(1) def get_cpu_safe_count(): """Retorna contagem de CPU menos 1 para não travar o SO.""" try: count = multiprocessing.cpu_count() return max(1, count - 1) except: return 1 # --- Funções de Info de Sistema --- def get_linux_mem_info(): try: with open("/proc/meminfo", "r") as f: for line in f: if "MemTotal" in line: kb_value = int(line.split()[1]) return kb_value / (1024 * 1024) except (IOError, ValueError): return 0 except Exception as e: logging.warning(f"Erro ao ler memória: {e}") return 0 def get_cpu_model_name(): try: with open("/proc/cpuinfo", "r") as f: for line in f: if "model name" in line: return line.split(":")[1].strip() except Exception: return platform.processor() def log_system_info(): try: logging.info("=" * 30) logging.info("AUDITORIA DE AMBIENTE (HARDWARE)") logging.info("=" * 30) uname = platform.uname() logging.info(f"Sistema: {uname.system} {uname.release}") logging.info(f"Node: {uname.node}") cpu_model = get_cpu_model_name() cores_logical = multiprocessing.cpu_count() safe_cores = get_cpu_safe_count() mem_total_gb = get_linux_mem_info() logging.info(f"CPU Modelo: {cpu_model}") logging.info( f"Núcleos (Total): {cores_logical} | Workers Seguros: {safe_cores}" ) logging.info(f"Memória Total: {mem_total_gb:.2f} GB") total, used, free = shutil.disk_usage(".") logging.info(f"Disco (Livre): {free // (2**30)} GB") logging.info("=" * 30) except Exception as e: logging.warning(f"Falha ao coletar info do sistema: {e}") def setup_logger(): os.makedirs(LOG_FOLDER, exist_ok=True) timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") log_filename = os.path.join(LOG_FOLDER, f"execucao_{timestamp}.log") logger = logging.getLogger() logger.setLevel(logging.INFO) formatter = logging.Formatter( "%(asctime)s [%(levelname)s] %(message)s", datefmt="%d/%m/%Y %H:%M:%S" ) file_handler = logging.FileHandler(log_filename, encoding="utf-8") file_handler.setFormatter(formatter) logger.addHandler(file_handler) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) return logger, log_filename def process_single_file(args): """ Worker robusto com CONTADOR COMPARTILHADO (THREAD-SAFE). """ # Desempacota os argumentos (O CONTADOR NÃO VEM MAIS AQUI) file_name, clean_slug, total_files = args file_path = os.path.join(MMP_FOLDER, file_name) pid = os.getpid() # === Lógica do Contador (Acessando via Global) === # O global_counter foi injetado pelo init_worker global global_counter current_idx = 0 if global_counter: with global_counter.get_lock(): global_counter.value += 1 current_idx = global_counter.value remaining = total_files - current_idx # Log com status de progresso logging.info( f"[PID {pid}] [Progresso: {current_idx}/{total_files} | Faltam: {remaining}] Iniciando: {file_name}" ) # Métricas para Auditoria start_time = time.time() ram_usage_mb = 0.0 result = { "success": False, "file": file_name, "slug": clean_slug, "data": None, "error": None, "duration_s": 0.0, "ram_mb": 0.0, "file_size_mb": 0.0, } try: # Checagem de Tamanho if os.path.exists(file_path): result["file_size_mb"] = os.path.getsize(file_path) / (1024 * 1024) # Checagem de Segurança de RAM mem = psutil.virtual_memory() if (mem.available / (1024 * 1024)) < MIN_RAM_FREE_MB: result["error"] = "RAM Insuficiente para iniciar worker" logging.warning(f"[PID {pid}] Pulei {file_name}: RAM Crítica") return result original_base_name = os.path.splitext(file_name)[0] wav_name = clean_slug + ".wav" json_name = clean_slug + ".json" yml_name = clean_slug + ".yml" target_mmp_path = "" # 1. Tratamento e Extração if file_name.endswith(".mmpz"): destination_path = os.path.join(MMPZ_FOLDER, file_name) if not os.path.exists(destination_path): shutil.move(file_path, destination_path) elif os.path.exists(file_path): os.remove(file_path) mmp_temp_name = clean_slug + ".mmp" output_mmp_path = os.path.join(MMP_FOLDER, mmp_temp_name) abs_dest = os.path.abspath(destination_path) abs_mmp_out = os.path.abspath(output_mmp_path) with open(abs_mmp_out, "w") as outfile: subprocess.run( ["lmms", "--dump", abs_dest], stdout=outfile, stderr=subprocess.PIPE, check=True, timeout=60, env={"QT_QPA_PLATFORM": "offscreen", **os.environ}, ) target_mmp_path = output_mmp_path elif file_name.endswith(".mmp"): target_mmp_path = file_path # 2. GERAÇÃO DE ÁUDIO if os.path.exists(target_mmp_path): abs_target_mmp = os.path.abspath(target_mmp_path) abs_wav_out = os.path.abspath(os.path.join(WAV_FOLDER, wav_name)) render_process = subprocess.run( ["lmms", "-r", abs_target_mmp, "-o", abs_wav_out, "-f", "wav"], check=False, capture_output=True, text=True, timeout=TIMEOUT_RENDER_SECONDS, env={"QT_QPA_PLATFORM": "offscreen", **os.environ}, ) if render_process.returncode != 0: err_msg = ( render_process.stderr if render_process.stderr else "Erro desconhecido" ) if render_process.returncode == -11: err_msg = f"LMMS CRASH (SIGSEGV): {err_msg}" raise subprocess.CalledProcessError( render_process.returncode, "lmms render", output=render_process.stdout, stderr=err_msg, ) logging.info(f"[PID {pid}] Áudio WAV gerado: {wav_name}") else: raise FileNotFoundError( "Arquivo MMP alvo não encontrado para renderização." ) # 3. Parsing e Salvamento INDIVIDUAL if os.path.exists(target_mmp_path): mmp_data = parse_mmp_file(target_mmp_path) if mmp_data: mmp_data["file"] = clean_slug mmp_data["original_title"] = original_base_name # Salva metadados individuais save_to_json(mmp_data, os.path.join(METADATA_FOLDER, json_name)) save_to_yaml(mmp_data, os.path.join(DATA_FOLDER, yml_name)) result["success"] = True result["data"] = mmp_data else: result["error"] = "Dados vazios após parsing." else: result["error"] = "Arquivo MMP não encontrado para parsing." try: process = psutil.Process(pid) ram_usage_mb = process.memory_info().rss / (1024 * 1024) except: ram_usage_mb = 0.0 except subprocess.TimeoutExpired: result["error"] = f"Timeout ({TIMEOUT_RENDER_SECONDS}s) excedido." logging.error(f"[PID {pid}] TIMEOUT em {file_name}") except subprocess.CalledProcessError as e: if isinstance(e.stderr, bytes): err_msg = e.stderr.decode("utf-8", errors="ignore") else: err_msg = str(e.stderr) if e.stderr else str(e) result["error"] = f"Erro LMMS (Code {e.returncode}): {err_msg}" logging.error(f"[PID {pid}] Falha LMMS: {result['error']}") except Exception as e: result["error"] = f"Erro geral: {str(e)}" logging.error(f"[PID {pid}] {file_name}: {result['error']}") result["duration_s"] = time.time() - start_time result["ram_mb"] = ram_usage_mb return result def main_parallel(): logger, log_file_path = setup_logger() start_time_global = time.time() audit_csv_path = os.path.join(SAIDA_ANALISES, "audit_pipeline.csv") check_system_dependencies() log_system_info() # Cria pastas (incluindo a de Lotes) create_folders_if_not_exist( [MMPZ_FOLDER, WAV_FOLDER, METADATA_FOLDER, DATA_FOLDER, LOTES_FOLDER] ) logging.info( "=== Iniciando Pipeline Otimizado (Multi-Batch + Paralelo + Contador) ===" ) if not os.path.exists(MMP_FOLDER): logging.critical(f"Pasta {MMP_FOLDER} não encontrada.") return all_files_raw = [f for f in os.listdir(MMP_FOLDER) if f.endswith((".mmp", ".mmpz"))] if not all_files_raw: logging.warning("Nenhum arquivo encontrado.") return # === APLICAÇÃO DO LIMITE DE ARQUIVOS (ALTERAÇÃO AQUI) === if LIMIT_FILES and LIMIT_FILES > 0: logging.info( f"MODO LIMITADO: Processando apenas os primeiros {LIMIT_FILES} arquivos." ) all_files_raw = all_files_raw[:LIMIT_FILES] total_files = len(all_files_raw) logging.info(f"Total de arquivos a processar: {total_files}") # === CONFIGURAÇÃO DO CONTADOR COMPARTILHADO === # Usamos multiprocessing.Value diretamente (Shared Memory) shared_counter = multiprocessing.Value("i", 0) # === PRÉ-PROCESSAMENTO DOS SLUGS === tasks = [] invalid_count = 0 logging.info("Preparando tarefas...") for file_name in all_files_raw: original_base = os.path.splitext(file_name)[0] slug = slugify(original_base) if not slug: invalid_count += 1 slug = f"titulo-incorreto-{invalid_count}" logging.warning( f"Nome inválido detectado: '{file_name}'. Renomeando para '{slug}'" ) # Passamos APENAS os dados estáticos. O contador vai pelo initializer. tasks.append((file_name, slug, total_files)) # === EXECUÇÃO PARALELA === num_cores = get_cpu_safe_count() logging.info(f"Processando com {num_cores} workers (Safe Mode).") with open(audit_csv_path, "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow( [ "Timestamp", "Arquivo_Original", "Slug", "Tamanho_MB", "Duracao_Sec", "RAM_Worker_MB", "Status", "Erro", ] ) successful_data = [] failed_files = [] # === O POOL AGORA USA INITIALIZER === # Isso injeta o shared_counter em cada worker de forma segura antes das tarefas começarem with multiprocessing.Pool( processes=num_cores, initializer=init_worker, initargs=(shared_counter,) ) as pool: results = pool.map(process_single_file, tasks) # Coleta de resultados with open(audit_csv_path, "a", newline="", encoding="utf-8") as f: writer = csv.writer(f) for res in results: ts = datetime.now().strftime("%H:%M:%S") status = "SUCESSO" if res["success"] else "FALHA" writer.writerow( [ ts, res["file"], res["slug"], f"{res['file_size_mb']:.2f}", f"{res['duration_s']:.2f}", f"{res['ram_mb']:.2f}", status, res["error"] or "", ] ) if res["success"]: successful_data.append(res["data"]) else: failed_files.append(res) # === SALVAMENTO EM LOTES (BATCHING) === if successful_data: logging.info(f"Total processado com sucesso: {len(successful_data)}") logging.info(f"Iniciando gravação em lotes (Batch Size: {BATCH_SIZE})...") # 1. Salvar JSON Completo logging.info("Salvando all.json completo...") save_to_json(successful_data, os.path.join(METADATA_FOLDER, "all.json")) # --- ADIÇÃO: Salvar YAML Completo (all.yml) --- logging.info("Salvando all.yml completo...") save_to_yaml(successful_data, os.path.join(DATA_FOLDER, "all.yml")) # ---------------------------------------------- # 2. Salvar YAML em Fatias (Para evitar memory overflow) total_items = len(successful_data) for i in range(0, total_items, BATCH_SIZE): batch = successful_data[i : i + BATCH_SIZE] batch_num = (i // BATCH_SIZE) + 1 batch_filename = os.path.join( LOTES_FOLDER, f"projetos_lote_{batch_num:03d}.yml" ) logging.info( f"Gravando Lote {batch_num} ({len(batch)} itens) em {batch_filename}..." ) save_to_yaml(batch, batch_filename) logging.info("Gravação de lotes concluída.") # Geração dos manifestos try: manifest_report = generate_manifests(SRC_MMPSEARCH) # base_dir_instruments = "/nethome/jotachina/projetos/mmpSearch/mmp/instruments" # manifest_instruments = find_audio_files(base_dir_instruments) except Exception as e: manifest_report = {"generated": [], "failed": [str(e)]} duration = time.time() - start_time_global logging.info("=" * 60) logging.info( f"FIM - Tempo Total: {str(timedelta(seconds=int(duration)))} | Sucessos: {len(successful_data)} | Falhas: {len(failed_files)}" ) logging.info(f"Relatório de Auditoria: {audit_csv_path}") if failed_files: logging.info("--- Detalhe das Falhas ---") for f in failed_files: logging.error(f"{f['file']}: {f['error']}") try: check_dependencies( os.path.join(METADATA_FOLDER, "all.json"), os.path.join(METADATA_FOLDER, "samples-manifest.json"), os.path.join(METADATA_FOLDER, "dependency_report.json"), ) except Exception: pass if __name__ == "__main__": main_parallel()