系统概述

飞书AWS小助手是一个基于Dify AI平台构建的智能聊天机器人,专注于解答AWS相关问题。它不仅仅是一个简单的问答工具,而是一个结合了知识库检索与智能对话生成的综合系统。机器人能够在飞书的私聊和群聊环境中无缝工作,支持文本、图片及富文本等多种交互方式,为用户提供准确、自然且连贯的AWS技术支持服务。

核心技术架构

"Agent+助手"双层处理模型

系统的核心是一个创新的双层AI处理架构,这种设计融合了检索式和生成式AI的优势:

用户查询 → Agent检索知识库 → 信息提取与整合 → 助手优化表达 → 最终回复

这一架构的工作原理详述如下:

  1. Agent层(知识检索):当用户提出问题时,Agent组件首先接收查询,并在专门的AWS知识库中进行深度检索。Agent使用流式处理模式(streaming),能够执行复杂的认知任务,包括信息搜索、筛选、比较和综合。Agent会生成"思考过程"(thoughts)和初步回答。

  2. 助手层(表达优化):Agent检索到的原始信息随后传递给智能助手,助手使用阻塞模式(blocking)接收增强查询:

enhanced_query = f"""用户原始问题: {query}

以下是Agent检索到的相关信息:
{agent_response}

请基于上述信息,为用户提供准确、全面且易于理解的回答。如果Agent提供的信息不足以回答用户问题,请基于你自身的知识给出最佳回答。
回复时直接回答用户问题,不要提及Agent或引用上述信息。"""

助手将这些专业信息重新组织为流畅、自然的对话式回答,确保技术准确性的同时提升用户体验。

技术实现细节

双层架构通过精心设计的API调用实现,两个组件使用不同的API密钥和处理模式:

# Agent调用 - 使用流式响应和Agent会话ID
agent_response, agent_thoughts = ask_dify_agent(
    query,
    agent_conversation_id,
    user_id,
    with_image,
    image_file_id
)

# 助手调用 - 使用阻塞响应和用户会话ID
assistant_response = ask_dify(enhanced_query, conversation_id, user_id)

这种设计使系统能够平衡处理速度与回复质量,Agent的快速检索和助手的细致优化相互补充,形成一个高效的问答系统。

深度解析会话上下文管理

上下文管理是智能对话系统的核心挑战,本系统实现了多层次的上下文管理机制,使机器人能够"记住"与用户的对话历史,提供连贯的交互体验。

多维上下文存储体系

系统维护了一个复杂的上下文存储结构:

user_conversations = {}      # 存储用户ID到助手会话ID的映射
agent_conversations = {}     # 存储用户ID到Agent会话ID的映射
group_conversations = {}     # 存储群聊ID到会话ID的映射
user_conversation_times = {} # 记录会话最后活跃时间
agent_conversation_times = {} # 记录Agent会话最后活跃时间

这种设计使系统能够同时管理多个用户的多轮对话,并区分私聊和群聊场景的上下文需求。特别是,系统为Agent和助手分别维护会话ID,确保两个组件各自保持连贯的对话流程。

上下文传递与更新机制

上下文在系统中的传递遵循精心设计的流程:

  1. 会话识别:每次收到消息,系统首先识别发送者,并查找相关会话:

    conversation_id = user_conversations.get(sender_id)
    agent_conversation_id = agent_conversations.get(sender_id)
  2. 上下文应用:系统将会话ID包含在AI请求中,使AI能够基于历史对话生成回复:

    if conversation_id:
        data["conversation_id"] = conversation_id
  3. 上下文保存:AI响应后,系统提取并保存新的会话ID:

    if "conversation_id" in response_json:
        user_conversations[user_id] = response_json["conversation_id"]
  4. 上下文时间更新:每次交互都会更新会话的活跃时间戳:

    update_conversation_time(user_id)
  5. 群聊同步:在群聊中,系统会将用户的会话ID同步到群聊存储:

    if reply_type == "chat_id" and assistant_response:
        group_conversations[reply_id] = user_conversations.get(sender_id)

智能会话生命周期管理

系统实现了完整的会话生命周期管理,包括创建、使用、过期和清理:

  1. 会话过期检测:系统会检查会话是否已超过设定的超时时间:

    def is_conversation_expired(user_id):
        current_time = time.time()
        last_time = user_conversation_times.get(user_id, 0)
        return (current_time - last_time) > CONTEXT_EXPIRY_SECONDS
  2. 过期会话清理:后台线程会定期清理过期会话,释放系统资源:

    def cleanup_expired_conversations():
        while True:
            # 检查并清理过期会话
            current_time = time.time()
            for user_id, last_time in list(user_conversation_times.items()):
                if (current_time - last_time) > CONTEXT_EXPIRY_SECONDS:
                    # 清理过期会话
  3. 上下文重置:当检测到会话过期时,系统会创建新会话:

    if is_conversation_expired(user_id):
        # 清除过期会话
        if user_id in user_conversations:
            del user_conversations[user_id]
        # ...创建新会话
  4. 动态超时配置:系统提供API接口,允许动态调整上下文超时时间:

    @app.post('/config/context-timeout')
    def set_context_timeout():
        # 设置上下文超时时间

这种全面的上下文管理机制确保了系统能够在保持对话连贯性的同时,高效管理内存资源,适应各种对话场景。

消息处理全景分析

全面的消息类型支持

系统支持处理多种消息类型,每种类型都有专门的处理逻辑:

  • 纯文本消息:直接提取文本内容进行处理

  • 图片消息:获取图片资源,分析图片内容

  • 富文本消息:解析复杂的JSON结构,提取文本和图片内容

  • 文本+图片混合消息:同时处理文本和图片,提供综合分析

代码中使用专门的处理函数应对不同类型的消息:

def process_text_message(event_data, sender_id):
    # 处理文本消息

def process_image_message(event_data, sender_id):
    # 处理图片消息

def process_post_message(event_data, sender_id):
    # 处理富文本消息

深入消息处理流程

以处理文本消息为例,系统的处理流程如下:

  1. 消息解析:系统从飞书事件中提取消息内容、发送者ID、会话类型等信息

  2. 会话类型判断:区分私聊与群聊,在群聊中检查是否@了机器人

  3. 上下文检索:查找与发送者关联的会话ID

  4. 用户即时反馈:发送"正在思考"提示,提升用户体验

  5. 异步处理启动:创建专门线程处理消息,避免阻塞主线程

  6. 双层AI处理:依次通过Agent和助手处理消息

  7. 结果返回:将最终回答发送给用户

特别值得注意的是系统的异步处理机制:

# 启动异步线程处理消息
processing_thread = threading.Thread(
    target=process_message_async_group,
    args=(sender_id, text_content, "text", None, conversation_id, None, reply_id, reply_type, message_id)
)
processing_thread.daemon = True
processing_thread.start()

# 立即返回响应,不阻塞主线程
return HTTPResponse(
    status=200,
    body=json.dumps({"code": 0, "msg": "success"}),
    headers={'Content-Type': 'application/json'}
)

这种设计使系统能够立即响应飞书平台的请求,避免超时问题,同时在后台处理复杂的AI任务。

群聊智能识别

系统在群聊中实现了智能的@识别机制:

def is_bot_mentioned(mentions):
    """检查消息中是否@了机器人"""
    if not mentions:
        return False

    for mention in mentions:
        # 通过名称或ID匹配
        mention_name = mention.get("name", "")
        mention_id = mention.get("id", {}).get("open_id", "")

        # 检查是否匹配机器人名称或ID
        if (BOT_NAME and BOT_NAME in mention_name) or (BOT_OPEN_ID and BOT_OPEN_ID == mention_id):
            return True

    return False

当检测到机器人被@时,系统还会智能去除@部分,只处理实际问题内容:

def remove_mentions(text, mentions):
    """从文本中删除@部分"""
    if not mentions:
        return text

    for mention in mentions:
        mention_name = mention.get("name", "")
        if mention_name:
            # 删除@用户名
            text = text.replace(f"@{mention_name}", "").strip()

    return text

图片处理技术解析

完整的图片处理流程

系统的图片处理能力是其重要特色,涉及多个复杂步骤:

  1. 图片资源获取:通过飞书API获取图片二进制数据:

    image_data = get_message_resource(message_id, image_key)
  2. 双通道上传:将图片分别上传到两个AI服务:

    upload_result = upload_image_to_dify(image_data, sender_id)
    upload_agent_result = upload_image_to_dify_agent(image_data, sender_id)
  3. 多部分表单构建:上传图片时构建复杂的多部分表单请求:

    # 构造表单数据
    body = []
    # 添加用户ID
    body.extend([
        f'--{boundary}'.encode(),
        f'Content-Disposition: form-data; name="user"'.encode(),
        b'',
        user_id.encode()
    ])
    # 添加文件数据
    body.extend([
        f'--{boundary}'.encode(),
        f'Content-Disposition: form-data; name="file"; filename="{filename}"'.encode(),
        f'Content-Type: image/jpeg'.encode(),
        b'',
        image_data,
        f'--{boundary}--'.encode()
    ])
  4. 图像分析:通过Agent和助手对图片内容进行分析:

    assistant_response = process_with_agent_then_assistant(
        "请描述这张图片,并提取其中包含的关键信息",
        conversation_id,
        sender_id,
        True,  # with_image
        upload_agent_result["id"]  # 使用Agent的上传ID
    )
  5. 容错处理:实现完善的错误处理,确保即使图片处理失败也能继续处理文本内容:

    if not upload_agent_result or "id" not in upload_agent_result:
        # 如果仅Agent上传失败,则继续使用聊天助手处理
        logger.warning("上传图片到Agent失败,仅使用聊天助手处理")
        assistant_response = ask_dify_with_image(content, upload_result["id"], conversation_id, sender_id)

