import os import sys import hashlib sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from flask import Flask, redirect, url_for, request, session, render_template from flask_socketio import SocketIO from server.database import init_db from server.routers import accounts, tasks, orders socketio = SocketIO(cors_allowed_origins="*") # 访问密码,可通过环境变量 ADMIN_PASSWORD 设置,默认 admin123 ADMIN_PASSWORD = os.environ.get('ADMIN_PASSWORD', 'admin123') def create_app(): app = Flask( __name__, template_folder=os.path.join(os.path.dirname(__file__), '..', 'templates'), static_folder=os.path.join(os.path.dirname(__file__), '..', 'static'), ) app.secret_key = os.environ.get( 'SECRET_KEY', 'snatcher-secret-key-change-me') init_db() # ── 登录认证 ── @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': pwd = request.form.get('password', '') if pwd == ADMIN_PASSWORD: session['authenticated'] = _hash(pwd) session.permanent = True app.permanent_session_lifetime = __import__( 'datetime').timedelta(days=7) return redirect(request.args.get('next', '/')) return render_template('login.html', error='密码错误') return render_template('login.html') @app.route('/logout') def logout(): session.clear() return redirect('/login') @app.before_request def require_auth(): # 放行登录页和静态资源 if request.path in ('/login',) or request.path.startswith('/static'): return # 放行 SocketIO 的内部路径 if request.path.startswith('/socket.io'): return if session.get('authenticated') != _hash(ADMIN_PASSWORD): return redirect(url_for('login', next=request.path)) app.register_blueprint(accounts.bp) app.register_blueprint(tasks.bp) app.register_blueprint(orders.bp) @app.route('/') def index(): return redirect(url_for('tasks.list_tasks')) socketio.init_app(app) _register_socketio_events() return app def _hash(s): return hashlib.sha256(s.encode()).hexdigest()[:16] def _register_socketio_events(): import asyncio from server.services.remote_browser import get_session @socketio.on('rb_mouse', namespace='/rb') def handle_mouse(data): s = get_session(data.get('session_id', '')) if s and s.loop and s.loop.is_running(): asyncio.run_coroutine_threadsafe( s.handle_mouse( data.get('action', 'click'), data.get('x', 0), data.get('y', 0)), s.loop) @socketio.on('rb_keyboard', namespace='/rb') def handle_keyboard(data): s = get_session(data.get('session_id', '')) if s and s.loop and s.loop.is_running(): text = data.get('text', '') if text: asyncio.run_coroutine_threadsafe( s.handle_keyboard(text), s.loop) @socketio.on('rb_key', namespace='/rb') def handle_key(data): s = get_session(data.get('session_id', '')) if s and s.loop and s.loop.is_running(): key = data.get('key', '') if key: asyncio.run_coroutine_threadsafe( s.handle_key(key), s.loop) @socketio.on('rb_close', namespace='/rb') def handle_close(data): from server.services.remote_browser import close_session close_session(data.get('session_id', '')) if __name__ == '__main__': app = create_app() socketio.run(app, host='0.0.0.0', port=9000, debug=True)