import os import time import urllib.parse import requests import random import hashlib import json from flask import Flask, render_template, request, redirect, url_for, flash, jsonify, Response from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user from werkzeug.security import check_password_hash, generate_password_hash from dotenv import load_dotenv # 加载环境变量 load_dotenv() app = Flask(__name__) app.secret_key = os.environ.get('SECRET_KEY', 'starsky_secret_key') # 登录管理配置 login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' # 用户类 class User(UserMixin): def __init__(self, id, username, password_hash): self.id = id self.username = username self.password_hash = password_hash # 默认用户 default_username = os.environ.get('ADMIN_USERNAME', 'admin') default_password = os.environ.get('ADMIN_PASSWORD', 'admin123') # 用户存储 users = { default_username: User( default_username, default_username, generate_password_hash(default_password) ) } # 代理和缓存系统 class ProxySystem: def __init__(self): # 可用的API列表 self.apis = [ "https://api.v1.mk/sub", "https://pub-api-1.bianyuan.xyz/sub" ] # 简单缓存系统 self._cache_file = "cache.json" self._cache = {} self._cache_time = {} self._cache_ttl = 3600 # 1小时缓存 self._load_cache() # 统计信息 self.stats = { "cache_hits": 0, "cache_misses": 0, "api_successes": 0, "api_failures": 0 } def _load_cache(self): """从磁盘加载缓存""" try: if os.path.exists(self._cache_file): with open(self._cache_file, 'r') as f: cache_data = json.load(f) self._cache = cache_data.get('cache', {}) self._cache_time = cache_data.get('cache_time', {}) print(f"已加载 {len(self._cache)} 条缓存数据") except Exception as e: print(f"加载缓存失败: {e}") self._cache = {} self._cache_time = {} def _save_cache(self): """保存缓存到磁盘""" try: # 限制缓存大小,最多保存100条 if len(self._cache) > 100: # 移除最旧的缓存 sorted_keys = sorted(self._cache_time.items(), key=lambda x: x[1]) keys_to_remove = [k for k, v in sorted_keys[:len(self._cache) - 100]] for key in keys_to_remove: self._cache.pop(key, None) self._cache_time.pop(key, None) cache_data = { 'cache': self._cache, 'cache_time': self._cache_time } with open(self._cache_file, 'w') as f: json.dump(cache_data, f) except Exception as e: print(f"保存缓存失败: {e}") def get_random_api(self): """获取随机API""" return random.choice(self.apis) def get_cache_key(self, params): """生成缓存键""" # 排序参数以保持一致性 sorted_params = sorted(params.items()) param_str = urllib.parse.urlencode(sorted_params) return hashlib.md5(param_str.encode()).hexdigest() def get_from_cache(self, params): """从缓存获取结果""" key = self.get_cache_key(params) current_time = time.time() if key in self._cache and current_time - self._cache_time.get(key, 0) < self._cache_ttl: self.stats["cache_hits"] += 1 return self._cache.get(key) self.stats["cache_misses"] += 1 return None def save_to_cache(self, params, data): """保存到缓存""" key = self.get_cache_key(params) self._cache[key] = data self._cache_time[key] = time.time() # 定期保存缓存 if self.stats["cache_misses"] % 5 == 0: # 每5次未命中缓存时保存 self._save_cache() def convert(self, params): """执行转换,优先使用缓存""" # 尝试从缓存获取 cache_result = self.get_from_cache(params) if cache_result: return cache_result # 缓存未命中,请求API errors = [] # 随机API顺序 apis = list(self.apis) random.shuffle(apis) for api_url in apis: try: # 发送请求 response = requests.get(api_url, params=params, timeout=30) if response.status_code == 200: # 获取内容和类型 content = response.content content_type = response.headers.get('Content-Type', 'text/plain') # 保存到缓存 cache_data = { 'content': content.decode('utf-8', errors='ignore'), # 保存为字符串 'content_type': content_type } self.save_to_cache(params, cache_data) self.stats["api_successes"] += 1 return content, content_type else: errors.append(f"API {api_url} 返回状态码: {response.status_code}") except Exception as e: errors.append(f"API {api_url} 请求失败: {str(e)}") self.stats["api_failures"] += 1 # 所有API都失败 error_message = "所有转换API均不可用:\n" + "\n".join(errors) raise Exception(error_message) def get_stats(self): """获取统计信息""" return { "cache_size": len(self._cache), "cache_hits": self.stats["cache_hits"], "cache_misses": self.stats["cache_misses"], "api_successes": self.stats["api_successes"], "api_failures": self.stats["api_failures"], "cache_hit_ratio": self.stats["cache_hits"] / (self.stats["cache_hits"] + self.stats["cache_misses"]) if (self.stats["cache_hits"] + self.stats["cache_misses"]) > 0 else 0 } # 初始化代理系统 proxy_system = ProxySystem() @login_manager.user_loader def load_user(user_id): return users.get(user_id) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form.get('username') password = request.form.get('password') user = users.get(username) if user and check_password_hash(user.password_hash, password): login_user(user) return redirect(url_for('index')) else: flash('用户名或密码错误') return render_template('login.html') @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('login')) @app.route('/') @login_required def index(): # 获取缓存统计信息 try: stats = proxy_system.get_stats() cache_hit_ratio = f"{stats['cache_hit_ratio']*100:.1f}%" if stats['cache_hit_ratio'] > 0 else "0%" except: cache_hit_ratio = "计算中" return render_template('index.html', conversion_mode="隐私代理+本地缓存", cache_hit_ratio=cache_hit_ratio) @app.route('/convert', methods=['POST']) @login_required def convert(): # 获取参数 backend_url = request.form.get('backend_url', 'https://raw.githubusercontent.com/yuanwangokk-1/subscribe/refs/heads/main/ACL4SSR/ACL4SSR.ini') target = request.form.get('target', 'clash') original_url = request.form.get('original_url', '') if not original_url: return jsonify({"status": "error", "message": "订阅链接不能为空"}) try: # 构建参数 params = { 'target': target, 'url': original_url, 'config': backend_url, 'emoji': 'true', 'list': 'false', 'udp': 'false', 'tfo': 'false', 'expand': 'true', 'scv': 'false', 'fdn': 'false', 'new_name': 'true' } # 构建转换URL (隐私代理URL) server_url = request.url_root.rstrip('/') proxy_url = f"{server_url}/api/sub?{urllib.parse.urlencode(params)}" return jsonify({ "status": "success", "result": proxy_url, "mode": "proxy" }) except Exception as e: return jsonify({"status": "error", "message": f"处理失败: {str(e)}"}) @app.route('/api/sub') def api_sub_proxy(): """智能代理API - 带缓存""" try: # 获取所有请求参数 params = request.args.to_dict() # 使用代理系统处理请求 try: # 从缓存中获取 cache_result = proxy_system.get_from_cache(params) if cache_result: # 返回缓存内容 content = cache_result.get('content', '').encode('utf-8') content_type = cache_result.get('content_type', 'text/plain') headers = { 'Content-Type': content_type, 'X-Proxy-By': 'Your Private Space', 'X-Cache': 'HIT', 'X-Content-Type-Options': 'nosniff', 'Cache-Control': 'max-age=3600' } return Response(content, headers=headers) # 缓存未命中,使用API content, content_type = proxy_system.convert(params) # 构建响应 headers = { 'Content-Type': content_type, 'X-Proxy-By': 'Your Private Space', 'X-Cache': 'MISS', 'X-Content-Type-Options': 'nosniff', 'Cache-Control': 'max-age=3600' } return Response(content, headers=headers) except Exception as e: return f"转换失败: {str(e)}", 500 except Exception as e: return f"请求处理失败: {str(e)}", 500 # 添加统计接口 @app.route('/stats') @login_required def show_stats(): stats = proxy_system.get_stats() return jsonify(stats) # 添加清除缓存接口 @app.route('/clear-cache', methods=['POST']) @login_required def clear_cache(): try: proxy_system._cache = {} proxy_system._cache_time = {} proxy_system._save_cache() return jsonify({"status": "success", "message": "缓存已清除"}) except Exception as e: return jsonify({"status": "error", "message": f"清除缓存失败: {str(e)}"}) # 仅在本地开发时使用 if __name__ == '__main__' and os.environ.get('DEVELOPMENT') == 'true': app.run(host='0.0.0.0', port=7860, debug=True)