技术挑战与解决方案

处理图片消息面临多项技术挑战:

  1. 不同消息格式:图片可能出现在纯图片消息或富文本消息中,系统需要使用复杂的提取逻辑:

    def extract_image_key_from_post(content_json):
        """从富文本内容中提取图片key"""
        # 复杂的递归检索逻辑
  2. 二进制数据处理:系统需要正确处理二进制图片数据,包括编码、传输和存储。

  3. 多服务协调:图片需要同时发送给多个AI服务,并协调它们的响应。

  4. 请求超时处理:图片处理通常耗时较长,系统实现了专门的超时处理和状态通知:

    if current_time - last_update_time > AGENT_STATUS_REMIND_SECONDS and not status_update_sent:
        # 向用户发送状态更新
        send_message(open_id=user_id, content="小助手正在搜索和整理资料,这可能需要一些时间,请稍候...")

健壮性设计深度解析

全面的错误处理策略

系统实现了多层次的错误处理机制,确保在各种异常情况下仍能正常运行:

  1. 请求重试机制:使用指数级退避策略处理网络请求失败:

    def http_request_with_retry(req, context=None, max_retries=MAX_RETRIES,
                              initial_delay=INITIAL_RETRY_DELAY, backoff_factor=RETRY_BACKOFF_FACTOR,
                              timeout=None):
        # 实现指数级退避的重试逻辑
  2. 特定错误类型处理:针对不同类型的错误采用不同处理策略:

    # 特殊处理超时错误
    if isinstance(e, socket.timeout) or "timed out" in error_msg.lower():
        logger.error(f"Dify Agent请求超时: {error_msg}")
        return "Agent处理超时,但我们仍会尝试为您解答问题。", None
  3. 用户友好错误消息:将技术错误转换为用户可理解的消息:

    user_friendly_msg = "抱歉,处理您的请求时出现错误。"
    if "timeout" in error_msg.lower() or "timed out" in error_msg.lower():
        user_friendly_msg += "AI系统响应超时,您的问题可能需要更长时间处理。请尝试简化问题或稍后再试。"
  4. 全局异常捕获:确保主流程不会因为单个处理器异常而崩溃:

    try:
        # 处理逻辑
    except Exception as e:
        logger.error(f"处理事件出错: {str(e)}")
        logger.error(traceback.format_exc())
        # 返回错误响应

高级安全处理

系统实现了多项安全处理措施:

  1. 身份验证:验证飞书请求的合法性:

    if token != VERIFICATION_TOKEN:
        logger.warning(f"Token验证失败: {token}")
        return HTTPResponse(status=401, ...)
  2. SSL验证处理:处理HTTPS请求时的SSL验证问题:

    ctx = ssl.create_default_context()
    ctx.check_hostname = False
    ctx.verify_mode = ssl.CERT_NONE
  3. 请求去重:避免处理重复事件:

    if event_id in processed_events:
        logger.info(f"跳过重复事件: {event_id}")
        return True
  4. 并发控制:使用锁机制确保线程安全:

    with processing_lock:
        # 处理事件

系统监控与管理

全面的日志系统

系统实现了详细的日志记录,便于监控和问题排查:

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('lark_bot.log'),
        logging.StreamHandler()
    ]
)

日志覆盖了系统运行的各个方面,包括请求处理、错误报告、性能指标等,是系统运维的重要工具。

丰富的管理API

系统提供了一套完整的管理API,用于监控和调整系统行为:

  1. 健康检查:提供基本健康状态监测:

    @app.get('/ping')
    def ping():
        """健康检查接口"""
        return "pong"
  2. 组件测试:验证各AI组件连接状态:

    @app.get('/test-dify')
    @app.get('/test-agent')
  3. 会话管理:查看和清理用户会话:

    @app.get('/clear-session')
    @app.get('/sessions')
  4. 配置调整:动态修改系统参数:

    @app.post('/config/timeout')
    @app.post('/config/agent-update-interval')
    @app.post('/config/context-timeout')

这些API使系统管理员能够在不重启服务的情况下调整系统行为,提高了运维效率。

性能优化措施

系统实现了多项性能优化措施:

  1. 异步处理:将耗时操作放入后台线程,避免阻塞主线程

  2. 会话超时:自动清理长时间不活跃的会话,释放内存资源

  3. 状态通知:在长时间处理时提供进度更新,改善用户体验

  4. 请求缓存:避免重复处理相同的事件,减少不必要的计算

Markdown增强通信

系统实现了对Markdown格式的智能识别和处理,使机器人能够发送格式丰富的回复:

def is_markdown(text):
    """简单判断文本是否包含Markdown格式"""
    # 匹配常见的Markdown格式,如标题、列表、链接、粗体、代码块等
    markdown_patterns = [
        r'#{1,6}\s+\S+',  # 标题
        r'\*\*.*?\*\*',  # 粗体
        # ...更多模式
    ]
    
    for pattern in markdown_patterns:
        if re.search(pattern, text, re.MULTILINE):
            return True

当检测到Markdown格式时,系统会自动使用富文本模式发送消息:

if content and is_markdown(content):
    # 构建富文本消息,使用md标签包装Markdown内容
    post_content = {
        "zh_cn": {
            "title": "",
            "content": [[{"tag": "md", "text": content}]]
        }
    }
    # 使用post类型发送
    data = {
        "receive_id": open_id if open_id else chat_id,
        "msg_type": "post",
        "content": json.dumps(post_content)
    }

这使机器人能够发送包含代码块、表格、列表等复杂格式的回复,大大提升了技术信息的可读性。

部署与维护

系统配置

系统关键配置参数包括:

  • 应用凭证:与飞书平台交互的必要凭证

  • API端点:Dify AI服务的访问地址

  • 超时设置:各类操作的超时参数

  • 上下文配置:会话管理的相关参数

系统初始化

系统启动时会执行一系列初始化操作:

if __name__ == '__main__':
    logger.info("飞书机器人服务启动 - 支持Agent+助手两阶段处理,已加入上下文超时机制")
    logger.info(f"配置信息: APP_ID={APP_ID}, DIFY_API_URL={DIFY_API_URL}")
    
    # 启动上下文清理线程
    cleanup_thread = threading.Thread(target=cleanup_expired_conversations)
    cleanup_thread.daemon = True
    cleanup_thread.start()
    
    # 启动服务
    app.run(host='0.0.0.0', port=8080, debug=True)

主要初始化步骤包括:

  1. 记录启动信息

  2. 启动上下文清理线程

  3. 启动Web服务器

运维建议

  1. 监控日志:定期检查系统日志,关注错误和性能问题

  2. 调整超时:根据实际使用情况调整各类超时参数

  3. 会话管理:监控会话数量,必要时手动清理过期会话

  4. 备份配置:保存关键配置参数的备份

  5. 定期测试:使用测试API验证系统各组件正常工作

实际应用场景

技术支持场景

在AWS技术支持场景中,机器人可以:

  • 解答AWS服务配置问题

  • 分析错误日志截图并提供解决方案

  • 解释AWS文档中的复杂概念

  • 提供最佳实践建议

群组协作场景

在团队协作环境中,机器人可以:

  • 在技术讨论群中响应@询问

  • 解答团队成员的技术问题

  • 分析共享的架构图和配置截图

  • 提供技术决策参考信息

总结

飞书AWS小助手是一个深度集成了多种AI技术的智能聊天机器人系统。通过Agent和助手的创新双层架构,系统成功平衡了知识检索的专业性与对话表达的自然流畅性。先进的上下文管理机制使机器人能够维持连贯对话,而全面的错误处理和异步处理设计确保了系统的稳定性和响应速度。

系统对多种消息类型的支持,特别是图片处理能力,使机器人能够应对复杂多样的用户需求。通过Markdown格式支持,机器人能够以结构化方式呈现技术信息,提升了用户体验。完善的管理接口和监控机制则为系统维护提供了便利。

这一综合系统不仅仅是AWS知识的简单提供者,更是一个能够理解上下文、分析图片、生成专业答案的全功能助手。无论是在个人技术咨询还是团队协作场景,它都能提供准确、自然且连贯的服务,代表了当前聊天机器人技术的先进水平。




完整代码:

      
# !/usr/bin/env python3
# -*- coding: utf-8 -*-

