461 lines
17 KiB
Python
Executable File
461 lines
17 KiB
Python
Executable File
import os
|
|
import subprocess
|
|
import xml.etree.ElementTree as ET
|
|
import gzip
|
|
import zipfile
|
|
import zlib
|
|
import json
|
|
import shutil
|
|
import time
|
|
import io
|
|
|
|
from flask import Flask, request, jsonify, send_file
|
|
from flask_cors import CORS
|
|
from werkzeug.utils import secure_filename
|
|
from main import process_single_file, rebuild_indexes, generate_manifests, slugify
|
|
from utils import ALLOWED_EXTENSIONS, ALLOWED_SAMPLE_EXTENSIONS, MMP_FOLDER, MMPZ_FOLDER, CERT_PATH, KEY_PATH, BASE_DATA, SRC_MMPSEARCH, SAMPLE_SRC, METADATA_FOLDER, XML_IMPORTED_PATH_PREFIX, SAMPLE_MANIFEST
|
|
|
|
app = Flask(__name__)
|
|
CORS(app)
|
|
|
|
def allowed_file(filename):
|
|
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
def allowed_sample(filename):
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_SAMPLE_EXTENSIONS
|
|
|
|
def run_jekyll_build():
|
|
print("Iniciando build do Jekyll...")
|
|
command = ["bundle", "exec", "jekyll", "build", "--destination", "/var/www/html/trens/mmpSearch/"]
|
|
try:
|
|
subprocess.run(command, check=True, cwd=BASE_DATA, capture_output=True, text=True)
|
|
print("Jekyll Build Sucesso!")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"ERRO no Jekyll Build: {e.stderr}")
|
|
|
|
def load_manifest_keys():
|
|
"""Carrega as pastas de fábrica (keys do manifesto)."""
|
|
if not os.path.exists(SAMPLE_MANIFEST):
|
|
return ["drums", "instruments", "effects", "presets", "samples"]
|
|
try:
|
|
with open(SAMPLE_MANIFEST, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
return list(data.keys())
|
|
except Exception:
|
|
return []
|
|
|
|
def convert_mmpz_to_mmp(mmpz_path, mmp_target_path):
|
|
"""
|
|
Tenta descompactar .mmpz usando métodos Python (rápido) e falha para o CLI do LMMS (robusto).
|
|
Salva o resultado em mmp_target_path.
|
|
"""
|
|
print(f"Tentando converter: {mmpz_path} -> {mmp_target_path}")
|
|
|
|
# --- TENTATIVA 1: Métodos Python (Memória/Rápido) ---
|
|
content = None
|
|
try:
|
|
with open(mmpz_path, "rb") as f:
|
|
content = f.read()
|
|
except Exception:
|
|
pass
|
|
|
|
if content:
|
|
# 1.1 Tenta ZIP
|
|
if zipfile.is_zipfile(mmpz_path):
|
|
try:
|
|
with zipfile.ZipFile(mmpz_path, 'r') as z:
|
|
with open(mmp_target_path, 'wb') as f_out:
|
|
f_out.write(z.read(z.namelist()[0]))
|
|
print("Sucesso: Descompactado via ZIP.")
|
|
return True
|
|
except: pass
|
|
|
|
# 1.2 Tenta GZIP
|
|
try:
|
|
decompressed = gzip.decompress(content)
|
|
with open(mmp_target_path, "wb") as f_out:
|
|
f_out.write(decompressed)
|
|
print("Sucesso: Descompactado via GZIP.")
|
|
return True
|
|
except: pass
|
|
|
|
# 1.3 Tenta ZLIB (Qt Default)
|
|
try:
|
|
decompressed = zlib.decompress(content)
|
|
with open(mmp_target_path, "wb") as f_out:
|
|
f_out.write(decompressed)
|
|
print("Sucesso: Descompactado via ZLIB.")
|
|
return True
|
|
except: pass
|
|
|
|
# --- TENTATIVA 2: Fallback para LMMS CLI (Lento/Robusto) ---
|
|
# Se o Python falhar, pedimos ao próprio LMMS para converter
|
|
print("Métodos Python falharam. Tentando fallback via LMMS CLI...")
|
|
|
|
cmd = ["lmms", "--dump", mmpz_path]
|
|
|
|
# Define ambiente sem interface gráfica para evitar erros no servidor
|
|
env_vars = os.environ.copy()
|
|
env_vars["QT_QPA_PLATFORM"] = "offscreen"
|
|
|
|
try:
|
|
# O LMMS joga o XML no stdout, então capturamos e salvamos no arquivo
|
|
with open(mmp_target_path, "w") as f_out:
|
|
subprocess.run(
|
|
cmd,
|
|
stdout=f_out,
|
|
stderr=subprocess.PIPE,
|
|
check=True,
|
|
env=env_vars
|
|
)
|
|
|
|
# Verifica se o arquivo foi criado e tem conteúdo
|
|
if os.path.exists(mmp_target_path) and os.path.getsize(mmp_target_path) > 0:
|
|
print("Sucesso: Descompactado via LMMS CLI (--dump).")
|
|
return True
|
|
else:
|
|
print("Erro: LMMS CLI rodou, mas arquivo de saída está vazio.")
|
|
return False
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Falha crítica no LMMS CLI: {e.stderr.decode('utf-8') if e.stderr else str(e)}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Erro inesperado ao chamar LMMS: {e}")
|
|
return False
|
|
|
|
def analyze_mmp_dependencies(mmp_filename):
|
|
"""
|
|
Analisa um arquivo .mmp (que já deve ser texto XML puro).
|
|
Retorna: (is_clean, missing_files_list)
|
|
"""
|
|
filepath = os.path.join(MMP_FOLDER, mmp_filename)
|
|
|
|
if not os.path.exists(filepath):
|
|
print(f"Erro: Arquivo não encontrado: {filepath}")
|
|
return False, []
|
|
|
|
try:
|
|
tree = ET.parse(filepath)
|
|
except ET.ParseError as e:
|
|
print(f"Erro Fatal: O arquivo {mmp_filename} não é um XML válido: {e}")
|
|
return False, []
|
|
|
|
root = tree.getroot()
|
|
factory_folders = load_manifest_keys()
|
|
|
|
missing_samples = set()
|
|
|
|
# Varre todos os audiofileprocessor
|
|
for audio_node in root.findall(".//audiofileprocessor"):
|
|
src = audio_node.get('src', '')
|
|
if not src:
|
|
continue
|
|
|
|
# 1. É arquivo de fábrica?
|
|
is_factory = any(src.startswith(folder + "/") for folder in factory_folders)
|
|
|
|
# 2. Já foi importado anteriormente?
|
|
is_already_imported = src.startswith("src_mmpSearch/samples/imported/")
|
|
|
|
# Se não é nativo e não está na pasta de importados, é externo
|
|
if not is_factory and not is_already_imported:
|
|
file_name = os.path.basename(src)
|
|
missing_samples.add(file_name)
|
|
|
|
return len(missing_samples) == 0, list(missing_samples)
|
|
|
|
def update_xml_paths_exact(mmp_filename, replacements):
|
|
"""
|
|
Substitui caminhos no XML baseando-se no mapeamento exato:
|
|
replacements = { 'nome_original_no_xml.wav': 'novo_nome_seguro.wav' }
|
|
"""
|
|
filepath = os.path.join(MMP_FOLDER, mmp_filename)
|
|
|
|
try:
|
|
tree = ET.parse(filepath)
|
|
root = tree.getroot()
|
|
changes_made = False
|
|
|
|
# Normaliza as chaves para minúsculo para garantir o match
|
|
# Ex: {'Kick.wav': 'kick.wav'} -> {'kick.wav': 'kick.wav'}
|
|
replacements_lower = {k.lower(): v for k, v in replacements.items()}
|
|
|
|
for audio_node in root.findall(".//audiofileprocessor"):
|
|
src = audio_node.get('src', '')
|
|
if not src: continue
|
|
|
|
# Pega o nome do arquivo que está atualmente no XML
|
|
current_basename = os.path.basename(src)
|
|
current_basename_lower = current_basename.lower()
|
|
|
|
# Verifica se esse nome está na lista de substituições
|
|
if current_basename_lower in replacements_lower:
|
|
new_filename = replacements_lower[current_basename_lower]
|
|
|
|
# Constrói o novo caminho completo
|
|
new_src = f"{XML_IMPORTED_PATH_PREFIX}/{new_filename}"
|
|
|
|
# Aplica a alteração
|
|
audio_node.set('src', new_src)
|
|
audio_node.set('vol', audio_node.get('vol', '1')) # Garante volume
|
|
|
|
print(f"[XML FIX] Substituído: {current_basename} -> {new_src}")
|
|
changes_made = True
|
|
|
|
if changes_made:
|
|
tree.write(filepath, encoding='UTF-8', xml_declaration=True)
|
|
print(f"Projeto {mmp_filename} salvo com novos caminhos.")
|
|
return True
|
|
else:
|
|
print("Aviso: Nenhum caminho foi alterado no XML (match não encontrado).")
|
|
return True # Retorna True para não travar o processo, mas avisa no log
|
|
|
|
except Exception as e:
|
|
print(f"Erro crítico ao editar XML: {e}")
|
|
return False
|
|
|
|
# --- ROTAS ---
|
|
|
|
@app.route("/api/download/<project_name>", methods=["GET"])
|
|
def download_project_package(project_name):
|
|
"""
|
|
Gera um ZIP contendo o arquivo .mmp e apenas os samples importados necessários.
|
|
Estrutura do ZIP:
|
|
- projeto.mmp
|
|
- src_mmpSearch/
|
|
- samples/
|
|
- imported/
|
|
- sample1.wav
|
|
- sample2.wav
|
|
"""
|
|
# Garante que estamos pegando o .mmp (mesmo que peçam sem extensão)
|
|
if not project_name.endswith('.mmp'):
|
|
project_name += '.mmp'
|
|
|
|
clean_name = secure_filename(project_name)
|
|
mmp_path = os.path.join(MMP_FOLDER, clean_name)
|
|
|
|
if not os.path.exists(mmp_path):
|
|
return jsonify({"error": "Projeto não encontrado"}), 404
|
|
|
|
# Prepara o buffer do ZIP na memória (não salva no disco para economizar I/O)
|
|
memory_file = io.BytesIO()
|
|
|
|
try:
|
|
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
|
|
|
|
# 1. Adiciona o arquivo .mmp na RAIZ do ZIP
|
|
# arcname é o nome que o arquivo terá dentro do zip
|
|
zf.write(mmp_path, arcname=clean_name)
|
|
|
|
# 2. Lê o XML para descobrir quais samples incluir
|
|
tree = ET.parse(mmp_path)
|
|
root = tree.getroot()
|
|
|
|
# Conjunto para evitar adicionar o mesmo sample 2x
|
|
samples_to_pack = set()
|
|
|
|
for audio_node in root.findall(".//audiofileprocessor"):
|
|
src = audio_node.get('src', '')
|
|
|
|
# Verifica se é um sample importado (nosso padrão)
|
|
# O src no XML já deve estar como: src_mmpSearch/samples/imported/nome.wav
|
|
if src.startswith(XML_IMPORTED_PATH_PREFIX):
|
|
# Extrai apenas o nome do arquivo (ex: kick.wav)
|
|
sample_filename = os.path.basename(src)
|
|
samples_to_pack.add(sample_filename)
|
|
|
|
# 3. Adiciona os samples encontrados ao ZIP mantendo a estrutura de pasta
|
|
for sample_name in samples_to_pack:
|
|
# Caminho físico no servidor (onde o arquivo realmente está)
|
|
physical_path = os.path.join(XML_IMPORTED_PATH_PREFIX, sample_name)
|
|
|
|
# Caminho DENTRO do ZIP (para o LMMS ler relativo ao mmp)
|
|
# Deve ser: src_mmpSearch/samples/imported/sample.wav
|
|
zip_internal_path = f"{XML_IMPORTED_PATH_PREFIX}/{sample_name}"
|
|
|
|
if os.path.exists(physical_path):
|
|
zf.write(physical_path, arcname=zip_internal_path)
|
|
print(f"[ZIP] Adicionado: {sample_name}")
|
|
else:
|
|
print(f"[ZIP] AVISO: Sample listado no XML mas não encontrado no disco: {sample_name}")
|
|
|
|
# Finaliza o ponteiro do arquivo
|
|
memory_file.seek(0)
|
|
|
|
return send_file(
|
|
memory_file,
|
|
mimetype='application/zip',
|
|
as_attachment=True,
|
|
download_name=f"{os.path.splitext(clean_name)[0]}_pack.zip"
|
|
)
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
return jsonify({"error": f"Erro ao gerar pacote: {str(e)}"}), 500
|
|
|
|
@app.route("/api/upload", methods=["POST"])
|
|
def upload_file():
|
|
if "project_file" not in request.files:
|
|
return jsonify({"error": "Nenhum arquivo enviado"}), 400
|
|
|
|
file = request.files["project_file"]
|
|
if file.filename == "":
|
|
return jsonify({"error": "Nome do arquivo vazio"}), 400
|
|
|
|
if file and allowed_file(file.filename):
|
|
# 1. Sanitização do nome
|
|
original_name = file.filename
|
|
name_without_ext = os.path.splitext(original_name)[0]
|
|
ext = os.path.splitext(original_name)[1].lower()
|
|
clean_name = slugify(name_without_ext)
|
|
if not clean_name:
|
|
clean_name = f"upload-{int(time.time())}"
|
|
|
|
# O nome final no sistema será sempre .mmp
|
|
final_mmp_filename = f"{clean_name}.mmp"
|
|
final_mmp_path = os.path.join(MMP_FOLDER, final_mmp_filename)
|
|
|
|
# Caminho temporário para o upload original
|
|
upload_filename = f"{clean_name}{ext}"
|
|
if ext == ".mmpz":
|
|
upload_path = os.path.join(MMPZ_FOLDER, upload_filename)
|
|
else:
|
|
upload_path = os.path.join(MMP_FOLDER, upload_filename)
|
|
|
|
try:
|
|
# Salva o arquivo original
|
|
file.save(upload_path)
|
|
print(f"Upload salvo em: {upload_path}")
|
|
|
|
# 2. SE FOR MMPZ, CONVERTE IMEDIATAMENTE PARA MMP
|
|
if ext == ".mmpz":
|
|
print("Detectado arquivo compactado. Iniciando extração...")
|
|
success = convert_mmpz_to_mmp(upload_path, final_mmp_path)
|
|
|
|
if not success:
|
|
# Se falhou zlib/gzip, tenta o fallback do sistema (lmms --dump) se disponível
|
|
# Mas por segurança, retornamos erro aqui para não travar
|
|
return jsonify({"error": "Falha crítica: O arquivo .mmpz não pode ser descompactado. Tente enviar o .mmp descomprimido."}), 400
|
|
|
|
# Opcional: Remover o .mmpz original já que já extraímos
|
|
# os.remove(upload_path)
|
|
|
|
# Se já for .mmp, apenas garante que está no lugar certo
|
|
elif ext == ".mmp" and upload_path != final_mmp_path:
|
|
shutil.move(upload_path, final_mmp_path)
|
|
|
|
# A partir daqui, TRABALHAMOS APENAS COM O ARQUIVO .MMP (final_mmp_filename)
|
|
|
|
# 3. Analisa dependências no arquivo .mmp limpo
|
|
is_clean, missing_files = analyze_mmp_dependencies(final_mmp_filename)
|
|
|
|
if not is_clean:
|
|
return jsonify({
|
|
"status": "missing_samples",
|
|
"message": "Samples externos detectados.",
|
|
"project_file": final_mmp_filename, # Retornamos o nome do MMP, pois é ele que vamos editar
|
|
"missing_files": missing_files
|
|
}), 202
|
|
|
|
# 4. Se limpo, processa
|
|
return process_and_build(final_mmp_filename)
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
traceback.print_exc()
|
|
return jsonify({"error": f"Erro interno: {str(e)}"}), 500
|
|
|
|
return jsonify({"error": "Tipo de arquivo não permitido"}), 400
|
|
|
|
@app.route("/api/upload/resolve", methods=["POST"])
|
|
def resolve_samples():
|
|
"""
|
|
Recebe inputs onde:
|
|
name="nome_original.wav" (chave) -> value=Arquivo (conteúdo)
|
|
"""
|
|
project_filename = request.form.get('project_file') # Nome do .mmp
|
|
|
|
if not project_filename:
|
|
return jsonify({"error": "Nome do projeto não informado"}), 400
|
|
|
|
# Cria pasta imported
|
|
os.makedirs(XML_IMPORTED_PATH_PREFIX, exist_ok=True)
|
|
try:
|
|
os.chmod(XML_IMPORTED_PATH_PREFIX, 0o775)
|
|
except: pass
|
|
|
|
# Dicionário de substituição: { 'Original.wav': 'Novo_Seguro.wav' }
|
|
replacements = {}
|
|
|
|
# request.files é um dicionário imutável, iteramos sobre ele
|
|
for original_name, file_storage in request.files.items():
|
|
if file_storage and allowed_sample(file_storage.filename):
|
|
|
|
# 1. Salva o arquivo fisicamente
|
|
# Usamos secure_filename no arquivo NOVO para evitar problemas no disco
|
|
safe_new_name = secure_filename(file_storage.filename)
|
|
|
|
# Se secure_filename deixou vazio (ex: "???"), gera um nome
|
|
if not safe_new_name:
|
|
safe_new_name = f"sample_{int(time.time())}.wav"
|
|
|
|
save_path = os.path.join(XML_IMPORTED_PATH_PREFIX, safe_new_name)
|
|
file_storage.save(save_path)
|
|
|
|
try:
|
|
os.chmod(save_path, 0o664)
|
|
except: pass
|
|
|
|
# 2. Mapeia para substituição no XML
|
|
# A chave 'original_name' vem do `name` do input HTML, que definimos como o nome original
|
|
replacements[original_name] = safe_new_name
|
|
|
|
if not replacements:
|
|
return jsonify({"error": "Nenhum arquivo válido enviado."}), 400
|
|
|
|
# 3. Atualiza o XML
|
|
if update_xml_paths_exact(project_filename, replacements):
|
|
# 4. Processa (Gera WAV na pasta correta e JSON)
|
|
return process_and_build(project_filename)
|
|
else:
|
|
return jsonify({"error": "Falha ao atualizar o arquivo de projeto."}), 500
|
|
|
|
|
|
def process_and_build(filename):
|
|
"""Encapsula a chamada do main.py"""
|
|
result = process_single_file(filename)
|
|
|
|
if result["success"]:
|
|
rebuild_indexes()
|
|
run_jekyll_build()
|
|
generate_manifests(SRC_MMPSEARCH)
|
|
return jsonify({
|
|
"message": "Sucesso! Projeto processado.",
|
|
"data": result["data"]
|
|
}), 200
|
|
else:
|
|
return jsonify({"error": f"Erro no processamento: {result['error']}"}), 500
|
|
|
|
# Rota avulsa de sample mantida igual...
|
|
@app.route('/api/upload/sample', methods=['POST'])
|
|
def upload_sample_standalone():
|
|
if 'sample_file' not in request.files: return jsonify({'error': 'Nenhum arquivo'}), 400
|
|
file = request.files['sample_file']
|
|
subfolder = request.form.get('subfolder', '').strip()
|
|
if file and allowed_sample(file.filename):
|
|
filename = secure_filename(file.filename)
|
|
safe_subfolder = subfolder.replace('..', '').strip('/')
|
|
target_dir = os.path.join(SAMPLE_SRC, safe_subfolder)
|
|
os.makedirs(target_dir, exist_ok=True)
|
|
file.save(os.path.join(target_dir, filename))
|
|
generate_manifests(SRC_MMPSEARCH)
|
|
run_jekyll_build()
|
|
return jsonify({'message': 'Sample enviado!'}), 200
|
|
return jsonify({'error': 'Erro no arquivo'}), 400
|
|
|
|
if __name__ == "__main__":
|
|
context = (CERT_PATH, KEY_PATH) if os.path.exists(CERT_PATH) else "adhoc"
|
|
app.run(host="0.0.0.0", port=33002, ssl_context=context, debug=True) |