Files
recommend/web_app.py
赵杰 Jie Zhao (雄狮汽车科技) f65abdef0f feat: complete web app features and fix encoding
2025-11-02 22:23:10 +08:00

586 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
网页端应用 - 个性化饮食推荐助手 + 背诵排序功能
"""
from flask import Flask, render_template, request, jsonify, session, Response
import re
import random
import logging
import json
from pathlib import Path
from datetime import datetime
# 导入业务模块
from core.base import BaseConfig, AppCore, ModuleManager, ModuleType, initialize_app
from modules.data_collection import DataCollectionModule
from modules.ai_analysis import AIAnalysisModule
from modules.recommendation_engine import RecommendationEngine
from modules.ocr_calorie_recognition import OCRCalorieRecognitionModule
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/web_app.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key-here'
# 确保模板文件使用UTF-8编码读取
app.jinja_env.auto_reload = True
app.config['TEMPLATES_AUTO_RELOAD'] = True
# 确保所有响应使用UTF-8编码
@app.after_request
def after_request(response):
"""确保所有响应使用UTF-8编码"""
response.headers['Content-Type'] = response.headers.get('Content-Type', 'text/html; charset=utf-8')
if 'charset=' not in response.headers.get('Content-Type', ''):
if response.headers.get('Content-Type', '').startswith('text/html'):
response.headers['Content-Type'] = 'text/html; charset=utf-8'
elif response.headers.get('Content-Type', '').startswith('application/json'):
response.headers['Content-Type'] = 'application/json; charset=utf-8'
return response
# 初始化应用核心(延迟加载,避免在导入时就初始化)
app_core = None
config = None
def get_app_core():
"""获取应用核心实例(延迟初始化)"""
global app_core, config
if app_core is None:
config = BaseConfig()
if initialize_app(config):
app_core = AppCore(config)
# 注册所有模块
module_manager = ModuleManager(config)
module_manager.register_module(DataCollectionModule(config))
module_manager.register_module(AIAnalysisModule(config))
module_manager.register_module(RecommendationEngine(config))
module_manager.register_module(OCRCalorieRecognitionModule(config))
module_manager.initialize_all()
app_core.module_manager = module_manager
app_core.start()
logger.info("应用核心初始化完成")
else:
logger.error("应用核心初始化失败")
return app_core
class RecitationSorter:
"""背诵排序器"""
def __init__(self):
self.items = []
def extract_items(self, text):
"""从文本中提取背诵项目"""
items = []
# 方法1: 按行分割,过滤空行和无关行
lines = text.strip().split('\n')
for line in lines:
line = line.strip()
# 跳过空行
if not line:
continue
# 跳过明显的表头行(包含"章节"、"知识点"等)
if any(keyword in line for keyword in ['章节', '知识点', '选择题', '主观题', '完成', '']):
continue
# 跳过页码行
if re.match(r'^第\d+页', line) or re.match(r'^共\d+页', line):
continue
# 跳过说明文字
if any(keyword in line for keyword in ['使用说明', '祝:', '凯程', '框架', '理解', '背诵']):
continue
# 提取知识点的几种模式
# 模式1: 以数字或字母开头(如"1. 知识点"或"第一章 内容"
match = re.match(r'^[第]?[一二三四五六七八九十\d]+[章节]?\s*[:、]?\s*(.+)', line)
if match:
item = match.group(1).strip()
if item and len(item) > 1: # 至少2个字符才认为是有效知识点
items.append(item)
continue
# 模式2: 以"-"或"•"开头的列表项
match = re.match(r'^[-•]\s*(.+)', line)
if match:
item = match.group(1).strip()
if item and len(item) > 1:
items.append(item)
continue
# 模式3: 表格中的知识点(通常不包含特殊标记符)
# 如果行中包含常见的中文标点,但不包含表格标记符,可能是知识点
if len(line) > 2 and not re.match(r'^[✓×√✗\s]+$', line):
# 检查是否包含常见的中文内容
if re.search(r'[\u4e00-\u9fff]', line): # 包含中文
# 排除明显的表格分隔符
if not re.match(r'^[|+\-\s]+$', line):
items.append(line)
# 去重
unique_items = []
seen = set()
for item in items:
# 标准化:去除首尾空格,统一标点
normalized = item.strip()
if normalized and normalized not in seen:
seen.add(normalized)
unique_items.append(normalized)
return unique_items
def random_sort(self, items):
"""随机排序项目"""
shuffled = items.copy()
random.shuffle(shuffled)
return shuffled
# 创建全局排序器实例
sorter = RecitationSorter()
@app.route('/')
def index():
"""首页"""
return render_template('index.html', encoding='utf-8')
@app.route('/recitation')
def recitation():
"""背诵排序页面"""
return render_template('recitation.html', encoding='utf-8')
@app.route('/data-collection')
def data_collection():
"""数据采集页面"""
return render_template('data_collection.html', encoding='utf-8')
@app.route('/recommendation')
def recommendation():
"""推荐页面"""
return render_template('recommendation.html', encoding='utf-8')
@app.route('/analysis')
def analysis():
"""分析页面"""
return render_template('analysis.html', encoding='utf-8')
@app.route('/api/extract', methods=['POST'])
def extract_items():
"""提取背诵项目API"""
try:
data = request.get_json()
text = data.get('text', '')
if not text:
return jsonify({
'success': False,
'message': '请输入要处理的文本'
}), 400
# 提取项目
items = sorter.extract_items(text)
if not items:
return jsonify({
'success': False,
'message': '未能识别到背诵内容,请检查文本格式'
}), 400
logger.info(f"提取到 {len(items)} 个背诵项目")
return jsonify({
'success': True,
'items': items,
'count': len(items)
})
except Exception as e:
logger.error(f"提取项目失败: {e}")
return jsonify({
'success': False,
'message': f'处理失败: {str(e)}'
}), 500
@app.route('/api/sort', methods=['POST'])
def sort_items():
"""随机排序API"""
try:
data = request.get_json()
items = data.get('items', [])
if not items:
return jsonify({
'success': False,
'message': '请先提取背诵项目'
}), 400
# 随机排序
sorted_items = sorter.random_sort(items)
logger.info(f"{len(sorted_items)} 个项目进行随机排序")
return jsonify({
'success': True,
'items': sorted_items,
'count': len(sorted_items)
})
except Exception as e:
logger.error(f"排序失败: {e}")
return jsonify({
'success': False,
'message': f'排序失败: {str(e)}'
}), 500
@app.route('/api/export/sorted', methods=['POST'])
def export_sorted():
"""导出排序结果"""
try:
data = request.get_json()
items = data.get('items', [])
export_format = data.get('format', 'txt') # txt, json, csv
if not items:
return jsonify({
'success': False,
'message': '没有可导出的数据'
}), 400
if export_format == 'json':
content = json.dumps({
'items': items,
'count': len(items),
'export_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}, ensure_ascii=False, indent=2)
filename = f'背诵排序结果_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
mimetype = 'application/json; charset=utf-8'
elif export_format == 'csv':
import csv
import io
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(['序号', '知识点'])
for i, item in enumerate(items, 1):
writer.writerow([i, item])
content = output.getvalue()
filename = f'背诵排序结果_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
mimetype = 'text/csv; charset=utf-8'
else: # txt
content_lines = ['背诵排序结果', '=' * 50, '']
for i, item in enumerate(items, 1):
content_lines.append(f'{i}. {item}')
content_lines.extend(['', '=' * 50, f'导出时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}'])
content = '\n'.join(content_lines)
filename = f'背诵排序结果_{datetime.now().strftime("%Y%m%d_%H%M%S")}.txt'
mimetype = 'text/plain; charset=utf-8'
response = Response(
content.encode('utf-8'),
mimetype=mimetype,
headers={
'Content-Disposition': f'attachment; filename="{filename}"'
}
)
return response
except Exception as e:
logger.error(f"导出失败: {e}")
return jsonify({
'success': False,
'message': f'导出失败: {str(e)}'
}), 500
# ==================== 业务功能API ====================
@app.route('/api/user/login', methods=['POST'])
def user_login():
"""用户登录"""
try:
data = request.get_json()
user_id = data.get('user_id', '').strip()
if not user_id:
return jsonify({
'success': False,
'message': '请输入用户ID'
}), 400
# 获取用户数据(如果不存在会自动创建)
core = get_app_core()
user_data = core.get_user_data(user_id)
if user_data:
session['user_id'] = user_id
return jsonify({
'success': True,
'user_id': user_id,
'name': user_data.profile.get('name', '未设置')
})
else:
return jsonify({
'success': False,
'message': '用户数据获取失败'
}), 500
except Exception as e:
logger.error(f"用户登录失败: {e}")
return jsonify({
'success': False,
'message': f'登录失败: {str(e)}'
}), 500
@app.route('/api/user/register', methods=['POST'])
def user_register():
"""用户注册"""
try:
data = request.get_json()
user_id = data.get('user_id', '').strip()
name = data.get('name', '').strip()
if not user_id or not name:
return jsonify({
'success': False,
'message': '请输入用户ID和姓名'
}), 400
core = get_app_core()
user_data = core.get_user_data(user_id)
# 更新用户基本信息
user_data.profile['name'] = name
core.data_manager.save_user_data(user_data)
session['user_id'] = user_id
return jsonify({
'success': True,
'user_id': user_id,
'name': name
})
except Exception as e:
logger.error(f"用户注册失败: {e}")
return jsonify({
'success': False,
'message': f'注册失败: {str(e)}'
}), 500
@app.route('/api/questionnaire/submit', methods=['POST'])
def submit_questionnaire():
"""提交问卷"""
try:
user_id = session.get('user_id') or request.json.get('user_id')
if not user_id:
return jsonify({
'success': False,
'message': '请先登录'
}), 401
data = request.get_json()
questionnaire_type = data.get('type', 'basic') # basic, taste, physiological
answers = data.get('answers', {})
core = get_app_core()
input_data = {
'type': 'questionnaire',
'questionnaire_type': questionnaire_type,
'answers': answers
}
result = core.process_user_request(ModuleType.DATA_COLLECTION, input_data, user_id)
if result and result.result.get('success', False):
return jsonify({
'success': True,
'message': '问卷提交成功'
})
else:
return jsonify({
'success': False,
'message': result.result.get('message', '问卷提交失败') if result else '处理失败'
}), 500
except Exception as e:
logger.error(f"提交问卷失败: {e}")
return jsonify({
'success': False,
'message': f'提交失败: {str(e)}'
}), 500
@app.route('/api/meal/record', methods=['POST'])
def record_meal():
"""记录餐食"""
try:
user_id = session.get('user_id') or request.json.get('user_id')
if not user_id:
return jsonify({
'success': False,
'message': '请先登录'
}), 401
data = request.get_json()
meal_data = {
'date': data.get('date', datetime.now().strftime('%Y-%m-%d')),
'meal_type': data.get('meal_type', 'lunch'), # breakfast, lunch, dinner
'foods': data.get('foods', []),
'quantities': data.get('quantities', []),
'calories': data.get('calories', 0),
'satisfaction_score': data.get('satisfaction_score', 3),
'notes': data.get('notes', '')
}
core = get_app_core()
input_data = {
'type': 'meal_record',
**meal_data
}
result = core.process_user_request(ModuleType.DATA_COLLECTION, input_data, user_id)
if result and result.result.get('success', False):
return jsonify({
'success': True,
'message': '餐食记录成功'
})
else:
return jsonify({
'success': False,
'message': result.result.get('message', '记录失败') if result else '处理失败'
}), 500
except Exception as e:
logger.error(f"记录餐食失败: {e}")
return jsonify({
'success': False,
'message': f'记录失败: {str(e)}'
}), 500
@app.route('/api/recommendation/get', methods=['POST'])
def get_recommendations():
"""获取餐食推荐"""
try:
user_id = session.get('user_id') or request.json.get('user_id')
if not user_id:
return jsonify({
'success': False,
'message': '请先登录'
}), 401
data = request.get_json()
meal_type = data.get('meal_type', 'lunch')
preferences = data.get('preferences', {})
context = data.get('context', {})
core = get_app_core()
input_data = {
'type': 'meal_recommendation',
'meal_type': meal_type,
'preferences': preferences,
'context': context
}
result = core.process_user_request(ModuleType.RECOMMENDATION, input_data, user_id)
if result and result.result.get('success', False):
return jsonify({
'success': True,
'recommendations': result.result.get('recommendations', [])
})
else:
return jsonify({
'success': False,
'message': result.result.get('message', '推荐失败') if result else '处理失败',
'recommendations': []
}), 500
except Exception as e:
logger.error(f"获取推荐失败: {e}")
return jsonify({
'success': False,
'message': f'获取失败: {str(e)}',
'recommendations': []
}), 500
@app.route('/api/analysis/nutrition', methods=['POST'])
def analyze_nutrition():
"""营养分析"""
try:
user_id = session.get('user_id') or request.json.get('user_id')
if not user_id:
return jsonify({
'success': False,
'message': '请先登录'
}), 401
data = request.get_json()
meal_data = data.get('meal_data', {})
core = get_app_core()
input_data = {
'type': 'nutrition_analysis',
'meal_data': meal_data
}
result = core.process_user_request(ModuleType.USER_ANALYSIS, input_data, user_id)
if result and result.result.get('success', False):
return jsonify({
'success': True,
'analysis': result.result.get('analysis', {})
})
else:
return jsonify({
'success': False,
'message': result.result.get('message', '分析失败') if result else '处理失败'
}), 500
except Exception as e:
logger.error(f"营养分析失败: {e}")
return jsonify({
'success': False,
'message': f'分析失败: {str(e)}'
}), 500
@app.route('/health')
def health():
"""健康检查"""
return jsonify({'status': 'ok'})
if __name__ == '__main__':
# 创建必要的目录
Path('templates').mkdir(exist_ok=True)
Path('static').mkdir(exist_ok=True)
Path('logs').mkdir(exist_ok=True)
# 启动应用
app.run(debug=True, host='0.0.0.0', port=5000)