from bottle import Bottle, request, HTTPResponse
import json
import urllib.request
import urllib.parse
import urllib.error
import logging
import os
import traceback
import ssl
import time
import threading
import uuid
import base64
import re
from collections import deque
from io import BytesIO
import socket  # 用于捕获超时异常

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('lark_bot.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# 创建Bottle应用
app = Bottle()

# 应用凭证
VERIFICATION_TOKEN = ""
APP_ID = ""
APP_SECRET = ""
BOT_NAME = "AWS 小助手"  # 机器人的名称,用于识别@消息
BOT_OPEN_ID = ""  # 如果有机器人的open_id,可以填写

# Dify API配置
DIFY_API_URL = ""
DIFY_API_KEY = ""

# Dify Agent API配置
DIFY_AGENT_API_URL = ""
DIFY_AGENT_API_KEY = ""

# 临时文件目录
TEMP_DIR = "temp_images"
os.makedirs(TEMP_DIR, exist_ok=True)

# 用户会话存储
user_conversations = {}
group_conversations = {}  # 新增:群聊会话存储

# 新增:Agent会话存储
agent_conversations = {}

# 新增:会话时间记录
user_conversation_times = {}  # 记录会话最后活跃时间
agent_conversation_times = {}  # 记录Agent会话最后活跃时间
CONTEXT_EXPIRY_SECONDS = 300  # 上下文过期时间:5分钟

# 请求去重
processed_events = deque(maxlen=100)
processing_lock = threading.RLock()

# 重试机制配置
MAX_RETRIES = 3
INITIAL_RETRY_DELAY = 2  # 初始重试延迟(秒)
RETRY_BACKOFF_FACTOR = 1.5  # 延迟增长因子
DIFY_TIMEOUT = 60  # Dify API超时时间(秒)

# Agent状态更新时间配置
AGENT_STATUS_UPDATE_SECONDS = 15  # 每15秒发送一次状态更新

# Agent状态更新时间配置
AGENT_STATUS_REMIND_SECONDS = 31  # 每31秒发送一次状态更新


# 判断文本是否为Markdown格式
def is_markdown(text):
    """简单判断文本是否包含Markdown格式"""
    # 匹配常见的Markdown格式,如标题、列表、链接、粗体、代码块等
    markdown_patterns = [
        r'#{1,6}\s+\S+',  # 标题
        r'\*\*.*?\*\*',  # 粗体
        r'\*.*?\*',  # 斜体
        r'`.*?`',  # 行内代码
        r'```[\s\S]*?```',  # 代码块
        r'!$$.*?$$$$.*?$$',  # 图片
        r'$$.*?$$$$.*?$$',  # 链接
        r'^\s*[*+-]\s+',  # 无序列表
        r'^\s*\d+\.\s+',  # 有序列表
        r'^\s*>\s+',  # 引用
        r'^-{3,}$',  # 水平线
    ]

    for pattern in markdown_patterns:
        if re.search(pattern, text, re.MULTILINE):
            return True

    return False


# 增强版带重试的HTTP请求函数,支持指数级退避和超时处理
def http_request_with_retry(req, context=None, max_retries=MAX_RETRIES,
                            initial_delay=INITIAL_RETRY_DELAY, backoff_factor=RETRY_BACKOFF_FACTOR,
                            timeout=None):
    """执行HTTP请求,使用指数级退避策略自动重试失败的请求"""
    retries = 0
    current_delay = initial_delay

    # 如果设置了timeout,需要使用socket.setdefaulttimeout
    old_timeout = None
    if timeout:
        old_timeout = socket.getdefaulttimeout()
        socket.setdefaulttimeout(timeout)

    try:
        while retries <= max_retries:
            try:
                if context:
                    with urllib.request.urlopen(req, context=context) as response:
                        return response.read()
                else:
                    with urllib.request.urlopen(req) as response:
                        return response.read()
            except (urllib.error.URLError, socket.timeout) as e:
                retries += 1

                # 记录错误信息
                error_msg = str(e)
                if isinstance(e, urllib.error.HTTPError) and hasattr(e, 'code'):
                    error_msg = f"HTTP Error {e.code}: {e.reason}"
                    # 尝试读取更详细的错误信息
                    if hasattr(e, 'read'):
                        try:
                            error_content = e.read().decode('utf-8')
                            logger.error(f"错误详情: {error_content}")
                        except:
                            pass
                elif isinstance(e, socket.timeout):
                    error_msg = "Connection timed out"

                # 检查是否继续重试
                if retries <= max_retries:
                    logger.warning(f"请求失败: {error_msg},将在{current_delay:.1f}秒后重试 ({retries}/{max_retries})")
                    time.sleep(current_delay)
                    current_delay *= backoff_factor  # 指数级增加延迟
                else:
                    logger.error(f"请求失败,已达最大重试次数: {error_msg}")
                    raise e
    finally:
        # 恢复原来的超时设置
        if timeout and old_timeout is not None:
            socket.setdefaulttimeout(old_timeout)

    return None


# 获取tenant_access_token
def get_tenant_access_token():
    """获取tenant_access_token用于API调用"""
    url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
    headers = {
        "Content-Type": "application/json"
    }
    data = {
        "app_id": APP_ID,
        "app_secret": APP_SECRET
    }

    data_bytes = json.dumps(data).encode('utf-8')

    req = urllib.request.Request(url, data=data_bytes, headers=headers, method="POST")

    try:
        response_data = http_request_with_retry(req)
        if response_data:
            response_json = json.loads(response_data.decode('utf-8'))
            token = response_json.get("tenant_access_token")
            logger.info(f"成功获取tenant_access_token: {token[:10]}...")
            return token
        return None
    except Exception as e:
        logger.error(f"获取tenant_access_token失败: {e}")
        return None


# 发送消息到飞书 - 支持Markdown
def send_message(open_id=None, chat_id=None, content=None):
    """发送消息到用户或群组,支持文本和Markdown格式"""
    base_url = "https://open.feishu.cn/open-apis/im/v1/messages"

    params = {"receive_id_type": "open_id" if open_id else "chat_id"}
    url = f"{base_url}?{urllib.parse.urlencode(params)}"

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {get_tenant_access_token()}"
    }

    # 检测是否为Markdown格式,如果是则使用富文本(post)格式发送
    if content and is_markdown(content):
        logger.info("检测到Markdown格式内容,使用富文本格式发送")
        # 构建富文本消息,使用md标签包装Markdown内容
        post_content = {
            "zh_cn": {  # 简体中文,可以根据需要添加其他语言
                "title": "",  # 空标题
                "content": [
                    [
                        {
                            "tag": "md",
                            "text": content
                        }
                    ]
                ]
            }
        }

        data = {
            "receive_id": open_id if open_id else chat_id,
            "msg_type": "post",
            "content": json.dumps(post_content)
        }
    else:
        # 普通文本消息
        msg_content = {"text": content} if content else {"text": "Hello, I'm a bot!"}

        data = {
            "receive_id": open_id if open_id else chat_id,
            "msg_type": "text",
            "content": json.dumps(msg_content)
        }

    data_bytes = json.dumps(data).encode('utf-8')

    req = urllib.request.Request(url, data=data_bytes, headers=headers, method="POST")

    try:
        response_data = http_request_with_retry(req)
        if response_data:
            response_json = json.loads(response_data.decode('utf-8'))
            logger.info(f"消息发送成功: {response_json}")
            return response_json
        return {"code": -1, "msg": "请求失败"}
    except Exception as e:
        logger.error(f"发送消息失败: {e}")
        return {"code": -1, "msg": str(e)}


# 获取消息中的资源文件 - 使用正确的API
def get_message_resource(message_id, file_key):
    """获取消息中的资源文件"""
    # 确保提供了必需的参数
    if not message_id or not file_key:
        logger.error(f"获取资源文件失败: 缺少必需参数, message_id={message_id}, file_key={file_key}")
        return None

    # 正确的API URL格式
    url = f"https://open.feishu.cn/open-apis/im/v1/messages/{message_id}/resources/{file_key}?type=image"

    headers = {
        "Authorization": f"Bearer {get_tenant_access_token()}",
        "Content-Type": "application/json; charset=utf-8"
    }

    req = urllib.request.Request(url, headers=headers)

    try:
        logger.info(f"尝试获取消息资源: message_id={message_id}, file_key={file_key}")

        # 创建SSL上下文,忽略证书验证
        ctx = ssl.create_default_context()
        ctx.check_hostname = False
        ctx.verify_mode = ssl.CERT_NONE

        response_data = http_request_with_retry(req, ctx)
        if response_data:
            logger.info(f"消息资源获取成功: file_key={file_key}, size={len(response_data)} bytes")
            return response_data

        logger.error(f"消息资源获取失败: message_id={message_id}, file_key={file_key}")
        return None
    except Exception as e:
        logger.error(f"获取消息资源失败: {e}, URL={url}")
        traceback_info = traceback.format_exc()
        logger.error(f"错误详情: {traceback_info}")
        return None


