import os import requests import urllib.parse from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, Response from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user from werkzeug.security import check_password_hash, generate_password_hash from dotenv import load_dotenv import time # 加载环境变量 load_dotenv() app = Flask(__name__) app.secret_key = os.environ.get('SECRET_KEY', 'starsky_secret_key') # 登录管理配置 login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' # 用户类 class User(UserMixin): def __init__(self, id, username, password_hash): self.id = id self.username = username self.password_hash = password_hash # 默认用户 default_username = os.environ.get('ADMIN_USERNAME', 'admin') default_password = os.environ.get('ADMIN_PASSWORD', 'admin123') # 用户存储 users = { default_username: User( default_username, default_username, generate_password_hash(default_password) ) } # 检查 subconverter 是否启动 def check_subconverter(): max_retries = 10 for i in range(max_retries): try: response = requests.get('http://localhost:25500/version', timeout=2) if response.status_code == 200: print(f"Subconverter 服务已启动: {response.text}") return True except: pass print(f"等待 Subconverter 服务启动... ({i+1}/{max_retries})") time.sleep(2) print("警告: Subconverter 服务可能未正常启动") return False # 应用启动时检查 subconverter 服务 check_subconverter() @login_manager.user_loader def load_user(user_id): return users.get(user_id) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') user = users.get(username) if user and check_password_hash(user.password_hash, password): login_user(user) return redirect(url_for('index')) else: flash('用户名或密码错误') return render_template('login.html') @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('login')) @app.route('/') @login_required def index(): return render_template('index.html') @app.route('/convert', methods=['POST']) @login_required def convert(): # 获取参数 backend_url = request.form.get('backend_url', 'https://raw.githubusercontent.com/yuanwangokk-1/subscribe/refs/heads/main/ACL4SSR/ACL4SSR.ini') target = request.form.get('target', 'clash') original_url = request.form.get('original_url', '') if not original_url: return jsonify({"status": "error", "message": "订阅链接不能为空"}) try: # 检查subconverter服务是否可用 try: version_check = requests.get('http://localhost:25500/version', timeout=2) if version_check.status_code != 200: return jsonify({"status": "error", "message": "订阅转换服务暂时不可用,请稍后再试"}) except: return jsonify({"status": "error", "message": "订阅转换服务暂时不可用,请稍后再试"}) # 构建转换请求参数 params = { 'target': target, 'url': original_url, 'insert': 'false', 'config': backend_url, 'emoji': 'true', 'list': 'false', 'xudp': 'false', 'udp': 'false', 'tfo': 'false', 'expand': 'true', 'scv': 'false', 'fdn': 'false', 'new_name': 'true' } # 通过URL参数构建转换链接 base_url = request.host_url.rstrip('/') convert_url = f"{base_url}/api/sub?{urllib.parse.urlencode(params)}" return jsonify({"status": "success", "result": convert_url}) except Exception as e: return jsonify({"status": "error", "message": f"处理失败: {str(e)}"}) @app.route('/api/sub') def subscribe_api(): """代理 subconverter 的转换请求""" try: # 获取所有URL参数 params = request.args.to_dict() # 调用本地 subconverter 服务 sub_url = f"http://localhost:25500/sub?{urllib.parse.urlencode(params)}" response = requests.get(sub_url, timeout=30) # 返回原始响应 return Response( response.content, status=response.status_code, content_type=response.headers.get('Content-Type', 'text/plain') ) except Exception as e: return f"转换失败: {str(e)}", 500 # 不要在这里调用 app.run() # 让 Hugging Face 的 WSGI 服务器来运行应用 # 仅在本地开发时使用以下代码 if __name__ == '__main__' and os.environ.get('DEVELOPMENT') == 'true': app.run(host='0.0.0.0', port=7860, debug=True)