3.2分支会话管理功能详细解析
一、改动背景与目标
3.2分支(3-2-session-management)的核心目标是为AI微服务网关添加会话管理功能,以支持长连接、实时交互场景,如AI聊天、持续对话等。在3.1分支(3-1-init-project)中,项目仅完成了基础架构搭建,缺乏会话状态管理能力,无法支持需要保持上下文的AI服务交互。
二、具体改动内容
1. 新增目录结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ai-mcp-gateway-domain/ └── src/main/java/cn/bugstack/ai/domain/session/ ├── adapter/ │ ├── port/ │ └── repository/ ├── model/ │ ├── aggregate/ │ ├── entity/ │ └── valobj/ │ └── SessionConfigVO.java └── service/ ├── ISessionManagementService.java └── impl/ └── SessionManagementService.java
|
2. 核心类详解
2.1 会话管理服务接口(ISessionManagementService)
1 2 3 4 5 6 7 8 9 10 11 12
| public interface ISessionManagementService { SessionConfigVO createSession(String gatewayId); void removeSession(String sessionId); SessionConfigVO getSession(String sessionId); void cleanupExpiredSessions(); void shutdown(); }
|
设计意图:定义会话管理的核心操作,为实现类提供规范,符合DDD中服务接口的设计原则。
2.2 会话配置值对象(SessionConfigVO)
会话配置值对象包含会话的核心信息,如会话ID、SSE的Sink、活跃状态、最后访问时间等。
设计意图:使用值对象(Value Object)表示会话配置,符合DDD中值对象的设计原则,确保会话信息的不可变性和完整性。
2.3 会话管理服务实现(SessionManagementService)
这是核心实现类,包含以下关键功能:
2.3.1 初始化与资源管理
1 2 3 4 5 6 7 8 9 10 11
| private static final long SESSION_TIMEOUT_MINUTES = 30;
private final ScheduledExecutorService cleanupScheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<String, SessionConfigVO> activeSessions = new ConcurrentHashMap<>();
public SessionManagementService() { cleanupScheduler.scheduleAtFixedRate(this::cleanupExpiredSessions, 5, 5, TimeUnit.MINUTES); log.info("会话管理服务已启动,会话超时时间: {} 分钟", SESSION_TIMEOUT_MINUTES); }
|
技术点:
- 使用
ConcurrentHashMap存储活跃会话,确保线程安全
- 初始化时启动定时清理任务,每5分钟执行一次
2.3.2 会话创建(createSession)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Override public SessionConfigVO createSession(String gatewayId) { log.info("创建会话 gatewayId:{}", gatewayId);
String sessionId = UUID.randomUUID().toString();
Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().multicast().onBackpressureBuffer();
String messageEndpoint = "/" + gatewayId + "/mcp/message?sessionId=" + sessionId; sink.tryEmitNext(ServerSentEvent.<String>builder() .event("endpoint") .data(messageEndpoint) .build());
SessionConfigVO sessionConfigVO = new SessionConfigVO(sessionId, sink);
activeSessions.put(sessionId, sessionConfigVO);
log.info("创建会话 gatewayId:{} sessionId:{},当前活跃会话数:{}", gatewayId, sessionId, activeSessions.size());
return sessionConfigVO; }
|
技术点:
- 生成唯一会话ID(UUID)
- 创建SSE的Sink,支持服务器向客户端推送消息
- 发送消息端点信息,告知客户端后续消息的请求地址
- 将会话配置存储到活跃会话集合
2.3.3 会话删除(removeSession)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Override public void removeSession(String sessionId) { log.info("删除会话配置 sessionId:{}", sessionId); SessionConfigVO sessionConfigVO = activeSessions.remove(sessionId); if (null == sessionConfigVO) return;
sessionConfigVO.markInactive();
try { sessionConfigVO.getSink().tryEmitComplete(); } catch (Exception e) { log.warn("关闭会话Sink时出错:{}", e.getMessage()); }
log.info("移除会话:{},剩余活跃会话数:{}", sessionId, activeSessions.size()); }
|
技术点:
- 从活跃会话集合中移除会话
- 标记会话为非活跃状态
- 尝试完成SSE的Sink,关闭连接
- 异常处理,确保即使关闭Sink失败也能继续执行
2.3.4 会话获取(getSession)
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Override public SessionConfigVO getSession(String sessionId) { if (null == sessionId || sessionId.isEmpty()) { return null; }
SessionConfigVO sessionConfigVO = activeSessions.get(sessionId); if (null != sessionConfigVO && sessionConfigVO.isActive()) { sessionConfigVO.updateLastAccessed(); return sessionConfigVO; }
return null; }
|
技术点:
- 空值检查,避免空指针异常
- 验证会话是否存在且活跃
- 更新会话的最后访问时间,延长会话生命周期
- 确保返回有效的会话配置
2.3.5 过期会话清理(cleanupExpiredSessions)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public void cleanupExpiredSessions() { int cleanedCount = 0;
for (Map.Entry<String, SessionConfigVO> entry : activeSessions.entrySet()) { SessionConfigVO sessionConfigVO = entry.getValue();
if (!sessionConfigVO.isActive() || sessionConfigVO.isExpired(SESSION_TIMEOUT_MINUTES)) { removeSession(sessionConfigVO.getSessionId()); cleanedCount++; }
}
if (cleanedCount > 0) { log.info("清理了 {} 个过期会话,剩余活跃会话数: {}", cleanedCount, activeSessions.size()); } }
|
技术点:
- 遍历所有活跃会话
- 检查会话是否非活跃或已过期
- 清理符合条件的会话
- 记录清理日志,便于监控
2.3.6 服务关闭(shutdown)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Override public void shutdown() { log.info("关闭会话管理服务...");
for (String sessionId : activeSessions.keySet()) { removeSession(sessionId); }
cleanupScheduler.shutdown();
try { if (!cleanupScheduler.awaitTermination(5, TimeUnit.SECONDS)) { cleanupScheduler.shutdown(); } } catch (InterruptedException e) { cleanupScheduler.shutdown(); Thread.currentThread().interrupt(); }
log.info("关闭会话管理服务完成"); }
|
技术点:
- 清理所有活跃会话
- 关闭定时清理调度器
- 优雅停机,等待正在执行的任务完成
- 异常处理,确保服务能够正确关闭
三、技术选型与设计考量
1. 线程安全设计
- ConcurrentHashMap:用于存储活跃会话,确保多线程环境下的线程安全
- 单线程调度器:定时清理任务使用单线程,避免并发冲突
2. 响应式编程支持
- Spring WebFlux SSE:使用
Sinks.Many实现服务器向客户端的实时消息推送
- Backpressure处理:使用
onBackpressureBuffer策略,处理消息积压情况
3. 领域驱动设计(DDD)实践
- 服务接口与实现分离:
ISessionManagementService接口定义核心操作,SessionManagementService提供实现
- 值对象:
SessionConfigVO作为值对象,封装会话配置信息
- 分层架构:清晰的目录结构,符合DDD的分层设计原则
4. 资源管理
- 定时清理:定期清理过期会话,避免资源泄漏
- 优雅关闭:服务关闭时清理所有资源,确保系统稳定
四、业务价值与技术价值
1. 业务价值
- 支持长连接场景:为AI聊天、持续对话等场景提供会话保持能力
- 实时消息推送:通过SSE实现服务器向客户端的实时消息推送
- 会话状态管理:维护会话的生命周期,确保服务的连续性和一致性
2. 技术价值
- 线程安全:使用并发工具类确保多线程环境下的安全
- 资源优化:自动清理过期会话,提高系统资源利用率
- 可扩展性:模块化设计,便于后续功能扩展
- 可维护性:清晰的代码结构和设计模式,便于维护
五、代码优化建议
- 配置外部化:将会话超时时间、清理间隔等配置抽取到配置文件中,提高可配置性
- 异常处理增强:在关键操作中添加更详细的异常处理和日志记录
- 监控指标:添加会话数量、清理频率等监控指标,便于运维监控
- 单元测试:为核心功能编写单元测试,确保代码质量
- 会话持久化:考虑将会话信息持久化到数据库或Redis,提高系统可靠性
六、总结
3.2分支通过新增会话管理功能,为AI微服务网关提供了关键的会话状态管理能力。实现上采用了现代化的技术栈和设计模式,包括线程安全的并发处理、响应式编程的SSE支持、领域驱动设计的分层架构等。这些改动不仅满足了业务需求,也体现了良好的代码质量和架构设计,为项目的后续发展奠定了坚实的基础。
通过会话管理功能,AI微服务网关能够更好地支持长连接、实时交互场景,提升用户体验,同时也为后续的功能扩展(如多模型支持、消息历史管理等)做好了准备。