# 兼容旧代码的下载图片函数 - 重定向到正确的API
def download_image(image_key, message_id=None):
    """
    从飞书下载图片 - 兼容旧代码,实际转发到get_message_resource
    注意:必须提供message_id才能使用正确的API
    """
    logger.warning(
        f"使用已废弃的download_image函数,请直接使用get_message_resource。image_key={image_key}, message_id={message_id}")

    # 如果没有message_id,无法使用正确的API
    if not message_id:
        logger.error(f"download_image: 缺少message_id参数,无法下载图片")
        return None

    # 转发到正确的API函数
    return get_message_resource(message_id, image_key)


# 上传图片到Dify
def upload_image_to_dify(image_data, user_id):
    """上传图片到Dify"""
    url = f"{DIFY_API_URL}/files/upload"

    # 生成唯一的文件名
    filename = f"{uuid.uuid4()}.jpg"

    # 创建多部分表单数据
    boundary = '----WebKitFormBoundary' + ''.join(['%02x' % c for c in os.urandom(16)])

    headers = {
        'Authorization': f'Bearer {DIFY_API_KEY}',
        'Content-Type': f'multipart/form-data; boundary={boundary}'
    }

    # 构造表单数据
    body = []
    # 添加用户ID
    body.extend([
        f'--{boundary}'.encode(),
        f'Content-Disposition: form-data; name="user"'.encode(),
        b'',
        user_id.encode()
    ])

    # 添加文件数据
    body.extend([
        f'--{boundary}'.encode(),
        f'Content-Disposition: form-data; name="file"; filename="{filename}"'.encode(),
        f'Content-Type: image/jpeg'.encode(),
        b'',
        image_data,
        f'--{boundary}--'.encode()
    ])

    # 连接所有部分
    body_data = b'\r\n'.join(body)

    req = urllib.request.Request(url, data=body_data, headers=headers, method="POST")

    try:
        # 禁用SSL验证
        ctx = ssl.create_default_context()
        ctx.check_hostname = False
        ctx.verify_mode = ssl.CERT_NONE

        logger.info(f"开始上传图片到Dify: user_id={user_id}, file_size={len(image_data)} bytes")

        response_data = http_request_with_retry(req, ctx)
        if response_data:
            response_json = json.loads(response_data.decode('utf-8'))
            logger.info(f"上传图片到Dify成功: {response_json}")
            return response_json
        return None
    except Exception as e:
        logger.error(f"上传图片到Dify失败: {e}")
        return None


# 上传图片到Dify Agent
def upload_image_to_dify_agent(image_data, user_id):
    """上传图片到Dify Agent"""
    url = f"{DIFY_AGENT_API_URL}/files/upload"

    # 生成唯一的文件名
    filename = f"{uuid.uuid4()}.jpg"

    # 创建多部分表单数据
    boundary = '----WebKitFormBoundary' + ''.join(['%02x' % c for c in os.urandom(16)])

    headers = {
        'Authorization': f'Bearer {DIFY_AGENT_API_KEY}',
        'Content-Type': f'multipart/form-data; boundary={boundary}'
    }

    # 构造表单数据
    body = []
    # 添加用户ID
    body.extend([
        f'--{boundary}'.encode(),
        f'Content-Disposition: form-data; name="user"'.encode(),
        b'',
        user_id.encode()
    ])

    # 添加文件数据
    body.extend([
        f'--{boundary}'.encode(),
        f'Content-Disposition: form-data; name="file"; filename="{filename}"'.encode(),
        f'Content-Type: image/jpeg'.encode(),
        b'',
        image_data,
        f'--{boundary}--'.encode()
    ])

    # 连接所有部分
    body_data = b'\r\n'.join(body)

    req = urllib.request.Request(url, data=body_data, headers=headers, method="POST")

    try:
        # 禁用SSL验证
        ctx = ssl.create_default_context()
        ctx.check_hostname = False
        ctx.verify_mode = ssl.CERT_NONE

        logger.info(f"开始上传图片到Dify Agent: user_id={user_id}, file_size={len(image_data)} bytes")

        response_data = http_request_with_retry(req, ctx)
        if response_data:
            response_json = json.loads(response_data.decode('utf-8'))
            logger.info(f"上传图片到Dify Agent成功: {response_json}")
            return response_json
        return None
    except Exception as e:
        logger.error(f"上传图片到Dify Agent失败: {e}")
        return None


# 新增:更新会话时间函数
def update_conversation_time(user_id):
    """更新用户会话的最后活跃时间"""
    current_time = time.time()
    user_conversation_times[user_id] = current_time
    agent_conversation_times[user_id] = current_time


# 新增:检查会话是否过期
def is_conversation_expired(user_id):
    """检查用户会话是否已过期"""
    current_time = time.time()
    last_time = user_conversation_times.get(user_id, 0)
    return (current_time - last_time) > CONTEXT_EXPIRY_SECONDS


# 新增:清理过期会话函数
def cleanup_expired_conversations():
    """后台任务:清理过期会话"""
    while True:
        try:
            current_time = time.time()
            expired_users = []

            # 检查过期会话
            for user_id, last_time in list(user_conversation_times.items()):
                if (current_time - last_time) > CONTEXT_EXPIRY_SECONDS:
                    expired_users.append(user_id)

            # 清理过期会话
            for user_id in expired_users:
                if user_id in user_conversations:
                    del user_conversations[user_id]
                if user_id in agent_conversations:
                    del agent_conversations[user_id]
                if user_id in user_conversation_times:
                    del user_conversation_times[user_id]
                if user_id in agent_conversation_times:
                    del agent_conversation_times[user_id]
                logger.info(f"自动清理过期会话: 用户{user_id}")

            # 每60秒检查一次
            time.sleep(60)
        except Exception as e:
            logger.error(f"清理过期会话时出错: {str(e)}")
            time.sleep(60)  # 出错后等待一分钟再次尝试


# 向Dify Agent发送请求并获取响应 - 修改支持会话ID
def ask_dify_agent(query, conversation_id=None, user_id="default_user", with_image=False, image_file_id=None):
    """向Dify Agent API发送请求,获取文档检索结果"""
    url = f"{DIFY_AGENT_API_URL}/chat-messages"

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {DIFY_AGENT_API_KEY}"
    }

    data = {
        "query": query,
        "inputs": {},
        "response_mode": "streaming",  # Agent模式下必须使用流式返回
        "user": user_id
    }

    # 如果有图片,添加到请求中
    if with_image and image_file_id:
        data["files"] = [
            {
                "type": "image",
                "transfer_method": "local_file",
                "upload_file_id": image_file_id
            }
        ]

    # 如果有会话ID,添加到请求中
    if conversation_id:
        data["conversation_id"] = conversation_id

    data_bytes = json.dumps(data).encode('utf-8')

    req = urllib.request.Request(url, data=data_bytes, headers=headers, method="POST")

    try:
        # 创建SSL上下文,忽略证书验证
        ctx = ssl.create_default_context()
        ctx.check_hostname = False
        ctx.verify_mode = ssl.CERT_NONE

        opener = urllib.request.build_opener(urllib.request.HTTPRedirectHandler())
        urllib.request.install_opener(opener)

        logger.info(f"向Dify Agent发送请求: URL={url}, 用户={user_id}, 会话ID={conversation_id}")

        # 使用流式响应处理
        agent_response = ""
        agent_thoughts = []
        start_time = time.time()
        last_update_time = start_time
        status_update_sent = False

        # 打开流式连接
        with urllib.request.urlopen(req, context=ctx, timeout=DIFY_TIMEOUT) as response:
            buffer = b""

            while True:
                # 检查是否需要发送状态更新
                current_time = time.time()
                if current_time - last_update_time > AGENT_STATUS_REMIND_SECONDS and not status_update_sent:
                    # 向用户发送状态更新
                    send_message(open_id=user_id, content="小助手正在搜索和整理资料,这可能需要一些时间,请稍候...")
                    status_update_sent = True
                    last_update_time = current_time

                chunk = response.read(1024)
                if not chunk:
                    break

                buffer += chunk

                # 处理完整的SSE事件
                while b"\n\n" in buffer:
                    event, buffer = buffer.split(b"\n\n", 1)
                    if event.startswith(b"data: "):
                        event_data = event[6:]  # 移除 "data: " 前缀
                        try:
                            event_json = json.loads(event_data)

                            # 处理不同类型的事件
                            if event_json.get("event") == "agent_message":
                                # 累积agent回复
                                agent_response += event_json.get("answer", "")
                            elif event_json.get("event") == "agent_thought":
                                # 收集agent思考过程
                                thought = event_json.get("thought", "")
                                if thought:
                                    agent_thoughts.append(thought)
                            elif event_json.get("event") == "message_end":
                                # 流结束
                                # 保存会话ID - 新增的部分
                                if "conversation_id" in event_json:
                                    agent_conversations[user_id] = event_json["conversation_id"]
                                    logger.info(f"保存Agent会话ID: {user_id} -> {event_json['conversation_id']}")
                                break
                            elif event_json.get("event") == "error":
                                # 错误处理
                                logger.error(f"Agent API错误: {event_json}")
                                return f"Agent处理出错: {event_json.get('message', '未知错误')}", None

                        except json.JSONDecodeError:
                            logger.error(f"解析Agent响应JSON失败: {event_data}")

        # 提取agent思考作为参考
        agent_thought_text = "\n".join(agent_thoughts) if agent_thoughts else ""

        # 返回Agent的回复和思考内容
        logger.info(f"Agent响应完成,处理时间: {time.time() - start_time:.2f}秒")
        return agent_response, agent_thought_text

    except Exception as e:
        error_type = type(e).__name__
        error_msg = str(e)

        # 特殊处理超时错误
        if isinstance(e, socket.timeout) or "timed out" in error_msg.lower():
            logger.error(f"Dify Agent请求超时: {error_msg}")
            return "Agent处理超时,但我们仍会尝试为您解答问题。", None

        logger.error(f"向Dify Agent发送请求时发生异常({error_type}): {error_msg}")
        logger.error(traceback.format_exc())
        return f"Agent处理出错: {error_msg}", None


