mmpSearch/scripts/handler/main.py

371 lines
12 KiB
Python

# main.py
import os
import shutil
import subprocess
import multiprocessing
import logging
import time
import platform
from datetime import datetime
# Importando seus módulos
from generate_manifest import generate_manifests
from file_parser import parse_mmp_file
from file_saver import save_to_json, save_to_yaml
from dependency_checker import check_dependencies
from utils import (
create_folders_if_not_exist,
BASE_PATH,
DATA_FOLDER,
METADATA_FOLDER,
WAV_FOLDER,
MMPZ_FOLDER,
MMP_FOLDER,
)
# Caminho para os Logs
# Assumindo que este script roda dentro de scripts/handler/
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
LOG_FOLDER = os.path.join(CURRENT_DIR, "logs")
def get_linux_mem_info():
"""Lê /proc/meminfo para obter memória total de forma nativa no Linux."""
try:
with open("/proc/meminfo", "r") as f:
for line in f:
if "MemTotal" in line:
# Ex: MemTotal: 16303032 kB
kb_value = int(line.split()[1])
return kb_value / (1024 * 1024) # Retorna em GB
except:
return 0
def get_cpu_model_name():
"""Lê /proc/cpuinfo para pegar o nome real do processador."""
try:
with open("/proc/cpuinfo", "r") as f:
for line in f:
if "model name" in line:
return line.split(":")[1].strip()
except:
return platform.processor()
def log_system_info():
"""
Registra informações do hardware usando apenas bibliotecas nativas do Python.
"""
try:
logging.info("=" * 30)
logging.info("AUDITORIA DE AMBIENTE (HARDWARE)")
logging.info("=" * 30)
# 1. Sistema Operacional
uname = platform.uname()
logging.info(f"Sistema: {uname.system} {uname.release}")
logging.info(f"Kernel: {uname.version}")
logging.info(
f"Node: {uname.node}"
) # Nome do servidor (provavelmente 'alice')
# 2. CPU (Processamento)
cpu_model = get_cpu_model_name()
cores_logical = multiprocessing.cpu_count()
# os.sched_getaffinity pega os cores que o processo PODE usar (útil em cluster compartilhado)
try:
cores_available = len(os.sched_getaffinity(0))
except AttributeError:
cores_available = cores_logical
logging.info(f"CPU Modelo: {cpu_model}")
logging.info(f"Núcleos (Log): {cores_logical}")
logging.info(f"Núcleos (Disp): {cores_available} (Alocados para este script)")
# 3. Memória RAM
mem_total_gb = get_linux_mem_info()
logging.info(f"Memória Total: {mem_total_gb:.2f} GB")
# 4. Armazenamento (Disco onde o script está rodando)
# Pega o uso do disco no diretório atual (.)
total, used, free = shutil.disk_usage(".")
logging.info(f"Disco (Total): {total // (2**30)} GB")
logging.info(f"Disco (Livre): {free // (2**30)} GB")
logging.info("=" * 30)
except Exception as e:
logging.warning(f"Falha ao coletar info do sistema: {e}")
def setup_logger():
"""
Configura o logger para escrever no terminal e em arquivo simultaneamente.
"""
# Cria pasta de logs se não existir
os.makedirs(LOG_FOLDER, exist_ok=True)
# Nome do arquivo com Timestamp
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_filename = os.path.join(LOG_FOLDER, f"execucao_{timestamp}.log")
# Configuração do Logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Formato da mensagem
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s", datefmt="%d/%m/%Y %H:%M:%S"
)
# Handler Arquivo
file_handler = logging.FileHandler(log_filename, encoding="utf-8")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Handler Terminal (Console)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger, log_filename
def process_single_file(file_name):
"""
Worker: Processa um arquivo. Retorna um dict com status de sucesso/erro.
"""
file_path = os.path.join(MMP_FOLDER, file_name)
pid = os.getpid()
# Resultado padrão
result = {"success": False, "file": file_name, "data": None, "error": None}
try:
logging.info(f"[PID {pid}] Processando: {file_name}")
mmp_data = None
if file_name.endswith(".mmpz"):
destination_path = os.path.join(MMPZ_FOLDER, file_name)
# Movimentação de arquivo
if not os.path.exists(destination_path):
shutil.move(file_path, destination_path)
logging.info(f"[PID {pid}] Movido para: {destination_path}")
else:
logging.info(
f"[PID {pid}] Arquivo já existe em {destination_path}, usando existente."
)
if os.path.exists(file_path):
os.remove(file_path)
mmp_name = os.path.splitext(file_name)[0] + ".mmp"
output_mmp_path = os.path.join(MMP_FOLDER, mmp_name)
# Ambiente para LMMS Headless
env = os.environ.copy()
env["QT_QPA_PLATFORM"] = "offscreen"
# Conversão MMPZ -> MMP
convert_cmd = f'lmms --dump "{destination_path}" > "{output_mmp_path}"'
try:
# capture_output=True permite jogar o log do LMMS para o nosso arquivo de log
subprocess.run(
convert_cmd,
shell=True,
check=True,
env=env,
capture_output=True,
text=True,
)
logging.info(f"[PID {pid}] {file_name}: Convertido para .mmp")
except subprocess.CalledProcessError as e:
error_msg = f"Erro conversão .mmp (LMMS): {e.stderr}"
logging.error(f"[PID {pid}] {file_name}: {error_msg}")
result["error"] = error_msg
return result
# Conversão -> WAV
wav_output_path = os.path.join(WAV_FOLDER, os.path.splitext(file_name)[0])
wav_convert_cmd = (
f'lmms -r "{destination_path}" -o "{wav_output_path}" -f wav'
)
try:
subprocess.run(
wav_convert_cmd,
shell=True,
check=True,
env=env,
capture_output=True,
text=True,
)
logging.info(f"[PID {pid}] {file_name}: Convertido para .wav")
except subprocess.CalledProcessError as e:
# Erro no WAV não para o processo de indexação, apenas loga
logging.warning(
f"[PID {pid}] {file_name}: Falha no render WAV: {e.stderr}"
)
if os.path.exists(output_mmp_path):
mmp_data = parse_mmp_file(output_mmp_path)
else:
result["error"] = "Arquivo .mmp não foi gerado após dump."
return result
elif file_name.endswith(".mmp"):
mmp_data = parse_mmp_file(file_path)
# Salvamento Individual
if mmp_data:
json_file_path = os.path.join(
METADATA_FOLDER, os.path.splitext(file_name)[0] + ".json"
)
yaml_file_path = os.path.join(
DATA_FOLDER, os.path.splitext(file_name)[0] + ".yml"
)
save_to_yaml(mmp_data, yaml_file_path)
save_to_json(mmp_data, json_file_path)
result["success"] = True
result["data"] = mmp_data
return result
else:
result["error"] = "Parser retornou dados vazios."
return result
except Exception as e:
error_msg = f"Exceção não tratada: {str(e)}"
logging.error(f"[PID {pid}] {file_name}: {error_msg}")
result["error"] = error_msg
return result
def main_parallel():
# 1. Configura Logger e Timer
logger, log_file_path = setup_logger()
start_time = time.time()
# Informações do PC/S.O
log_system_info()
logging.info("=== Iniciando Pipeline de Indexação Paralela ===")
logging.info(f"Log sendo salvo em: {log_file_path}")
# 2. Criação de Pastas
create_folders_if_not_exist([MMPZ_FOLDER, WAV_FOLDER, METADATA_FOLDER, DATA_FOLDER])
# 3. Listagem
if not os.path.exists(MMP_FOLDER):
logging.critical(f"Pasta {MMP_FOLDER} não encontrada.")
return
all_files = [
f for f in os.listdir(MMP_FOLDER) if f.endswith(".mmp") or f.endswith(".mmpz")
]
total_files = len(all_files)
if total_files == 0:
logging.warning("Nenhum arquivo .mmp ou .mmpz encontrado para processar.")
return
logging.info(f"Iniciando processamento de {total_files} arquivos.")
# 4. Pool Paralelo (ETAPA 1: Processamento Pesado)
num_cores = multiprocessing.cpu_count()
logging.info(f"Utilizando {num_cores} núcleos de CPU.")
with multiprocessing.Pool(processes=num_cores) as pool:
results = pool.map(process_single_file, all_files)
# 5. Processamento dos Resultados
successful_data = []
failed_files = []
for res in results:
if res["success"]:
successful_data.append(res["data"])
else:
failed_files.append(res)
# 6. Salvamento Consolidado
if successful_data:
save_to_json(successful_data, os.path.join(METADATA_FOLDER, "all.json"))
save_to_yaml(successful_data, os.path.join(DATA_FOLDER, "all.yml"))
logging.info("Arquivos 'all.json' e 'all.yml' gerados com sucesso.")
# >>> ETAPA 2: Geração de Manifestos <<<
manifest_report = {"generated": [], "failed": []}
try:
# Capturamos o relatório retornado pela função
manifest_report = generate_manifests(BASE_PATH)
except Exception as e:
logging.error(f"Falha crítica no gerador de manifestos: {e}")
# 7. Relatório Final Completo
end_time = time.time()
duration = end_time - start_time
logging.info("=" * 60)
logging.info(
f"RELATÓRIO DE EXECUÇÃO - {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}"
)
logging.info("=" * 60)
logging.info(f"Tempo Total do Pipeline: {duration:.2f} segundos")
# Seção 1: Processamento Paralelo (Arquivos MMP)
logging.info("-" * 20)
logging.info("[1] Indexação Paralela (Arquivos .mmp/.mmpz)")
logging.info(f" Total Encontrado: {total_files}")
logging.info(f" Sucessos: {len(successful_data)}")
logging.info(f" Falhas: {len(failed_files)}")
if failed_files:
for fail in failed_files:
logging.error(f" -> FALHA: {fail['file']} | Erro: {fail['error']}")
# Seção 2: Geração de Manifestos
logging.info("-" * 20)
logging.info("[2] Geração de Manifestos (Frontend)")
if manifest_report["generated"]:
for item in manifest_report["generated"]:
logging.info(f" [OK] {item}")
if manifest_report["failed"]:
for item in manifest_report["failed"]:
logging.error(f" [X] {item}")
if not manifest_report["generated"] and not manifest_report["failed"]:
logging.warning(" [!] Nenhum manifesto foi processado.")
logging.info("=" * 60)
logging.info("Pipeline finalizado.")
# >>> ETAPA 3: Análise de Integridade (NOVO) <<<
try:
# Caminhos (ajuste conforme sua estrutura real)
PROJECTS_DB = os.path.join(METADATA_FOLDER, "all.json")
SAMPLES_MANIFEST = os.path.join(METADATA_FOLDER, "samples-manifest.json")
REPORT_FILE = os.path.join(METADATA_FOLDER, "dependency_report.json")
if os.path.exists(PROJECTS_DB) and os.path.exists(SAMPLES_MANIFEST):
check_dependencies(PROJECTS_DB, SAMPLES_MANIFEST, REPORT_FILE)
else:
logging.warning(
"Arquivos necessários para verificação de dependência não encontrados."
)
except Exception as e:
logging.error(f"Erro na verificação de dependências: {e}")
if __name__ == "__main__":
main_parallel()