318 lines
11 KiB
Python
318 lines
11 KiB
Python
# main.py
|
|
|
|
import os
|
|
import json
|
|
import shutil
|
|
import subprocess
|
|
import multiprocessing
|
|
import logging
|
|
import time
|
|
import platform
|
|
import sys # Adicionado para sair do script se faltar dependência
|
|
from datetime import datetime
|
|
|
|
# Importando seus módulos
|
|
from generate_manifest import generate_manifests
|
|
from file_parser import parse_mmp_file
|
|
from file_saver import save_to_json, save_to_yaml
|
|
from dependency_checker import check_dependencies
|
|
from utils import (
|
|
create_folders_if_not_exist,
|
|
BASE_PATH,
|
|
DATA_FOLDER,
|
|
METADATA_FOLDER,
|
|
WAV_FOLDER,
|
|
MMPZ_FOLDER,
|
|
MMP_FOLDER,
|
|
)
|
|
|
|
# Caminho para os Logs
|
|
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
LOG_FOLDER = os.path.join(CURRENT_DIR, "logs")
|
|
|
|
|
|
def check_system_dependencies():
|
|
"""Verifica se as ferramentas necessárias estão instaladas antes de iniciar."""
|
|
required_tools = ["lmms"]
|
|
missing = []
|
|
for tool in required_tools:
|
|
if shutil.which(tool) is None:
|
|
missing.append(tool)
|
|
|
|
if missing:
|
|
logging.critical(f"FERRAMENTAS FALTANDO: {', '.join(missing)}")
|
|
logging.critical("Por favor instale: sudo apt-get install " + " ".join(missing))
|
|
sys.exit(1) # Encerra o script imediatamente
|
|
|
|
|
|
def get_linux_mem_info():
|
|
"""Lê /proc/meminfo para obter memória total de forma nativa no Linux."""
|
|
try:
|
|
with open("/proc/meminfo", "r") as f:
|
|
for line in f:
|
|
if "MemTotal" in line:
|
|
kb_value = int(line.split()[1])
|
|
return kb_value / (1024 * 1024)
|
|
except (IOError, ValueError): # Melhoria: Captura apenas erros de arquivo/valor
|
|
return 0
|
|
except Exception as e:
|
|
logging.warning(f"Erro ao ler memória: {e}")
|
|
return 0
|
|
|
|
|
|
def get_cpu_model_name():
|
|
"""Lê /proc/cpuinfo para pegar o nome real do processador."""
|
|
try:
|
|
with open("/proc/cpuinfo", "r") as f:
|
|
for line in f:
|
|
if "model name" in line:
|
|
return line.split(":")[1].strip()
|
|
except Exception:
|
|
return platform.processor()
|
|
|
|
|
|
def log_system_info():
|
|
# Apenas certifique-se de manter a indentação correta
|
|
try:
|
|
logging.info("=" * 30)
|
|
logging.info("AUDITORIA DE AMBIENTE (HARDWARE)")
|
|
logging.info("=" * 30)
|
|
uname = platform.uname()
|
|
logging.info(f"Sistema: {uname.system} {uname.release}")
|
|
logging.info(f"Node: {uname.node}")
|
|
cpu_model = get_cpu_model_name()
|
|
cores_logical = multiprocessing.cpu_count()
|
|
mem_total_gb = get_linux_mem_info()
|
|
logging.info(f"CPU Modelo: {cpu_model}")
|
|
logging.info(f"Núcleos: {cores_logical}")
|
|
logging.info(f"Memória Total: {mem_total_gb:.2f} GB")
|
|
total, used, free = shutil.disk_usage(".")
|
|
logging.info(f"Disco (Livre): {free // (2**30)} GB")
|
|
logging.info("=" * 30)
|
|
except Exception as e:
|
|
logging.warning(f"Falha ao coletar info do sistema: {e}")
|
|
|
|
|
|
def setup_logger():
|
|
os.makedirs(LOG_FOLDER, exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
|
log_filename = os.path.join(LOG_FOLDER, f"execucao_{timestamp}.log")
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.INFO)
|
|
formatter = logging.Formatter(
|
|
"%(asctime)s [%(levelname)s] %(message)s", datefmt="%d/%m/%Y %H:%M:%S"
|
|
)
|
|
file_handler = logging.FileHandler(log_filename, encoding="utf-8")
|
|
file_handler.setFormatter(formatter)
|
|
logger.addHandler(file_handler)
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(formatter)
|
|
logger.addHandler(console_handler)
|
|
return logger, log_filename
|
|
|
|
|
|
def process_single_file(file_name):
|
|
"""Worker robusto com melhor tratamento de Subprocesso."""
|
|
file_path = os.path.join(MMP_FOLDER, file_name)
|
|
pid = os.getpid()
|
|
result = {"success": False, "file": file_name, "data": None, "error": None}
|
|
|
|
try:
|
|
logging.info(f"[PID {pid}] Processando: {file_name}")
|
|
|
|
# 1. Tratamento MMPZ
|
|
if file_name.endswith(".mmpz"):
|
|
destination_path = os.path.join(MMPZ_FOLDER, file_name)
|
|
|
|
# Move arquivo se necessário
|
|
if not os.path.exists(destination_path):
|
|
shutil.move(file_path, destination_path)
|
|
elif os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
|
|
mmp_name = os.path.splitext(file_name)[0] + ".mmp"
|
|
output_mmp_path = os.path.join(MMP_FOLDER, mmp_name)
|
|
|
|
# Usa caminho absoluto para evitar erros do LMMS
|
|
abs_dest = os.path.abspath(destination_path)
|
|
abs_mmp_out = os.path.abspath(output_mmp_path)
|
|
|
|
# Lista ao invés de string (Sem shell=True)
|
|
# Comando dump: LMMS escreve no stdout, então precisamos capturar
|
|
dump_cmd = ["lmms", "--dump", abs_dest]
|
|
|
|
try:
|
|
with open(abs_mmp_out, "w") as outfile:
|
|
subprocess.run(
|
|
dump_cmd,
|
|
stdout=outfile, # Redireciona saida direto pro arquivo
|
|
stderr=subprocess.PIPE, # Captura erros
|
|
check=True,
|
|
env={"QT_QPA_PLATFORM": "offscreen", **os.environ},
|
|
)
|
|
logging.info(f"[PID {pid}] Dump MMP criado.")
|
|
except subprocess.CalledProcessError as e:
|
|
result["error"] = f"Erro no dump MMP: {e.stderr.decode('utf-8')}"
|
|
logging.error(f"[PID {pid}] {result['error']}")
|
|
return result
|
|
|
|
# Conversão WAV
|
|
wav_name = os.path.splitext(file_name)[0] + ".wav"
|
|
abs_wav_out = os.path.abspath(os.path.join(WAV_FOLDER, wav_name))
|
|
|
|
# Comando em lista: Mais seguro e organizado
|
|
wav_cmd = [
|
|
"lmms",
|
|
"-r",
|
|
abs_dest,
|
|
"-o",
|
|
abs_wav_out,
|
|
"-f",
|
|
"wav",
|
|
]
|
|
|
|
try:
|
|
subprocess.run(wav_cmd, check=True, capture_output=True, text=True)
|
|
# Log em vez de Print
|
|
logging.info(f"[PID {pid}] Áudio WAV gerado com sucesso.")
|
|
except subprocess.CalledProcessError as e:
|
|
# Não paramos o fluxo por erro no WAV, mas logamos como erro
|
|
logging.warning(f"[PID {pid}] Falha no WAV: {e.stderr}")
|
|
|
|
# Define o caminho para o parser
|
|
file_to_parse = output_mmp_path
|
|
|
|
elif file_name.endswith(".mmp"):
|
|
file_to_parse = file_path
|
|
|
|
# 2. Parsing e Salvamento
|
|
if os.path.exists(file_to_parse):
|
|
mmp_data = parse_mmp_file(file_to_parse)
|
|
|
|
if mmp_data:
|
|
base_name = os.path.splitext(file_name)[0]
|
|
save_to_json(
|
|
mmp_data, os.path.join(METADATA_FOLDER, base_name + ".json")
|
|
)
|
|
save_to_yaml(mmp_data, os.path.join(DATA_FOLDER, base_name + ".yml"))
|
|
|
|
result["success"] = True
|
|
result["data"] = mmp_data
|
|
else:
|
|
result["error"] = "Dados vazios após parsing."
|
|
else:
|
|
result["error"] = "Arquivo MMP não encontrado para parsing."
|
|
|
|
except Exception as e:
|
|
result["error"] = f"Erro geral: {str(e)}"
|
|
logging.error(f"[PID {pid}] {file_name}: {result['error']}")
|
|
|
|
return result
|
|
|
|
|
|
def main_parallel():
|
|
logger, log_file_path = setup_logger()
|
|
start_time = time.time()
|
|
|
|
# MELHORIA 4: Fail Fast
|
|
check_system_dependencies()
|
|
|
|
log_system_info()
|
|
logging.info("=== Iniciando Pipeline Otimizado ===")
|
|
|
|
create_folders_if_not_exist([MMPZ_FOLDER, WAV_FOLDER, METADATA_FOLDER, DATA_FOLDER])
|
|
|
|
if not os.path.exists(MMP_FOLDER):
|
|
logging.critical(f"Pasta {MMP_FOLDER} não encontrada.")
|
|
return
|
|
|
|
all_files = [f for f in os.listdir(MMP_FOLDER) if f.endswith((".mmp", ".mmpz"))]
|
|
|
|
if not all_files:
|
|
logging.warning("Nenhum arquivo encontrado.")
|
|
return
|
|
|
|
num_cores = multiprocessing.cpu_count()
|
|
logging.info(f"Processando {len(all_files)} arquivos com {num_cores} workers.")
|
|
|
|
with multiprocessing.Pool(processes=num_cores) as pool:
|
|
results = pool.map(process_single_file, all_files)
|
|
|
|
# Processamento de resultados (igual ao seu)
|
|
successful_data = [r["data"] for r in results if r["success"]]
|
|
failed_files = [r for r in results if not r["success"]]
|
|
|
|
if successful_data:
|
|
save_to_json(successful_data, os.path.join(METADATA_FOLDER, "all.json"))
|
|
save_to_yaml(successful_data, os.path.join(DATA_FOLDER, "all.yml"))
|
|
|
|
# Geração de Manifestos
|
|
try:
|
|
manifest_report = generate_manifests(BASE_PATH)
|
|
except Exception as e:
|
|
manifest_report = {"generated": [], "failed": [str(e)]}
|
|
|
|
# Relatório Final
|
|
duration = time.time() - start_time
|
|
logging.info("=" * 60)
|
|
logging.info(
|
|
f"FIM - Tempo: {duration:.2f}s | Sucessos: {len(successful_data)} | Falhas: {len(failed_files)}"
|
|
)
|
|
|
|
if failed_files:
|
|
logging.info("--- Detalhe das Falhas ---")
|
|
for f in failed_files:
|
|
logging.error(f"{f['file']}: {f['error']}")
|
|
|
|
# Verificação de Dependências (Projetos)
|
|
try:
|
|
check_dependencies(
|
|
os.path.join(METADATA_FOLDER, "all.json"),
|
|
os.path.join(METADATA_FOLDER, "samples-manifest.json"),
|
|
os.path.join(METADATA_FOLDER, "dependency_report.json"),
|
|
)
|
|
except Exception:
|
|
pass # Erro já logado na função ou irrelevante se arquivos não existirem
|
|
|
|
|
|
def rebuild_indexes():
|
|
"""
|
|
Função auxiliar para regerar all.json e all.yml baseada nos arquivos processados.
|
|
Pode ser chamada pela API após um upload.
|
|
"""
|
|
logging.info("Regerando índices globais (all.json / all.yml)...")
|
|
|
|
# Lista todos os JSONs individuais na pasta metadata (exceto os manifestos e o all.json)
|
|
all_data = []
|
|
ignored_files = [
|
|
"all.json",
|
|
"samples-manifest.json",
|
|
"mmp-manifest.json",
|
|
"dependency_report.json",
|
|
]
|
|
|
|
if os.path.exists(METADATA_FOLDER):
|
|
for f in os.listdir(METADATA_FOLDER):
|
|
if f.endswith(".json") and f not in ignored_files:
|
|
try:
|
|
with open(
|
|
os.path.join(METADATA_FOLDER, f), "r", encoding="utf-8"
|
|
) as json_file:
|
|
data = json.load(
|
|
json_file
|
|
) # Precisa importar json no topo se não tiver
|
|
all_data.append(data)
|
|
except Exception as e:
|
|
logging.error(f"Erro ao ler {f} para índice global: {e}")
|
|
|
|
if all_data:
|
|
save_to_json(all_data, os.path.join(METADATA_FOLDER, "all.json"))
|
|
save_to_yaml(all_data, os.path.join(DATA_FOLDER, "all.yml"))
|
|
logging.info("Índices globais atualizados com sucesso.")
|
|
|
|
return len(all_data)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main_parallel()
|