# 向Dify发送带图片的请求 - 增强版本
def ask_dify_with_image(query, image_file_id, conversation_id=None, user_id="default_user"):
    """向Dify API发送带图片的问题并获取回答 - 添加特殊重试逻辑"""
    url = f"{DIFY_API_URL}/chat-messages"

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {DIFY_API_KEY}"
    }

    data = {
        "query": query,
        "inputs": {},
        "response_mode": "blocking",
        "user": user_id,
        "files": [
            {
                "type": "image",
                "transfer_method": "local_file",
                "upload_file_id": image_file_id
            }
        ]
    }

    # 如果有会话ID,添加到请求中
    if conversation_id:
        data["conversation_id"] = conversation_id

    data_bytes = json.dumps(data).encode('utf-8')

    req = urllib.request.Request(url, data=data_bytes, headers=headers, method="POST")

    try:
        # 创建自定义的opener,支持重定向和忽略SSL证书验证问题
        ctx = ssl.create_default_context()
        ctx.check_hostname = False
        ctx.verify_mode = ssl.CERT_NONE

        opener = urllib.request.build_opener(urllib.request.HTTPRedirectHandler())
        urllib.request.install_opener(opener)

        logger.info(
            f"向Dify发送带图片的请求: URL={url}, 用户={user_id}, 会话ID={conversation_id}, 图片ID={image_file_id}")

        # 使用增强的重试函数,设置更长的超时
        response_data = http_request_with_retry(req, ctx, timeout=DIFY_TIMEOUT)
        if not response_data:
            return "抱歉,无法连接到AI服务,请稍后再试"

        response_json = json.loads(response_data.decode('utf-8'))

        # 保存会话ID
        if "conversation_id" in response_json:
            user_conversations[user_id] = response_json["conversation_id"]
            logger.info(f"保存用户会话ID: {user_id} -> {response_json['conversation_id']}")

        if "answer" in response_json:
            return response_json["answer"]
        else:
            logger.warning(f"未找到回答字段: {response_json}")
            return "抱歉,无法获取回答"
    except Exception as e:
        error_type = type(e).__name__
        error_msg = str(e)

        # 特殊处理超时错误
        if isinstance(e, socket.timeout) or "timed out" in error_msg.lower():
            logger.error(f"Dify请求超时: {error_msg}")
            return "抱歉,AI生成回复超时,您的问题可能需要更长时间处理。请尝试简化问题或稍后再试。"

        logger.error(f"向Dify发送请求时发生异常({error_type}): {error_msg}")
        logger.error(traceback.format_exc())
        return f"抱歉,处理请求时出错: {error_msg}"


# 向Dify发送纯文本请求 - 增强版本
def ask_dify(query, conversation_id=None, user_id="default_user"):
    """向Dify API发送问题并获取回答 - 添加特殊重试逻辑"""
    url = f"{DIFY_API_URL}/chat-messages"

    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {DIFY_API_KEY}"
    }

    data = {
        "query": query,
        "inputs": {},
        "response_mode": "blocking",  # 使用阻塞模式
        "user": user_id
    }

    # 如果有会话ID,添加到请求中
    if conversation_id:
        data["conversation_id"] = conversation_id

    data_bytes = json.dumps(data).encode('utf-8')

    req = urllib.request.Request(url, data=data_bytes, headers=headers, method="POST")

    try:
        # 创建自定义的opener,支持重定向和忽略SSL证书验证问题
        ctx = ssl.create_default_context()
        ctx.check_hostname = False
        ctx.verify_mode = ssl.CERT_NONE

        opener = urllib.request.build_opener(urllib.request.HTTPRedirectHandler())
        urllib.request.install_opener(opener)

        logger.info(f"向Dify发送请求: URL={url}, 用户={user_id}, 会话ID={conversation_id}")

        # 使用增强的重试函数,设置更长的超时
        response_data = http_request_with_retry(req, ctx, timeout=DIFY_TIMEOUT)
        if not response_data:
            return "抱歉,无法连接到AI服务,请稍后再试"

        response_json = json.loads(response_data.decode('utf-8'))

        # 保存会话ID
        if "conversation_id" in response_json:
            user_conversations[user_id] = response_json["conversation_id"]
            logger.info(f"保存用户会话ID: {user_id} -> {response_json['conversation_id']}")

        if "answer" in response_json:
            return response_json["answer"]
        else:
            logger.warning(f"未找到回答字段: {response_json}")
            return "抱歉,无法获取回答"
    except Exception as e:
        error_type = type(e).__name__
        error_msg = str(e)

        # 特殊处理超时错误
        if isinstance(e, socket.timeout) or "timed out" in error_msg.lower():
            logger.error(f"Dify请求超时: {error_msg}")
            return "抱歉,AI生成回复超时,您的问题可能需要更长时间处理。请尝试简化问题或稍后再试。"

        logger.error(f"向Dify发送请求时发生异常({error_type}): {error_msg}")
        logger.error(traceback.format_exc())
        return f"抱歉,处理请求时出错: {error_msg}"


# 增强版处理流程 - 先询问Agent再使用聊天助手
def process_with_agent_then_assistant(query, conversation_id=None, user_id="default_user",
                                      with_image=False, image_file_id=None):
    """
    使用Agent + Assistant串联处理用户请求
    先询问Agent获取知识库检索结果,再将结果和原始查询一起发送给聊天助手
    """
    try:
        logger.info(f"开始两阶段处理: 用户={user_id}, 会话ID={conversation_id}")

        # 检查会话是否过期 - 新增部分
        if is_conversation_expired(user_id):
            # 清除过期会话
            if user_id in user_conversations:
                del user_conversations[user_id]
            if user_id in agent_conversations:
                del agent_conversations[user_id]
            logger.info(f"用户{user_id}的会话已过期,创建新会话")
            conversation_id = None

        # 更新会话时间 - 新增部分
        update_conversation_time(user_id)

        # 获取Agent会话ID - 新增部分
        agent_conversation_id = agent_conversations.get(user_id)

        # 第一阶段: 询问Agent - 传入Agent会话ID
        agent_response, agent_thoughts = ask_dify_agent(
            query,
            agent_conversation_id,  # 使用Agent的会话ID
            user_id,
            with_image,
            image_file_id
        )

        if not agent_response:
            agent_response = "Agent未返回结果,将直接由助手回答您的问题。"

        # 构建发送给聊天助手的增强查询
        enhanced_query = f"""用户原始问题: {query}

以下是Agent检索到的相关信息:
{agent_response}

请基于上述信息,为用户提供准确、全面且易于理解的回答。如果Agent提供的信息不足以回答用户问题,请基于你自身的知识给出最佳回答。
回复时直接回答用户问题,不要提及Agent或引用上述信息。"""

        # 第二阶段: 发送给聊天助手
        assistant_response = ask_dify(enhanced_query, conversation_id, user_id)

        # 返回聊天助手的回复
        return assistant_response

    except Exception as e:
        error_type = type(e).__name__
        error_msg = str(e)
        logger.error(f"两阶段处理过程中出错({error_type}): {error_msg}")
        logger.error(traceback.format_exc())

        # 尝试直接使用聊天助手回答
        try:
            return ask_dify(query, conversation_id, user_id)
        except:
            return f"抱歉,处理您的请求时出现错误。请稍后再试。"


# 从富文本内容中提取图片key
def extract_image_key_from_post(content_json):
    """从富文本内容中提取图片key"""
    # 记录原始json以便调试
    logger.info(f"提取图片key,原始JSON: {json.dumps(content_json)}")

    # 首先检查直接包含在content_json中的image_key
    if "image_key" in content_json:
        return content_json.get("image_key")

    if "content" in content_json:
        post_content = content_json["content"]

        # 内容可能是字符串形式的JSON
        if isinstance(post_content, str):
            try:
                post_content = json.loads(post_content)
            except:
                logger.error(f"无法解析post_content字符串: {post_content}")
                return None

        # 遍历所有可能的位置寻找图片
        if isinstance(post_content, list):
            for item in post_content:
                if isinstance(item, list):
                    for element in item:
                        if isinstance(element, dict) and element.get("tag") == "img":
                            return element.get("image_key")
                elif isinstance(item, dict) and item.get("tag") == "img":
                    return item.get("image_key")

        # 处理嵌套字典的情况
        elif isinstance(post_content, dict):
            for key, value in post_content.items():
                if isinstance(value, list):
                    for item in value:
                        if isinstance(item, list):
                            for element in item:
                                if isinstance(element, dict) and element.get("tag") == "img":
                                    return element.get("image_key")
                        elif isinstance(item, dict) and item.get("tag") == "img":
                            return item.get("image_key")

    return None


