import os import subprocess import xml.etree.ElementTree as ET import gzip import zipfile import zlib import json import shutil import time import io import threading from flask import Flask, request, jsonify, send_file, redirect from flask_cors import CORS from werkzeug.utils import secure_filename from datetime import datetime from flask_sqlalchemy import SQLAlchemy from flask_bcrypt import Bcrypt from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from flask_admin import Admin, AdminIndexView, expose from flask_admin.contrib.sqla import ModelView from main import process_single_file, rebuild_indexes, generate_manifests, slugify from utils import ALLOWED_EXTENSIONS, ALLOWED_SAMPLE_EXTENSIONS, MMP_FOLDER, MMPZ_FOLDER, DATA_FOLDER, BASE_DATA, SRC_MMPSEARCH, SAMPLE_SRC, METADATA_FOLDER, XML_IMPORTED_PATH_PREFIX, SAMPLE_MANIFEST app = Flask(__name__) # --- CONFIGURAÇÃO DE SEGURANÇA E BANCO --- app.config['SECRET_KEY'] = '25de5592bf94c2ca18e27baa0ae2d4ee22a63012f32e1be719d31f530c215a387b9ec0c9d96be38e80a7ccdd859e04408facefff8fd9119e7f5a2d987d85abb7' app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(DATA_FOLDER, 'users.db') app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # CORS precisa suportar credenciais para o cookie de login funcionar CORS(app, supports_credentials=True) db = SQLAlchemy(app) bcrypt = Bcrypt(app) login_manager = LoginManager(app) login_manager.login_view = 'login' # ============================================================================== # CLASSES UTILIZADAS # ============================================================================== # --- MODELO DE USUÁRIO --- class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(150), unique=True, nullable=False) password = db.Column(db.String(150), nullable=False) is_admin = db.Column(db.Boolean, default=False) # Detalhes do usuário bio = db.Column(db.String(240), default="") tags = db.Column(db.String(500), default="") # Vamos salvar separado por vírgula avatar_url = db.Column(db.String(255), default="/assets/img/default_avatar.png") cover_url = db.Column(db.String(255), default="/assets/img/default_cover.jpg") projects = db.relationship('Project', backref='owner', lazy=True) samples = db.relationship('Sample', backref='owner', lazy=True) recordings = db.relationship('Recording', backref='owner', lazy=True) class SecureModelView(ModelView): def is_accessible(self): return current_user.is_authenticated and current_user.is_admin def inaccessible_callback(self, name, **kwargs): # Retorna erro JSON ou redireciona (como é painel, melhor redirecionar ou negar) return jsonify({"error": "Acesso restrito a administradores."}), 403 class SecureIndexView(AdminIndexView): @expose('/') def index(self): if not current_user.is_authenticated or not current_user.is_admin: # Em vez de retornar JSON, redireciona para a home # O usuário verá o botão de login lá. return redirect('/') # OU, se quiser ser mais explícito, retorna 403 mas em HTML (opcional) # return "
Você precisa ser admin.
", 403 return super(SecureIndexView, self).index() # ============================================================================== # CLASSES DE ARTEFATOS # ============================================================================== # Tabela de Projetos class Project(db.Model): id = db.Column(db.Integer, primary_key=True) filename = db.Column(db.String(255), nullable=False) # Nome do arquivo físico .mmp display_name = db.Column(db.String(255), nullable=False) # Nome "bonito" para exibir created_at = db.Column(db.DateTime, default=datetime.utcnow) # Chave estrangeira ligando ao usuário user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) class Sample(db.Model): id = db.Column(db.Integer, primary_key=True) filename = db.Column(db.String(255), nullable=False) display_name = db.Column(db.String(255), nullable=False) category = db.Column(db.String(50), default="Uncategorized") # Ex: drums, bass created_at = db.Column(db.DateTime, default=datetime.utcnow) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) class Recording(db.Model): id = db.Column(db.Integer, primary_key=True) filename = db.Column(db.String(255), nullable=False) display_name = db.Column(db.String(255), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) # Cria o banco na inicialização se não existir with app.app_context(): db.create_all() @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) # --- FUNÇÕES UTILITÁRIAS --- def allowed_file(filename): return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS def allowed_sample(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_SAMPLE_EXTENSIONS def run_jekyll_build(): RUBY_BIN_PATH = "/usr/bin/ruby3.2" BUNDLE_PATH = "/nethome/jotachina/projetos/mmpSearch/vendor/bundle/ruby/3.2.0/bin/bundle" # Prepara o ambiente para o subprocesso env_vars = os.environ.copy() # Adiciona o caminho do Ruby ao PATH do usuário www-data temporariamente env_vars["PATH"] = f"{RUBY_BIN_PATH}:{env_vars.get('PATH', '')}" print("Iniciando build do Jekyll...") command = [BUNDLE_PATH, "exec", "jekyll", "build", "--destination", "/var/www/html/trens/mmpSearch/"] try: # Redirecionamos a saída para DEVNULL para não encher o buffer e travar subprocess.Popen( command, cwd=BASE_DATA, env=env_vars, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) print("Jekyll Build iniciado em segundo plano (background).") except Exception as e: print(f"Erro ao iniciar Jekyll: {e}") def load_manifest_keys(): """Carrega as pastas de fábrica (keys do manifesto).""" if not os.path.exists(SAMPLE_MANIFEST): return ["drums", "instruments", "effects", "presets", "samples"] try: with open(SAMPLE_MANIFEST, 'r', encoding='utf-8') as f: data = json.load(f) return list(data.keys()) except Exception: return [] def convert_mmpz_to_mmp(mmpz_path, mmp_target_path): LMMS_PATH = "/usr/bin/lmms" """Tenta descompactar .mmpz usando Python ou LMMS CLI.""" print(f"Tentando converter: {mmpz_path} -> {mmp_target_path}") content = None try: with open(mmpz_path, "rb") as f: content = f.read() except Exception: pass if content: # --- Fallback para LMMS CLI --- # VERIFICAÇÃO DE SEGURANÇA: O binário existe? if not os.path.exists(LMMS_PATH): print(f"ERRO CRÍTICO: Executável do LMMS não encontrado em {LMMS_PATH}") return False print("Métodos Python falharam. Tentando fallback via LMMS CLI...") cmd = [LMMS_PATH, "--dump", mmpz_path] env_vars = os.environ.copy() env_vars["QT_QPA_PLATFORM"] = "offscreen" try: with open(mmp_target_path, "w") as f_out: subprocess.run(cmd, stdout=f_out, stderr=subprocess.PIPE, check=True, env=env_vars) if os.path.exists(mmp_target_path) and os.path.getsize(mmp_target_path) > 0: print("Sucesso: Descompactado via LMMS CLI (--dump).") return True else: print("Erro: LMMS CLI rodou, mas arquivo de saída está vazio.") return False except subprocess.CalledProcessError as e: print(f"Falha crítica no LMMS CLI: {e.stderr.decode('utf-8') if e.stderr else str(e)}") return False except Exception as e: print(f"Erro inesperado ao chamar LMMS: {e}") return False def analyze_mmp_dependencies(mmp_filename): """Analisa um arquivo .mmp XML puro.""" filepath = os.path.join(MMP_FOLDER, mmp_filename) if not os.path.exists(filepath): print(f"Erro: Arquivo não encontrado: {filepath}") return False, [] try: tree = ET.parse(filepath) except ET.ParseError as e: print(f"Erro Fatal: O arquivo {mmp_filename} não é um XML válido: {e}") return False, [] root = tree.getroot() factory_folders = load_manifest_keys() missing_samples = set() for audio_node in root.findall(".//audiofileprocessor"): src = audio_node.get('src', '') if not src: continue is_factory = any(src.startswith(folder + "/") for folder in factory_folders) is_already_imported = src.startswith("src_mmpSearch/samples/imported/") if not is_factory and not is_already_imported: file_name = os.path.basename(src) missing_samples.add(file_name) return len(missing_samples) == 0, list(missing_samples) def update_xml_paths_exact(mmp_filename, replacements): """Substitui caminhos no XML baseando-se no mapeamento exato.""" filepath = os.path.join(MMP_FOLDER, mmp_filename) try: tree = ET.parse(filepath) root = tree.getroot() changes_made = False replacements_lower = {k.lower(): v for k, v in replacements.items()} for audio_node in root.findall(".//audiofileprocessor"): src = audio_node.get('src', '') if not src: continue current_basename = os.path.basename(src) current_basename_lower = current_basename.lower() if current_basename_lower in replacements_lower: new_filename = replacements_lower[current_basename_lower] new_src = f"{XML_IMPORTED_PATH_PREFIX}/{new_filename}" audio_node.set('src', new_src) audio_node.set('vol', audio_node.get('vol', '1')) print(f"[XML FIX] Substituído: {current_basename} -> {new_src}") changes_made = True if changes_made: tree.write(filepath, encoding='UTF-8', xml_declaration=True) return True return True except Exception as e: print(f"Erro crítico ao editar XML: {e}") return False def run_heavy_tasks_in_background(): """Esta função roda isolada sem travar o usuário""" print("--- [BACKGROUND] Iniciando reconstrução de índices ---") try: # 1. Isso é Python puro e PESADO (aqui é onde estava travando) rebuild_indexes() # 2. Isso gera os manifestos (Python puro) generate_manifests(SRC_MMPSEARCH) # 3. Isso chama o subprocesso do Jekyll (Externo) # Mantém sua função original que usa subprocess run_jekyll_build() print("--- [BACKGROUND] Tarefas concluídas com sucesso ---") except Exception as e: print(f"--- [BACKGROUND] Erro: {e} ---") def process_and_build(filename): """Função chamada pela rota de upload""" # Processamento inicial do arquivo (rápido) result = process_single_file(filename) if result["success"]: # Em vez de chamar rebuild_indexes() direto, criamos a Thread # O Flask vai responder o return abaixo imediatamente, # enquanto a thread continua rodando no servidor. task_thread = threading.Thread(target=run_heavy_tasks_in_background) task_thread.start() return jsonify({ "message": "Sucesso! O projeto foi recebido. O site será atualizado em instantes.", "data": result["data"] }), 200 else: return jsonify({"error": f"Erro no processamento: {result['error']}"}), 500 # ============================================================================== # ROTAS DE AUTENTICAÇÃO # ============================================================================== @app.route('/api/register', methods=['POST']) def register(): data = request.json if not data or not data.get('username') or not data.get('password'): return jsonify({"message": "Dados incompletos"}), 400 if User.query.filter_by(username=data['username']).first(): return jsonify({"message": "Usuário já existe"}), 400 hashed_password = bcrypt.generate_password_hash(data['password']).decode('utf-8') new_user = User(username=data['username'], password=hashed_password) try: db.session.add(new_user) db.session.commit() return jsonify({"message": "Usuário criado com sucesso!"}), 201 except Exception as e: return jsonify({"message": f"Erro: {str(e)}"}), 500 @app.route('/api/login', methods=['POST']) def login(): data = request.json user = User.query.filter_by(username=data['username']).first() if user and bcrypt.check_password_hash(user.password, data['password']): login_user(user) return jsonify({"message": "Login realizado", "user": user.username}), 200 return jsonify({"message": "Credenciais inválidas"}), 401 @app.route('/api/logout', methods=['POST']) @login_required def logout(): logout_user() return jsonify({"message": "Logout realizado"}), 200 @app.route('/api/check_auth', methods=['GET']) def check_auth(): if current_user.is_authenticated: return jsonify({"logged_in": True, "user": current_user.username}) return jsonify({"logged_in": False}) # No upload_server.py, adicione esta rota @app.route('/api/user/update', methods=['POST']) @login_required def update_profile(): data = request.json new_username = data.get('username') if new_username and new_username != current_user.username: if User.query.filter_by(username=new_username).first(): return jsonify({"error": "Nome em uso."}), 400 current_user.username = new_username if 'bio' in data: current_user.bio = data['bio'][:240] if 'tags' in data: current_user.tags = data['tags'] try: db.session.commit() return jsonify({"message": "Perfil atualizado!"}), 200 except Exception: return jsonify({"error": "Erro ao salvar."}), 500 # Atualize a rota user_profile antiga para retornar os novos dados @app.route('/api/user/profile', methods=['GET']) @login_required def user_profile(): # Projetos user_projects = [] for p in current_user.projects: user_projects.append({ "id": p.id, "display_name": p.display_name, "filename": p.filename, "created_at": p.created_at.strftime("%d/%m/%Y"), "download_link": f"/api/download/{p.filename}" }) # Samples user_samples = [] for s in current_user.samples: # Ajuste do prefixo exato solicitado public_url = f"/mmpSearch/src_mmpSearch/samples/{s.category}/{s.filename}" user_samples.append({ "id": s.id, "display_name": s.display_name, "category": s.category, "created_at": s.created_at.strftime("%d/%m/%Y"), "download_link": public_url }) return jsonify({ "username": current_user.username, "bio": current_user.bio, "tags": current_user.tags, "avatar": current_user.avatar_url, "cover": current_user.cover_url, "projects": user_projects, "samples": user_samples }) @app.route('/api/project/