feat: 新增飞书长连接模式,无需公网域名
## 🚀 重大更新 ### 飞书集成升级 - ✅ 迁移到飞书官方 SDK 的事件订阅 2.0(长连接模式) - ✅ 无需公网域名和 webhook 配置 - ✅ 支持内网部署 - ✅ 自动重连机制 ### 核心功能优化 - ✅ 优化群聊隔离机制(每个用户在每个群独立会话) - ✅ 增强日志输出(emoji 标记便于快速识别) - ✅ 完善错误处理和异常恢复 - ✅ 添加 SSL 证书问题解决方案 ### 新增文件 - `src/integrations/feishu_longconn_service.py` - 飞书长连接服务 - `start_feishu_bot.py` - 启动脚本 - `test_feishu_connection.py` - 连接诊断工具 - `docs/FEISHU_LONGCONN.md` - 详细使用文档 - `README.md` - 项目说明文档 ### 技术改进 - 添加 lark-oapi==1.3.5 官方 SDK - 升级 certifi 包以支持 SSL 验证 - 优化配置加载逻辑 - 改进会话管理机制 ### 文档更新 - 新增飞书长连接模式完整文档 - 更新快速开始指南 - 添加常见问题解答(SSL、权限、部署等) - 完善架构说明和技术栈介绍 ## 📝 使用方式 启动飞书长连接服务(无需公网域名): ```bash python3 start_feishu_bot.py ``` 详见:docs/FEISHU_LONGCONN.md Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -6,7 +6,7 @@
|
||||
import logging
|
||||
import json
|
||||
import threading
|
||||
from flask import Blueprint, request, jsonify
|
||||
from flask import Blueprint, request, jsonify, current_app
|
||||
from src.integrations.feishu_service import FeishuService
|
||||
from src.web.service_manager import service_manager
|
||||
|
||||
@@ -16,66 +16,117 @@ logger = logging.getLogger(__name__)
|
||||
# 创建蓝图
|
||||
feishu_bot_bp = Blueprint('feishu_bot', __name__, url_prefix='/api/feishu/bot')
|
||||
|
||||
# 在模块级别实例化飞书服务,以便复用
|
||||
# 注意:这假设配置在启动时是固定的。如果配置可热更新,则需要调整。
|
||||
feishu_service = FeishuService()
|
||||
def _process_message_in_background(app, event_data: dict):
|
||||
"""
|
||||
在后台线程中处理消息,避免阻塞飞书的回调请求。
|
||||
|
||||
def _process_message_in_background(app_context, event_data: dict):
|
||||
Args:
|
||||
app: Flask应用实例
|
||||
event_data: 飞书事件数据
|
||||
"""
|
||||
在后台线程中处理消息,避免阻塞飞书的回调请求。
|
||||
"""
|
||||
with app_context:
|
||||
with app.app_context():
|
||||
# 每个线程创建独立的飞书服务实例,避免token共享问题
|
||||
feishu_service = FeishuService()
|
||||
|
||||
try:
|
||||
# 1. 解析事件数据
|
||||
message_id = event_data['event']['message']['message_id']
|
||||
chat_id = event_data['event']['message']['chat_id']
|
||||
# 内容是一个JSON字符串,需要再次解析
|
||||
content_json = json.loads(event_data['event']['message']['content'])
|
||||
text_content = content_json.get('text', '').strip()
|
||||
event = event_data.get('event', {})
|
||||
message = event.get('message', {})
|
||||
|
||||
message_id = message.get('message_id')
|
||||
chat_id = message.get('chat_id')
|
||||
|
||||
if not message_id or not chat_id:
|
||||
logger.error(f"[Feishu Bot] 事件数据缺少必要字段: {event_data}")
|
||||
return
|
||||
|
||||
# 内容是一个JSON字符串,需要再次解析
|
||||
try:
|
||||
content_json = json.loads(message.get('content', '{}'))
|
||||
text_content = content_json.get('text', '').strip()
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"[Feishu Bot] 解析消息内容失败: {e}")
|
||||
return
|
||||
|
||||
logger.info(f"[Feishu Bot] 后台开始处理消息ID: {message_id}, 内容: '{text_content}'")
|
||||
|
||||
# 2. 移除@机器人的部分
|
||||
# 飞书的@消息格式通常是 "@机器人名 实际内容"
|
||||
if event_data['event']['message'].get('mentions'):
|
||||
for mention in event_data['event']['message']['mentions']:
|
||||
# mention['key']是@内容,例如"@_user_1"
|
||||
mentions = message.get('mentions', [])
|
||||
if mentions:
|
||||
for mention in mentions:
|
||||
# mention['key']是@内容,例如"@_user_1"
|
||||
# mention['name']是显示的名字
|
||||
bot_mention_text = f"@{mention['name']}"
|
||||
if text_content.startswith(bot_mention_text):
|
||||
text_content = text_content[len(bot_mention_text):].strip()
|
||||
break
|
||||
mention_name = mention.get('name', '')
|
||||
if mention_name:
|
||||
# 尝试多种@格式
|
||||
for prefix in [f"@{mention_name}", f"@{mention_name} "]:
|
||||
if text_content.startswith(prefix):
|
||||
text_content = text_content[len(prefix):].strip()
|
||||
break
|
||||
|
||||
if not text_content:
|
||||
logger.warning(f"[Feishu Bot] 移除@后内容为空,不处理。消息ID: {message_id}")
|
||||
logger.warning(f"[Feishu Bot] 移除@后内容为空,不处理。消息ID: {message_id}")
|
||||
# 仍然回复一个提示
|
||||
feishu_service.reply_to_message(message_id, "您好!请问有什么可以帮助您的吗?")
|
||||
return
|
||||
|
||||
logger.info(f"[Feishu Bot] 清理后的消息内容: '{text_content}'")
|
||||
|
||||
# 3. 调用核心服务获取回复
|
||||
assistant = service_manager.get_assistant()
|
||||
# 注意:process_message_agent 是一个异步方法,需要处理
|
||||
# 在同步线程中运行异步方法
|
||||
import asyncio
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError: # 'RuntimeError: There is no current event loop...'
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
# 3. 获取或创建该飞书用户的会话(支持群聊隔离)
|
||||
chat_manager = service_manager.get_chat_manager()
|
||||
|
||||
# 调用对话服务
|
||||
logger.info(f"[Feishu Bot] 调用Agent服务处理消息...")
|
||||
response_data = loop.run_until_complete(
|
||||
assistant.process_message_agent(message=text_content, user_id=f"feishu_{chat_id}")
|
||||
# 获取发送者ID(从event中提取)
|
||||
sender_id = event.get('sender', {}).get('sender_id', {}).get('user_id', 'unknown')
|
||||
|
||||
# 群聊隔离:每个用户在每个群都有独立会话
|
||||
# 格式:feishu_群聊ID_用户ID
|
||||
user_id = f"feishu_{chat_id}_{sender_id}"
|
||||
|
||||
logger.info(f"[Feishu Bot] 会话用户标识: {user_id}")
|
||||
|
||||
# 检查是否已有活跃会话
|
||||
active_sessions = chat_manager.get_active_sessions()
|
||||
session_id = None
|
||||
for session in active_sessions:
|
||||
if session.get('user_id') == user_id:
|
||||
session_id = session.get('session_id')
|
||||
logger.info(f"[Feishu Bot] 找到已有会话: {session_id}")
|
||||
break
|
||||
|
||||
# 如果没有会话,创建新会话
|
||||
if not session_id:
|
||||
session_id = chat_manager.create_session(user_id=user_id, work_order_id=None)
|
||||
logger.info(f"[Feishu Bot] 为用户 {sender_id} 在群聊 {chat_id} 创建新会话: {session_id}")
|
||||
|
||||
# 4. 调用实时对话接口处理消息
|
||||
logger.info(f"[Feishu Bot] 调用实时对话接口处理消息...")
|
||||
response_data = chat_manager.process_message(
|
||||
session_id=session_id,
|
||||
user_message=text_content,
|
||||
ip_address=None,
|
||||
invocation_method="feishu_bot"
|
||||
)
|
||||
logger.info(f"[Feishu Bot] Agent服务返回结果: {response_data}")
|
||||
logger.info(f"[Feishu Bot] 实时对话接口返回结果: {response_data}")
|
||||
|
||||
# 4. 提取回复并发送
|
||||
reply_text = response_data.get("message", "抱歉,我暂时无法回答这个问题。")
|
||||
if isinstance(reply_text, dict): # 有时候返回的可能是字典
|
||||
# 5. 提取回复并发送
|
||||
if response_data.get("success"):
|
||||
reply_text = response_data.get("response") or response_data.get("content", "抱歉,我暂时无法回答这个问题。")
|
||||
else:
|
||||
error_msg = response_data.get('error', '未知错误')
|
||||
reply_text = f"抱歉,处理您的问题时遇到了一些问题。请稍后重试或联系客服。\n错误信息: {error_msg}"
|
||||
logger.error(f"[Feishu Bot] 处理消息失败: {error_msg}")
|
||||
|
||||
# 确保回复是字符串
|
||||
if isinstance(reply_text, dict):
|
||||
reply_text = reply_text.get('content', str(reply_text))
|
||||
|
||||
logger.info(f"[Feishu Bot] 准备发送回复到飞书: '{reply_text}'")
|
||||
if not isinstance(reply_text, str):
|
||||
reply_text = str(reply_text)
|
||||
|
||||
logger.info(f"[Feishu Bot] 准备发送回复到飞书 (长度: {len(reply_text)})")
|
||||
logger.debug(f"[Feishu Bot] 回复内容: {reply_text}")
|
||||
|
||||
success = feishu_service.reply_to_message(message_id, reply_text)
|
||||
|
||||
if success:
|
||||
@@ -83,8 +134,16 @@ def _process_message_in_background(app_context, event_data: dict):
|
||||
else:
|
||||
logger.error(f"[Feishu Bot] 回复消息到飞书失败。消息ID: {message_id}")
|
||||
|
||||
except KeyError as e:
|
||||
logger.error(f"[Feishu Bot] 事件数据格式错误,缺少字段: {e}", exc_info=True)
|
||||
except Exception as e:
|
||||
logger.error(f"[Feishu Bot] 后台处理消息时发生严重错误: {e}", exc_info=True)
|
||||
# 尝试发送错误提示给用户
|
||||
try:
|
||||
if 'message_id' in locals():
|
||||
feishu_service.reply_to_message(message_id, "抱歉,系统遇到了一些问题,请稍后重试。")
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
@feishu_bot_bp.route('/event', methods=['POST'])
|
||||
@@ -94,30 +153,46 @@ def handle_feishu_event():
|
||||
"""
|
||||
# 1. 解析请求
|
||||
data = request.json
|
||||
logger.info(f"[Feishu Bot] 收到飞书事件回调:\n{json.dumps(data, indent=2)}")
|
||||
|
||||
if not data:
|
||||
logger.warning("[Feishu Bot] 收到空的请求数据")
|
||||
return jsonify({"status": "error", "message": "empty request"}), 400
|
||||
|
||||
logger.info(f"[Feishu Bot] 收到飞书事件回调:\n{json.dumps(data, indent=2, ensure_ascii=False)}")
|
||||
|
||||
# 2. 安全校验 (如果配置了)
|
||||
# 此处可以添加Verification Token的校验逻辑
|
||||
# headers = request.headers
|
||||
# ...
|
||||
# 可以在这里添加Verification Token的校验逻辑
|
||||
# from src.config.unified_config import get_config
|
||||
# config = get_config()
|
||||
# if config.feishu.verification_token:
|
||||
# token = request.headers.get('X-Lark-Request-Token')
|
||||
# if token != config.feishu.verification_token:
|
||||
# logger.warning("[Feishu Bot] Token验证失败")
|
||||
# return jsonify({"status": "error", "message": "invalid token"}), 403
|
||||
|
||||
# 3. 处理URL验证挑战
|
||||
if data and data.get("type") == "url_verification":
|
||||
if data.get("type") == "url_verification":
|
||||
challenge = data.get("challenge", "")
|
||||
logger.info(f"[Feishu Bot] 收到URL验证请求,返回challenge: {challenge}")
|
||||
logger.info(f"[Feishu Bot] 收到URL验证请求,返回challenge: {challenge}")
|
||||
return jsonify({"challenge": challenge})
|
||||
|
||||
# 4. 处理事件回调
|
||||
if data and data.get("header", {}).get("event_type") == "im.message.receive_v1":
|
||||
# 立即响应飞书,防止超时重试
|
||||
event_type = data.get("header", {}).get("event_type")
|
||||
|
||||
if event_type == "im.message.receive_v1":
|
||||
# 获取当前Flask应用实例
|
||||
app = current_app._get_current_object()
|
||||
|
||||
# 立即在后台线程中处理,避免阻塞飞书回调
|
||||
threading.Thread(
|
||||
target=_process_message_in_background,
|
||||
args=(request.environ['werkzeug.request'].environ['flask.app'].app_context(), data)
|
||||
args=(app, data),
|
||||
daemon=True # 设置为守护线程
|
||||
).start()
|
||||
|
||||
logger.info("[Feishu Bot] 已将消息处理任务推送到后台线程,并立即响应200 OK")
|
||||
logger.info("[Feishu Bot] 已将消息处理任务推送到后台线程,并立即响应200 OK")
|
||||
return jsonify({"status": "processing"})
|
||||
|
||||
# 5. 对于其他未知事件,也返回成功,避免飞书重试
|
||||
logger.warning(f"[Feishu Bot] 收到未知类型的事件: {data.get('header', {}).get('event_type')}")
|
||||
# 5. 对于其他未知事件,也返回成功,避免飞书重试
|
||||
logger.warning(f"[Feishu Bot] 收到未知类型的事件: {event_type}")
|
||||
return jsonify({"status": "ignored"})
|
||||
|
||||
Reference in New Issue
Block a user