# 检查消息是否@了机器人
def is_bot_mentioned(mentions):
    """检查消息中是否@了机器人"""
    if not mentions:
        return False

    for mention in mentions:
        # 通过名称或ID匹配
        mention_name = mention.get("name", "")
        mention_id = mention.get("id", {}).get("open_id", "")

        # 检查是否匹配机器人名称或ID
        if (BOT_NAME and BOT_NAME in mention_name) or (BOT_OPEN_ID and BOT_OPEN_ID == mention_id):
            return True

    return False


# 从@消息中删除@部分
def remove_mentions(text, mentions):
    """从文本中删除@部分"""
    if not mentions:
        return text

    for mention in mentions:
        mention_name = mention.get("name", "")
        if mention_name:
            # 删除@用户名
            text = text.replace(f"@{mention_name}", "").strip()

    return text


# 异步处理消息 - 支持群聊回复
def process_message_async_group(sender_id, content, message_type="text", image_key=None, conversation_id=None,
                                image_data=None, reply_id=None, reply_type="open_id", message_id=None):
    """异步处理消息,支持文本和图片,使用Agent+助手双重处理,支持群聊回复"""
    try:
        # 保存原始请求信息,用于重试
        start_time = time.time()
        logger.info(f"开始处理消息: type={message_type}, sender={sender_id}, reply_to={reply_id}({reply_type})")

        # 检查上下文是否过期 - 新增部分
        if is_conversation_expired(sender_id):
            # 清除过期会话
            if sender_id in user_conversations:
                del user_conversations[sender_id]
            if sender_id in agent_conversations:
                del agent_conversations[sender_id]
            logger.info(f"用户{sender_id}的会话已过期,创建新会话")
            conversation_id = None

        # 更新会话时间 - 新增部分
        update_conversation_time(sender_id)

        # 使用发送者ID检索会话
        if conversation_id is None:
            # 根据回复类型选择会话存储
            if reply_type == "chat_id":
                conversation_id = group_conversations.get(reply_id)
            else:
                conversation_id = user_conversations.get(sender_id)

        if message_type == "text":
            # 纯文本消息 - 使用Agent+助手处理
            assistant_response = process_with_agent_then_assistant(content, conversation_id, sender_id)

            # 保存会话ID
            if reply_type == "chat_id" and assistant_response:
                # 如果是群聊,使用群ID保存会话
                group_conversations[reply_id] = user_conversations.get(sender_id)

            # 根据回复类型选择发送方式
            if reply_type == "open_id":
                send_message(open_id=reply_id, content=assistant_response)
            else:
                send_message(chat_id=reply_id, content=assistant_response)

        elif message_type == "image":
            # 纯图片消息
            if not image_data and image_key and message_id:
                image_data = get_message_resource(message_id, image_key)

            if not image_data:
                if reply_type == "open_id":
                    send_message(open_id=reply_id, content="抱歉,无法获取您发送的图片。请尝试重新发送。")
                else:
                    send_message(chat_id=reply_id, content="抱歉,无法获取您发送的图片。请尝试重新发送。")
                return

            # 上传图片到Dify和Agent
            upload_result = upload_image_to_dify(image_data, sender_id)
            upload_agent_result = upload_image_to_dify_agent(image_data, sender_id)

            if not upload_result or "id" not in upload_result:
                if reply_type == "open_id":
                    send_message(open_id=reply_id, content="抱歉,上传图片到AI系统失败")
                else:
                    send_message(chat_id=reply_id, content="抱歉,上传图片到AI系统失败")
                return

            if not upload_agent_result or "id" not in upload_agent_result:
                # 如果仅Agent上传失败,则继续使用聊天助手处理
                logger.warning("上传图片到Agent失败,仅使用聊天助手处理")
                assistant_response = ask_dify_with_image("请描述这张图片", upload_result["id"], conversation_id,
                                                         sender_id)

                # 保存会话ID
                if reply_type == "chat_id" and assistant_response:
                    group_conversations[reply_id] = user_conversations.get(sender_id)

                if reply_type == "open_id":
                    send_message(open_id=reply_id, content=assistant_response)
                else:
                    send_message(chat_id=reply_id, content=assistant_response)
                return

            # 发送图片到Dify Agent和助手
            assistant_response = process_with_agent_then_assistant(
                "请描述这张图片,并提取其中包含的关键信息",
                conversation_id,
                sender_id,
                True,  # with_image
                upload_agent_result["id"]  # 使用Agent的上传ID
            )

            # 保存会话ID
            if reply_type == "chat_id" and assistant_response:
                group_conversations[reply_id] = user_conversations.get(sender_id)

            if reply_type == "open_id":
                send_message(open_id=reply_id, content=assistant_response)
            else:
                send_message(chat_id=reply_id, content=assistant_response)

        elif message_type == "text_with_image":
            # 文本+图片混合消息
            if not image_data and image_key and message_id:
                image_data = get_message_resource(message_id, image_key)

            if not image_data:
                if reply_type == "open_id":
                    send_message(open_id=reply_id, content="抱歉,无法获取您发送的图片。将只处理文本部分。")
                else:
                    send_message(chat_id=reply_id, content="抱歉,无法获取您发送的图片。将只处理文本部分。")

                # 只处理文本部分
                if content.strip():
                    assistant_response = process_with_agent_then_assistant(content, conversation_id, sender_id)

                    # 保存会话ID
                    if reply_type == "chat_id" and assistant_response:
                        group_conversations[reply_id] = user_conversations.get(sender_id)

                    if reply_type == "open_id":
                        send_message(open_id=reply_id, content=assistant_response)
                    else:
                        send_message(chat_id=reply_id, content=assistant_response)
                return

            # 上传图片到Dify和Agent
            upload_result = upload_image_to_dify(image_data, sender_id)
            upload_agent_result = upload_image_to_dify_agent(image_data, sender_id)

            if not upload_result or "id" not in upload_result:
                if reply_type == "open_id":
                    send_message(open_id=reply_id, content="抱歉,上传图片到AI系统失败。将只处理文本部分。")
                else:
                    send_message(chat_id=reply_id, content="抱歉,上传图片到AI系统失败。将只处理文本部分。")

                # 只处理文本部分
                if content.strip():
                    assistant_response = process_with_agent_then_assistant(content, conversation_id, sender_id)

                    # 保存会话ID
                    if reply_type == "chat_id" and assistant_response:
                        group_conversations[reply_id] = user_conversations.get(sender_id)

                    if reply_type == "open_id":
                        send_message(open_id=reply_id, content=assistant_response)
                    else:
                        send_message(chat_id=reply_id, content=assistant_response)
                return

            if not upload_agent_result or "id" not in upload_agent_result:
                # 如果仅Agent上传失败,则继续使用聊天助手处理
                logger.warning("上传图片到Agent失败,仅使用聊天助手处理")
                assistant_response = ask_dify_with_image(content, upload_result["id"], conversation_id, sender_id)

                # 保存会话ID
                if reply_type == "chat_id" and assistant_response:
                    group_conversations[reply_id] = user_conversations.get(sender_id)

                if reply_type == "open_id":
                    send_message(open_id=reply_id, content=assistant_response)
                else:
                    send_message(chat_id=reply_id, content=assistant_response)
                return

            # 发送文本+图片到Dify Agent和助手
            assistant_response = process_with_agent_then_assistant(
                content,
                conversation_id,
                sender_id,
                True,  # with_image
                upload_agent_result["id"]  # 使用Agent的上传ID
            )

            # 保存会话ID
            if reply_type == "chat_id" and assistant_response:
                group_conversations[reply_id] = user_conversations.get(sender_id)

            if reply_type == "open_id":
                send_message(open_id=reply_id, content=assistant_response)
            else:
                send_message(chat_id=reply_id, content=assistant_response)

        processing_time = time.time() - start_time
        logger.info(f"消息处理完成: type={message_type}, 用时={processing_time:.2f}秒")

    except Exception as e:
        error_type = type(e).__name__
        error_msg = str(e)
        logger.error(f"异步处理消息时出错({error_type}): {error_msg}")
        logger.error(traceback.format_exc())

        # 友好的错误消息
        user_friendly_msg = "抱歉,处理您的请求时出现错误。"
        if "timeout" in error_msg.lower() or "timed out" in error_msg.lower():
            user_friendly_msg += "AI系统响应超时,您的问题可能需要更长时间处理。请尝试简化问题或稍后再试。"
        else:
            user_friendly_msg += f"错误信息: {error_msg}"

        if reply_type == "open_id":
            send_message(open_id=reply_id, content=user_friendly_msg)
        else:
            send_message(chat_id=reply_id, content=user_friendly_msg)


