refactor: 移除冗余文件并优化代码结构
- 删除多个不再使用的脚本和配置文件,包括 `auto_push.bat`, `check_and_fix_users.py`, `init.sql` 等。 - 新增 `git_push.bat` 和 `git_push.sh` 脚本以简化 Git 推送流程。 - 更新 `README.md` 以反映最新的功能和结构变化。 - 优化前端代码,添加新的页面和组件,提升用户体验。 此提交旨在清理项目结构并增强代码可维护性。
This commit is contained in:
144
src/core/auth_manager.py
Normal file
144
src/core/auth_manager.py
Normal file
@@ -0,0 +1,144 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
认证管理器
|
||||
处理用户登录、注册、会话管理等功能
|
||||
"""
|
||||
|
||||
import jwt
|
||||
import hashlib
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional, Dict, Any
|
||||
from flask import current_app
|
||||
|
||||
from src.core.database import db_manager
|
||||
from src.core.models import User
|
||||
|
||||
|
||||
class AuthManager:
|
||||
"""认证管理器"""
|
||||
|
||||
def __init__(self):
|
||||
self.secret_key = "your-secret-key-change-this-in-production" # 应该从配置中读取
|
||||
self.token_expiry = timedelta(hours=24)
|
||||
|
||||
def hash_password(self, password: str) -> str:
|
||||
"""密码哈希"""
|
||||
return hashlib.sha256(password.encode()).hexdigest()
|
||||
|
||||
def verify_password(self, password: str, password_hash: str) -> bool:
|
||||
"""验证密码"""
|
||||
return self.hash_password(password) == password_hash
|
||||
|
||||
def generate_token(self, user_data: dict) -> str:
|
||||
"""生成JWT token"""
|
||||
payload = {
|
||||
'user_id': user_data['id'],
|
||||
'username': user_data['username'],
|
||||
'exp': datetime.utcnow() + self.token_expiry,
|
||||
'iat': datetime.utcnow()
|
||||
}
|
||||
return jwt.encode(payload, self.secret_key, algorithm='HS256')
|
||||
|
||||
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
|
||||
"""验证JWT token"""
|
||||
try:
|
||||
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
|
||||
return payload
|
||||
except jwt.ExpiredSignatureError:
|
||||
return None
|
||||
except jwt.InvalidTokenError:
|
||||
return None
|
||||
|
||||
def authenticate_user(self, username: str, password: str) -> Optional[dict]:
|
||||
"""用户认证"""
|
||||
print(f"[DEBUG] 开始认证用户: {username}")
|
||||
with db_manager.get_session() as session:
|
||||
user = session.query(User).filter_by(username=username).first()
|
||||
print(f"[DEBUG] 找到用户: {user is not None}")
|
||||
|
||||
if user and user.is_active and self.verify_password(password, user.password_hash):
|
||||
print(f"[DEBUG] 用户认证成功: {user.username}")
|
||||
|
||||
# 立即访问所有需要的属性,确保在会话内
|
||||
user_id = user.id
|
||||
username_val = user.username
|
||||
password_hash_val = user.password_hash
|
||||
email_val = user.email
|
||||
name_val = user.name
|
||||
role_val = user.role
|
||||
is_active_val = user.is_active
|
||||
created_at_val = user.created_at
|
||||
last_login_val = datetime.now()
|
||||
|
||||
print(f"[DEBUG] 访问用户属性成功: name={name_val}, role={role_val}")
|
||||
|
||||
# 更新最后登录时间
|
||||
user.last_login = last_login_val
|
||||
session.commit()
|
||||
print("[DEBUG] 数据库更新成功")
|
||||
|
||||
# 返回用户数据字典,避免SQLAlchemy会话绑定问题
|
||||
user_dict = {
|
||||
'id': user_id,
|
||||
'username': username_val,
|
||||
'password_hash': password_hash_val,
|
||||
'email': email_val,
|
||||
'name': name_val,
|
||||
'role': role_val,
|
||||
'is_active': is_active_val,
|
||||
'created_at': created_at_val,
|
||||
'last_login': last_login_val
|
||||
}
|
||||
print(f"[DEBUG] 返回用户字典: {user_dict['username']}")
|
||||
return user_dict
|
||||
else:
|
||||
print("[DEBUG] 用户认证失败")
|
||||
return None
|
||||
|
||||
def get_user_by_id(self, user_id: int) -> Optional[User]:
|
||||
"""根据ID获取用户"""
|
||||
with db_manager.get_session() as session:
|
||||
return session.query(User).filter_by(id=user_id).first()
|
||||
|
||||
def get_user_by_token(self, token: str) -> Optional[User]:
|
||||
"""根据token获取用户"""
|
||||
payload = self.verify_token(token)
|
||||
if payload:
|
||||
return self.get_user_by_id(payload['user_id'])
|
||||
return None
|
||||
|
||||
def create_user(self, username: str, password: str, name: str, email: str = None, role: str = 'user') -> Optional[User]:
|
||||
"""创建新用户"""
|
||||
try:
|
||||
with db_manager.get_session() as session:
|
||||
# 检查用户名是否已存在
|
||||
existing_user = session.query(User).filter_by(username=username).first()
|
||||
if existing_user:
|
||||
return None
|
||||
|
||||
user = User(
|
||||
username=username,
|
||||
name=name,
|
||||
email=email,
|
||||
role=role,
|
||||
is_active=True
|
||||
)
|
||||
user.set_password(password)
|
||||
|
||||
session.add(user)
|
||||
session.commit()
|
||||
return user
|
||||
except Exception as e:
|
||||
print(f"创建用户失败: {e}")
|
||||
return None
|
||||
|
||||
def create_default_admin(self):
|
||||
"""创建默认管理员用户"""
|
||||
admin = self.create_user('admin', 'admin123', '系统管理员', 'admin@example.com', 'admin')
|
||||
if admin:
|
||||
print("默认管理员用户已创建: admin/admin123")
|
||||
return admin
|
||||
|
||||
|
||||
# 全局认证管理器实例
|
||||
auth_manager = AuthManager()
|
||||
Reference in New Issue
Block a user