mmpSearch/scripts/handler/benchmark.py

339 lines
11 KiB
Python

# benchmark.py
import os
import shutil
import subprocess
import multiprocessing
import logging
import time
import platform
import sys
import re
import unicodedata
import csv
import psutil
from datetime import datetime, timedelta
# Importando apenas o necessário (sem sobrescrever as pastas do utils nativo no processamento global)
from file_parser import parse_mmp_file
from file_saver import save_to_json, save_to_yaml
from utils import SRC_MMPSEARCH, MMP_FOLDER
# === ISOLAMENTO DE DIRETÓRIOS PARA O BENCHMARK ===
# Tudo será gerado dentro de "benchmark_output" para não afetar o Jekyll
BENCHMARK_OUT = os.path.join(SRC_MMPSEARCH, "benchmark_output")
DATA_FOLDER = os.path.join(BENCHMARK_OUT, "data")
METADATA_FOLDER = os.path.join(BENCHMARK_OUT, "metadata")
WAV_FOLDER = os.path.join(BENCHMARK_OUT, "wav")
MMPZ_FOLDER = os.path.join(BENCHMARK_OUT, "mmpz")
LOG_FOLDER = os.path.join(BENCHMARK_OUT, "logs")
SAIDA_ANALISES = os.path.join(BENCHMARK_OUT, "analises")
LOTES_FOLDER = os.path.join(BENCHMARK_OUT, "lotes_yaml")
# === CONFIGURAÇÕES GERAIS ===
TIMEOUT_RENDER_SECONDS = 300
MIN_RAM_FREE_MB = 500
global_counter = None
def slugify(value):
value = str(value)
value = (
unicodedata.normalize("NFKD", value).encode("ascii", "ignore").decode("ascii")
)
value = value.lower()
value = re.sub(r"[^\w\s-]", "", value)
value = re.sub(r"[-\s_]+", "-", value)
return value.strip("-_")
def get_cpu_safe_count():
try:
count = multiprocessing.cpu_count()
return max(1, count - 1)
except:
return 1
def init_worker(counter):
global global_counter
global_counter = counter
# === FUNÇÃO ORIGINAL DO SEU CÓDIGO (Com leve ajuste de segurança de arquivos) ===
def process_single_file(args):
file_name, clean_slug, total_files = args
file_path = os.path.join(MMP_FOLDER, file_name)
pid = os.getpid()
global global_counter
current_idx = 0
if global_counter:
with global_counter.get_lock():
global_counter.value += 1
current_idx = global_counter.value
start_time = time.time()
result = {
"success": False,
"file": file_name,
"slug": clean_slug,
"data": None,
"error": None,
"duration_s": 0.0,
"ram_mb": 0.0,
"file_size_mb": 0.0,
}
try:
if os.path.exists(file_path):
result["file_size_mb"] = os.path.getsize(file_path) / (1024 * 1024)
mem = psutil.virtual_memory()
if (mem.available / (1024 * 1024)) < MIN_RAM_FREE_MB:
result["error"] = "RAM Insuficiente"
return result
original_base_name = os.path.splitext(file_name)[0]
ogg_name = clean_slug + ".ogg"
json_name = clean_slug + ".json"
yml_name = clean_slug + ".yml"
target_mmp_path = ""
if file_name.endswith(".mmpz"):
destination_path = os.path.join(MMPZ_FOLDER, file_name)
# ALERTA DE BENCHMARK: Trocado move por copy2 para não destruir o dataset de testes!
if not os.path.exists(destination_path):
shutil.copy2(file_path, destination_path)
mmp_temp_name = clean_slug + ".mmp"
output_mmp_path = os.path.join(MMP_FOLDER, mmp_temp_name)
abs_dest = os.path.abspath(destination_path)
abs_mmp_out = os.path.abspath(output_mmp_path)
with open(abs_mmp_out, "w") as outfile:
subprocess.run(
["lmms", "--dump", abs_dest],
stdout=outfile,
stderr=subprocess.PIPE,
check=True,
timeout=60,
env={"QT_QPA_PLATFORM": "offscreen", **os.environ},
)
target_mmp_path = output_mmp_path
elif file_name.endswith(".mmp"):
target_mmp_path = file_path
# Renderização do Áudio
if os.path.exists(target_mmp_path):
abs_target_mmp = os.path.abspath(target_mmp_path)
abs_ogg_out = os.path.abspath(os.path.join(WAV_FOLDER, ogg_name))
# Aqui estava o errinho do "wav_cmd" que arrumamos na revisão!
ogg_cmd = ["lmms", "-r", abs_target_mmp, "-o", abs_ogg_out, "-f", "ogg"]
render_process = subprocess.run(
ogg_cmd,
check=False,
capture_output=True,
text=True,
timeout=TIMEOUT_RENDER_SECONDS,
env={"QT_QPA_PLATFORM": "offscreen", **os.environ},
)
if render_process.returncode != 0:
raise subprocess.CalledProcessError(
render_process.returncode, "lmms render", stderr="Erro LMMS"
)
# Parsing do XML
if os.path.exists(target_mmp_path):
mmp_data = parse_mmp_file(target_mmp_path)
if mmp_data:
mmp_data["file"] = clean_slug
mmp_data["original_title"] = original_base_name
save_to_json(mmp_data, os.path.join(METADATA_FOLDER, json_name))
save_to_yaml(mmp_data, os.path.join(DATA_FOLDER, yml_name))
result["success"] = True
result["data"] = mmp_data
try:
process = psutil.Process(pid)
# rss = Resident Set Size (RAM real usada pelo processo)
result["ram_mb"] = process.memory_info().rss / (1024 * 1024)
except:
result["ram_mb"] = 0.0
except Exception as e:
result["error"] = str(e)
result["duration_s"] = time.time() - start_time
return result
# === MOTOR DE BENCHMARK ===
def execute_batch(limit_files, mode):
"""
Executa um lote de arquivos.
Se mode == 'sequencial', usa apenas 1 núcleo.
Se mode == 'paralelo', usa o safe count de núcleos.
"""
start_time_global = time.time()
# Prepara pastas limpas para o teste
for folder in [
MMPZ_FOLDER,
WAV_FOLDER,
METADATA_FOLDER,
DATA_FOLDER,
LOTES_FOLDER,
SAIDA_ANALISES,
]:
os.makedirs(folder, exist_ok=True)
all_files_raw = [f for f in os.listdir(MMP_FOLDER) if f.endswith((".mmp", ".mmpz"))]
if limit_files > 0:
all_files_raw = all_files_raw[:limit_files]
total_files = len(all_files_raw)
if total_files == 0:
return 0, 0, 0, 0, 0
shared_counter = multiprocessing.Value("i", 0)
tasks = []
for idx, file_name in enumerate(all_files_raw):
slug = slugify(os.path.splitext(file_name)[0])
if not slug:
slug = f"proj-{idx}"
tasks.append((file_name, slug, total_files))
num_cores = 1 if mode == "sequencial" else get_cpu_safe_count()
print(
f"\n[{mode.upper()}] Processando {total_files} arquivos com {num_cores} núcleo(s)..."
)
# Execução
with multiprocessing.Pool(
processes=num_cores, initializer=init_worker, initargs=(shared_counter,)
) as pool:
results = pool.map(process_single_file, tasks)
# --- CÁLCULO DE MÉDIAS E SUCESSOS ---
sucessos = [r for r in results if r["success"]]
qtd_sucessos = len(sucessos)
avg_time = (
sum(r["duration_s"] for r in sucessos) / qtd_sucessos if qtd_sucessos > 0 else 0
)
avg_size = (
sum(r["file_size_mb"] for r in sucessos) / qtd_sucessos
if qtd_sucessos > 0
else 0
)
avg_ram = (
sum(r["ram_mb"] for r in sucessos) / qtd_sucessos if qtd_sucessos > 0 else 0
)
duration_total = time.time() - start_time_global
# --- SALVAR RELATÓRIO DETALHADO POR LOTE ---
detalhado_csv = os.path.join(
BENCHMARK_OUT, f"auditoria_detalhada_{mode}_{limit_files}.csv"
)
with open(detalhado_csv, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(
["Arquivo", "Tamanho_MB", "Tempo_s", "RAM_MB", "Status", "Erro"]
)
for r in results:
status_str = "SUCESSO" if r["success"] else "FALHA"
writer.writerow(
[
r["file"],
f"{r['file_size_mb']:.2f}",
f"{r['duration_s']:.2f}",
f"{r['ram_mb']:.2f}",
status_str,
r["error"] or "",
]
)
print(
f"[{mode.upper()}] Concluído em {duration_total:.2f}s (Sucessos: {qtd_sucessos}/{total_files})"
)
print(
f"[{mode.upper()}] Médias -> Tempo: {avg_time:.2f}s | Tamanho: {avg_size:.2f}MB | RAM: {avg_ram:.2f}MB"
)
return duration_total, qtd_sucessos, avg_time, avg_size, avg_ram
def run_benchmark():
print("==================================================")
print(" INICIANDO TESTE DE SPEEDUP (SEQUENCIAL VS PARALELO)")
print("==================================================")
test_sizes = [1, 10, 100, 1000]
modes = ["sequencial", "paralelo"]
results_table = []
for size in test_sizes:
print(f"\n>>> INICIANDO BATERIA PARA {size} ARQUIVO(S) <<<")
row = {"Tamanho": size}
for mode in modes:
duration, success_count, avg_time, avg_size, avg_ram = execute_batch(
size, mode
)
row[f"Tempo_Total_{mode}_(s)"] = round(duration, 2)
row[f"Tempo_Medio_Proj_{mode}_(s)"] = round(avg_time, 2)
row[f"Tamanho_Medio_Proj_(MB)"] = round(
avg_size, 2
) # O tamanho é igual para ambos, mas guardamos
row[f"RAM_Media_{mode}_(MB)"] = round(avg_ram, 2)
row[f"Sucesso_{mode}"] = success_count
# Calcula o Speedup (Tempo Total Sequencial / Tempo Total Paralelo)
speedup = (
row["Tempo_Total_sequencial_(s)"] / row["Tempo_Total_paralelo_(s)"]
if row["Tempo_Total_paralelo_(s)"] > 0
else 0
)
row["Speedup"] = round(speedup, 2)
results_table.append(row)
print(f"--- Fim da bateria de {size}. Speedup Alcançado: {speedup:.2f}x ---")
# Salva o resultado final num CSV consolidado
os.makedirs(BENCHMARK_OUT, exist_ok=True)
csv_file = os.path.join(BENCHMARK_OUT, "resultado_grafico_speedup.csv")
# Lista de colunas para o CSV final
colunas = [
"Tamanho",
"Speedup",
"Tempo_Total_sequencial_(s)",
"Tempo_Total_paralelo_(s)",
"Tempo_Medio_Proj_sequencial_(s)",
"Tempo_Medio_Proj_paralelo_(s)",
"RAM_Media_sequencial_(MB)",
"RAM_Media_paralelo_(MB)",
"Tamanho_Medio_Proj_(MB)",
"Sucesso_sequencial",
"Sucesso_paralelo",
]
with open(csv_file, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=colunas)
writer.writeheader()
writer.writerows(results_table)
print("\n==================================================")
print(f"BENCHMARK FINALIZADO! Resultados salvos em:\n{BENCHMARK_OUT}")
print("==================================================")
if __name__ == "__main__":
run_benchmark()