From b474a2a1818dc03761ac5bf5864a4300cdaccc6f Mon Sep 17 00:00:00 2001 From: JotaChina Date: Thu, 26 Mar 2026 20:45:04 -0300 Subject: [PATCH] auth --- _config.yml | 2 +- scripts/handler/upload_server.py | 401 ++++++++++++++++++------------- 2 files changed, 241 insertions(+), 162 deletions(-) diff --git a/_config.yml b/_config.yml index ccfe93ed..136b5b5a 100755 --- a/_config.yml +++ b/_config.yml @@ -15,7 +15,7 @@ plugins: page_gen-dirs: true page_gen: - - data: all + - data: all_leve template: projetos dir: projetos index_files: false diff --git a/scripts/handler/upload_server.py b/scripts/handler/upload_server.py index 2b29e2d9..11b31c97 100755 --- a/scripts/handler/upload_server.py +++ b/scripts/handler/upload_server.py @@ -1,9 +1,7 @@ import os import subprocess import xml.etree.ElementTree as ET -import gzip import zipfile -import zlib import json import shutil import time @@ -17,20 +15,42 @@ from datetime import datetime from flask_sqlalchemy import SQLAlchemy from flask_bcrypt import Bcrypt -from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user +from flask_login import ( + LoginManager, + UserMixin, + login_user, + login_required, + logout_user, + current_user, +) from flask_admin import Admin, AdminIndexView, expose from flask_admin.contrib.sqla import ModelView -from main import process_single_file, rebuild_indexes, generate_manifests, slugify -from utils import ALLOWED_EXTENSIONS, ALLOWED_SAMPLE_EXTENSIONS, MMP_FOLDER, MMPZ_FOLDER, DATA_FOLDER, BASE_DATA, SRC_MMPSEARCH, SAMPLE_SRC, METADATA_FOLDER, XML_IMPORTED_PATH_PREFIX, SAMPLE_MANIFEST +from main import process_single_file, generate_manifests, slugify +from utils import ( + ALLOWED_EXTENSIONS, + ALLOWED_SAMPLE_EXTENSIONS, + MMP_FOLDER, + MMPZ_FOLDER, + DATA_FOLDER, + BASE_DATA, + SRC_MMPSEARCH, + SAMPLE_SRC, + XML_IMPORTED_PATH_PREFIX, + SAMPLE_MANIFEST, +) app = Flask(__name__) # --- CONFIGURAÇÃO DE SEGURANÇA E BANCO --- -app.config['SECRET_KEY'] = '25de5592bf94c2ca18e27baa0ae2d4ee22a63012f32e1be719d31f530c215a387b9ec0c9d96be38e80a7ccdd859e04408facefff8fd9119e7f5a2d987d85abb7' -app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(DATA_FOLDER, 'users.db') -app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False +app.config["SECRET_KEY"] = ( + "25de5592bf94c2ca18e27baa0ae2d4ee22a63012f32e1be719d31f530c215a387b9ec0c9d96be38e80a7ccdd859e04408facefff8fd9119e7f5a2d987d85abb7" +) +app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///" + os.path.join( + DATA_FOLDER, "users.db" +) +app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False # CORS precisa suportar credenciais para o cookie de login funcionar CORS(app, supports_credentials=True) @@ -38,28 +58,30 @@ CORS(app, supports_credentials=True) db = SQLAlchemy(app) bcrypt = Bcrypt(app) login_manager = LoginManager(app) -login_manager.login_view = 'login' +login_manager.login_view = "login" # ============================================================================== # CLASSES UTILIZADAS # ============================================================================== + # --- MODELO DE USUÁRIO --- class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(150), unique=True, nullable=False) password = db.Column(db.String(150), nullable=False) is_admin = db.Column(db.Boolean, default=False) - + # Detalhes do usuário bio = db.Column(db.String(240), default="") - tags = db.Column(db.String(500), default="") # Vamos salvar separado por vírgula + tags = db.Column(db.String(500), default="") # Vamos salvar separado por vírgula avatar_url = db.Column(db.String(255), default="/assets/img/default_avatar.png") cover_url = db.Column(db.String(255), default="/assets/img/default_cover.jpg") - projects = db.relationship('Project', backref='owner', lazy=True) - samples = db.relationship('Sample', backref='owner', lazy=True) - recordings = db.relationship('Recording', backref='owner', lazy=True) + projects = db.relationship("Project", backref="owner", lazy=True) + samples = db.relationship("Sample", backref="owner", lazy=True) + recordings = db.relationship("Recording", backref="owner", lazy=True) + class SecureModelView(ModelView): def is_accessible(self): @@ -69,97 +91,124 @@ class SecureModelView(ModelView): # Retorna erro JSON ou redireciona (como é painel, melhor redirecionar ou negar) return jsonify({"error": "Acesso restrito a administradores."}), 403 + class SecureIndexView(AdminIndexView): - @expose('/') + @expose("/") def index(self): if not current_user.is_authenticated or not current_user.is_admin: # Em vez de retornar JSON, redireciona para a home # O usuário verá o botão de login lá. - return redirect('/') - + return redirect("/") + # OU, se quiser ser mais explícito, retorna 403 mas em HTML (opcional) # return "

