mmpSearch/scripts/handler/upload_server.py

364 lines
13 KiB
Python
Executable File

import os
import subprocess
import xml.etree.ElementTree as ET
import gzip
import zipfile
import zlib # <--- IMPORTANTE: Adicionado para suporte a mmpz raw
import json
import shutil
import time
from flask import Flask, request, jsonify
from flask_cors import CORS
from werkzeug.utils import secure_filename
# Importa suas funções e configurações existentes
from main import process_single_file, rebuild_indexes, generate_manifests, slugify
from utils import MMP_FOLDER, MMPZ_FOLDER, CERT_PATH, KEY_PATH, BASE_DATA, SRC_MMPSEARCH, SAMPLE_SRC, METADATA_FOLDER
app = Flask(__name__)
CORS(app)
ALLOWED_EXTENSIONS = {"mmp", "mmpz"}
ALLOWED_SAMPLE_EXTENSIONS = {'wav', 'mp3', 'ogg', 'flac', 'ds'}
# --- CONFIGURAÇÕES ---
MANIFEST_PATH = os.path.join(METADATA_FOLDER, "samples-manifest.json")
XML_IMPORTED_PATH_PREFIX = "src_mmpSearch/samples/imported"
PHYSICAL_IMPORTED_FOLDER = os.path.join(SAMPLE_SRC, "imported")
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def allowed_sample(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_SAMPLE_EXTENSIONS
def run_jekyll_build():
print("Iniciando build do Jekyll...")
command = ["bundle", "exec", "jekyll", "build", "--destination", "/var/www/html/trens/mmpSearch/"]
try:
subprocess.run(command, check=True, cwd=BASE_DATA, capture_output=True, text=True)
print("Jekyll Build Sucesso!")
except subprocess.CalledProcessError as e:
print(f"ERRO no Jekyll Build: {e.stderr}")
def load_manifest_keys():
"""Carrega as pastas de fábrica (keys do manifesto)."""
if not os.path.exists(MANIFEST_PATH):
return ["drums", "instruments", "effects", "presets", "samples"]
try:
with open(MANIFEST_PATH, 'r', encoding='utf-8') as f:
data = json.load(f)
return list(data.keys())
except Exception:
return []
def convert_mmpz_to_mmp(mmpz_path, mmp_target_path):
"""
Tenta descompactar .mmpz usando métodos Python (rápido) e falha para o CLI do LMMS (robusto).
Salva o resultado em mmp_target_path.
"""
print(f"Tentando converter: {mmpz_path} -> {mmp_target_path}")
# --- TENTATIVA 1: Métodos Python (Memória/Rápido) ---
content = None
try:
with open(mmpz_path, "rb") as f:
content = f.read()
except Exception:
pass
if content:
# 1.1 Tenta ZIP
if zipfile.is_zipfile(mmpz_path):
try:
with zipfile.ZipFile(mmpz_path, 'r') as z:
with open(mmp_target_path, 'wb') as f_out:
f_out.write(z.read(z.namelist()[0]))
print("Sucesso: Descompactado via ZIP.")
return True
except: pass
# 1.2 Tenta GZIP
try:
decompressed = gzip.decompress(content)
with open(mmp_target_path, "wb") as f_out:
f_out.write(decompressed)
print("Sucesso: Descompactado via GZIP.")
return True
except: pass
# 1.3 Tenta ZLIB (Qt Default)
try:
decompressed = zlib.decompress(content)
with open(mmp_target_path, "wb") as f_out:
f_out.write(decompressed)
print("Sucesso: Descompactado via ZLIB.")
return True
except: pass
# --- TENTATIVA 2: Fallback para LMMS CLI (Lento/Robusto) ---
# Se o Python falhar, pedimos ao próprio LMMS para converter
print("Métodos Python falharam. Tentando fallback via LMMS CLI...")
cmd = ["lmms", "--dump", mmpz_path]
# Define ambiente sem interface gráfica para evitar erros no servidor
env_vars = os.environ.copy()
env_vars["QT_QPA_PLATFORM"] = "offscreen"
try:
# O LMMS joga o XML no stdout, então capturamos e salvamos no arquivo
with open(mmp_target_path, "w") as f_out:
subprocess.run(
cmd,
stdout=f_out,
stderr=subprocess.PIPE,
check=True,
env=env_vars
)
# Verifica se o arquivo foi criado e tem conteúdo
if os.path.exists(mmp_target_path) and os.path.getsize(mmp_target_path) > 0:
print("Sucesso: Descompactado via LMMS CLI (--dump).")
return True
else:
print("Erro: LMMS CLI rodou, mas arquivo de saída está vazio.")
return False
except subprocess.CalledProcessError as e:
print(f"Falha crítica no LMMS CLI: {e.stderr.decode('utf-8') if e.stderr else str(e)}")
return False
except Exception as e:
print(f"Erro inesperado ao chamar LMMS: {e}")
return False
def analyze_mmp_dependencies(mmp_filename):
"""
Analisa um arquivo .mmp (que já deve ser texto XML puro).
Retorna: (is_clean, missing_files_list)
"""
filepath = os.path.join(MMP_FOLDER, mmp_filename)
if not os.path.exists(filepath):
print(f"Erro: Arquivo não encontrado: {filepath}")
return False, []
try:
tree = ET.parse(filepath)
except ET.ParseError as e:
print(f"Erro Fatal: O arquivo {mmp_filename} não é um XML válido: {e}")
return False, []
root = tree.getroot()
factory_folders = load_manifest_keys()
missing_samples = set()
# Varre todos os audiofileprocessor
for audio_node in root.findall(".//audiofileprocessor"):
src = audio_node.get('src', '')
if not src:
continue
# 1. É arquivo de fábrica?
is_factory = any(src.startswith(folder + "/") for folder in factory_folders)
# 2. Já foi importado anteriormente?
is_already_imported = src.startswith("src_mmpSearch/samples/imported/")
# Se não é nativo e não está na pasta de importados, é externo
if not is_factory and not is_already_imported:
file_name = os.path.basename(src)
missing_samples.add(file_name)
return len(missing_samples) == 0, list(missing_samples)
def update_xml_paths(mmp_filename, file_map):
"""
Atualiza os caminhos no XML para apontar para a pasta imported.
"""
filepath = os.path.join(MMP_FOLDER, mmp_filename)
try:
tree = ET.parse(filepath)
root = tree.getroot()
factory_folders = load_manifest_keys()
changes_made = False
for audio_node in root.findall(".//audiofileprocessor"):
src = audio_node.get('src', '')
if not src: continue
is_factory = any(src.startswith(folder + "/") for folder in factory_folders)
is_already_imported = src.startswith(XML_IMPORTED_PATH_PREFIX)
if not is_factory and not is_already_imported:
base_name = os.path.basename(src)
# Verifica se temos esse arquivo mapeado (upload do usuário)
if base_name in file_map:
safe_name = file_map[base_name]
new_src = f"{XML_IMPORTED_PATH_PREFIX}/{safe_name}"
audio_node.set('src', new_src)
audio_node.set('vol', audio_node.get('vol', '1'))
changes_made = True
print(f"Path corrigido: {base_name} -> {new_src}")
if changes_made:
tree.write(filepath, encoding='UTF-8', xml_declaration=True)
print(f"Projeto {mmp_filename} atualizado com novos caminhos.")
return True
except Exception as e:
print(f"Erro ao atualizar XML: {e}")
return False
# --- ROTAS ---
@app.route("/api/upload", methods=["POST"])
def upload_file():
if "project_file" not in request.files:
return jsonify({"error": "Nenhum arquivo enviado"}), 400
file = request.files["project_file"]
if file.filename == "":
return jsonify({"error": "Nome do arquivo vazio"}), 400
if file and allowed_file(file.filename):
# 1. Sanitização do nome
original_name = file.filename
name_without_ext = os.path.splitext(original_name)[0]
ext = os.path.splitext(original_name)[1].lower()
clean_name = slugify(name_without_ext)
if not clean_name:
clean_name = f"upload-{int(time.time())}"
# O nome final no sistema será sempre .mmp
final_mmp_filename = f"{clean_name}.mmp"
final_mmp_path = os.path.join(MMP_FOLDER, final_mmp_filename)
# Caminho temporário para o upload original
upload_filename = f"{clean_name}{ext}"
if ext == ".mmpz":
upload_path = os.path.join(MMPZ_FOLDER, upload_filename)
else:
upload_path = os.path.join(MMP_FOLDER, upload_filename)
try:
# Salva o arquivo original
file.save(upload_path)
print(f"Upload salvo em: {upload_path}")
# 2. SE FOR MMPZ, CONVERTE IMEDIATAMENTE PARA MMP
if ext == ".mmpz":
print("Detectado arquivo compactado. Iniciando extração...")
success = convert_mmpz_to_mmp(upload_path, final_mmp_path)
if not success:
# Se falhou zlib/gzip, tenta o fallback do sistema (lmms --dump) se disponível
# Mas por segurança, retornamos erro aqui para não travar
return jsonify({"error": "Falha crítica: O arquivo .mmpz não pode ser descompactado. Tente enviar o .mmp descomprimido."}), 400
# Opcional: Remover o .mmpz original já que já extraímos
# os.remove(upload_path)
# Se já for .mmp, apenas garante que está no lugar certo
elif ext == ".mmp" and upload_path != final_mmp_path:
shutil.move(upload_path, final_mmp_path)
# A partir daqui, TRABALHAMOS APENAS COM O ARQUIVO .MMP (final_mmp_filename)
# 3. Analisa dependências no arquivo .mmp limpo
is_clean, missing_files = analyze_mmp_dependencies(final_mmp_filename)
if not is_clean:
return jsonify({
"status": "missing_samples",
"message": "Samples externos detectados.",
"project_file": final_mmp_filename, # Retornamos o nome do MMP, pois é ele que vamos editar
"missing_files": missing_files
}), 202
# 4. Se limpo, processa
return process_and_build(final_mmp_filename)
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({"error": f"Erro interno: {str(e)}"}), 500
return jsonify({"error": "Tipo de arquivo não permitido"}), 400
@app.route("/api/upload/resolve", methods=["POST"])
def resolve_samples():
"""
Recebe o nome do arquivo .mmp (já existente no servidor) e os samples.
"""
project_filename = request.form.get('project_file') # Espera o nome .mmp
uploaded_files = request.files.getlist("sample_files")
if not project_filename or not uploaded_files:
return jsonify({"error": "Dados incompletos"}), 400
# Garante pasta
os.makedirs(PHYSICAL_IMPORTED_FOLDER, exist_ok=True)
os.chmod(PHYSICAL_IMPORTED_FOLDER, 0o775)
file_map = {}
# 1. Salva os samples
for file in uploaded_files:
if file and allowed_sample(file.filename):
clean_sample_name = secure_filename(file.filename)
save_path = os.path.join(PHYSICAL_IMPORTED_FOLDER, clean_sample_name)
file.save(save_path)
os.chmod(save_path, 0o664)
# Mapeia nome original -> nome salvo
file_map[file.filename] = clean_sample_name
file_map[clean_sample_name] = clean_sample_name # Fallback
# 2. Atualiza o XML (.mmp)
if update_xml_paths(project_filename, file_map):
# 3. Processa e Builda
return process_and_build(project_filename)
else:
return jsonify({"error": "Falha ao atualizar referências no projeto."}), 500
def process_and_build(filename):
"""Encapsula a chamada do main.py"""
result = process_single_file(filename)
if result["success"]:
rebuild_indexes()
run_jekyll_build()
generate_manifests(SRC_MMPSEARCH)
return jsonify({
"message": "Sucesso! Projeto processado.",
"data": result["data"]
}), 200
else:
return jsonify({"error": f"Erro no processamento: {result['error']}"}), 500
# Rota avulsa de sample mantida igual...
@app.route('/api/upload/sample', methods=['POST'])
def upload_sample_standalone():
if 'sample_file' not in request.files: return jsonify({'error': 'Nenhum arquivo'}), 400
file = request.files['sample_file']
subfolder = request.form.get('subfolder', '').strip()
if file and allowed_sample(file.filename):
filename = secure_filename(file.filename)
safe_subfolder = subfolder.replace('..', '').strip('/')
target_dir = os.path.join(SAMPLE_SRC, safe_subfolder)
os.makedirs(target_dir, exist_ok=True)
file.save(os.path.join(target_dir, filename))
generate_manifests(SRC_MMPSEARCH)
run_jekyll_build()
return jsonify({'message': 'Sample enviado!'}), 200
return jsonify({'error': 'Erro no arquivo'}), 400
if __name__ == "__main__":
context = (CERT_PATH, KEY_PATH) if os.path.exists(CERT_PATH) else "adhoc"
app.run(host="0.0.0.0", port=33002, ssl_context=context, debug=True)