import os import sys import multiprocessing import concurrent.futures # NOVO: Necessário para usar o ProcessPoolExecutor import csv import time import glob from datetime import datetime, timedelta # ================= 1. LIMITADOR DE CPU DINÂMICO ================= def configurar_limites_cpu(): try: total_cores = multiprocessing.cpu_count() print("--- Configuração de Hardware ---") print(f"Núcleos Totais: {total_cores}") print("Núcleos Alocados: 1 thread por processo (Multiprocessing Real)") print("Núcleos Reservados para o Sistema: 1") # IMPORTANTE: Em processamento paralelo real (ProcessPoolExecutor), # as libs em C++ devem usar apenas "1" thread para evitar colisão na CPU (oversubscription) os.environ["OMP_NUM_THREADS"] = "1" os.environ["MKL_NUM_THREADS"] = "1" os.environ["TF_NUM_INTRAOP_THREADS"] = "1" os.environ["TF_NUM_INTEROP_THREADS"] = "1" os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" except Exception as e: print(f"Aviso: Não foi possível limitar CPU automaticamente: {e}") configurar_limites_cpu() # ================= IMPORTS ================= import json import yaml import logging import gc import numpy as np import psutil import essentia import essentia.standard as es from tqdm import tqdm import unicodedata # Desativa logs internos do Essentia essentia.log.info.active = False essentia.log.warning.active = False # ================= CONFIGURAÇÕES ================= BASE_DIR = "/var/www/html/trens/src_mmpSearch" PASTA_WAVS = os.path.join(BASE_DIR, "wav") PASTA_LOTES_YAML = os.path.join(BASE_DIR, "lotes_yaml") # --- SISTEMA DE DOIS MODELOS --- MODELO_EMBEDDING = "discogs-effnet-bs64-1.pb" MODELO_CLASSIFIER = "genre_discogs400.pb" MODELO_CLASSES = "genre_discogs400.json" # Saídas DIR_SAIDA = os.path.join(BASE_DIR, "saida_analises") os.makedirs(DIR_SAIDA, exist_ok=True) ARQUIVO_CHECKPOINT = os.path.join(DIR_SAIDA, "checkpoint_analises.jsonl") ARQUIVO_FINAL_JSON = os.path.join(DIR_SAIDA, "db_final_completo.json") ARQUIVO_AUDITORIA = os.path.join(DIR_SAIDA, "audit_performance.csv") # Limites de Segurança LIMITE_RAM_MINIMA_MB = 500 MAX_DURACAO_ANALISE = 20 * 60 # Logs LOG_DIR = os.path.join(BASE_DIR, "logs/classificacao") os.makedirs(LOG_DIR, exist_ok=True) TIMESTAMP = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") logging.basicConfig( filename=os.path.join(LOG_DIR, f"log_{TIMESTAMP}.log"), level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", ) DATABASE_YAML = {} # Variáveis globais para os Workers conseguirem enxergar os modelos em memória GLOBAL_EMBEDDING = None GLOBAL_CLASSIFIER = None GLOBAL_CLASSES = None # ================= CLASSE DE AUDITORIA OTIMIZADA ================= class Auditoria: def __init__(self, arquivo_csv): self.arquivo_csv = arquivo_csv self.inicio_global = time.time() self.process = psutil.Process(os.getpid()) # Mantém o arquivo aberto em modo 'append' na memória (Resolve Gargalo HD) cabecalho = not os.path.exists(self.arquivo_csv) self.file = open(self.arquivo_csv, "a", newline="", encoding="utf-8") self.writer = csv.writer(self.file) if cabecalho: self.writer.writerow( [ "timestamp", "arquivo", "tamanho_mb", "duracao_audio_s", "tempo_proc_s", "ram_uso_mb", "cpu_percent", ] ) self.file.flush() def registrar_processamento(self, nome_arquivo, tamanho_bytes, duracao_audio, tempo_gasto): try: ram_mb = self.process.memory_info().rss / (1024 * 1024) cpu_pct = self.process.cpu_percent(interval=None) tamanho_mb = tamanho_bytes / (1024 * 1024) agora = datetime.now().strftime("%H:%M:%S") # Escreve na mesma conexão de arquivo aberta self.writer.writerow( [ agora, nome_arquivo, round(tamanho_mb, 2), round(duracao_audio, 2), round(tempo_gasto, 4), round(ram_mb, 2), cpu_pct, ] ) self.file.flush() except Exception as e: logging.error(f"Erro ao auditar arquivo: {e}") def verificar_marco(self, contagem): pass # Você pode customizar logs de progresso aqui depois se quiser def fechar(self): try: self.file.close() except: pass # ================= FUNÇÕES AUXILIARES ================= def verificar_memoria_segura(tamanho_arquivo_bytes): mem = psutil.virtual_memory() livre_mb = mem.available / (1024 * 1024) estimativa_uso_ram = tamanho_arquivo_bytes * 15 uso_mb = estimativa_uso_ram / (1024 * 1024) if livre_mb < LIMITE_RAM_MINIMA_MB: print(f" [!] RAM Crítica ({livre_mb:.1f}MB livres). Forçando limpeza...") gc.collect() time.sleep(2) mem = psutil.virtual_memory() if mem.available / (1024 * 1024) < LIMITE_RAM_MINIMA_MB: return False, "RAM insuficiente no sistema" if uso_mb > (livre_mb * 0.8): return ( False, f"Arquivo muito grande para RAM atual (Est: {uso_mb:.1f}MB, Livre: {livre_mb:.1f}MB)", ) return True, "OK" def normalizar_chave(texto): if not texto: return "" s = str(texto) if "/" in s: s = os.path.basename(s) s = os.path.splitext(s)[0] s = unicodedata.normalize("NFKD", s).encode("ASCII", "ignore").decode("ASCII") return s.lower().replace(" ", "").replace("-", "").replace("_", "").strip() def carregar_database_yaml(): print(f"--- Carregando Metadados de Lotes YAML em: {PASTA_LOTES_YAML} ---") padrao = os.path.join(PASTA_LOTES_YAML, "*.yml") arquivos_lote = sorted(glob.glob(padrao)) db = {} if not arquivos_lote: logging.warning(f"Nenhum lote encontrado em {PASTA_LOTES_YAML}") return {} total_carregados = 0 for arquivo in arquivos_lote: try: with open(arquivo, "r", encoding="utf-8") as f: dados_lote = yaml.safe_load(f) if not dados_lote: continue lista = dados_lote if isinstance(dados_lote, list) else [dados_lote] for proj in lista: if not isinstance(proj, dict): continue candidatos = [ proj.get("file"), proj.get("original_title"), proj.get("src"), ] for k in candidatos: if k: chave = normalizar_chave(k) if chave: db[chave] = proj total_carregados += len(lista) print(f"-> Carregado: {os.path.basename(arquivo)} ({len(lista)} projetos)") except Exception as e: logging.error(f"Erro ao ler lote {arquivo}: {e}") print(f"X Erro ao ler {os.path.basename(arquivo)}") print(f"--- Total de Projetos no DB (Memória): {len(db)} (de {total_carregados} lidos) ---") return db def carregar_checkpoint(): processados = set() dados_existentes = [] if os.path.exists(ARQUIVO_CHECKPOINT): with open(ARQUIVO_CHECKPOINT, "r", encoding="utf-8") as f: for linha in f: try: obj = json.loads(linha) reprocessar = False if "analise_ia" in obj and "erro" in obj["analise_ia"]: reprocessar = True if "analise_tecnica" in obj: est = obj["analise_tecnica"].get("estrutura_segundos", []) if len(est) == 2 and est[1] == 12.0: reprocessar = True if reprocessar: continue processados.add(obj["arquivo"]) dados_existentes.append(obj) except: pass return processados, dados_existentes def salvar_progresso(resultado): with open(ARQUIVO_CHECKPOINT, "a", encoding="utf-8") as f: json.dump(resultado, f, ensure_ascii=False) f.write("\n") def calcular_complexidade(projeto_yaml): score = 0 W_INST_SYNTH, W_INST_SAMPLE = 3.0, 1.0 W_AUTOMATION, W_PATTERN, W_FX = 2.0, 0.5, 1.5 stats = {"num_automations": 0, "num_patterns": 0, "num_effects": 0, "num_synths": 0} tracks = projeto_yaml.get("tracks", []) for track in tracks: instruments = track.get("instruments", []) for inst in instruments: nome_plugin = inst.get("plugin_name", "").lower() nome_inst = inst.get("instrument_name", "").lower() if "automation" in nome_inst or "automation" in nome_plugin: score += W_AUTOMATION stats["num_automations"] += 1 elif nome_plugin in [ "zynaddsubfx", "monstro", "lb302", "watsyn", "opyd", "sfxr", ]: score += W_INST_SYNTH stats["num_synths"] += 1 else: score += W_INST_SAMPLE patterns = inst.get("patterns", []) qtd_patterns = len(patterns) score += qtd_patterns * W_PATTERN stats["num_patterns"] += qtd_patterns fxchain = inst.get("fxchain", {}) if str(fxchain.get("enabled")) == "1": num_fx = int(fxchain.get("numofeffects", 0)) score += num_fx * W_FX stats["num_effects"] += num_fx if score <= 15: estrelas = 1 elif score <= 40: estrelas = 2 elif score <= 80: estrelas = 3 elif score <= 120: estrelas = 4 else: estrelas = 5 return {"score_total": round(score, 2), "estrelas": estrelas, "detalhes": stats} # ================= NÚCLEO DE ANÁLISE ================= def detectar_estrutura(audio_vec, sample_rate, duration): try: if duration < 30: return [0.0, round(duration, 2)] frame_size = 2048 hop_size = 512 rms_algo = es.RMS() flux_algo = es.Flux() w = es.Windowing(type="hann") spec = es.Spectrum() rms_curve = [] flux_curve = [] for frame in es.FrameGenerator( audio_vec, frameSize=frame_size, hopSize=hop_size ): s = spec(w(frame)) rms_curve.append(rms_algo(frame)) flux_curve.append(flux_algo(s)) min_len = min(len(rms_curve), len(flux_curve)) rms_curve = np.array(rms_curve[:min_len]) flux_curve = np.array(flux_curve[:min_len]) if np.max(rms_curve) > 0: rms_curve = rms_curve / np.max(rms_curve) if np.max(flux_curve) > 0: flux_curve = flux_curve / np.max(flux_curve) novelty = rms_curve * 0.5 + flux_curve * 0.5 segundos_por_frame = hop_size / sample_rate pontos_mudanca = [0.0] window_frames = int(10 / segundos_por_frame) step_frames = int(5 / segundos_por_frame) total_frames = len(novelty) for i in range(0, total_frames - window_frames, step_frames): janela = novelty[i : i + window_frames] if np.std(janela) > 0.1: pico_idx = i + np.argmax(janela) tempo = round(pico_idx * segundos_por_frame, 2) if tempo - pontos_mudanca[-1] > 15: pontos_mudanca.append(tempo) pontos_mudanca.append(round(duration, 2)) return pontos_mudanca except Exception as e: logging.error(f"Erro estrutura nova: {e}") return [0.0, round(duration, 2)] def analisar_faixa(caminho_arquivo, embedding_model, classifier_model, classes_raw): # --- AUDITORIA: INÍCIO DO TIMER --- inicio_timer = time.perf_counter() tamanho_arquivo = os.path.getsize(caminho_arquivo) pode_processar, motivo = verificar_memoria_segura(tamanho_arquivo) if not pode_processar: logging.warning(f"SKIPPED {os.path.basename(caminho_arquivo)}: {motivo}") return None, 0, 0 nome_arquivo = os.path.basename(caminho_arquivo) chave_busca = normalizar_chave(nome_arquivo) dados_humano = DATABASE_YAML.get(chave_busca) metadata = { "arquivo": nome_arquivo, "caminho": caminho_arquivo, "dados_projeto": {}, "analise_tecnica": {}, "analise_ia": {}, } if dados_humano: metadata["dados_projeto"] = { "titulo": dados_humano.get("original_title") or dados_humano.get("file"), "bpm_original": dados_humano.get("bpm"), "tags_yaml": dados_humano.get("tags"), "autor": dados_humano.get("author", "Desconhecido"), } else: metadata["analise_tecnica"]["complexidade"] = { "score_total": 0, "estrelas": 0, "detalhes": "YAML não encontrado", } duracao_real = 0.0 # 1. ANÁLISE TÉCNICA E EXTRAÇÃO ÚNICA (HQ 44.1kHz) try: # Lê do HD APENAS UMA VEZ loader_hq = es.MonoLoader(filename=caminho_arquivo, sampleRate=44100) audio_hq = loader_hq() if len(audio_hq) > (MAX_DURACAO_ANALISE * 44100): audio_hq = audio_hq[: int(MAX_DURACAO_ANALISE * 44100)] duracao_real = len(audio_hq) / 44100.0 audio_hq_vec = essentia.array(audio_hq) rhythm = es.RhythmExtractor2013(method="multifeature") bpm, _, conf, _, _ = rhythm(audio_hq_vec) key_ex = es.KeyExtractor() key, scale, _ = key_ex(audio_hq_vec) loudness = es.Loudness()(audio_hq_vec) estrutura = detectar_estrutura(audio_hq_vec, 44100, duracao_real) bpm_final = round(bpm, 1) origem = "algoritmo" if dados_humano and dados_humano.get("bpm"): try: bpm_str = str(dados_humano.get("bpm")).replace("'", "").replace('"', "") b = float(bpm_str) if b > 0: bpm_final = b origem = "yaml (humano)" except: pass metadata["analise_tecnica"].update( { "bpm": bpm_final, "origem_bpm": origem, "tom": key, "escala": scale, "intensidade_db": round(loudness, 2), "estrutura_segundos": estrutura, "duracao_detectada": round(duracao_real, 2), } ) metadata["analise_tecnica"]["complexidade"] = ( calcular_complexidade(dados_humano) if dados_humano else None ) except Exception as e: logging.error(f"Erro math {nome_arquivo}: {e}") metadata["analise_tecnica"]["erro"] = str(e) return metadata, 0, 0 # 2. ANÁLISE IA (LQ 16kHz via Resample da RAM - Resolve Leitura Dupla de HD) if embedding_model and classifier_model and classes_raw: try: resample_algo = es.Resample(inputSampleRate=44100, outputSampleRate=16000) audio_lq_vec = resample_algo(audio_hq_vec) embeddings = embedding_model(audio_lq_vec) predictions = classifier_model(embeddings) avg_preds = np.mean(predictions, axis=0) top_indices = np.argsort(avg_preds)[::-1][:10] tags_detectadas = [] genero_pai = "Unknown" for i, idx in enumerate(top_indices): score = float(avg_preds[idx]) if score < 0.05: continue nome_full = classes_raw[idx] partes = nome_full.split("---") tag_limpa = partes[-1] if i == 0: genero_pai = partes[0] if len(partes) > 0 else tag_limpa tags_detectadas.append({"tag": tag_limpa, "score": round(score, 3)}) metadata["analise_ia"] = { "genero_macro": genero_pai, "estilo_principal": tags_detectadas[0]["tag"] if tags_detectadas else "Unknown", "nuvem_tags": tags_detectadas, } del audio_lq_vec del embeddings except Exception as e: msg = f"Erro IA Pipeline: {str(e)}" logging.error(msg) metadata["analise_ia"] = {"erro": msg} else: metadata["analise_ia"] = {"status": "Modelos nao carregados"} # Limpeza Limpa del audio_hq del audio_hq_vec gc.collect() tempo_gasto = time.perf_counter() - inicio_timer return metadata, duracao_real, tempo_gasto # ================= MAIN (Multiprocessing) ================= def worker_analise(caminho): # Processa usando as instâncias dos modelos carregadas na thread mestre global GLOBAL_EMBEDDING, GLOBAL_CLASSIFIER, GLOBAL_CLASSES return analisar_faixa(caminho, GLOBAL_EMBEDDING, GLOBAL_CLASSIFIER, GLOBAL_CLASSES) def main(): print("\n--- INICIANDO PROCESSADOR + AUDITORIA ---") # Referencia variávies globais do script para alimentar os childs process do pool global DATABASE_YAML, GLOBAL_EMBEDDING, GLOBAL_CLASSIFIER, GLOBAL_CLASSES DATABASE_YAML = carregar_database_yaml() ja_processados, lista_resultados = carregar_checkpoint() auditor = Auditoria(ARQUIVO_AUDITORIA) if not os.path.exists(PASTA_WAVS): print("CRÍTICO: Pasta WAV não encontrada!") return todos = [f for f in os.listdir(PASTA_WAVS) if f.lower().endswith((".wav", ".ogg"))] a_fazer = [os.path.join(PASTA_WAVS, f) for f in todos if f not in ja_processados] total = len(a_fazer) print(f"Total Arquivos: {len(todos)} | Novos a Fazer: {total}") print(f"Auditoria será salva em: {ARQUIVO_AUDITORIA}") # === CARREGAMENTO MODELOS GLOBAIS === path_embed = MODELO_EMBEDDING if os.path.exists(MODELO_EMBEDDING) else os.path.join(BASE_DIR, MODELO_EMBEDDING) path_class = MODELO_CLASSIFIER if os.path.exists(MODELO_CLASSIFIER) else os.path.join(BASE_DIR, MODELO_CLASSIFIER) path_json = MODELO_CLASSES if os.path.exists(MODELO_CLASSES) else os.path.join(BASE_DIR, MODELO_CLASSES) if os.path.exists(path_embed) and os.path.exists(path_class) and os.path.exists(path_json): print("--- Carregando Modelos de IA ---") try: with open(path_json, "r") as f: GLOBAL_CLASSES = json.load(f)["classes"] print("[1/2] Carregando Extrator de Embeddings...") if hasattr(es, "TensorflowPredictEffnetDiscogs"): GLOBAL_EMBEDDING = es.TensorflowPredictEffnetDiscogs(graphFilename=path_embed, output="PartitionedCall:1") else: print(" -> Usando TensorflowPredict genérico.") GLOBAL_EMBEDDING = es.TensorflowPredict(graphFilename=path_embed, input="serving_default_model_Placeholder", output="PartitionedCall:1") print(" -> Embeddings carregados com sucesso.") GLOBAL_CLASSIFIER = es.TensorflowPredict2D(graphFilename=path_class, input="serving_default_model_Placeholder", output="PartitionedCall:0") print("[2/2] Classificador de Gênero carregado.") except Exception as e: print(f"ERRO CRÍTICO AO CARREGAR IA: {e}") return else: print("Arquivos de modelo incompletos.") return if total == 0: print("Tudo atualizado.") return # ProcessPoolExecutor lidará com N filas ao mesmo tempo max_workers = max(1, multiprocessing.cpu_count() - 1) print(f"-> Iniciando processamento paralelo com {max_workers} workers.") contagem_sessao = 0 try: with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor: resultados_futuros = {executor.submit(worker_analise, caminho): caminho for caminho in a_fazer} # O as_completed processa os que terminam mais rápido em vez de travar na ordem for future in tqdm(concurrent.futures.as_completed(resultados_futuros), total=len(a_fazer), unit="track"): try: res, duracao, tempo_gasto = future.result() caminho = resultados_futuros[future] if res: salvar_progresso(res) lista_resultados.append(res) tamanho = os.path.getsize(caminho) auditor.registrar_processamento(res['arquivo'], tamanho, duracao, tempo_gasto) contagem_sessao += 1 auditor.verificar_marco(contagem_sessao) except Exception as e: logging.error(f"Erro ao processar um dos arquivos no worker: {e}") except KeyboardInterrupt: print("\nParando paralelismo graciosamente...") finally: auditor.fechar() # Fecha o CSV com segurança tempo_total_sessao = time.time() - auditor.inicio_global print("\n--- FIM DO PROCESSAMENTO ---") print(f"Tempo Total: {str(timedelta(seconds=int(tempo_total_sessao)))}") print("Gerando JSON final consolidado...") with open(ARQUIVO_FINAL_JSON, "w", encoding="utf-8") as f: json.dump(lista_resultados, f, indent=4, ensure_ascii=False) print(f"Dados JSON: {ARQUIVO_FINAL_JSON}") print(f"Auditoria CSV: {ARQUIVO_AUDITORIA}") if __name__ == "__main__": main()