import os import subprocess import xml.etree.ElementTree as ET import gzip import zipfile import zlib import json import shutil import time import io from flask import Flask, request, jsonify, send_file from flask_cors import CORS from werkzeug.utils import secure_filename # --- NOVAS IMPORTAÇÕES DE AUTH --- from flask_sqlalchemy import SQLAlchemy from flask_bcrypt import Bcrypt from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from main import process_single_file, rebuild_indexes, generate_manifests, slugify from utils import ALLOWED_EXTENSIONS, ALLOWED_SAMPLE_EXTENSIONS, MMP_FOLDER, MMPZ_FOLDER, CERT_PATH, KEY_PATH, BASE_DATA, SRC_MMPSEARCH, SAMPLE_SRC, METADATA_FOLDER, XML_IMPORTED_PATH_PREFIX, SAMPLE_MANIFEST app = Flask(__name__) # --- CONFIGURAÇÃO DE SEGURANÇA E BANCO --- # IMPORTANTE: Troque esta chave em produção! app.config['SECRET_KEY'] = 'chave_secreta_super_segura_mmp_ecosystem_2025' # O banco ficará salvo em /nethome/jotachina/projetos/mmpSearch/users.db (BASE_DATA) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(BASE_DATA, 'users.db') app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # CORS precisa suportar credenciais para o cookie de login funcionar CORS(app, supports_credentials=True) db = SQLAlchemy(app) bcrypt = Bcrypt(app) login_manager = LoginManager(app) login_manager.login_view = 'login' # --- MODELO DE USUÁRIO --- class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(150), unique=True, nullable=False) password = db.Column(db.String(150), nullable=False) # Cria o banco na inicialização se não existir with app.app_context(): db.create_all() @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) # --- SUAS FUNÇÕES UTILITÁRIAS MANTIDAS --- def allowed_file(filename): return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS def allowed_sample(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_SAMPLE_EXTENSIONS def run_jekyll_build(): print("Iniciando build do Jekyll...") command = ["bundle", "exec", "jekyll", "build", "--destination", "/var/www/html/trens/mmpSearch/"] try: subprocess.run(command, check=True, cwd=BASE_DATA, capture_output=True, text=True) print("Jekyll Build Sucesso!") except subprocess.CalledProcessError as e: print(f"ERRO no Jekyll Build: {e.stderr}") def load_manifest_keys(): """Carrega as pastas de fábrica (keys do manifesto).""" if not os.path.exists(SAMPLE_MANIFEST): return ["drums", "instruments", "effects", "presets", "samples"] try: with open(SAMPLE_MANIFEST, 'r', encoding='utf-8') as f: data = json.load(f) return list(data.keys()) except Exception: return [] def convert_mmpz_to_mmp(mmpz_path, mmp_target_path): """Tenta descompactar .mmpz usando Python ou LMMS CLI.""" print(f"Tentando converter: {mmpz_path} -> {mmp_target_path}") content = None try: with open(mmpz_path, "rb") as f: content = f.read() except Exception: pass if content: # 1.1 Tenta ZIP if zipfile.is_zipfile(mmpz_path): try: with zipfile.ZipFile(mmpz_path, 'r') as z: with open(mmp_target_path, 'wb') as f_out: f_out.write(z.read(z.namelist()[0])) print("Sucesso: Descompactado via ZIP.") return True except: pass # 1.2 Tenta GZIP try: decompressed = gzip.decompress(content) with open(mmp_target_path, "wb") as f_out: f_out.write(decompressed) print("Sucesso: Descompactado via GZIP.") return True except: pass # 1.3 Tenta ZLIB try: decompressed = zlib.decompress(content) with open(mmp_target_path, "wb") as f_out: f_out.write(decompressed) print("Sucesso: Descompactado via ZLIB.") return True except: pass # --- Fallback para LMMS CLI --- print("Métodos Python falharam. Tentando fallback via LMMS CLI...") cmd = ["lmms", "--dump", mmpz_path] env_vars = os.environ.copy() env_vars["QT_QPA_PLATFORM"] = "offscreen" try: with open(mmp_target_path, "w") as f_out: subprocess.run(cmd, stdout=f_out, stderr=subprocess.PIPE, check=True, env=env_vars) if os.path.exists(mmp_target_path) and os.path.getsize(mmp_target_path) > 0: print("Sucesso: Descompactado via LMMS CLI (--dump).") return True else: print("Erro: LMMS CLI rodou, mas arquivo de saída está vazio.") return False except subprocess.CalledProcessError as e: print(f"Falha crítica no LMMS CLI: {e.stderr.decode('utf-8') if e.stderr else str(e)}") return False except Exception as e: print(f"Erro inesperado ao chamar LMMS: {e}") return False def analyze_mmp_dependencies(mmp_filename): """Analisa um arquivo .mmp XML puro.""" filepath = os.path.join(MMP_FOLDER, mmp_filename) if not os.path.exists(filepath): print(f"Erro: Arquivo não encontrado: {filepath}") return False, [] try: tree = ET.parse(filepath) except ET.ParseError as e: print(f"Erro Fatal: O arquivo {mmp_filename} não é um XML válido: {e}") return False, [] root = tree.getroot() factory_folders = load_manifest_keys() missing_samples = set() for audio_node in root.findall(".//audiofileprocessor"): src = audio_node.get('src', '') if not src: continue is_factory = any(src.startswith(folder + "/") for folder in factory_folders) is_already_imported = src.startswith("src_mmpSearch/samples/imported/") if not is_factory and not is_already_imported: file_name = os.path.basename(src) missing_samples.add(file_name) return len(missing_samples) == 0, list(missing_samples) def update_xml_paths_exact(mmp_filename, replacements): """Substitui caminhos no XML baseando-se no mapeamento exato.""" filepath = os.path.join(MMP_FOLDER, mmp_filename) try: tree = ET.parse(filepath) root = tree.getroot() changes_made = False replacements_lower = {k.lower(): v for k, v in replacements.items()} for audio_node in root.findall(".//audiofileprocessor"): src = audio_node.get('src', '') if not src: continue current_basename = os.path.basename(src) current_basename_lower = current_basename.lower() if current_basename_lower in replacements_lower: new_filename = replacements_lower[current_basename_lower] new_src = f"{XML_IMPORTED_PATH_PREFIX}/{new_filename}" audio_node.set('src', new_src) audio_node.set('vol', audio_node.get('vol', '1')) print(f"[XML FIX] Substituído: {current_basename} -> {new_src}") changes_made = True if changes_made: tree.write(filepath, encoding='UTF-8', xml_declaration=True) return True return True except Exception as e: print(f"Erro crítico ao editar XML: {e}") return False def process_and_build(filename): """Encapsula a chamada do main.py""" result = process_single_file(filename) if result["success"]: rebuild_indexes() run_jekyll_build() generate_manifests(SRC_MMPSEARCH) return jsonify({ "message": "Sucesso! Projeto processado.", "data": result["data"] }), 200 else: return jsonify({"error": f"Erro no processamento: {result['error']}"}), 500 # ============================================================================== # ROTAS DE AUTENTICAÇÃO # ============================================================================== @app.route('/api/register', methods=['POST']) def register(): data = request.json if not data or not data.get('username') or not data.get('password'): return jsonify({"message": "Dados incompletos"}), 400 if User.query.filter_by(username=data['username']).first(): return jsonify({"message": "Usuário já existe"}), 400 hashed_password = bcrypt.generate_password_hash(data['password']).decode('utf-8') new_user = User(username=data['username'], password=hashed_password) try: db.session.add(new_user) db.session.commit() return jsonify({"message": "Usuário criado com sucesso!"}), 201 except Exception as e: return jsonify({"message": f"Erro: {str(e)}"}), 500 @app.route('/api/login', methods=['POST']) def login(): data = request.json user = User.query.filter_by(username=data['username']).first() if user and bcrypt.check_password_hash(user.password, data['password']): login_user(user) return jsonify({"message": "Login realizado", "user": user.username}), 200 return jsonify({"message": "Credenciais inválidas"}), 401 @app.route('/api/logout', methods=['POST']) @login_required def logout(): logout_user() return jsonify({"message": "Logout realizado"}), 200 @app.route('/api/check_auth', methods=['GET']) def check_auth(): if current_user.is_authenticated: return jsonify({"logged_in": True, "user": current_user.username}) return jsonify({"logged_in": False}) # ============================================================================== # ROTAS PRINCIPAIS # ============================================================================== @app.route("/api/download/", methods=["GET"]) def download_project_package(project_name): """Gera um ZIP com caminhos limpos (Não exige login para baixar).""" if not project_name.lower().endswith('.mmp'): project_name += '.mmp' clean_name = secure_filename(project_name) mmp_path = os.path.join(MMP_FOLDER, clean_name) if not os.path.exists(mmp_path): return jsonify({"error": "Projeto não encontrado"}), 404 memory_file = io.BytesIO() try: tree = ET.parse(mmp_path) root = tree.getroot() samples_to_pack = set() for audio_node in root.findall(".//audiofileprocessor"): src = audio_node.get('src', '') if src.startswith(XML_IMPORTED_PATH_PREFIX): filename = os.path.basename(src) short_path = f"imported/{filename}" audio_node.set('src', short_path) samples_to_pack.add(filename) with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf: xml_str = ET.tostring(root, encoding='utf-8', method='xml') zf.writestr(clean_name, xml_str) for sample_name in samples_to_pack: physical_path = os.path.join(XML_IMPORTED_PATH_PREFIX, sample_name) zip_entry_name = f"imported/{sample_name}" if os.path.exists(physical_path): zf.write(physical_path, arcname=zip_entry_name) memory_file.seek(0) return send_file( memory_file, mimetype='application/zip', as_attachment=True, download_name=f"{os.path.splitext(clean_name)[0]}.zip" ) except Exception as e: return jsonify({"error": f"Erro ao gerar pacote: {str(e)}"}), 500 @app.route("/api/upload", methods=["POST"]) @login_required # <--- PROTEGIDO def upload_file(): if "project_file" not in request.files: return jsonify({"error": "Nenhum arquivo enviado"}), 400 file = request.files["project_file"] if file.filename == "": return jsonify({"error": "Nome do arquivo vazio"}), 400 if file and allowed_file(file.filename): original_name = file.filename name_without_ext = os.path.splitext(original_name)[0] ext = os.path.splitext(original_name)[1].lower() clean_name = slugify(name_without_ext) if not clean_name: clean_name = f"upload-{int(time.time())}" final_mmp_filename = f"{clean_name}.mmp" final_mmp_path = os.path.join(MMP_FOLDER, final_mmp_filename) upload_filename = f"{clean_name}{ext}" if ext == ".mmpz": upload_path = os.path.join(MMPZ_FOLDER, upload_filename) else: upload_path = os.path.join(MMP_FOLDER, upload_filename) try: file.save(upload_path) print(f"Upload salvo em: {upload_path} (User: {current_user.username})") if ext == ".mmpz": success = convert_mmpz_to_mmp(upload_path, final_mmp_path) if not success: return jsonify({"error": "Falha crítica na descompactação."}), 400 elif ext == ".mmp" and upload_path != final_mmp_path: shutil.move(upload_path, final_mmp_path) is_clean, missing_files = analyze_mmp_dependencies(final_mmp_filename) if not is_clean: return jsonify({ "status": "missing_samples", "message": "Samples externos detectados.", "project_file": final_mmp_filename, "missing_files": missing_files }), 202 return process_and_build(final_mmp_filename) except Exception as e: return jsonify({"error": f"Erro interno: {str(e)}"}), 500 return jsonify({"error": "Tipo de arquivo não permitido"}), 400 @app.route("/api/upload/resolve", methods=["POST"]) @login_required # <--- PROTEGIDO def resolve_samples(): project_filename = request.form.get('project_file') if not project_filename: return jsonify({"error": "Nome do projeto não informado"}), 400 os.makedirs(XML_IMPORTED_PATH_PREFIX, exist_ok=True) replacements = {} for original_name, file_storage in request.files.items(): if file_storage and allowed_sample(file_storage.filename): safe_new_name = secure_filename(file_storage.filename) if not safe_new_name: safe_new_name = f"sample_{int(time.time())}.wav" save_path = os.path.join(XML_IMPORTED_PATH_PREFIX, safe_new_name) file_storage.save(save_path) replacements[original_name] = safe_new_name if not replacements: return jsonify({"error": "Nenhum arquivo válido enviado."}), 400 if update_xml_paths_exact(project_filename, replacements): return process_and_build(project_filename) else: return jsonify({"error": "Falha ao atualizar o arquivo de projeto."}), 500 @app.route('/api/upload/sample', methods=['POST']) @login_required # <--- PROTEGIDO def upload_sample_standalone(): if 'sample_file' not in request.files: return jsonify({'error': 'Nenhum arquivo'}), 400 file = request.files['sample_file'] subfolder = request.form.get('subfolder', '').strip() if file and allowed_sample(file.filename): filename = secure_filename(file.filename) safe_subfolder = subfolder.replace('..', '').strip('/') target_dir = os.path.join(SAMPLE_SRC, safe_subfolder) os.makedirs(target_dir, exist_ok=True) file.save(os.path.join(target_dir, filename)) generate_manifests(SRC_MMPSEARCH) run_jekyll_build() return jsonify({'message': 'Sample enviado!'}), 200 return jsonify({'error': 'Erro no arquivo'}), 400 if __name__ == "__main__": # Se estiver rodando atrás do Apache, NÃO use ssl_context. # O Apache já cuida do SSL (https://alice.ufsj.edu.br). # O Flask deve rodar em HTTP puro localmente. print("Iniciando servidor na porta 33002 (HTTP Mode)...") app.run(host="0.0.0.0", port=33002, debug=True) # Removi o ssl_context=context para facilitar o proxy reverso