# 处理文本消息 - 支持群聊@
def process_text_message(event_data, sender_id):
    """处理文本消息,支持私聊和群聊@"""
    try:
        event = event_data.get("event", {})
        message = event.get("message", {})

        # 解析消息内容
        content_json = json.loads(message.get("content", "{}"))
        text_content = content_json.get("text", "")

        # 获取消息元数据
        chat_type = message.get("chat_type")
        chat_id = message.get("chat_id")
        message_id = message.get("message_id")

        logger.info(f"处理文本消息: chat_type={chat_type}, chat_id={chat_id}, text={text_content}")

        # 检查是否是群聊@消息
        is_mention = False
        mentions = message.get("mentions", [])

        # 如果是群聊且有@提及
        if chat_type == "group" and mentions:
            logger.info(f"检测到群聊消息,mentions={mentions}")
            # 检查是否@了机器人
            is_mention = is_bot_mentioned(mentions)
            if is_mention:
                # 去除@机器人部分的文本,只保留实际问题
                text_content = remove_mentions(text_content, mentions)
                logger.info(f"机器人被@,处理请求: {text_content}")

        # 确定回复方式:私聊用open_id,群聊@用chat_id
        reply_id = chat_id if is_mention and chat_type == "group" else sender_id
        reply_type = "chat_id" if is_mention and chat_type == "group" else "open_id"

        # 获取会话ID (基于用户ID)
        conversation_id = user_conversations.get(sender_id)

        # 仅处理私聊消息或群聊中@机器人的消息
        if chat_type != "group" or (chat_type == "group" and is_mention):
            # 发送"正在思考"的消息
            if reply_type == "open_id":
                send_message(open_id=reply_id, content="正在思考中,请稍候...")
            else:
                send_message(chat_id=reply_id, content="正在思考中,请稍候...")

            # 启动异步线程处理消息
            processing_thread = threading.Thread(
                target=process_message_async_group,
                args=(sender_id, text_content, "text", None, conversation_id, None, reply_id, reply_type, message_id)
            )
            processing_thread.daemon = True
            processing_thread.start()

            return True
        else:
            # 不是@机器人的群聊消息,忽略
            logger.info(f"忽略群聊非@消息")
            return True
    except Exception as e:
        logger.error(f"处理文本消息时出错: {str(e)}")
        logger.error(traceback.format_exc())
        return False


# 处理图片消息 - 使用正确的API获取图片
def process_image_message(event_data, sender_id):
    """处理图片消息,支持私聊和群聊"""
    try:
        event = event_data.get("event", {})
        message = event.get("message", {})
        message_id = message.get("message_id")
        chat_type = message.get("chat_type")
        chat_id = message.get("chat_id")

        # 如果没有message_id,无法处理
        if not message_id:
            logger.error("处理图片消息失败: 没有message_id")
            if chat_type == "group":
                send_message(chat_id=chat_id, content="抱歉,无法处理这条消息")
            else:
                send_message(open_id=sender_id, content="抱歉,无法处理这条消息")
            return False

        # 检查是否是群聊@消息
        is_mention = False
        mentions = message.get("mentions", [])

        # 如果是群聊且有@提及
        if chat_type == "group" and mentions:
            is_mention = is_bot_mentioned(mentions)

        # 根据消息类型确定image_key
        image_key = None
        if message.get("message_type") == "image":
            # 直接图片消息
            content_json = json.loads(message.get("content", "{}"))
            image_key = content_json.get("image_key")
            logger.info(f"从图片消息中获取image_key: {image_key}")
        else:
            # 富文本消息中的图片
            content_json = json.loads(message.get("content", "{}"))
            image_key = extract_image_key_from_post(content_json)
            logger.info(f"从富文本消息中获取image_key: {image_key}")

        if not image_key:
            logger.error("未找到image_key")
            # 给用户反馈
            if chat_type == "group" and is_mention:
                send_message(chat_id=chat_id, content="抱歉,无法识别您发送的图片。请尝试重新发送。")
            else:
                send_message(open_id=sender_id, content="抱歉,无法识别您发送的图片。请尝试重新发送。")
            return False

        # 确定回复方式:私聊用open_id,群聊@用chat_id
        reply_id = chat_id if is_mention and chat_type == "group" else sender_id
        reply_type = "chat_id" if is_mention and chat_type == "group" else "open_id"

        # 群聊中如果没有@机器人,则不处理
        if chat_type == "group" and not is_mention:
            logger.info(f"忽略群聊非@图片消息")
            return True

        logger.info(f"收到图片消息: image_key={image_key}, message_id={message_id}, reply_type={reply_type}")

        # 获取该用户的会话ID(如果有)
        conversation_id = user_conversations.get(sender_id)

        # 回复正在处理
        if reply_type == "open_id":
            send_message(open_id=reply_id,
                         content="收到您的图片,正在分析中...\n如果您有特定问题,请告诉我您想了解图片中的什么信息。")
        else:
            send_message(chat_id=reply_id,
                         content="收到您的图片,正在分析中...\n如果您有特定问题,请告诉我您想了解图片中的什么信息。")

        # 获取图片数据 - 使用正确的API
        image_data = get_message_resource(message_id, image_key)

        if not image_data:
            if reply_type == "open_id":
                send_message(open_id=reply_id, content="抱歉,无法获取您发送的图片。请尝试重新发送或换一种方式提问。")
            else:
                send_message(chat_id=reply_id, content="抱歉,无法获取您发送的图片。请尝试重新发送或换一种方式提问。")
            return False

        # 启动异步线程处理消息
        processing_thread = threading.Thread(
            target=process_message_async_group,
            args=(
                sender_id, "请描述这张图片并提取其中有用的信息", "image", image_key, conversation_id, image_data,
                reply_id,
                reply_type, message_id)
        )
        processing_thread.daemon = True
        processing_thread.start()

        return True
    except Exception as e:
        logger.error(f"处理图片消息时出错: {str(e)}")
        logger.error(traceback.format_exc())
        try:
            if chat_type == "group" and is_mention:
                send_message(chat_id=chat_id, content="抱歉,处理图片时出现错误,请稍后重试")
            else:
                send_message(open_id=sender_id, content="抱歉,处理图片时出现错误,请稍后重试")
        except:
            logger.error("发送错误消息失败")
        return False


# 处理富文本消息 - 使用正确的API获取图片
def process_post_message(event_data, sender_id):
    """处理富文本消息,支持私聊和群聊"""
    try:
        message = event_data.get("event", {}).get("message", {})
        message_id = message.get("message_id")
        chat_type = message.get("chat_type")
        chat_id = message.get("chat_id")

        # 如果没有message_id,无法处理
        if not message_id:
            logger.error("处理富文本消息失败: 没有message_id")
            if chat_type == "group":
                send_message(chat_id=chat_id, content="抱歉,无法处理这条消息")
            else:
                send_message(open_id=sender_id, content="抱歉,无法处理这条消息")
            return False

        # 检查是否是群聊@消息
        is_mention = False
        mentions = message.get("mentions", [])

        # 如果是群聊且有@提及
        if chat_type == "group" and mentions:
            is_mention = is_bot_mentioned(mentions)

        # 解析富文本内容
        content_json = json.loads(message.get("content", "{}"))

        # 确定回复方式:私聊用open_id,群聊@用chat_id
        reply_id = chat_id if is_mention and chat_type == "group" else sender_id
        reply_type = "chat_id" if is_mention and chat_type == "group" else "open_id"

        # 群聊中如果没有@机器人,则不处理
        if chat_type == "group" and not is_mention:
            logger.info(f"忽略群聊非@富文本消息")
            return True

        # 记录原始json以便调试
        logger.info(f"原始富文本JSON: {json.dumps(content_json)}")

        # 提取文本和图片
        text_content = ""
        image_key = None

        # 处理富文本内容
        if "content" in content_json:
            post_content = content_json["content"]

            # 检查post_content的类型
            if isinstance(post_content, str):
                try:
                    post_content = json.loads(post_content)
                except:
                    logger.error(f"无法解析post_content字符串: {post_content}")

            # 从文本内容中提取文本
            if isinstance(post_content, list):
                for item in post_content:
                    if isinstance(item, list):
                        for element in item:
                            if isinstance(element, dict):
                                if element.get("tag") == "text":
                                    text_content += element.get("text", "")
                                elif element.get("tag") == "img" and not image_key:
                                    image_key = element.get("image_key")
                    elif isinstance(item, dict):
                        if item.get("tag") == "text":
                            text_content += item.get("text", "")
                        elif item.get("tag") == "img" and not image_key:
                            image_key = item.get("image_key")
            elif isinstance(post_content, dict):
                for key, value in post_content.items():
                    if isinstance(value, list):
                        for item in value:
                            if isinstance(item, dict):
                                if item.get("tag") == "text":
                                    text_content += item.get("text", "")
                                elif item.get("tag") == "img" and not image_key:
                                    image_key = item.get("image_key")

        # 如果是群聊@消息,移除@部分
        if is_mention and chat_type == "group":
            text_content = remove_mentions(text_content, mentions)

        logger.info(f"解析富文本消息: 文本={text_content}, 图片key={image_key}, 回复方式={reply_type}")

        # 获取该用户的会话ID(如果有)
        conversation_id = user_conversations.get(sender_id)

        # 回复正在处理
        if reply_type == "open_id":
            send_message(open_id=reply_id, content="正在处理您的消息,请稍候...")
        else:
            send_message(chat_id=reply_id, content="正在处理您的消息,请稍候...")

        # 如果有图片
        if image_key:
            # 获取图片数据 - 使用正确的API
            image_data = get_message_resource(message_id, image_key)

            if image_data:
                # 启动异步线程处理消息
                processing_thread = threading.Thread(
                    target=process_message_async_group,
                    args=(sender_id, text_content, "text_with_image", image_key, conversation_id, image_data, reply_id,
                          reply_type, message_id)
                )
                processing_thread.daemon = True
                processing_thread.start()
                return True
            else:
                logger.warning(f"无法获取富文本中的图片,仅处理文本内容: image_key={image_key}")
                # 图片获取失败,但仍处理文本部分
                if text_content.strip():
                    processing_thread = threading.Thread(
                        target=process_message_async_group,
                        args=(
                            sender_id, text_content, "text", None, conversation_id, None, reply_id, reply_type,
                            message_id)
                    )
                    processing_thread.daemon = True
                    processing_thread.start()
                    return True
                else:
                    if reply_type == "open_id":
                        send_message(open_id=reply_id, content="抱歉,无法处理您的消息,图片获取失败且没有文本内容")
                    else:
                        send_message(chat_id=reply_id, content="抱歉,无法处理您的消息,图片获取失败且没有文本内容")
                    return False
        elif text_content.strip():
            # 只有文本,正常处理
            processing_thread = threading.Thread(
                target=process_message_async_group,
                args=(sender_id, text_content, "text", None, conversation_id, None, reply_id, reply_type, message_id)
            )
            processing_thread.daemon = True
            processing_thread.start()
            return True
        else:
            if reply_type == "open_id":
                send_message(open_id=reply_id, content="抱歉,无法理解您的消息,没有找到文本或图片内容")
            else:
                send_message(chat_id=reply_id, content="抱歉,无法理解您的消息,没有找到文本或图片内容")
            return False
    except Exception as e:
        logger.error(f"处理富文本消息时出错: {str(e)}")
        logger.error(traceback.format_exc())
        try:
            if chat_type == "group" and is_mention:
                send_message(chat_id=chat_id, content="抱歉,处理您的富文本消息时出现错误")
            else:
                send_message(open_id=sender_id, content="抱歉,处理您的富文本消息时出现错误")
        except:
            logger.error("发送错误消息失败")
        return False


