mmpSearch/scripts/handler/upload_server.py

575 lines
22 KiB
Python
Executable File

import os
import subprocess
import xml.etree.ElementTree as ET
import gzip
import zipfile
import zlib
import json
import shutil
import time
import io
import threading
from flask import Flask, request, jsonify, send_file, redirect
from flask_cors import CORS
from werkzeug.utils import secure_filename
from datetime import datetime
from flask_sqlalchemy import SQLAlchemy
from flask_bcrypt import Bcrypt
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from flask_admin import Admin, AdminIndexView, expose
from flask_admin.contrib.sqla import ModelView
from main import process_single_file, rebuild_indexes, generate_manifests, slugify
from utils import ALLOWED_EXTENSIONS, ALLOWED_SAMPLE_EXTENSIONS, MMP_FOLDER, MMPZ_FOLDER, DATA_FOLDER, BASE_DATA, SRC_MMPSEARCH, SAMPLE_SRC, METADATA_FOLDER, XML_IMPORTED_PATH_PREFIX, SAMPLE_MANIFEST
app = Flask(__name__)
# --- CONFIGURAÇÃO DE SEGURANÇA E BANCO ---
app.config['SECRET_KEY'] = '25de5592bf94c2ca18e27baa0ae2d4ee22a63012f32e1be719d31f530c215a387b9ec0c9d96be38e80a7ccdd859e04408facefff8fd9119e7f5a2d987d85abb7'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(DATA_FOLDER, 'users.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# CORS precisa suportar credenciais para o cookie de login funcionar
CORS(app, supports_credentials=True)
db = SQLAlchemy(app)
bcrypt = Bcrypt(app)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
# --- MODELO DE USUÁRIO ---
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
password = db.Column(db.String(150), nullable=False)
is_admin = db.Column(db.Boolean, default=False)
# Detalhes do usuário
bio = db.Column(db.String(240), default="")
tags = db.Column(db.String(500), default="") # Vamos salvar separado por vírgula
avatar_url = db.Column(db.String(255), default="/assets/img/default_avatar.png")
cover_url = db.Column(db.String(255), default="/assets/img/default_cover.jpg")
projects = db.relationship('Project', backref='owner', lazy=True)
class SecureModelView(ModelView):
def is_accessible(self):
return current_user.is_authenticated and current_user.is_admin
def inaccessible_callback(self, name, **kwargs):
# Retorna erro JSON ou redireciona (como é painel, melhor redirecionar ou negar)
return jsonify({"error": "Acesso restrito a administradores."}), 403
class SecureIndexView(AdminIndexView):
@expose('/')
def index(self):
if not current_user.is_authenticated or not current_user.is_admin:
# Em vez de retornar JSON, redireciona para a home
# O usuário verá o botão de login lá.
return redirect('/')
# OU, se quiser ser mais explícito, retorna 403 mas em HTML (opcional)
# return "<h1>Acesso Negado</h1><p>Você precisa ser admin.</p>", 403
return super(SecureIndexView, self).index()
# Tabela de Projetos
class Project(db.Model):
id = db.Column(db.Integer, primary_key=True)
filename = db.Column(db.String(255), nullable=False) # Nome do arquivo físico .mmp
display_name = db.Column(db.String(255), nullable=False) # Nome "bonito" para exibir
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# Chave estrangeira ligando ao usuário
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
# Cria o banco na inicialização se não existir
with app.app_context():
db.create_all()
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# --- FUNÇÕES UTILITÁRIAS ---
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def allowed_sample(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_SAMPLE_EXTENSIONS
def run_jekyll_build():
RUBY_BIN_PATH = "/usr/bin/ruby3.2"
BUNDLE_PATH = "/nethome/jotachina/projetos/mmpSearch/vendor/bundle/ruby/3.2.0/bin/bundle"
# Prepara o ambiente para o subprocesso
env_vars = os.environ.copy()
# Adiciona o caminho do Ruby ao PATH do usuário www-data temporariamente
env_vars["PATH"] = f"{RUBY_BIN_PATH}:{env_vars.get('PATH', '')}"
print("Iniciando build do Jekyll...")
command = [BUNDLE_PATH, "exec", "jekyll", "build", "--destination", "/var/www/html/trens/mmpSearch/"]
try:
# Redirecionamos a saída para DEVNULL para não encher o buffer e travar
subprocess.Popen(
command,
cwd=BASE_DATA,
env=env_vars,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
print("Jekyll Build iniciado em segundo plano (background).")
except Exception as e:
print(f"Erro ao iniciar Jekyll: {e}")
def load_manifest_keys():
"""Carrega as pastas de fábrica (keys do manifesto)."""
if not os.path.exists(SAMPLE_MANIFEST):
return ["drums", "instruments", "effects", "presets", "samples"]
try:
with open(SAMPLE_MANIFEST, 'r', encoding='utf-8') as f:
data = json.load(f)
return list(data.keys())
except Exception:
return []
def convert_mmpz_to_mmp(mmpz_path, mmp_target_path):
LMMS_PATH = "/usr/bin/lmms"
"""Tenta descompactar .mmpz usando Python ou LMMS CLI."""
print(f"Tentando converter: {mmpz_path} -> {mmp_target_path}")
content = None
try:
with open(mmpz_path, "rb") as f:
content = f.read()
except Exception:
pass
if content:
# --- Fallback para LMMS CLI ---
# VERIFICAÇÃO DE SEGURANÇA: O binário existe?
if not os.path.exists(LMMS_PATH):
print(f"ERRO CRÍTICO: Executável do LMMS não encontrado em {LMMS_PATH}")
return False
print("Métodos Python falharam. Tentando fallback via LMMS CLI...")
cmd = [LMMS_PATH, "--dump", mmpz_path]
env_vars = os.environ.copy()
env_vars["QT_QPA_PLATFORM"] = "offscreen"
try:
with open(mmp_target_path, "w") as f_out:
subprocess.run(cmd, stdout=f_out, stderr=subprocess.PIPE, check=True, env=env_vars)
if os.path.exists(mmp_target_path) and os.path.getsize(mmp_target_path) > 0:
print("Sucesso: Descompactado via LMMS CLI (--dump).")
return True
else:
print("Erro: LMMS CLI rodou, mas arquivo de saída está vazio.")
return False
except subprocess.CalledProcessError as e:
print(f"Falha crítica no LMMS CLI: {e.stderr.decode('utf-8') if e.stderr else str(e)}")
return False
except Exception as e:
print(f"Erro inesperado ao chamar LMMS: {e}")
return False
def analyze_mmp_dependencies(mmp_filename):
"""Analisa um arquivo .mmp XML puro."""
filepath = os.path.join(MMP_FOLDER, mmp_filename)
if not os.path.exists(filepath):
print(f"Erro: Arquivo não encontrado: {filepath}")
return False, []
try:
tree = ET.parse(filepath)
except ET.ParseError as e:
print(f"Erro Fatal: O arquivo {mmp_filename} não é um XML válido: {e}")
return False, []
root = tree.getroot()
factory_folders = load_manifest_keys()
missing_samples = set()
for audio_node in root.findall(".//audiofileprocessor"):
src = audio_node.get('src', '')
if not src: continue
is_factory = any(src.startswith(folder + "/") for folder in factory_folders)
is_already_imported = src.startswith("src_mmpSearch/samples/imported/")
if not is_factory and not is_already_imported:
file_name = os.path.basename(src)
missing_samples.add(file_name)
return len(missing_samples) == 0, list(missing_samples)
def update_xml_paths_exact(mmp_filename, replacements):
"""Substitui caminhos no XML baseando-se no mapeamento exato."""
filepath = os.path.join(MMP_FOLDER, mmp_filename)
try:
tree = ET.parse(filepath)
root = tree.getroot()
changes_made = False
replacements_lower = {k.lower(): v for k, v in replacements.items()}
for audio_node in root.findall(".//audiofileprocessor"):
src = audio_node.get('src', '')
if not src: continue
current_basename = os.path.basename(src)
current_basename_lower = current_basename.lower()
if current_basename_lower in replacements_lower:
new_filename = replacements_lower[current_basename_lower]
new_src = f"{XML_IMPORTED_PATH_PREFIX}/{new_filename}"
audio_node.set('src', new_src)
audio_node.set('vol', audio_node.get('vol', '1'))
print(f"[XML FIX] Substituído: {current_basename} -> {new_src}")
changes_made = True
if changes_made:
tree.write(filepath, encoding='UTF-8', xml_declaration=True)
return True
return True
except Exception as e:
print(f"Erro crítico ao editar XML: {e}")
return False
def run_heavy_tasks_in_background():
"""Esta função roda isolada sem travar o usuário"""
print("--- [BACKGROUND] Iniciando reconstrução de índices ---")
try:
# 1. Isso é Python puro e PESADO (aqui é onde estava travando)
rebuild_indexes()
# 2. Isso gera os manifestos (Python puro)
generate_manifests(SRC_MMPSEARCH)
# 3. Isso chama o subprocesso do Jekyll (Externo)
# Mantém sua função original que usa subprocess
run_jekyll_build()
print("--- [BACKGROUND] Tarefas concluídas com sucesso ---")
except Exception as e:
print(f"--- [BACKGROUND] Erro: {e} ---")
def process_and_build(filename):
"""Função chamada pela rota de upload"""
# Processamento inicial do arquivo (rápido)
result = process_single_file(filename)
if result["success"]:
# Em vez de chamar rebuild_indexes() direto, criamos a Thread
# O Flask vai responder o return abaixo imediatamente,
# enquanto a thread continua rodando no servidor.
task_thread = threading.Thread(target=run_heavy_tasks_in_background)
task_thread.start()
return jsonify({
"message": "Sucesso! O projeto foi recebido. O site será atualizado em instantes.",
"data": result["data"]
}), 200
else:
return jsonify({"error": f"Erro no processamento: {result['error']}"}), 500
# ==============================================================================
# ROTAS DE AUTENTICAÇÃO
# ==============================================================================
@app.route('/api/register', methods=['POST'])
def register():
data = request.json
if not data or not data.get('username') or not data.get('password'):
return jsonify({"message": "Dados incompletos"}), 400
if User.query.filter_by(username=data['username']).first():
return jsonify({"message": "Usuário já existe"}), 400
hashed_password = bcrypt.generate_password_hash(data['password']).decode('utf-8')
new_user = User(username=data['username'], password=hashed_password)
try:
db.session.add(new_user)
db.session.commit()
return jsonify({"message": "Usuário criado com sucesso!"}), 201
except Exception as e:
return jsonify({"message": f"Erro: {str(e)}"}), 500
@app.route('/api/login', methods=['POST'])
def login():
data = request.json
user = User.query.filter_by(username=data['username']).first()
if user and bcrypt.check_password_hash(user.password, data['password']):
login_user(user)
return jsonify({"message": "Login realizado", "user": user.username}), 200
return jsonify({"message": "Credenciais inválidas"}), 401
@app.route('/api/logout', methods=['POST'])
@login_required
def logout():
logout_user()
return jsonify({"message": "Logout realizado"}), 200
@app.route('/api/check_auth', methods=['GET'])
def check_auth():
if current_user.is_authenticated:
return jsonify({"logged_in": True, "user": current_user.username})
return jsonify({"logged_in": False})
# No upload_server.py, adicione esta rota
@app.route('/api/user/update', methods=['POST'])
@login_required
def update_profile():
data = request.json
new_username = data.get('username')
# Verificar disponibilidade do username se ele mudou
if new_username and new_username != current_user.username:
existing = User.query.filter_by(username=new_username).first()
if existing:
return jsonify({"error": "Nome de usuário já está em uso."}), 400
current_user.username = new_username
if 'bio' in data:
current_user.bio = data['bio'][:240] # Corta em 240 chars
if 'tags' in data:
# Limpa e formata as tags
current_user.tags = data['tags']
try:
db.session.commit()
return jsonify({"message": "Perfil atualizado!"}), 200
except Exception as e:
return jsonify({"error": "Erro ao salvar."}), 500
# Atualize a rota user_profile antiga para retornar os novos dados
@app.route('/api/user/profile', methods=['GET'])
@login_required
def user_profile():
user_projects = []
for p in current_user.projects:
user_projects.append({
"id": p.id,
"display_name": p.display_name,
"filename": p.filename,
"created_at": p.created_at.strftime("%d/%m/%Y"),
"download_link": f"/api/download/{p.filename}"
})
return jsonify({
"username": current_user.username,
"bio": current_user.bio,
"tags": current_user.tags,
"avatar": current_user.avatar_url,
"cover": current_user.cover_url,
"projects": user_projects,
"samples": [], # Placeholder enquanto não cria a tabela Samples
"recordings": [] # Placeholder enquanto não cria a tabela Recordings
})
@app.route('/api/project/<int:project_id>', methods=['DELETE'])
@login_required
def delete_project(project_id):
project = Project.query.get_or_404(project_id)
# Segurança: Só o dono ou admin pode apagar
if project.owner != current_user and not current_user.is_admin:
return jsonify({"error": "Permissão negada"}), 403
try:
# 1. Remove do Banco
db.session.delete(project)
db.session.commit()
# 2. Remove o arquivo físico (Opcional, mas recomendado para não lotar o disco)
file_path = os.path.join(MMP_FOLDER, project.filename)
if os.path.exists(file_path):
os.remove(file_path)
# Nota: Você pode querer rodar o rebuild_indexes() aqui também se apagar o arquivo
return jsonify({"message": "Projeto removido com sucesso"}), 200
except Exception as e:
return jsonify({"error": str(e)}), 500
# ==============================================================================
# ROTAS PRINCIPAIS
# ==============================================================================
@app.route("/api/download/<project_name>", methods=["GET"])
def download_project_package(project_name):
"""Gera um ZIP com caminhos limpos (Não exige login para baixar)."""
if not project_name.lower().endswith('.mmp'):
project_name += '.mmp'
clean_name = secure_filename(project_name)
mmp_path = os.path.join(MMP_FOLDER, clean_name)
if not os.path.exists(mmp_path):
return jsonify({"error": "Projeto não encontrado"}), 404
memory_file = io.BytesIO()
try:
tree = ET.parse(mmp_path)
root = tree.getroot()
samples_to_pack = set()
for audio_node in root.findall(".//audiofileprocessor"):
src = audio_node.get('src', '')
if src.startswith(XML_IMPORTED_PATH_PREFIX):
filename = os.path.basename(src)
short_path = f"imported/{filename}"
audio_node.set('src', short_path)
samples_to_pack.add(filename)
with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf:
xml_str = ET.tostring(root, encoding='utf-8', method='xml')
zf.writestr(clean_name, xml_str)
for sample_name in samples_to_pack:
physical_path = os.path.join(XML_IMPORTED_PATH_PREFIX, sample_name)
zip_entry_name = f"imported/{sample_name}"
if os.path.exists(physical_path):
zf.write(physical_path, arcname=zip_entry_name)
memory_file.seek(0)
return send_file(
memory_file,
mimetype='application/zip',
as_attachment=True,
download_name=f"{os.path.splitext(clean_name)[0]}.zip"
)
except Exception as e:
return jsonify({"error": f"Erro ao gerar pacote: {str(e)}"}), 500
@app.route("/api/upload", methods=["POST"])
@login_required # <--- PROTEGIDO
def upload_file():
if "project_file" not in request.files:
return jsonify({"error": "Nenhum arquivo enviado"}), 400
file = request.files["project_file"]
if file.filename == "":
return jsonify({"error": "Nome do arquivo vazio"}), 400
if file and allowed_file(file.filename):
original_name = file.filename
name_without_ext = os.path.splitext(original_name)[0]
ext = os.path.splitext(original_name)[1].lower()
clean_name = slugify(name_without_ext)
if not clean_name:
clean_name = f"upload-{int(time.time())}"
final_mmp_filename = f"{clean_name}.mmp"
final_mmp_path = os.path.join(MMP_FOLDER, final_mmp_filename)
upload_filename = f"{clean_name}{ext}"
if ext == ".mmpz":
upload_path = os.path.join(MMPZ_FOLDER, upload_filename)
else:
upload_path = os.path.join(MMP_FOLDER, upload_filename)
try:
file.save(upload_path)
print(f"Upload salvo em: {upload_path} (User: {current_user.username})")
if ext == ".mmpz":
success = convert_mmpz_to_mmp(upload_path, final_mmp_path)
if not success:
return jsonify({"error": "Falha crítica na descompactação."}), 400
elif ext == ".mmp" and upload_path != final_mmp_path:
shutil.move(upload_path, final_mmp_path)
# Registra no banco de dados vinculado ao usuário logado
new_project = Project(
filename=final_mmp_filename,
display_name=clean_name, # Ou name_without_ext se preferir o original
owner=current_user # Flask-Login pega o user atual
)
db.session.add(new_project)
db.session.commit()
print(f"Projeto {clean_name} associado ao usuário {current_user.username}")
# -----------------------------
#
is_clean, missing_files = analyze_mmp_dependencies(final_mmp_filename)
if not is_clean:
return jsonify({
"status": "missing_samples",
"message": "Samples externos detectados.",
"project_file": final_mmp_filename,
"missing_files": missing_files
}), 202
return process_and_build(final_mmp_filename)
except Exception as e:
return jsonify({"error": f"Erro interno: {str(e)}"}), 500
return jsonify({"error": "Tipo de arquivo não permitido"}), 400
@app.route("/api/upload/resolve", methods=["POST"])
@login_required # <--- PROTEGIDO
def resolve_samples():
project_filename = request.form.get('project_file')
if not project_filename:
return jsonify({"error": "Nome do projeto não informado"}), 400
os.makedirs(XML_IMPORTED_PATH_PREFIX, exist_ok=True)
replacements = {}
for original_name, file_storage in request.files.items():
if file_storage and allowed_sample(file_storage.filename):
safe_new_name = secure_filename(file_storage.filename)
if not safe_new_name: safe_new_name = f"sample_{int(time.time())}.wav"
save_path = os.path.join(XML_IMPORTED_PATH_PREFIX, safe_new_name)
file_storage.save(save_path)
replacements[original_name] = safe_new_name
if not replacements:
return jsonify({"error": "Nenhum arquivo válido enviado."}), 400
if update_xml_paths_exact(project_filename, replacements):
return process_and_build(project_filename)
else:
return jsonify({"error": "Falha ao atualizar o arquivo de projeto."}), 500
@app.route('/api/upload/sample', methods=['POST'])
@login_required # <--- PROTEGIDO
def upload_sample_standalone():
if 'sample_file' not in request.files: return jsonify({'error': 'Nenhum arquivo'}), 400
file = request.files['sample_file']
subfolder = request.form.get('subfolder', '').strip()
if file and allowed_sample(file.filename):
filename = secure_filename(file.filename)
safe_subfolder = subfolder.replace('..', '').strip('/')
target_dir = os.path.join(SAMPLE_SRC, safe_subfolder)
os.makedirs(target_dir, exist_ok=True)
file.save(os.path.join(target_dir, filename))
generate_manifests(SRC_MMPSEARCH)
run_jekyll_build()
return jsonify({'message': 'Sample enviado!'}), 200
return jsonify({'error': 'Erro no arquivo'}), 400
# Inicializa o Flask-Admin
admin = Admin(
app,
name='MMP Admin',
url='/api/admin',
index_view=SecureIndexView()
)
# Adiciona a visualização da tabela de Usuários
admin.add_view(SecureModelView(User, db.session, name="Gerenciar Usuários"))
if __name__ == "__main__":
# DEBUG: Isso vai mostrar no log todas as rotas que o Flask reconheceu
print(app.url_map)
print("Iniciando servidor na porta 33002 (HTTP Mode)...")
app.run(host="0.0.0.0", port=33002, debug=True)