Acesso Negado

Você precisa ser admin.

", 403 - + return super(SecureIndexView, self).index() - + + # ============================================================================== # CLASSES DE ARTEFATOS # ============================================================================== + # Tabela de Projetos class Project(db.Model): id = db.Column(db.Integer, primary_key=True) - filename = db.Column(db.String(255), nullable=False) # Nome do arquivo físico .mmp - display_name = db.Column(db.String(255), nullable=False) # Nome "bonito" para exibir + filename = db.Column(db.String(255), nullable=False) # Nome do arquivo físico .mmp + display_name = db.Column( + db.String(255), nullable=False + ) # Nome "bonito" para exibir created_at = db.Column(db.DateTime, default=datetime.utcnow) - + # Chave estrangeira ligando ao usuário - user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) + user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False) + class Sample(db.Model): id = db.Column(db.Integer, primary_key=True) filename = db.Column(db.String(255), nullable=False) display_name = db.Column(db.String(255), nullable=False) - category = db.Column(db.String(50), default="Uncategorized") # Ex: drums, bass + category = db.Column(db.String(50), default="Uncategorized") # Ex: drums, bass created_at = db.Column(db.DateTime, default=datetime.utcnow) - user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) + user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False) + class Recording(db.Model): id = db.Column(db.Integer, primary_key=True) filename = db.Column(db.String(255), nullable=False) display_name = db.Column(db.String(255), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) - user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) + user_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False) + # Cria o banco na inicialização se não existir with app.app_context(): db.create_all() + @login_manager.user_loader def load_user(user_id): return User.query.get(int(user_id)) + # --- FUNÇÕES UTILITÁRIAS --- + def allowed_file(filename): return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS + def allowed_sample(filename): - return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_SAMPLE_EXTENSIONS + return ( + "." in filename + and filename.rsplit(".", 1)[1].lower() in ALLOWED_SAMPLE_EXTENSIONS + ) + def run_jekyll_build(): RUBY_BIN_PATH = "/usr/bin/ruby3.2" - BUNDLE_PATH = "/nethome/jotachina/projetos/mmpSearch/vendor/bundle/ruby/3.2.0/bin/bundle" + BUNDLE_PATH = ( + "/nethome/jotachina/projetos/mmpSearch/vendor/bundle/ruby/3.2.0/bin/bundle" + ) # Prepara o ambiente para o subprocesso env_vars = os.environ.copy() # Adiciona o caminho do Ruby ao PATH do usuário www-data temporariamente env_vars["PATH"] = f"{RUBY_BIN_PATH}:{env_vars.get('PATH', '')}" print("Iniciando build do Jekyll...") - command = [BUNDLE_PATH, "exec", "jekyll", "build", "--destination", "/var/www/html/trens/mmpSearch/"] + command = [ + BUNDLE_PATH, + "exec", + "jekyll", + "build", + "--destination", + "/var/www/html/trens/mmpSearch/", + ] try: - # Redirecionamos a saída para DEVNULL para não encher o buffer e travar + # Redirecionamos a saída para DEVNULL para não encher o buffer e travar subprocess.Popen( - command, - cwd=BASE_DATA, + command, + cwd=BASE_DATA, env=env_vars, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, ) print("Jekyll Build iniciado em segundo plano (background).") except Exception as e: print(f"Erro ao iniciar Jekyll: {e}") + def load_manifest_keys(): """Carrega as pastas de fábrica (keys do manifesto).""" if not os.path.exists(SAMPLE_MANIFEST): return ["drums", "instruments", "effects", "presets", "samples"] try: - with open(SAMPLE_MANIFEST, 'r', encoding='utf-8') as f: + with open(SAMPLE_MANIFEST, "r", encoding="utf-8") as f: data = json.load(f) return list(data.keys()) except Exception: return [] + def convert_mmpz_to_mmp(mmpz_path, mmp_target_path): LMMS_PATH = "/usr/bin/lmms" """Tenta descompactar .mmpz usando Python ou LMMS CLI.""" @@ -185,8 +234,10 @@ def convert_mmpz_to_mmp(mmpz_path, mmp_target_path): try: with open(mmp_target_path, "w") as f_out: - subprocess.run(cmd, stdout=f_out, stderr=subprocess.PIPE, check=True, env=env_vars) - + subprocess.run( + cmd, stdout=f_out, stderr=subprocess.PIPE, check=True, env=env_vars + ) + if os.path.exists(mmp_target_path) and os.path.getsize(mmp_target_path) > 0: print("Sucesso: Descompactado via LMMS CLI (--dump).") return True @@ -194,12 +245,15 @@ def convert_mmpz_to_mmp(mmpz_path, mmp_target_path): print("Erro: LMMS CLI rodou, mas arquivo de saída está vazio.") return False except subprocess.CalledProcessError as e: - print(f"Falha crítica no LMMS CLI: {e.stderr.decode('utf-8') if e.stderr else str(e)}") + print( + f"Falha crítica no LMMS CLI: {e.stderr.decode('utf-8') if e.stderr else str(e)}" + ) return False except Exception as e: print(f"Erro inesperado ao chamar LMMS: {e}") return False + def analyze_mmp_dependencies(mmp_filename): """Analisa um arquivo .mmp XML puro.""" filepath = os.path.join(MMP_FOLDER, mmp_filename) @@ -218,18 +272,20 @@ def analyze_mmp_dependencies(mmp_filename): missing_samples = set() for audio_node in root.findall(".//audiofileprocessor"): - src = audio_node.get('src', '') - if not src: continue - + src = audio_node.get("src", "") + if not src: + continue + is_factory = any(src.startswith(folder + "/") for folder in factory_folders) is_already_imported = src.startswith("src_mmpSearch/samples/imported/") - + if not is_factory and not is_already_imported: file_name = os.path.basename(src) missing_samples.add(file_name) return len(missing_samples) == 0, list(missing_samples) + def update_xml_paths_exact(mmp_filename, replacements): """Substitui caminhos no XML baseando-se no mapeamento exato.""" filepath = os.path.join(MMP_FOLDER, mmp_filename) @@ -238,80 +294,87 @@ def update_xml_paths_exact(mmp_filename, replacements): root = tree.getroot() changes_made = False replacements_lower = {k.lower(): v for k, v in replacements.items()} - + for audio_node in root.findall(".//audiofileprocessor"): - src = audio_node.get('src', '') - if not src: continue + src = audio_node.get("src", "") + if not src: + continue current_basename = os.path.basename(src) current_basename_lower = current_basename.lower() if current_basename_lower in replacements_lower: new_filename = replacements_lower[current_basename_lower] new_src = f"{XML_IMPORTED_PATH_PREFIX}/{new_filename}" - audio_node.set('src', new_src) - audio_node.set('vol', audio_node.get('vol', '1')) + audio_node.set("src", new_src) + audio_node.set("vol", audio_node.get("vol", "1")) print(f"[XML FIX] Substituído: {current_basename} -> {new_src}") changes_made = True if changes_made: - tree.write(filepath, encoding='UTF-8', xml_declaration=True) + tree.write(filepath, encoding="UTF-8", xml_declaration=True) return True - return True + return True except Exception as e: print(f"Erro crítico ao editar XML: {e}") return False + def run_heavy_tasks_in_background(): """Esta função roda isolada sem travar o usuário""" print("--- [BACKGROUND] Iniciando reconstrução de índices ---") try: # 1. Isso é Python puro e PESADO (aqui é onde estava travando) - rebuild_indexes() - + rebuild_indexes() + # 2. Isso gera os manifestos (Python puro) generate_manifests(SRC_MMPSEARCH) - + # 3. Isso chama o subprocesso do Jekyll (Externo) # Mantém sua função original que usa subprocess run_jekyll_build() - + print("--- [BACKGROUND] Tarefas concluídas com sucesso ---") except Exception as e: print(f"--- [BACKGROUND] Erro: {e} ---") + def process_and_build(filename): """Função chamada pela rota de upload""" # Processamento inicial do arquivo (rápido) result = process_single_file(filename) - + if result["success"]: # Em vez de chamar rebuild_indexes() direto, criamos a Thread - # O Flask vai responder o return abaixo imediatamente, + # O Flask vai responder o return abaixo imediatamente, # enquanto a thread continua rodando no servidor. task_thread = threading.Thread(target=run_heavy_tasks_in_background) task_thread.start() - - return jsonify({ - "message": "Sucesso! O projeto foi recebido. O site será atualizado em instantes.", - "data": result["data"] - }), 200 + + return jsonify( + { + "message": "Sucesso! O projeto foi recebido. O site será atualizado em instantes.", + "data": result["data"], + } + ), 200 else: return jsonify({"error": f"Erro no processamento: {result['error']}"}), 500 + # ============================================================================== # ROTAS DE AUTENTICAÇÃO # ============================================================================== -@app.route('/api/register', methods=['POST']) + +@app.route("/api/register", methods=["POST"]) def register(): data = request.json - if not data or not data.get('username') or not data.get('password'): + if not data or not data.get("username") or not data.get("password"): return jsonify({"message": "Dados incompletos"}), 400 - if User.query.filter_by(username=data['username']).first(): + if User.query.filter_by(username=data["username"]).first(): return jsonify({"message": "Usuário já existe"}), 400 - hashed_password = bcrypt.generate_password_hash(data['password']).decode('utf-8') - new_user = User(username=data['username'], password=hashed_password) + hashed_password = bcrypt.generate_password_hash(data["password"]).decode("utf-8") + new_user = User(username=data["username"], password=hashed_password) try: db.session.add(new_user) db.session.commit() @@ -319,41 +382,47 @@ def register(): except Exception as e: return jsonify({"message": f"Erro: {str(e)}"}), 500 -@app.route('/api/login', methods=['POST']) + +@app.route("/api/login", methods=["POST"]) def login(): data = request.json - user = User.query.filter_by(username=data['username']).first() - if user and bcrypt.check_password_hash(user.password, data['password']): + user = User.query.filter_by(username=data["username"]).first() + if user and bcrypt.check_password_hash(user.password, data["password"]): login_user(user) return jsonify({"message": "Login realizado", "user": user.username}), 200 return jsonify({"message": "Credenciais inválidas"}), 401 -@app.route('/api/logout', methods=['POST']) + +@app.route("/api/logout", methods=["POST"]) @login_required def logout(): logout_user() return jsonify({"message": "Logout realizado"}), 200 -@app.route('/api/check_auth', methods=['GET']) + +@app.route("/api/check_auth", methods=["GET"]) def check_auth(): if current_user.is_authenticated: return jsonify({"logged_in": True, "user": current_user.username}) return jsonify({"logged_in": False}) + # No upload_server.py, adicione esta rota -@app.route('/api/user/update', methods=['POST']) +@app.route("/api/user/update", methods=["POST"]) @login_required def update_profile(): data = request.json - new_username = data.get('username') - + new_username = data.get("username") + if new_username and new_username != current_user.username: if User.query.filter_by(username=new_username).first(): return jsonify({"error": "Nome em uso."}), 400 current_user.username = new_username - if 'bio' in data: current_user.bio = data['bio'][:240] - if 'tags' in data: current_user.tags = data['tags'] + if "bio" in data: + current_user.bio = data["bio"][:240] + if "tags" in data: + current_user.tags = data["tags"] try: db.session.commit() @@ -361,50 +430,58 @@ def update_profile(): except Exception: return jsonify({"error": "Erro ao salvar."}), 500 + # Atualize a rota user_profile antiga para retornar os novos dados -@app.route('/api/user/profile', methods=['GET']) +@app.route("/api/user/profile", methods=["GET"]) @login_required def user_profile(): # Projetos user_projects = [] for p in current_user.projects: - user_projects.append({ - "id": p.id, - "display_name": p.display_name, - "filename": p.filename, - "created_at": p.created_at.strftime("%d/%m/%Y"), - "download_link": f"/api/download/{p.filename}" - }) + user_projects.append( + { + "id": p.id, + "display_name": p.display_name, + "filename": p.filename, + "created_at": p.created_at.strftime("%d/%m/%Y"), + "download_link": f"/api/download/{p.filename}", + } + ) # Samples user_samples = [] for s in current_user.samples: # Ajuste do prefixo exato solicitado public_url = f"/mmpSearch/src_mmpSearch/samples/{s.category}/{s.filename}" - - user_samples.append({ - "id": s.id, - "display_name": s.display_name, - "category": s.category, - "created_at": s.created_at.strftime("%d/%m/%Y"), - "download_link": public_url - }) - return jsonify({ - "username": current_user.username, - "bio": current_user.bio, - "tags": current_user.tags, - "avatar": current_user.avatar_url, - "cover": current_user.cover_url, - "projects": user_projects, - "samples": user_samples - }) + user_samples.append( + { + "id": s.id, + "display_name": s.display_name, + "category": s.category, + "created_at": s.created_at.strftime("%d/%m/%Y"), + "download_link": public_url, + } + ) -@app.route('/api/project/', methods=['DELETE']) + return jsonify( + { + "username": current_user.username, + "bio": current_user.bio, + "tags": current_user.tags, + "avatar": current_user.avatar_url, + "cover": current_user.cover_url, + "projects": user_projects, + "samples": user_samples, + } + ) + + +@app.route("/api/project/", methods=["DELETE"]) @login_required def delete_project(project_id): project = Project.query.get_or_404(project_id) - + # Segurança: Só o dono ou admin pode apagar if project.owner != current_user and not current_user.is_admin: return jsonify({"error": "Permissão negada"}), 403 @@ -413,27 +490,29 @@ def delete_project(project_id): # 1. Remove do Banco db.session.delete(project) db.session.commit() - + # 2. Remove o arquivo físico (Opcional, mas recomendado para não lotar o disco) file_path = os.path.join(MMP_FOLDER, project.filename) if os.path.exists(file_path): os.remove(file_path) - + # Nota: Você pode querer rodar o rebuild_indexes() aqui também se apagar o arquivo - + return jsonify({"message": "Projeto removido com sucesso"}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 - + + # ============================================================================== # ROTAS PRINCIPAIS # ============================================================================== + @app.route("/api/download/", methods=["GET"]) def download_project_package(project_name): """Gera um ZIP com caminhos limpos (Não exige login para baixar).""" - if not project_name.lower().endswith('.mmp'): - project_name += '.mmp' + if not project_name.lower().endswith(".mmp"): + project_name += ".mmp" clean_name = secure_filename(project_name) mmp_path = os.path.join(MMP_FOLDER, clean_name) @@ -448,15 +527,15 @@ def download_project_package(project_name): samples_to_pack = set() for audio_node in root.findall(".//audiofileprocessor"): - src = audio_node.get('src', '') + src = audio_node.get("src", "") if src.startswith(XML_IMPORTED_PATH_PREFIX): filename = os.path.basename(src) short_path = f"imported/{filename}" - audio_node.set('src', short_path) + audio_node.set("src", short_path) samples_to_pack.add(filename) - with zipfile.ZipFile(memory_file, 'w', zipfile.ZIP_DEFLATED) as zf: - xml_str = ET.tostring(root, encoding='utf-8', method='xml') + with zipfile.ZipFile(memory_file, "w", zipfile.ZIP_DEFLATED) as zf: + xml_str = ET.tostring(root, encoding="utf-8", method="xml") zf.writestr(clean_name, xml_str) for sample_name in samples_to_pack: @@ -468,9 +547,9 @@ def download_project_package(project_name): memory_file.seek(0) return send_file( memory_file, - mimetype='application/zip', + mimetype="application/zip", as_attachment=True, - download_name=f"{os.path.splitext(clean_name)[0]}.zip" + download_name=f"{os.path.splitext(clean_name)[0]}.zip", ) except Exception as e: return jsonify({"error": f"Erro ao gerar pacote: {str(e)}"}), 500 @@ -493,7 +572,7 @@ def upload_file(): clean_name = slugify(name_without_ext) if not clean_name: clean_name = f"upload-{int(time.time())}" - + final_mmp_filename = f"{clean_name}.mmp" final_mmp_path = os.path.join(MMP_FOLDER, final_mmp_filename) @@ -516,24 +595,26 @@ def upload_file(): # Registra no banco de dados vinculado ao usuário logado new_project = Project( - filename=final_mmp_filename, - display_name=clean_name, # Ou name_without_ext se preferir o original - owner=current_user # Flask-Login pega o user atual + filename=final_mmp_filename, + display_name=clean_name, # Ou name_without_ext se preferir o original + owner=current_user, # Flask-Login pega o user atual ) db.session.add(new_project) db.session.commit() print(f"Projeto {clean_name} associado ao usuário {current_user.username}") # ----------------------------- - # + # is_clean, missing_files = analyze_mmp_dependencies(final_mmp_filename) if not is_clean: - return jsonify({ - "status": "missing_samples", - "message": "Samples externos detectados.", - "project_file": final_mmp_filename, - "missing_files": missing_files - }), 202 + return jsonify( + { + "status": "missing_samples", + "message": "Samples externos detectados.", + "project_file": final_mmp_filename, + "missing_files": missing_files, + } + ), 202 return process_and_build(final_mmp_filename) @@ -546,7 +627,7 @@ def upload_file(): @app.route("/api/upload/resolve", methods=["POST"]) @login_required # <--- PROTEGIDO def resolve_samples(): - project_filename = request.form.get('project_file') + project_filename = request.form.get("project_file") if not project_filename: return jsonify({"error": "Nome do projeto não informado"}), 400 @@ -556,7 +637,8 @@ def resolve_samples(): for original_name, file_storage in request.files.items(): if file_storage and allowed_sample(file_storage.filename): safe_new_name = secure_filename(file_storage.filename) - if not safe_new_name: safe_new_name = f"sample_{int(time.time())}.wav" + if not safe_new_name: + safe_new_name = f"sample_{int(time.time())}.wav" save_path = os.path.join(XML_IMPORTED_PATH_PREFIX, safe_new_name) file_storage.save(save_path) @@ -571,82 +653,79 @@ def resolve_samples(): return jsonify({"error": "Falha ao atualizar o arquivo de projeto."}), 500 -@app.route('/api/upload/sample', methods=['POST']) -@login_required +@app.route("/api/upload/sample", methods=["POST"]) +@login_required def upload_sample_standalone(): - if 'sample_file' not in request.files: - return jsonify({'error': 'Nenhum arquivo'}), 400 - - file = request.files['sample_file'] + if "sample_file" not in request.files: + return jsonify({"error": "Nenhum arquivo"}), 400 + + file = request.files["sample_file"] # Pega a pasta escolhida pelo usuário, padrão 'uncategorized' - raw_subfolder = request.form.get('subfolder', 'uncategorized').strip() - + raw_subfolder = request.form.get("subfolder", "uncategorized").strip() + # Limpa o nome da pasta para evitar ../../etc/passwd safe_subfolder = secure_filename(raw_subfolder) - if not safe_subfolder: - safe_subfolder = 'uncategorized' + if not safe_subfolder: + safe_subfolder = "uncategorized" if file and allowed_sample(file.filename): filename = secure_filename(file.filename) - + # Cria o caminho físico final: SAMPLE_SRC + Categoria + Arquivo target_dir = os.path.join(SAMPLE_SRC, safe_subfolder) os.makedirs(target_dir, exist_ok=True) - + file_path = os.path.join(target_dir, filename) file.save(file_path) - + # Salva no Banco new_sample = Sample( filename=filename, - display_name=filename, + display_name=filename, category=safe_subfolder, - owner=current_user + owner=current_user, ) db.session.add(new_sample) db.session.commit() - - generate_manifests(SRC_MMPSEARCH) + + generate_manifests(SRC_MMPSEARCH) run_jekyll_build() - return jsonify({'message': 'Sample enviado com sucesso!'}), 200 - - return jsonify({'error': 'Tipo de arquivo inválido'}), 400 + return jsonify({"message": "Sample enviado com sucesso!"}), 200 -@app.route('/api/sample/', methods=['DELETE']) + return jsonify({"error": "Tipo de arquivo inválido"}), 400 + + +@app.route("/api/sample/", methods=["DELETE"]) @login_required def delete_sample(sample_id): sample = Sample.query.get_or_404(sample_id) - + if sample.owner != current_user and not current_user.is_admin: return jsonify({"error": "Proibido"}), 403 - + try: # 1. Tenta remover o arquivo físico file_path = os.path.join(SAMPLE_SRC, sample.category, sample.filename) if os.path.exists(file_path): os.remove(file_path) - + # 2. Remove do banco db.session.delete(sample) db.session.commit() - + return jsonify({"message": "Sample apagado"}), 200 except Exception as e: return jsonify({"error": str(e)}), 500 - + + # Inicializa o Flask-Admin -admin = Admin( - app, - name='MMP Admin', - url='/api/admin', - index_view=SecureIndexView() -) +admin = Admin(app, name="MMP Admin", url="/api/admin", index_view=SecureIndexView()) # Adiciona a visualização da tabela de Usuários admin.add_view(SecureModelView(User, db.session, name="Gerenciar Usuários")) if __name__ == "__main__": - admin = Admin(app, name='MMP Admin', url='/api/admin', index_view=SecureIndexView()) + admin = Admin(app, name="MMP Admin", url="/api/admin", index_view=SecureIndexView()) admin.add_view(SecureModelView(User, db.session, name="Gerenciar Usuários")) - app.run(host="0.0.0.0", port=33002, debug=True) \ No newline at end of file + app.run(host="0.0.0.0", port=33002, debug=True)