# 处理v2.0版本的消息事件 - 优化事件处理
def handle_v2_event(event_data):
    """处理v2.0版本的事件"""
    header = event_data.get("header", {})
    event_type = header.get("event_type")
    event_id = header.get("event_id")

    # 事件去重检查
    if event_id in processed_events:
        logger.info(f"跳过重复事件: {event_id}")
        return True

    # 添加到已处理事件列表
    processed_events.append(event_id)

    # 处理消息事件
    if event_type == "im.message.receive_v1":
        event = event_data.get("event", {})
        sender = event.get("sender", {})
        sender_id = sender.get("sender_id", {}).get("open_id")
        message = event.get("message", {})
        msg_type = message.get("message_type")
        chat_type = message.get("chat_type")

        logger.info(f"收到v2.0消息: 类型={msg_type}, 发送者={sender_id}, 聊天类型={chat_type}")

        if msg_type == "text":
            # 处理文本消息
            return process_text_message(event_data, sender_id)

        elif msg_type == "image":
            # 处理图片消息
            return process_image_message(event_data, sender_id)

        elif msg_type == "post":
            # 处理富文本消息
            return process_post_message(event_data, sender_id)

        else:
            logger.warning(f"未处理的消息类型: {msg_type}")
            # 确定是否需要回复
            if chat_type == "group":
                # 检查是否@机器人
                mentions = message.get("mentions", [])
                if is_bot_mentioned(mentions):
                    send_message(chat_id=message.get("chat_id"), content="抱歉,暂不支持处理此类型的消息")
            else:
                send_message(open_id=sender_id, content="抱歉,暂不支持处理此类型的消息")
            return False

    return False

# 处理v1.0版本的消息事件
def handle_v1_event(event_data):
    """处理v1.0版本的事件"""
    if event_data.get("type") == "event_callback":
        event = event_data.get("event", {})
        event_type = event.get("type")

        # 事件去重检查
        event_id = event_data.get("uuid")
        if event_id in processed_events:
            logger.info(f"跳过重复事件: {event_id}")
            return True

        # 添加到已处理事件列表
        processed_events.append(event_id)

        # 处理消息事件
        if event_type == "im.message.receive_v1" or event_type == "message":
            sender_id = event.get("sender", {}).get("sender_id", {}).get("open_id")
            message = event.get("message", {})
            msg_type = message.get("message_type")
            chat_type = message.get("chat_type")

            logger.info(f"收到v1.0消息: 类型={msg_type}, 发送者={sender_id}, 聊天类型={chat_type}")

            if msg_type == "text":
                # 处理文本消息
                return process_text_message(event_data, sender_id)

            elif msg_type == "image":
                # 处理图片消息
                return process_image_message(event_data, sender_id)

            elif msg_type == "post":
                # 处理富文本消息
                return process_post_message(event_data, sender_id)

            else:
                logger.warning(f"未处理的消息类型: {msg_type}")
                # 确定是否需要回复
                if chat_type == "group":
                    # 检查是否@机器人
                    mentions = message.get("mentions", [])
                    if is_bot_mentioned(mentions):
                        send_message(chat_id=message.get("chat_id"), content="抱歉,暂不支持处理此类型的消息")
                else:
                    send_message(open_id=sender_id, content="抱歉,暂不支持处理此类型的消息")
                return False

    return False


# 事件接收接口
@app.post('/webhook/event')
def event_handler():
    """处理飞书事件"""
    try:
        # 获取请求体
        body = request.body.read().decode('utf-8')
        logger.info(f"收到请求: {body}")

        # 解析JSON
        event_data = json.loads(body)

        # URL验证处理
        if event_data.get("type") == "url_verification":
            challenge = event_data.get("challenge")
            token = event_data.get("token")

            logger.info(f"处理URL验证请求: challenge={challenge}, token={token}")

            # 验证token
            if token != VERIFICATION_TOKEN:
                logger.warning(f"Token验证失败: {token}")
                return HTTPResponse(
                    status=401,
                    body=json.dumps({"error": "invalid token"}),
                    headers={'Content-Type': 'application/json'}
                )

            # 返回验证响应 - 严格按照文档格式
            return HTTPResponse(
                status=200,
                body=json.dumps({"challenge": challenge}),
                headers={'Content-Type': 'application/json'}
            )

        # 验证Token (对于非验证请求)
        if "header" in event_data:  # v2.0 事件
            token = event_data.get("header", {}).get("token")
        else:  # v1.0 事件
            token = event_data.get("token")

        if token != VERIFICATION_TOKEN:
            logger.warning(f"Token验证失败: {token}")
            return HTTPResponse(
                status=401,
                body=json.dumps({"error": "invalid token"}),
                headers={'Content-Type': 'application/json'}
            )

        # 加锁处理事件,防止并发处理相同事件
        with processing_lock:
            # 处理不同版本的事件
            if "schema" in event_data and event_data.get("schema") == "2.0":
                # 处理 v2.0 事件
                handle_v2_event(event_data)
            else:
                # 处理 v1.0 事件
                handle_v1_event(event_data)

        # 立即返回成功响应,后续处理异步进行
        return HTTPResponse(
            status=200,
            body=json.dumps({"code": 0, "msg": "success"}),
            headers={'Content-Type': 'application/json'}
        )

    except Exception as e:
        logger.error(f"处理事件出错: {str(e)}")
        logger.error(traceback.format_exc())
        return HTTPResponse(
            status=500,
            body=json.dumps({"error": str(e)}),
            headers={'Content-Type': 'application/json'}
        )


# 主入口
if __name__ == '__main__':
    logger.info("飞书机器人服务启动 - 支持Agent+助手两阶段处理,已加入上下文超时机制")
    logger.info(f"配置信息: APP_ID={APP_ID}, DIFY_API_URL={DIFY_API_URL}, DIFY_AGENT_API_URL={DIFY_AGENT_API_URL}")
    logger.info(f"超时设置: DIFY_TIMEOUT={DIFY_TIMEOUT}秒, AGENT_STATUS_UPDATE_SECONDS={AGENT_STATUS_UPDATE_SECONDS}秒")
    logger.info(
        f"上下文超时设置: CONTEXT_EXPIRY_SECONDS={CONTEXT_EXPIRY_SECONDS}秒 (约{CONTEXT_EXPIRY_SECONDS / 60:.1f}分钟)")

    # 启动上下文清理线程
    cleanup_thread = threading.Thread(target=cleanup_expired_conversations)
    cleanup_thread.daemon = True
    cleanup_thread.start()
    logger.info("上下文清理线程已启动")

    # 启动服务
    app.run(host='0.0.0.0', port=8080, debug=False)