mmpSearch/scripts/handler/upload_server.py

423 lines
16 KiB
Python
Executable File

import os
import subprocess
import xml.etree.ElementTree as ET
import gzip
import zipfile
import zlib
import json
import shutil
import time
import io
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
from werkzeug.utils import secure_filename
# --- NOVAS IMPORTAÇÕES DE AUTH ---
from flask_sqlalchemy import SQLAlchemy
from flask_bcrypt import Bcrypt
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from main import process_single_file, rebuild_indexes, generate_manifests, slugify
from utils import ALLOWED_EXTENSIONS, ALLOWED_SAMPLE_EXTENSIONS, MMP_FOLDER, MMPZ_FOLDER, CERT_PATH, KEY_PATH, BASE_DATA, SRC_MMPSEARCH, SAMPLE_SRC, METADATA_FOLDER, XML_IMPORTED_PATH_PREFIX, SAMPLE_MANIFEST
app = Flask(__name__)
# --- CONFIGURAÇÃO DE SEGURANÇA E BANCO ---
# IMPORTANTE: Troque esta chave em produção!
app.config['SECRET_KEY'] = 'chave_secreta_super_segura_mmp_ecosystem_2025'
# O banco ficará salvo em /nethome/jotachina/projetos/mmpSearch/users.db (BASE_DATA)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(BASE_DATA, 'users.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# CORS precisa suportar credenciais para o cookie de login funcionar
CORS(app, supports_credentials=True)
db = SQLAlchemy(app)
bcrypt = Bcrypt(app)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
# --- MODELO DE USUÁRIO ---
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
password = db.Column(db.String(150), nullable=False)
# Cria o banco na inicialização se não existir
with app.app_context():
db.create_all()
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# --- SUAS FUNÇÕES UTILITÁRIAS MANTIDAS ---
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def allowed_sample(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_SAMPLE_EXTENSIONS
def run_jekyll_build():
print("Iniciando build do Jekyll...")
command = ["bundle", "exec", "jekyll", "build", "--destination", "/var/www/html/trens/mmpSearch/"]
try:
subprocess.run(command, check=True, cwd=BASE_DATA, capture_output=True, text=True)
print("Jekyll Build Sucesso!")
except subprocess.CalledProcessError as e:
print(f"ERRO no Jekyll Build: {e.stderr}")
def load_manifest_keys():
"""Carrega as pastas de fábrica (keys do manifesto)."""
if not os.path.exists(SAMPLE_MANIFEST):
return ["drums", "instruments", "effects", "presets", "samples"]
try:
with open(SAMPLE_MANIFEST, 'r', encoding='utf-8') as f:
data = json.load(f)
return list(data.keys())
except Exception:
return []
def convert_mmpz_to_mmp(mmpz_path, mmp_target_path):
"""Tenta descompactar .mmpz usando Python ou LMMS CLI."""
print(f"Tentando converter: {mmpz_path} -> {mmp_target_path}")
content = None
try:
with open(mmpz_path, "rb") as f:
content = f.read()
except Exception:
pass
if content:
# 1.1 Tenta ZIP
if zipfile.is_zipfile(mmpz_path):
try:
with zipfile.ZipFile(mmpz_path, 'r') as z:
with open(mmp_target_path, 'wb') as f_out:
f_out.write(z.read(z.namelist()[0]))
print("Sucesso: Descompactado via ZIP.")
return True
except: pass
# 1.2 Tenta GZIP
try:
decompressed = gzip.decompress(content)
with open(mmp_target_path, "wb") as f_out:
f_out.write(decompressed)
print("Sucesso: Descompactado via GZIP.")
return True
except: pass
# 1.3 Tenta ZLIB
try:
decompressed = zlib.decompress(content)
with open(mmp_target_path, "wb") as f_out:
f_out.write(decompressed)
print("Sucesso: Descompactado via ZLIB.")
return True
except: pass
# --- Fallback para LMMS CLI ---
print("Métodos Python falharam. Tentando fallback via LMMS CLI...")
cmd = ["lmms", "--dump", mmpz_path]
env_vars = os.environ.copy()
env_vars["QT_QPA_PLATFORM"] = "offscreen"
try:
with open(mmp_target_path, "w") as f_out:
subprocess.run(cmd, stdout=f_out, stderr=subprocess.PIPE, check=True, env=env_vars)
if os.path.exists(mmp_target_path) and os.path.getsize(mmp_target_path) > 0:
print("Sucesso: Descompactado via LMMS CLI (--dump).")
return True
else:
print("Erro: LMMS CLI rodou, mas arquivo de saída está vazio.")
return False
except subprocess.CalledProcessError as e:
print(f"Falha crítica no LMMS CLI: {e.stderr.decode('utf-8') if e.stderr else str(e)}")
return False
except Exception as e:
print(f"Erro inesperado ao chamar LMMS: {e}")
return False
def analyze_mmp_dependencies(mmp_filename):
"""Analisa um arquivo .mmp XML puro."""
filepath = os.path.join(MMP_FOLDER, mmp_filename)
if not os.path.exists(filepath):
print(f"Erro: Arquivo não encontrado: {filepath}")
return False, []
try:
tree = ET.parse(filepath)
except ET.ParseError as e:
print(f"Erro Fatal: O arquivo {mmp_filename} não é um XML válido: {e}")
return False, []
root = tree.getroot()
factory_folders = load_manifest_keys()
missing_samples = set()
for audio_node in root.findall(".//audiofileprocessor"):
src = audio_node.get('src', '')
if not src: continue
is_factory = any(src.startswith(folder + "/") for folder in factory_folders)
is_already_imported = src.startswith("src_mmpSearch/samples/imported/")
if not is_factory and not is_already_imported:
file_name = os.path.basename(src)
missing_samples.add(file_name)
return len(missing_samples) == 0, list(missing_samples)
def update_xml_paths_exact(mmp_filename, replacements):
"""Substitui caminhos no XML baseando-se no mapeamento exato."""
filepath = os.path.join(MMP_FOLDER, mmp_filename)
try:
tree = ET.parse(filepath)
root = tree.getroot()
changes_made = False
replacements_lower = {k.lower(): v for k, v in replacements.items()}
for audio_node in root.findall(".//audiofileprocessor"):
src = audio_node.get('src', '')
if not src: continue
current_basename = os.path.basename(src)
current_basename_lower = current_basename.lower()
if current_basename_lower in replacements_lower:
new_filename = replacements_lower[current_basename_lower]
new_src = f"{XML_IMPORTED_PATH_PREFIX}/{new_filename}"
audio_node.set('src', new_src)
audio_node.set('vol', audio_node.get('vol', '1'))
print(f"[XML FIX] Substituído: {current_basename} -> {new_src}")
changes_made = True
if changes_made:
tree.write(filepath, encoding='UTF-8', xml_declaration=True)
return True
return True
except Exception as e:
print(f"Erro crítico ao editar XML: {e}")
return False
def process_and_build(filename):
"""Encapsula a chamada do main.py"""
result = process_single_file(filename)
if result["success"]:
rebuild_indexes()
run_jekyll_build()
generate_manifests(SRC_MMPSEARCH)
return jsonify({
"message": "Sucesso! Projeto processado.",
"data": result["data"]
}), 200
else:
return jsonify({"error": f"Erro no processamento: {result['error']}"}), 500
# ==============================================================================
# ROTAS DE AUTENTICAÇÃO
# ==============================================================================
@app.route('/api/register', methods=['POST'])
def register():
data = request.json
if not data or not data.get('username') or not data.get('password'):
return jsonify({"message": "Dados incompletos"}), 400
if User.query.filter_by(username=data['username']).first():
return jsonify({"message": "Usuário já existe"}), 400
hashed_password = bcrypt.generate_password_hash(data['password']).decode('utf-8')
new_user = User(username=data['username'], password=hashed_password)
try:
db.session.add(new_user)
db.session.commit()
return jsonify({"message": "Usuário criado com sucesso!"}), 201
except Exception as e:
return jsonify({"message": f"Erro: {str(e)}"}), 500
@app.route('/api/login', methods=['POST'])
def login():
data = request.json
user = User.query.filter_by(username=data['username']).first()
if user and bcrypt.check_password_hash(user.password, data['password']):
login_user(user)
return jsonify({"message": "Login realizado", "user": user.username}), 200
return jsonify({"message": "Credenciais inválidas"}), 401
@app.route('/api/logout', methods=['POST'])
@login_required
def logout():
logout_user()
return jsonify({"message": "Logout realizado"}), 200
@app.route('/api/check_auth', methods=['GET'])
def check_auth():
if current_user.is_authenticated:
return jsonify({"logged_in": True, "user": current_user.username})
return jsonify({"logged_in": False})
# ==============================================================================
# ROTAS PRINCIPAIS
# ==============================================================================
@app.route("/api/download/<project_name>", methods=["GET"])
def download_project_package(project_name):
"""Gera um ZIP com caminhos limpos (Não exige login para baixar)."""
if not project_name.lower().endswith('.mmp'):
project_name += '.mmp'
clean_name = secure_filename(project_name)
mmp_path = os.path.join(MMP_FOLDER, clean_name)
if not os.path.exists(mmp_path):
return jsonify({"error": "Projeto não encontrado"}), 404
memory_file = io.BytesIO()
try:
tree = ET.parse(mmp_path)
root = tree.getroot()
samples_to_pack = set()
for audio_node in root.findall(".//audiofileprocessor"):
src = audio_node.get('src', '')
if src.startswith(XML_IMPORTED_PATH_PREFIX):
filename = os.path.basename(src)
short_path = f"imported/{filename}"
audio_node.set('src', short_path)
samples_to_pack.add(filename)
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
xml_str = ET.tostring(root, encoding='utf-8', method='xml')
zf.writestr(clean_name, xml_str)
for sample_name in samples_to_pack:
physical_path = os.path.join(XML_IMPORTED_PATH_PREFIX, sample_name)
zip_entry_name = f"imported/{sample_name}"
if os.path.exists(physical_path):
zf.write(physical_path, arcname=zip_entry_name)
memory_file.seek(0)
return send_file(
memory_file,
mimetype='application/zip',
as_attachment=True,
download_name=f"{os.path.splitext(clean_name)[0]}.zip"
)
except Exception as e:
return jsonify({"error": f"Erro ao gerar pacote: {str(e)}"}), 500
@app.route("/api/upload", methods=["POST"])
@login_required # <--- PROTEGIDO
def upload_file():
if "project_file" not in request.files:
return jsonify({"error": "Nenhum arquivo enviado"}), 400
file = request.files["project_file"]
if file.filename == "":
return jsonify({"error": "Nome do arquivo vazio"}), 400
if file and allowed_file(file.filename):
original_name = file.filename
name_without_ext = os.path.splitext(original_name)[0]
ext = os.path.splitext(original_name)[1].lower()
clean_name = slugify(name_without_ext)
if not clean_name:
clean_name = f"upload-{int(time.time())}"
final_mmp_filename = f"{clean_name}.mmp"
final_mmp_path = os.path.join(MMP_FOLDER, final_mmp_filename)
upload_filename = f"{clean_name}{ext}"
if ext == ".mmpz":
upload_path = os.path.join(MMPZ_FOLDER, upload_filename)
else:
upload_path = os.path.join(MMP_FOLDER, upload_filename)
try:
file.save(upload_path)
print(f"Upload salvo em: {upload_path} (User: {current_user.username})")
if ext == ".mmpz":
success = convert_mmpz_to_mmp(upload_path, final_mmp_path)
if not success:
return jsonify({"error": "Falha crítica na descompactação."}), 400
elif ext == ".mmp" and upload_path != final_mmp_path:
shutil.move(upload_path, final_mmp_path)
is_clean, missing_files = analyze_mmp_dependencies(final_mmp_filename)
if not is_clean:
return jsonify({
"status": "missing_samples",
"message": "Samples externos detectados.",
"project_file": final_mmp_filename,
"missing_files": missing_files
}), 202
return process_and_build(final_mmp_filename)
except Exception as e:
return jsonify({"error": f"Erro interno: {str(e)}"}), 500
return jsonify({"error": "Tipo de arquivo não permitido"}), 400
@app.route("/api/upload/resolve", methods=["POST"])
@login_required # <--- PROTEGIDO
def resolve_samples():
project_filename = request.form.get('project_file')
if not project_filename:
return jsonify({"error": "Nome do projeto não informado"}), 400
os.makedirs(XML_IMPORTED_PATH_PREFIX, exist_ok=True)
replacements = {}
for original_name, file_storage in request.files.items():
if file_storage and allowed_sample(file_storage.filename):
safe_new_name = secure_filename(file_storage.filename)
if not safe_new_name: safe_new_name = f"sample_{int(time.time())}.wav"
save_path = os.path.join(XML_IMPORTED_PATH_PREFIX, safe_new_name)
file_storage.save(save_path)
replacements[original_name] = safe_new_name
if not replacements:
return jsonify({"error": "Nenhum arquivo válido enviado."}), 400
if update_xml_paths_exact(project_filename, replacements):
return process_and_build(project_filename)
else:
return jsonify({"error": "Falha ao atualizar o arquivo de projeto."}), 500
@app.route('/api/upload/sample', methods=['POST'])
@login_required # <--- PROTEGIDO
def upload_sample_standalone():
if 'sample_file' not in request.files: return jsonify({'error': 'Nenhum arquivo'}), 400
file = request.files['sample_file']
subfolder = request.form.get('subfolder', '').strip()
if file and allowed_sample(file.filename):
filename = secure_filename(file.filename)
safe_subfolder = subfolder.replace('..', '').strip('/')
target_dir = os.path.join(SAMPLE_SRC, safe_subfolder)
os.makedirs(target_dir, exist_ok=True)
file.save(os.path.join(target_dir, filename))
generate_manifests(SRC_MMPSEARCH)
run_jekyll_build()
return jsonify({'message': 'Sample enviado!'}), 200
return jsonify({'error': 'Erro no arquivo'}), 400
if __name__ == "__main__":
# Se estiver rodando atrás do Apache, NÃO use ssl_context.
# O Apache já cuida do SSL (https://alice.ufsj.edu.br).
# O Flask deve rodar em HTTP puro localmente.
print("Iniciando servidor na porta 33002 (HTTP Mode)...")
app.run(host="0.0.0.0", port=33002, debug=True)
# Removi o ssl_context=context para facilitar o proxy reverso