# main.py import os import shutil import subprocess import multiprocessing import logging import time import platform from datetime import datetime # Importando seus módulos from generate_manifest import generate_manifests from file_parser import parse_mmp_file from file_saver import save_to_json, save_to_yaml from dependency_checker import check_dependencies from utils import ( create_folders_if_not_exist, BASE_PATH, DATA_FOLDER, METADATA_FOLDER, WAV_FOLDER, MMPZ_FOLDER, MMP_FOLDER, ) # Caminho para os Logs # Assumindo que este script roda dentro de scripts/handler/ CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) LOG_FOLDER = os.path.join(CURRENT_DIR, "logs") def get_linux_mem_info(): """Lê /proc/meminfo para obter memória total de forma nativa no Linux.""" try: with open("/proc/meminfo", "r") as f: for line in f: if "MemTotal" in line: # Ex: MemTotal: 16303032 kB kb_value = int(line.split()[1]) return kb_value / (1024 * 1024) # Retorna em GB except: return 0 def get_cpu_model_name(): """Lê /proc/cpuinfo para pegar o nome real do processador.""" try: with open("/proc/cpuinfo", "r") as f: for line in f: if "model name" in line: return line.split(":")[1].strip() except: return platform.processor() def log_system_info(): """ Registra informações do hardware usando apenas bibliotecas nativas do Python. """ try: logging.info("=" * 30) logging.info("AUDITORIA DE AMBIENTE (HARDWARE)") logging.info("=" * 30) # 1. Sistema Operacional uname = platform.uname() logging.info(f"Sistema: {uname.system} {uname.release}") logging.info(f"Kernel: {uname.version}") logging.info( f"Node: {uname.node}" ) # Nome do servidor (provavelmente 'alice') # 2. CPU (Processamento) cpu_model = get_cpu_model_name() cores_logical = multiprocessing.cpu_count() # os.sched_getaffinity pega os cores que o processo PODE usar (útil em cluster compartilhado) try: cores_available = len(os.sched_getaffinity(0)) except AttributeError: cores_available = cores_logical logging.info(f"CPU Modelo: {cpu_model}") logging.info(f"Núcleos (Log): {cores_logical}") logging.info(f"Núcleos (Disp): {cores_available} (Alocados para este script)") # 3. Memória RAM mem_total_gb = get_linux_mem_info() logging.info(f"Memória Total: {mem_total_gb:.2f} GB") # 4. Armazenamento (Disco onde o script está rodando) # Pega o uso do disco no diretório atual (.) total, used, free = shutil.disk_usage(".") logging.info(f"Disco (Total): {total // (2**30)} GB") logging.info(f"Disco (Livre): {free // (2**30)} GB") logging.info("=" * 30) except Exception as e: logging.warning(f"Falha ao coletar info do sistema: {e}") def setup_logger(): """ Configura o logger para escrever no terminal e em arquivo simultaneamente. """ # Cria pasta de logs se não existir os.makedirs(LOG_FOLDER, exist_ok=True) # Nome do arquivo com Timestamp timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") log_filename = os.path.join(LOG_FOLDER, f"execucao_{timestamp}.log") # Configuração do Logger logger = logging.getLogger() logger.setLevel(logging.INFO) # Formato da mensagem formatter = logging.Formatter( "%(asctime)s [%(levelname)s] %(message)s", datefmt="%d/%m/%Y %H:%M:%S" ) # Handler Arquivo file_handler = logging.FileHandler(log_filename, encoding="utf-8") file_handler.setFormatter(formatter) logger.addHandler(file_handler) # Handler Terminal (Console) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) return logger, log_filename def process_single_file(file_name): """ Worker: Processa um arquivo. Retorna um dict com status de sucesso/erro. """ file_path = os.path.join(MMP_FOLDER, file_name) pid = os.getpid() # Resultado padrão result = {"success": False, "file": file_name, "data": None, "error": None} try: logging.info(f"[PID {pid}] Processando: {file_name}") mmp_data = None if file_name.endswith(".mmpz"): destination_path = os.path.join(MMPZ_FOLDER, file_name) # Movimentação de arquivo if not os.path.exists(destination_path): shutil.move(file_path, destination_path) logging.info(f"[PID {pid}] Movido para: {destination_path}") else: logging.info( f"[PID {pid}] Arquivo já existe em {destination_path}, usando existente." ) if os.path.exists(file_path): os.remove(file_path) mmp_name = os.path.splitext(file_name)[0] + ".mmp" output_mmp_path = os.path.join(MMP_FOLDER, mmp_name) # Ambiente para LMMS Headless env = os.environ.copy() env["QT_QPA_PLATFORM"] = "offscreen" # Conversão MMPZ -> MMP convert_cmd = f'lmms --dump "{destination_path}" > "{output_mmp_path}"' try: # capture_output=True permite jogar o log do LMMS para o nosso arquivo de log subprocess.run( convert_cmd, shell=True, check=True, env=env, capture_output=True, text=True, ) logging.info(f"[PID {pid}] {file_name}: Convertido para .mmp") except subprocess.CalledProcessError as e: error_msg = f"Erro conversão .mmp (LMMS): {e.stderr}" logging.error(f"[PID {pid}] {file_name}: {error_msg}") result["error"] = error_msg return result # Conversão -> WAV wav_output_path = os.path.join(WAV_FOLDER, os.path.splitext(file_name)[0]) wav_convert_cmd = ( f'lmms -r "{destination_path}" -o "{wav_output_path}" -f wav' ) try: subprocess.run( wav_convert_cmd, shell=True, check=True, env=env, capture_output=True, text=True, ) logging.info(f"[PID {pid}] {file_name}: Convertido para .wav") except subprocess.CalledProcessError as e: # Erro no WAV não para o processo de indexação, apenas loga logging.warning( f"[PID {pid}] {file_name}: Falha no render WAV: {e.stderr}" ) if os.path.exists(output_mmp_path): mmp_data = parse_mmp_file(output_mmp_path) else: result["error"] = "Arquivo .mmp não foi gerado após dump." return result elif file_name.endswith(".mmp"): mmp_data = parse_mmp_file(file_path) # Salvamento Individual if mmp_data: json_file_path = os.path.join( METADATA_FOLDER, os.path.splitext(file_name)[0] + ".json" ) yaml_file_path = os.path.join( DATA_FOLDER, os.path.splitext(file_name)[0] + ".yml" ) save_to_yaml(mmp_data, yaml_file_path) save_to_json(mmp_data, json_file_path) result["success"] = True result["data"] = mmp_data return result else: result["error"] = "Parser retornou dados vazios." return result except Exception as e: error_msg = f"Exceção não tratada: {str(e)}" logging.error(f"[PID {pid}] {file_name}: {error_msg}") result["error"] = error_msg return result def main_parallel(): # 1. Configura Logger e Timer logger, log_file_path = setup_logger() start_time = time.time() # Informações do PC/S.O log_system_info() logging.info("=== Iniciando Pipeline de Indexação Paralela ===") logging.info(f"Log sendo salvo em: {log_file_path}") # 2. Criação de Pastas create_folders_if_not_exist([MMPZ_FOLDER, WAV_FOLDER, METADATA_FOLDER, DATA_FOLDER]) # 3. Listagem if not os.path.exists(MMP_FOLDER): logging.critical(f"Pasta {MMP_FOLDER} não encontrada.") return all_files = [ f for f in os.listdir(MMP_FOLDER) if f.endswith(".mmp") or f.endswith(".mmpz") ] total_files = len(all_files) if total_files == 0: logging.warning("Nenhum arquivo .mmp ou .mmpz encontrado para processar.") return logging.info(f"Iniciando processamento de {total_files} arquivos.") # 4. Pool Paralelo (ETAPA 1: Processamento Pesado) num_cores = multiprocessing.cpu_count() logging.info(f"Utilizando {num_cores} núcleos de CPU.") with multiprocessing.Pool(processes=num_cores) as pool: results = pool.map(process_single_file, all_files) # 5. Processamento dos Resultados successful_data = [] failed_files = [] for res in results: if res["success"]: successful_data.append(res["data"]) else: failed_files.append(res) # 6. Salvamento Consolidado if successful_data: save_to_json(successful_data, os.path.join(METADATA_FOLDER, "all.json")) save_to_yaml(successful_data, os.path.join(DATA_FOLDER, "all.yml")) logging.info("Arquivos 'all.json' e 'all.yml' gerados com sucesso.") # >>> ETAPA 2: Geração de Manifestos <<< manifest_report = {"generated": [], "failed": []} try: # Capturamos o relatório retornado pela função manifest_report = generate_manifests(BASE_PATH) except Exception as e: logging.error(f"Falha crítica no gerador de manifestos: {e}") # 7. Relatório Final Completo end_time = time.time() duration = end_time - start_time logging.info("=" * 60) logging.info( f"RELATÓRIO DE EXECUÇÃO - {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}" ) logging.info("=" * 60) logging.info(f"Tempo Total do Pipeline: {duration:.2f} segundos") # Seção 1: Processamento Paralelo (Arquivos MMP) logging.info("-" * 20) logging.info("[1] Indexação Paralela (Arquivos .mmp/.mmpz)") logging.info(f" Total Encontrado: {total_files}") logging.info(f" Sucessos: {len(successful_data)}") logging.info(f" Falhas: {len(failed_files)}") if failed_files: for fail in failed_files: logging.error(f" -> FALHA: {fail['file']} | Erro: {fail['error']}") # Seção 2: Geração de Manifestos logging.info("-" * 20) logging.info("[2] Geração de Manifestos (Frontend)") if manifest_report["generated"]: for item in manifest_report["generated"]: logging.info(f" [OK] {item}") if manifest_report["failed"]: for item in manifest_report["failed"]: logging.error(f" [X] {item}") if not manifest_report["generated"] and not manifest_report["failed"]: logging.warning(" [!] Nenhum manifesto foi processado.") logging.info("=" * 60) logging.info("Pipeline finalizado.") # >>> ETAPA 3: Análise de Integridade (NOVO) <<< try: # Caminhos (ajuste conforme sua estrutura real) PROJECTS_DB = os.path.join(METADATA_FOLDER, "all.json") SAMPLES_MANIFEST = os.path.join(METADATA_FOLDER, "samples-manifest.json") REPORT_FILE = os.path.join(METADATA_FOLDER, "dependency_report.json") if os.path.exists(PROJECTS_DB) and os.path.exists(SAMPLES_MANIFEST): check_dependencies(PROJECTS_DB, SAMPLES_MANIFEST, REPORT_FILE) else: logging.warning( "Arquivos necessários para verificação de dependência não encontrados." ) except Exception as e: logging.error(f"Erro na verificação de dependências: {e}") if __name__ == "__main__": main_parallel()