mmpSearch/scripts/handler/main.py

606 lines
20 KiB
Python
Executable File

# main.py
import os
import shutil
import subprocess
import multiprocessing
import logging
import time
import platform
import sys
import re
import unicodedata
import csv
import psutil
import glob
import json
from datetime import datetime, timedelta
# Importando seus módulos locais
from generate_manifest import generate_manifests
# from src_instruments import find_audio_files
from file_parser import parse_mmp_file
from file_saver import save_to_json, save_to_yaml
from dependency_checker import check_dependencies
from utils import (
create_folders_if_not_exist,
DATA_FOLDER,
METADATA_FOLDER,
WAV_FOLDER,
MMPZ_FOLDER,
MMP_FOLDER,
SRC_MMPSEARCH,
LOG_FOLDER,
SAIDA_ANALISES,
)
# === CONFIGURAÇÕES DE SEGURANÇA E LOTES ===
TIMEOUT_RENDER_SECONDS = 300 # 5 minutos máximo por arquivo
MIN_RAM_FREE_MB = 500 # Mínimo de RAM livre para iniciar worker
BATCH_SIZE = 250 # Tamanho do lote para o YAML
LOTES_FOLDER = os.path.join(SRC_MMPSEARCH, "lotes_yaml") # Pasta dos lotes
LIMIT_FILES = 300 # <--- 0 = PROCESSA TUDO | >0 = LIMITA A QUANTIDADE (EX: 50)
# === VARIÁVEL GLOBAL PARA O WORKER (CORREÇÃO MULTIPROCESSING) ===
# Necessário para evitar erro de pickle ao passar Lock via argumentos
global_counter = None
def create_lightweight_dataset(data_list):
"""
Filtra apenas os dados cruciais para o frontend (Jekyll),
removendo dados inúteis de automação e notas MIDI pesadas que inflam o YAML,
mas preservando os parâmetros dos synths para a Web Audio API.
"""
light_data = []
for proj in data_list:
# 1. Dados de cabeçalho
p_light = {
"file": proj.get("file"),
"original_title": proj.get("original_title"),
"bpm": proj.get("bpm"),
"tags": proj.get("tags", {}),
}
# 2. Filtragem das Tracks
p_light["tracks"] = []
for track in proj.get("tracks", []):
t_light = {}
# Extrai apenas chaves de identificação da track usadas no frontend
for key in ["bassline_name", "instrument_name", "sample_name", "sample"]:
if key in track:
t_light[key] = track[key]
# 3. Preservar Instrumentos e "Enxugar" os Patterns
if "instruments" in track:
t_light["instruments"] = []
for inst in track["instruments"]:
# Copiamos o objeto inteiro do instrumento (O JS InstrumentFactory precisa disso)
inst_light = inst.copy()
# Limpeza pesada: Removemos notas MIDI, mantemos apenas 'name' e 'steps' (Grid 1/0)
if "patterns" in inst:
pats_light = []
for pat in inst["patterns"]:
pats_light.append(
{
"name": pat.get("name", ""),
"steps": pat.get("steps", []),
}
)
inst_light["patterns"] = pats_light
t_light["instruments"].append(inst_light)
p_light["tracks"].append(t_light)
light_data.append(p_light)
return light_data
def init_worker(counter):
"""Inicializa a variável global dentro de cada processo worker"""
global global_counter
global_counter = counter
# === Função de Sanitização (Slugify) ===
def slugify(value):
"""
Normaliza a string: remove acentos, converte para minúsculas,
remove caracteres não alfanuméricos e substitui espaços por hifens.
"""
value = str(value)
value = (
unicodedata.normalize("NFKD", value).encode("ascii", "ignore").decode("ascii")
)
value = value.lower()
value = re.sub(r"[^\w\s-]", "", value)
value = re.sub(r"[-\s_]+", "-", value)
return value.strip("-_")
def check_system_dependencies():
"""Verifica se as ferramentas necessárias estão instaladas."""
required_tools = ["lmms"]
missing = []
for tool in required_tools:
if shutil.which(tool) is None:
missing.append(tool)
if missing:
logging.critical(f"FERRAMENTAS FALTANDO: {', '.join(missing)}")
logging.critical("Por favor instale: sudo apt-get install " + " ".join(missing))
sys.exit(1)
def get_cpu_safe_count():
"""Retorna contagem de CPU menos 1 para não travar o SO."""
try:
count = multiprocessing.cpu_count()
return max(1, count - 1)
except:
return 1
# --- Funções de Info de Sistema ---
def get_linux_mem_info():
try:
with open("/proc/meminfo", "r") as f:
for line in f:
if "MemTotal" in line:
kb_value = int(line.split()[1])
return kb_value / (1024 * 1024)
except (IOError, ValueError):
return 0
except Exception as e:
logging.warning(f"Erro ao ler memória: {e}")
return 0
def get_cpu_model_name():
try:
with open("/proc/cpuinfo", "r") as f:
for line in f:
if "model name" in line:
return line.split(":")[1].strip()
except Exception:
return platform.processor()
def log_system_info():
try:
logging.info("=" * 30)
logging.info("AUDITORIA DE AMBIENTE (HARDWARE)")
logging.info("=" * 30)
uname = platform.uname()
logging.info(f"Sistema: {uname.system} {uname.release}")
logging.info(f"Node: {uname.node}")
cpu_model = get_cpu_model_name()
cores_logical = multiprocessing.cpu_count()
safe_cores = get_cpu_safe_count()
mem_total_gb = get_linux_mem_info()
logging.info(f"CPU Modelo: {cpu_model}")
logging.info(
f"Núcleos (Total): {cores_logical} | Workers Seguros: {safe_cores}"
)
logging.info(f"Memória Total: {mem_total_gb:.2f} GB")
total, used, free = shutil.disk_usage(".")
logging.info(f"Disco (Livre): {free // (2**30)} GB")
logging.info("=" * 30)
except Exception as e:
logging.warning(f"Falha ao coletar info do sistema: {e}")
def setup_logger():
os.makedirs(LOG_FOLDER, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_filename = os.path.join(LOG_FOLDER, f"execucao_{timestamp}.log")
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s", datefmt="%d/%m/%Y %H:%M:%S"
)
file_handler = logging.FileHandler(log_filename, encoding="utf-8")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger, log_filename
def process_single_file(args):
"""
Worker robusto com CONTADOR COMPARTILHADO (THREAD-SAFE).
"""
# Desempacota os argumentos (O CONTADOR NÃO VEM MAIS AQUI)
file_name, clean_slug, total_files = args
file_path = os.path.join(MMP_FOLDER, file_name)
pid = os.getpid()
# === Lógica do Contador (Acessando via Global) ===
# O global_counter foi injetado pelo init_worker
global global_counter
current_idx = 0
if global_counter:
with global_counter.get_lock():
global_counter.value += 1
current_idx = global_counter.value
remaining = total_files - current_idx
# Log com status de progresso
logging.info(
f"[PID {pid}] [Progresso: {current_idx}/{total_files} | Faltam: {remaining}] Iniciando: {file_name}"
)
# Métricas para Auditoria
start_time = time.time()
ram_usage_mb = 0.0
result = {
"success": False,
"file": file_name,
"slug": clean_slug,
"data": None,
"error": None,
"duration_s": 0.0,
"ram_mb": 0.0,
"file_size_mb": 0.0,
}
try:
# Checagem de Tamanho
if os.path.exists(file_path):
result["file_size_mb"] = os.path.getsize(file_path) / (1024 * 1024)
# Checagem de Segurança de RAM
mem = psutil.virtual_memory()
if (mem.available / (1024 * 1024)) < MIN_RAM_FREE_MB:
result["error"] = "RAM Insuficiente para iniciar worker"
logging.warning(f"[PID {pid}] Pulei {file_name}: RAM Crítica")
return result
original_base_name = os.path.splitext(file_name)[0]
ogg_name = clean_slug + ".ogg"
json_name = clean_slug + ".json"
yml_name = clean_slug + ".yml"
target_mmp_path = ""
# 1. Tratamento e Extração
if file_name.endswith(".mmpz"):
destination_path = os.path.join(MMPZ_FOLDER, file_name)
if not os.path.exists(destination_path):
shutil.move(file_path, destination_path)
elif os.path.exists(file_path):
os.remove(file_path)
mmp_temp_name = clean_slug + ".mmp"
output_mmp_path = os.path.join(MMP_FOLDER, mmp_temp_name)
abs_dest = os.path.abspath(destination_path)
abs_mmp_out = os.path.abspath(output_mmp_path)
with open(abs_mmp_out, "w") as outfile:
subprocess.run(
["lmms", "--dump", abs_dest],
stdout=outfile,
stderr=subprocess.PIPE,
check=True,
timeout=60,
env={"QT_QPA_PLATFORM": "offscreen", **os.environ},
)
target_mmp_path = output_mmp_path
elif file_name.endswith(".mmp"):
target_mmp_path = file_path
# 2. GERAÇÃO DE ÁUDIO
if os.path.exists(target_mmp_path):
abs_target_mmp = os.path.abspath(target_mmp_path)
abs_ogg_out = os.path.abspath(os.path.join(WAV_FOLDER, ogg_name))
render_process = subprocess.run(
["lmms", "-r", abs_target_mmp, "-o", abs_ogg_out, "-f", "ogg"],
check=False,
capture_output=True,
text=True,
timeout=TIMEOUT_RENDER_SECONDS,
env={"QT_QPA_PLATFORM": "offscreen", **os.environ},
)
if render_process.returncode != 0:
err_msg = (
render_process.stderr
if render_process.stderr
else "Erro desconhecido"
)
if render_process.returncode == -11:
err_msg = f"LMMS CRASH (SIGSEGV): {err_msg}"
raise subprocess.CalledProcessError(
render_process.returncode,
"lmms render",
output=render_process.stdout,
stderr=err_msg,
)
logging.info(f"[PID {pid}] Áudio ogg gerado: {ogg_name}")
else:
raise FileNotFoundError(
"Arquivo MMP alvo não encontrado para renderização."
)
# 3. Parsing e Salvamento INDIVIDUAL
if os.path.exists(target_mmp_path):
mmp_data = parse_mmp_file(target_mmp_path)
if mmp_data:
mmp_data["file"] = clean_slug
mmp_data["original_title"] = original_base_name
# Salva metadados individuais
save_to_json(mmp_data, os.path.join(METADATA_FOLDER, json_name))
save_to_yaml(mmp_data, os.path.join(DATA_FOLDER, yml_name))
result["success"] = True
result["data"] = mmp_data
else:
result["error"] = "Dados vazios após parsing."
else:
result["error"] = "Arquivo MMP não encontrado para parsing."
try:
process = psutil.Process(pid)
ram_usage_mb = process.memory_info().rss / (1024 * 1024)
except:
ram_usage_mb = 0.0
except subprocess.TimeoutExpired:
result["error"] = f"Timeout ({TIMEOUT_RENDER_SECONDS}s) excedido."
logging.error(f"[PID {pid}] TIMEOUT em {file_name}")
except subprocess.CalledProcessError as e:
if isinstance(e.stderr, bytes):
err_msg = e.stderr.decode("utf-8", errors="ignore")
else:
err_msg = str(e.stderr) if e.stderr else str(e)
result["error"] = f"Erro LMMS (Code {e.returncode}): {err_msg}"
logging.error(f"[PID {pid}] Falha LMMS: {result['error']}")
except Exception as e:
result["error"] = f"Erro geral: {str(e)}"
logging.error(f"[PID {pid}] {file_name}: {result['error']}")
result["duration_s"] = time.time() - start_time
result["ram_mb"] = ram_usage_mb
return result
def rebuild_all_yamls():
"""Lê todos os JSONs individuais e reconstrói APENAS os arquivos necessários (JSON e YAML leve)."""
logging.info("Reconstruindo all.json e all_leve.yml a partir dos projetos individuais...")
all_data = []
# Busca todos os arquivos .json na pasta de metadados
json_files = glob.glob(os.path.join(METADATA_FOLDER, "*.json"))
for j_file in json_files:
filename = os.path.basename(j_file)
if filename in ["all.json", "samples-manifest.json", "mmp-manifest.json", "dependency_report.json"]:
continue
try:
with open(j_file, 'r', encoding='utf-8') as f:
data = json.load(f)
all_data.append(data)
except Exception as e:
logging.error(f"Erro ao ler {j_file} para agregar: {e}")
if all_data:
# 1. Salva o JSON completo (A biblioteca JSON é nativa e MUITO mais eficiente em RAM)
save_to_json(all_data, os.path.join(METADATA_FOLDER, "all.json"))
# 2. Transforma o 'elefante' num 'camundongo' ANTES de passar pro YAML
dados_leves = create_lightweight_dataset(all_data)
# 3. Salva APENAS a versão leve para o Jekyll (Isso não vai estourar a memória)
save_to_yaml(dados_leves, os.path.join(DATA_FOLDER, "all_leve.yml"))
logging.info(f"Arquivos agregados (all.json / all_leve.yml) atualizados com {len(all_data)} projetos!")
def main_parallel():
logger, log_file_path = setup_logger()
start_time_global = time.time()
audit_csv_path = os.path.join(SAIDA_ANALISES, "audit_pipeline.csv")
check_system_dependencies()
log_system_info()
# Cria pastas (incluindo a de Lotes)
create_folders_if_not_exist(
[MMPZ_FOLDER, WAV_FOLDER, METADATA_FOLDER, DATA_FOLDER, LOTES_FOLDER]
)
logging.info(
"=== Iniciando Pipeline Otimizado (Multi-Batch + Paralelo + Contador) ==="
)
if not os.path.exists(MMP_FOLDER):
logging.critical(f"Pasta {MMP_FOLDER} não encontrada.")
return
all_files_raw = [f for f in os.listdir(MMP_FOLDER) if f.endswith((".mmp", ".mmpz"))]
if not all_files_raw:
logging.warning("Nenhum arquivo encontrado.")
return
# === APLICAÇÃO DO LIMITE DE ARQUIVOS (ALTERAÇÃO AQUI) ===
if LIMIT_FILES and LIMIT_FILES > 0:
logging.info(
f"MODO LIMITADO: Processando apenas os primeiros {LIMIT_FILES} arquivos."
)
all_files_raw = all_files_raw[:LIMIT_FILES]
total_files = len(all_files_raw)
logging.info(f"Total de arquivos a processar: {total_files}")
# === CONFIGURAÇÃO DO CONTADOR COMPARTILHADO ===
# Usamos multiprocessing.Value diretamente (Shared Memory)
shared_counter = multiprocessing.Value("i", 0)
# === PRÉ-PROCESSAMENTO DOS SLUGS ===
tasks = []
invalid_count = 0
logging.info("Preparando tarefas...")
for file_name in all_files_raw:
original_base = os.path.splitext(file_name)[0]
slug = slugify(original_base)
if not slug:
invalid_count += 1
slug = f"titulo-incorreto-{invalid_count}"
logging.warning(
f"Nome inválido detectado: '{file_name}'. Renomeando para '{slug}'"
)
# Passamos APENAS os dados estáticos. O contador vai pelo initializer.
tasks.append((file_name, slug, total_files))
# === EXECUÇÃO PARALELA ===
num_cores = get_cpu_safe_count()
logging.info(f"Processando com {num_cores} workers (Safe Mode).")
with open(audit_csv_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(
[
"Timestamp",
"Arquivo_Original",
"Slug",
"Tamanho_MB",
"Duracao_Sec",
"RAM_Worker_MB",
"Status",
"Erro",
]
)
successful_data = []
failed_files = []
# === O POOL AGORA USA INITIALIZER ===
# Isso injeta o shared_counter em cada worker de forma segura antes das tarefas começarem
with multiprocessing.Pool(
processes=num_cores, initializer=init_worker, initargs=(shared_counter,)
) as pool:
results = pool.map(process_single_file, tasks)
# Coleta de resultados
with open(audit_csv_path, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
for res in results:
ts = datetime.now().strftime("%H:%M:%S")
status = "SUCESSO" if res["success"] else "FALHA"
writer.writerow(
[
ts,
res["file"],
res["slug"],
f"{res['file_size_mb']:.2f}",
f"{res['duration_s']:.2f}",
f"{res['ram_mb']:.2f}",
status,
res["error"] or "",
]
)
if res["success"]:
successful_data.append(res["data"])
else:
failed_files.append(res)
# === SALVAMENTO EM LOTES (BATCHING) ===
if successful_data:
logging.info(f"Total processado com sucesso: {len(successful_data)}")
logging.info(f"Iniciando gravação em lotes (Batch Size: {BATCH_SIZE})...")
# 1. Salvar JSON Completo
logging.info("Salvando all.json completo...")
save_to_json(successful_data, os.path.join(METADATA_FOLDER, "all.json"))
# --- Salvar YAML Completo (all.yml) ---
logging.info("Salvando all.yml completo...")
save_to_yaml(successful_data, os.path.join(DATA_FOLDER, "all.yml"))
# ----------------------------------------------
# --- SALVAR VERSÃO LEVE PARA O JEKYLL ---
logging.info("Gerando e salvando all_leve.yml para o Jekyll...")
dados_leves = create_lightweight_dataset(successful_data)
save_to_yaml(dados_leves, os.path.join(DATA_FOLDER, "all_leve.yml"))
# 2. Salvar YAML em Fatias (Para evitar memory overflow)
total_items = len(successful_data)
for i in range(0, total_items, BATCH_SIZE):
batch = successful_data[i : i + BATCH_SIZE]
batch_num = (i // BATCH_SIZE) + 1
batch_filename = os.path.join(
LOTES_FOLDER, f"projetos_lote_{batch_num:03d}.yml"
)
logging.info(
f"Gravando Lote {batch_num} ({len(batch)} itens) em {batch_filename}..."
)
save_to_yaml(batch, batch_filename)
logging.info("Gravação de lotes concluída.")
# Geração dos manifestos
try:
manifest_report = generate_manifests(SRC_MMPSEARCH)
# base_dir_instruments = "/nethome/jotachina/projetos/mmpSearch/mmp/instruments"
# manifest_instruments = find_audio_files(base_dir_instruments)
except Exception as e:
manifest_report = {"generated": [], "failed": [str(e)]}
duration = time.time() - start_time_global
logging.info("=" * 60)
logging.info(
f"FIM - Tempo Total: {str(timedelta(seconds=int(duration)))} | Sucessos: {len(successful_data)} | Falhas: {len(failed_files)}"
)
logging.info(f"Relatório de Auditoria: {audit_csv_path}")
if failed_files:
logging.info("--- Detalhe das Falhas ---")
for f in failed_files:
logging.error(f"{f['file']}: {f['error']}")
try:
check_dependencies(
os.path.join(METADATA_FOLDER, "all.json"),
os.path.join(METADATA_FOLDER, "samples-manifest.json"),
os.path.join(METADATA_FOLDER, "dependency_report.json"),
)
except Exception:
pass
if __name__ == "__main__":
main_parallel()