mmpSearch/scripts/handler/upload_server.py

469 lines
18 KiB
Python
Executable File

import os
import subprocess
import xml.etree.ElementTree as ET
import gzip
import zipfile
import zlib
import json
import shutil
import time
import io
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
from werkzeug.utils import secure_filename
from main import process_single_file, rebuild_indexes, generate_manifests, slugify
from utils import ALLOWED_EXTENSIONS, ALLOWED_SAMPLE_EXTENSIONS, MMP_FOLDER, MMPZ_FOLDER, CERT_PATH, KEY_PATH, BASE_DATA, SRC_MMPSEARCH, SAMPLE_SRC, METADATA_FOLDER, XML_IMPORTED_PATH_PREFIX, SAMPLE_MANIFEST
app = Flask(__name__)
CORS(app)
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def allowed_sample(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_SAMPLE_EXTENSIONS
def run_jekyll_build():
print("Iniciando build do Jekyll...")
command = ["bundle", "exec", "jekyll", "build", "--destination", "/var/www/html/trens/mmpSearch/"]
try:
subprocess.run(command, check=True, cwd=BASE_DATA, capture_output=True, text=True)
print("Jekyll Build Sucesso!")
except subprocess.CalledProcessError as e:
print(f"ERRO no Jekyll Build: {e.stderr}")
def load_manifest_keys():
"""Carrega as pastas de fábrica (keys do manifesto)."""
if not os.path.exists(SAMPLE_MANIFEST):
return ["drums", "instruments", "effects", "presets", "samples"]
try:
with open(SAMPLE_MANIFEST, 'r', encoding='utf-8') as f:
data = json.load(f)
return list(data.keys())
except Exception:
return []
def convert_mmpz_to_mmp(mmpz_path, mmp_target_path):
"""
Tenta descompactar .mmpz usando métodos Python (rápido) e falha para o CLI do LMMS (robusto).
Salva o resultado em mmp_target_path.
"""
print(f"Tentando converter: {mmpz_path} -> {mmp_target_path}")
# --- TENTATIVA 1: Métodos Python (Memória/Rápido) ---
content = None
try:
with open(mmpz_path, "rb") as f:
content = f.read()
except Exception:
pass
if content:
# 1.1 Tenta ZIP
if zipfile.is_zipfile(mmpz_path):
try:
with zipfile.ZipFile(mmpz_path, 'r') as z:
with open(mmp_target_path, 'wb') as f_out:
f_out.write(z.read(z.namelist()[0]))
print("Sucesso: Descompactado via ZIP.")
return True
except: pass
# 1.2 Tenta GZIP
try:
decompressed = gzip.decompress(content)
with open(mmp_target_path, "wb") as f_out:
f_out.write(decompressed)
print("Sucesso: Descompactado via GZIP.")
return True
except: pass
# 1.3 Tenta ZLIB (Qt Default)
try:
decompressed = zlib.decompress(content)
with open(mmp_target_path, "wb") as f_out:
f_out.write(decompressed)
print("Sucesso: Descompactado via ZLIB.")
return True
except: pass
# --- TENTATIVA 2: Fallback para LMMS CLI (Lento/Robusto) ---
# Se o Python falhar, pedimos ao próprio LMMS para converter
print("Métodos Python falharam. Tentando fallback via LMMS CLI...")
cmd = ["lmms", "--dump", mmpz_path]
# Define ambiente sem interface gráfica para evitar erros no servidor
env_vars = os.environ.copy()
env_vars["QT_QPA_PLATFORM"] = "offscreen"
try:
# O LMMS joga o XML no stdout, então capturamos e salvamos no arquivo
with open(mmp_target_path, "w") as f_out:
subprocess.run(
cmd,
stdout=f_out,
stderr=subprocess.PIPE,
check=True,
env=env_vars
)
# Verifica se o arquivo foi criado e tem conteúdo
if os.path.exists(mmp_target_path) and os.path.getsize(mmp_target_path) > 0:
print("Sucesso: Descompactado via LMMS CLI (--dump).")
return True
else:
print("Erro: LMMS CLI rodou, mas arquivo de saída está vazio.")
return False
except subprocess.CalledProcessError as e:
print(f"Falha crítica no LMMS CLI: {e.stderr.decode('utf-8') if e.stderr else str(e)}")
return False
except Exception as e:
print(f"Erro inesperado ao chamar LMMS: {e}")
return False
def analyze_mmp_dependencies(mmp_filename):
"""
Analisa um arquivo .mmp (que já deve ser texto XML puro).
Retorna: (is_clean, missing_files_list)
"""
filepath = os.path.join(MMP_FOLDER, mmp_filename)
if not os.path.exists(filepath):
print(f"Erro: Arquivo não encontrado: {filepath}")
return False, []
try:
tree = ET.parse(filepath)
except ET.ParseError as e:
print(f"Erro Fatal: O arquivo {mmp_filename} não é um XML válido: {e}")
return False, []
root = tree.getroot()
factory_folders = load_manifest_keys()
missing_samples = set()
# Varre todos os audiofileprocessor
for audio_node in root.findall(".//audiofileprocessor"):
src = audio_node.get('src', '')
if not src:
continue
# 1. É arquivo de fábrica?
is_factory = any(src.startswith(folder + "/") for folder in factory_folders)
# 2. Já foi importado anteriormente?
is_already_imported = src.startswith("src_mmpSearch/samples/imported/")
# Se não é nativo e não está na pasta de importados, é externo
if not is_factory and not is_already_imported:
file_name = os.path.basename(src)
missing_samples.add(file_name)
return len(missing_samples) == 0, list(missing_samples)
def update_xml_paths_exact(mmp_filename, replacements):
"""
Substitui caminhos no XML baseando-se no mapeamento exato:
replacements = { 'nome_original_no_xml.wav': 'novo_nome_seguro.wav' }
"""
filepath = os.path.join(MMP_FOLDER, mmp_filename)
try:
tree = ET.parse(filepath)
root = tree.getroot()
changes_made = False
# Normaliza as chaves para minúsculo para garantir o match
# Ex: {'Kick.wav': 'kick.wav'} -> {'kick.wav': 'kick.wav'}
replacements_lower = {k.lower(): v for k, v in replacements.items()}
for audio_node in root.findall(".//audiofileprocessor"):
src = audio_node.get('src', '')
if not src: continue
# Pega o nome do arquivo que está atualmente no XML
current_basename = os.path.basename(src)
current_basename_lower = current_basename.lower()
# Verifica se esse nome está na lista de substituições
if current_basename_lower in replacements_lower:
new_filename = replacements_lower[current_basename_lower]
# Constrói o novo caminho completo
new_src = f"{XML_IMPORTED_PATH_PREFIX}/{new_filename}"
# Aplica a alteração
audio_node.set('src', new_src)
audio_node.set('vol', audio_node.get('vol', '1')) # Garante volume
print(f"[XML FIX] Substituído: {current_basename} -> {new_src}")
changes_made = True
if changes_made:
tree.write(filepath, encoding='UTF-8', xml_declaration=True)
print(f"Projeto {mmp_filename} salvo com novos caminhos.")
return True
else:
print("Aviso: Nenhum caminho foi alterado no XML (match não encontrado).")
return True # Retorna True para não travar o processo, mas avisa no log
except Exception as e:
print(f"Erro crítico ao editar XML: {e}")
return False
# --- ROTAS ---
@app.route("/api/download/<project_name>", methods=["GET"])
def download_project_package(project_name):
"""
Gera um ZIP onde:
1. O .mmp é modificado NA VOLÁTIL (sem salvar no disco) para ter caminhos curtos.
2. A estrutura do ZIP fica limpa:
- projeto.mmp
- imported/
- sample1.wav
- sample2.wav
"""
# Garante extensão .mmp
if not project_name.lower().endswith('.mmp'):
project_name += '.mmp'
clean_name = secure_filename(project_name)
mmp_path = os.path.join(MMP_FOLDER, clean_name)
if not os.path.exists(mmp_path):
return jsonify({"error": "Projeto não encontrado"}), 404
# Prepara o buffer do ZIP na memória
memory_file = io.BytesIO()
try:
# 1. Parseia o XML original do disco
tree = ET.parse(mmp_path)
root = tree.getroot()
# Conjunto para rastrear quais arquivos físicos precisamos colocar no ZIP
samples_to_pack = set()
# 2. Modifica o XML na MEMÓRIA (não afeta o arquivo original no servidor)
for audio_node in root.findall(".//audiofileprocessor"):
src = audio_node.get('src', '')
# Verifica se é um sample importado do nosso sistema
if src.startswith(XML_IMPORTED_PATH_PREFIX):
# Pega apenas o nome do arquivo (ex: 'kick_deep.wav')
filename = os.path.basename(src)
# Define o NOVO caminho curto relativo para dentro do ZIP
# Antes: src_mmpSearch/samples/imported/kick_deep.wav
# Agora: imported/kick_deep.wav
short_path = f"imported/{filename}"
# Atualiza o XML na memória
audio_node.set('src', short_path)
# Adiciona à lista de arquivos para copiar
samples_to_pack.add(filename)
# 3. Cria o ZIP
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
# A) Escreve o .mmp MODIFICADO no ZIP
# Convertemos a árvore XML modificada para string binária
xml_str = ET.tostring(root, encoding='utf-8', method='xml')
zf.writestr(clean_name, xml_str)
# B) Adiciona os samples físicos na pasta 'imported/' do ZIP
for sample_name in samples_to_pack:
physical_path = os.path.join(XML_IMPORTED_PATH_PREFIX, sample_name)
# Caminho curto dentro do ZIP
zip_entry_name = f"imported/{sample_name}"
if os.path.exists(physical_path):
zf.write(physical_path, arcname=zip_entry_name)
print(f"[ZIP CLEAN] Adicionado: {zip_entry_name}")
else:
print(f"[ZIP CLEAN] AVISO: Arquivo faltante no disco: {sample_name}")
# Finaliza e envia
memory_file.seek(0)
return send_file(
memory_file,
mimetype='application/zip',
as_attachment=True,
download_name=f"{os.path.splitext(clean_name)[0]}.zip"
)
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"error": f"Erro ao gerar pacote: {str(e)}"}), 500
@app.route("/api/upload", methods=["POST"])
def upload_file():
if "project_file" not in request.files:
return jsonify({"error": "Nenhum arquivo enviado"}), 400
file = request.files["project_file"]
if file.filename == "":
return jsonify({"error": "Nome do arquivo vazio"}), 400
if file and allowed_file(file.filename):
# 1. Sanitização do nome
original_name = file.filename
name_without_ext = os.path.splitext(original_name)[0]
ext = os.path.splitext(original_name)[1].lower()
clean_name = slugify(name_without_ext)
if not clean_name:
clean_name = f"upload-{int(time.time())}"
# O nome final no sistema será sempre .mmp
final_mmp_filename = f"{clean_name}.mmp"
final_mmp_path = os.path.join(MMP_FOLDER, final_mmp_filename)
# Caminho temporário para o upload original
upload_filename = f"{clean_name}{ext}"
if ext == ".mmpz":
upload_path = os.path.join(MMPZ_FOLDER, upload_filename)
else:
upload_path = os.path.join(MMP_FOLDER, upload_filename)
try:
# Salva o arquivo original
file.save(upload_path)
print(f"Upload salvo em: {upload_path}")
# 2. SE FOR MMPZ, CONVERTE IMEDIATAMENTE PARA MMP
if ext == ".mmpz":
print("Detectado arquivo compactado. Iniciando extração...")
success = convert_mmpz_to_mmp(upload_path, final_mmp_path)
if not success:
# Se falhou zlib/gzip, tenta o fallback do sistema (lmms --dump) se disponível
# Mas por segurança, retornamos erro aqui para não travar
return jsonify({"error": "Falha crítica: O arquivo .mmpz não pode ser descompactado. Tente enviar o .mmp descomprimido."}), 400
# Opcional: Remover o .mmpz original já que já extraímos
# os.remove(upload_path)
# Se já for .mmp, apenas garante que está no lugar certo
elif ext == ".mmp" and upload_path != final_mmp_path:
shutil.move(upload_path, final_mmp_path)
# A partir daqui, TRABALHAMOS APENAS COM O ARQUIVO .MMP (final_mmp_filename)
# 3. Analisa dependências no arquivo .mmp limpo
is_clean, missing_files = analyze_mmp_dependencies(final_mmp_filename)
if not is_clean:
return jsonify({
"status": "missing_samples",
"message": "Samples externos detectados.",
"project_file": final_mmp_filename, # Retornamos o nome do MMP, pois é ele que vamos editar
"missing_files": missing_files
}), 202
# 4. Se limpo, processa
return process_and_build(final_mmp_filename)
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"error": f"Erro interno: {str(e)}"}), 500
return jsonify({"error": "Tipo de arquivo não permitido"}), 400
@app.route("/api/upload/resolve", methods=["POST"])
def resolve_samples():
"""
Recebe inputs onde:
name="nome_original.wav" (chave) -> value=Arquivo (conteúdo)
"""
project_filename = request.form.get('project_file') # Nome do .mmp
if not project_filename:
return jsonify({"error": "Nome do projeto não informado"}), 400
# Cria pasta imported
os.makedirs(XML_IMPORTED_PATH_PREFIX, exist_ok=True)
try:
os.chmod(XML_IMPORTED_PATH_PREFIX, 0o775)
except: pass
# Dicionário de substituição: { 'Original.wav': 'Novo_Seguro.wav' }
replacements = {}
# request.files é um dicionário imutável, iteramos sobre ele
for original_name, file_storage in request.files.items():
if file_storage and allowed_sample(file_storage.filename):
# 1. Salva o arquivo fisicamente
# Usamos secure_filename no arquivo NOVO para evitar problemas no disco
safe_new_name = secure_filename(file_storage.filename)
# Se secure_filename deixou vazio (ex: "???"), gera um nome
if not safe_new_name:
safe_new_name = f"sample_{int(time.time())}.wav"
save_path = os.path.join(XML_IMPORTED_PATH_PREFIX, safe_new_name)
file_storage.save(save_path)
try:
os.chmod(save_path, 0o664)
except: pass
# 2. Mapeia para substituição no XML
# A chave 'original_name' vem do `name` do input HTML, que definimos como o nome original
replacements[original_name] = safe_new_name
if not replacements:
return jsonify({"error": "Nenhum arquivo válido enviado."}), 400
# 3. Atualiza o XML
if update_xml_paths_exact(project_filename, replacements):
# 4. Processa (Gera WAV na pasta correta e JSON)
return process_and_build(project_filename)
else:
return jsonify({"error": "Falha ao atualizar o arquivo de projeto."}), 500
def process_and_build(filename):
"""Encapsula a chamada do main.py"""
result = process_single_file(filename)
if result["success"]:
rebuild_indexes()
run_jekyll_build()
generate_manifests(SRC_MMPSEARCH)
return jsonify({
"message": "Sucesso! Projeto processado.",
"data": result["data"]
}), 200
else:
return jsonify({"error": f"Erro no processamento: {result['error']}"}), 500
# Rota avulsa de sample mantida igual...
@app.route('/api/upload/sample', methods=['POST'])
def upload_sample_standalone():
if 'sample_file' not in request.files: return jsonify({'error': 'Nenhum arquivo'}), 400
file = request.files['sample_file']
subfolder = request.form.get('subfolder', '').strip()
if file and allowed_sample(file.filename):
filename = secure_filename(file.filename)
safe_subfolder = subfolder.replace('..', '').strip('/')
target_dir = os.path.join(SAMPLE_SRC, safe_subfolder)
os.makedirs(target_dir, exist_ok=True)
file.save(os.path.join(target_dir, filename))
generate_manifests(SRC_MMPSEARCH)
run_jekyll_build()
return jsonify({'message': 'Sample enviado!'}), 200
return jsonify({'error': 'Erro no arquivo'}), 400
if __name__ == "__main__":
context = (CERT_PATH, KEY_PATH) if os.path.exists(CERT_PATH) else "adhoc"
app.run(host="0.0.0.0", port=33002, ssl_context=context, debug=True)