import os import subprocess import xml.etree.ElementTree as ET import zipfile import json import shutil import time import io import threading from flask import Flask, request, jsonify, send_file, redirect from flask_cors import CORS from werkzeug.utils import secure_filename from datetime import datetime from flask_sqlalchemy import SQLAlchemy from flask_bcrypt import Bcrypt from flask_login import ( LoginManager, UserMixin, login_user, login_required, logout_user, current_user, ) from flask_admin import Admin, AdminIndexView, expose from flask_admin.contrib.sqla import ModelView from main import process_single_file, generate_manifests, slugify from utils import ( ALLOWED_EXTENSIONS, ALLOWED_SAMPLE_EXTENSIONS, MMP_FOLDER, MMPZ_FOLDER, DATA_FOLDER, BASE_DATA, SRC_MMPSEARCH, SAMPLE_SRC, XML_IMPORTED_PATH_PREFIX, SAMPLE_MANIFEST, ) app = Flask(__name__) # --- CONFIGURAÇÃO DE SEGURANÇA E BANCO --- app.config["SECRET_KEY"] = ( "25de5592bf94c2ca18e27baa0ae2d4ee22a63012f32e1be719d31f530c215a387b9ec0c9d96be38e80a7ccdd859e04408facefff8fd9119e7f5a2d987d85abb7" ) app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///" + os.path.join( DATA_FOLDER, "users.db" ) app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False # CORS precisa suportar credenciais para o cookie de login funcionar CORS(app, supports_credentials=True) db = SQLAlchemy(app) bcrypt = Bcrypt(app) login_manager = LoginManager(app) login_manager.login_view = "login" # ============================================================================== # CLASSES UTILIZADAS # ============================================================================== # --- MODELO DE USUÁRIO --- class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(150), unique=True, nullable=False) password = db.Column(db.String(150), nullable=False) is_admin = db.Column(db.Boolean, default=False) # Detalhes do usuário bio = db.Column(db.String(240), default="") tags = db.Column(db.String(500), default="") # Vamos salvar separado por vírgula avatar_url = db.Column(db.String(255), default="/assets/img/default_avatar.png") cover_url = db.Column(db.String(255), default="/assets/img/default_cover.jpg") projects = db.relationship("Project", backref="owner", lazy=True) samples = db.relationship("Sample", backref="owner", lazy=True) recordings = db.relationship("Recording", backref="owner", lazy=True) class SecureModelView(ModelView): def is_accessible(self): return current_user.is_authenticated and current_user.is_admin def inaccessible_callback(self, name, **kwargs): # Retorna erro JSON ou redireciona (como é painel, melhor redirecionar ou negar) return jsonify({"error": "Acesso restrito a administradores."}), 403 class SecureIndexView(AdminIndexView): @expose("/") def index(self): if not current_user.is_authenticated or not current_user.is_admin: # Em vez de retornar JSON, redireciona para a home # O usuário verá o botão de login lá. return redirect("/") # OU, se quiser ser mais explícito, retorna 403 mas em HTML (opcional) # return "
Você precisa ser admin.
", 403 return super(SecureIndexView, self).index() # ============================================================================== # CLASSES DE ARTEFATOS # ============================================================================== # Tabela de Projetos class Project(db.Model): id = db.Column(db.Integer, primary_key=True) filename = db.Column(db.String(255), nullable=False) # Nome do arquivo físico .mmp display_name = db.Column( db.String(255), nullable=False ) # Nome "bonito" para exibir created_at = db.Column(db.DateTime, default=datetime.utcnow) # Chave estrangeira ligando ao usuário user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False) class Sample(db.Model): id = db.Column(db.Integer, primary_key=True) filename = db.Column(db.String(255), nullable=False) display_name = db.Column(db.String(255), nullable=False) category = db.Column(db.String(50), default="Uncategorized") # Ex: drums, bass created_at = db.Column(db.DateTime, default=datetime.utcnow) user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False) class Recording(db.Model): id = db.Column(db.Integer, primary_key=True) filename = db.Column(db.String(255), nullable=False) display_name = db.Column(db.String(255), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False) # Cria o banco na inicialização se não existir with app.app_context(): db.create_all() @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) # --- FUNÇÕES UTILITÁRIAS --- def allowed_file(filename): return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS def allowed_sample(filename): return ( "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_SAMPLE_EXTENSIONS ) def run_jekyll_build(): RUBY_BIN_PATH = "/usr/bin/ruby3.2" BUNDLE_PATH = ( "/nethome/jotachina/projetos/mmpSearch/vendor/bundle/ruby/3.2.0/bin/bundle" ) # Prepara o ambiente para o subprocesso env_vars = os.environ.copy() # Adiciona o caminho do Ruby ao PATH do usuário www-data temporariamente env_vars["PATH"] = f"{RUBY_BIN_PATH}:{env_vars.get('PATH', '')}" print("Iniciando build do Jekyll...") command = [ BUNDLE_PATH, "exec", "jekyll", "build", "--destination", "/var/www/html/trens/mmpSearch/", ] try: # Redirecionamos a saída para DEVNULL para não encher o buffer e travar subprocess.Popen( command, cwd=BASE_DATA, env=env_vars, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) print("Jekyll Build iniciado em segundo plano (background).") except Exception as e: print(f"Erro ao iniciar Jekyll: {e}") def load_manifest_keys(): """Carrega as pastas de fábrica (keys do manifesto).""" if not os.path.exists(SAMPLE_MANIFEST): return ["drums", "instruments", "effects", "presets", "samples"] try: with open(SAMPLE_MANIFEST, "r", encoding="utf-8") as f: data = json.load(f) return list(data.keys()) except Exception: return [] def convert_mmpz_to_mmp(mmpz_path, mmp_target_path): LMMS_PATH = "/usr/bin/lmms" """Tenta descompactar .mmpz usando Python ou LMMS CLI.""" print(f"Tentando converter: {mmpz_path} -> {mmp_target_path}") content = None try: with open(mmpz_path, "rb") as f: content = f.read() except Exception: pass if content: # --- Fallback para LMMS CLI --- # VERIFICAÇÃO DE SEGURANÇA: O binário existe? if not os.path.exists(LMMS_PATH): print(f"ERRO CRÍTICO: Executável do LMMS não encontrado em {LMMS_PATH}") return False print("Métodos Python falharam. Tentando fallback via LMMS CLI...") cmd = [LMMS_PATH, "--dump", mmpz_path] env_vars = os.environ.copy() env_vars["QT_QPA_PLATFORM"] = "offscreen" try: with open(mmp_target_path, "w") as f_out: subprocess.run( cmd, stdout=f_out, stderr=subprocess.PIPE, check=True, env=env_vars ) if os.path.exists(mmp_target_path) and os.path.getsize(mmp_target_path) > 0: print("Sucesso: Descompactado via LMMS CLI (--dump).") return True else: print("Erro: LMMS CLI rodou, mas arquivo de saída está vazio.") return False except subprocess.CalledProcessError as e: print( f"Falha crítica no LMMS CLI: {e.stderr.decode('utf-8') if e.stderr else str(e)}" ) return False except Exception as e: print(f"Erro inesperado ao chamar LMMS: {e}") return False def analyze_mmp_dependencies(mmp_filename): """Analisa um arquivo .mmp XML puro.""" filepath = os.path.join(MMP_FOLDER, mmp_filename) if not os.path.exists(filepath): print(f"Erro: Arquivo não encontrado: {filepath}") return False, [] try: tree = ET.parse(filepath) except ET.ParseError as e: print(f"Erro Fatal: O arquivo {mmp_filename} não é um XML válido: {e}") return False, [] root = tree.getroot() factory_folders = load_manifest_keys() missing_samples = set() for audio_node in root.findall(".//audiofileprocessor"): src = audio_node.get("src", "") if not src: continue is_factory = any(src.startswith(folder + "/") for folder in factory_folders) is_already_imported = src.startswith("src_mmpSearch/samples/imported/") if not is_factory and not is_already_imported: file_name = os.path.basename(src) missing_samples.add(file_name) return len(missing_samples) == 0, list(missing_samples) def update_xml_paths_exact(mmp_filename, replacements): """Substitui caminhos no XML baseando-se no mapeamento exato.""" filepath = os.path.join(MMP_FOLDER, mmp_filename) try: tree = ET.parse(filepath) root = tree.getroot() changes_made = False replacements_lower = {k.lower(): v for k, v in replacements.items()} for audio_node in root.findall(".//audiofileprocessor"): src = audio_node.get("src", "") if not src: continue current_basename = os.path.basename(src) current_basename_lower = current_basename.lower() if current_basename_lower in replacements_lower: new_filename = replacements_lower[current_basename_lower] new_src = f"{XML_IMPORTED_PATH_PREFIX}/{new_filename}" audio_node.set("src", new_src) audio_node.set("vol", audio_node.get("vol", "1")) print(f"[XML FIX] Substituído: {current_basename} -> {new_src}") changes_made = True if changes_made: tree.write(filepath, encoding="UTF-8", xml_declaration=True) return True return True except Exception as e: print(f"Erro crítico ao editar XML: {e}") return False def run_heavy_tasks_in_background(): """Esta função roda isolada sem travar o usuário""" print("--- [BACKGROUND] Iniciando reconstrução de índices ---") try: # 1. Isso é Python puro e PESADO (aqui é onde estava travando) rebuild_indexes() # 2. Isso gera os manifestos (Python puro) generate_manifests(SRC_MMPSEARCH) # 3. Isso chama o subprocesso do Jekyll (Externo) # Mantém sua função original que usa subprocess run_jekyll_build() print("--- [BACKGROUND] Tarefas concluídas com sucesso ---") except Exception as e: print(f"--- [BACKGROUND] Erro: {e} ---") def process_and_build(filename): """Função chamada pela rota de upload""" # Processamento inicial do arquivo (rápido) result = process_single_file(filename) if result["success"]: # Em vez de chamar rebuild_indexes() direto, criamos a Thread # O Flask vai responder o return abaixo imediatamente, # enquanto a thread continua rodando no servidor. task_thread = threading.Thread(target=run_heavy_tasks_in_background) task_thread.start() return jsonify( { "message": "Sucesso! O projeto foi recebido. O site será atualizado em instantes.", "data": result["data"], } ), 200 else: return jsonify({"error": f"Erro no processamento: {result['error']}"}), 500 # ============================================================================== # ROTAS DE AUTENTICAÇÃO # ============================================================================== @app.route("/api/register", methods=["POST"]) def register(): data = request.json if not data or not data.get("username") or not data.get("password"): return jsonify({"message": "Dados incompletos"}), 400 if User.query.filter_by(username=data["username"]).first(): return jsonify({"message": "Usuário já existe"}), 400 hashed_password = bcrypt.generate_password_hash(data["password"]).decode("utf-8") new_user = User(username=data["username"], password=hashed_password) try: db.session.add(new_user) db.session.commit() return jsonify({"message": "Usuário criado com sucesso!"}), 201 except Exception as e: return jsonify({"message": f"Erro: {str(e)}"}), 500 @app.route("/api/login", methods=["POST"]) def login(): data = request.json user = User.query.filter_by(username=data["username"]).first() if user and bcrypt.check_password_hash(user.password, data["password"]): login_user(user) return jsonify({"message": "Login realizado", "user": user.username}), 200 return jsonify({"message": "Credenciais inválidas"}), 401 @app.route("/api/logout", methods=["POST"]) @login_required def logout(): logout_user() return jsonify({"message": "Logout realizado"}), 200 @app.route("/api/check_auth", methods=["GET"]) def check_auth(): if current_user.is_authenticated: return jsonify({"logged_in": True, "user": current_user.username}) return jsonify({"logged_in": False}) # No upload_server.py, adicione esta rota @app.route("/api/user/update", methods=["POST"]) @login_required def update_profile(): data = request.json new_username = data.get("username") if new_username and new_username != current_user.username: if User.query.filter_by(username=new_username).first(): return jsonify({"error": "Nome em uso."}), 400 current_user.username = new_username if "bio" in data: current_user.bio = data["bio"][:240] if "tags" in data: current_user.tags = data["tags"] try: db.session.commit() return jsonify({"message": "Perfil atualizado!"}), 200 except Exception: return jsonify({"error": "Erro ao salvar."}), 500 # Atualize a rota user_profile antiga para retornar os novos dados @app.route("/api/user/profile", methods=["GET"]) @login_required def user_profile(): # Projetos user_projects = [] for p in current_user.projects: user_projects.append( { "id": p.id, "display_name": p.display_name, "filename": p.filename, "created_at": p.created_at.strftime("%d/%m/%Y"), "download_link": f"/api/download/{p.filename}", } ) # Samples user_samples = [] for s in current_user.samples: # Ajuste do prefixo exato solicitado public_url = f"/mmpSearch/src_mmpSearch/samples/{s.category}/{s.filename}" user_samples.append( { "id": s.id, "display_name": s.display_name, "category": s.category, "created_at": s.created_at.strftime("%d/%m/%Y"), "download_link": public_url, } ) return jsonify( { "username": current_user.username, "bio": current_user.bio, "tags": current_user.tags, "avatar": current_user.avatar_url, "cover": current_user.cover_url, "projects": user_projects, "samples": user_samples, } ) @app.route("/api/project/