3.2分支会话管理功能详细解析

一、改动背景与目标

3.2分支(3-2-session-management)的核心目标是为AI微服务网关添加会话管理功能,以支持长连接、实时交互场景,如AI聊天、持续对话等。在3.1分支(3-1-init-project)中,项目仅完成了基础架构搭建,缺乏会话状态管理能力,无法支持需要保持上下文的AI服务交互。

二、具体改动内容

1. 新增目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ai-mcp-gateway-domain/
└── src/main/java/cn/bugstack/ai/domain/session/
├── adapter/
│ ├── port/
│ └── repository/
├── model/
│ ├── aggregate/
│ ├── entity/
│ └── valobj/
│ └── SessionConfigVO.java
└── service/
├── ISessionManagementService.java
└── impl/
└── SessionManagementService.java

2. 核心类详解

2.1 会话管理服务接口(ISessionManagementService)

1
2
3
4
5
6
7
8
9
10
11
12
public interface ISessionManagementService {
// 创建会话
SessionConfigVO createSession(String gatewayId);
// 删除会话
void removeSession(String sessionId);
// 获取会话
SessionConfigVO getSession(String sessionId);
// 清理过期会话
void cleanupExpiredSessions();
// 关闭服务时清理资源
void shutdown();
}

设计意图:定义会话管理的核心操作,为实现类提供规范,符合DDD中服务接口的设计原则。

2.2 会话配置值对象(SessionConfigVO)

会话配置值对象包含会话的核心信息,如会话ID、SSE的Sink、活跃状态、最后访问时间等。

设计意图:使用值对象(Value Object)表示会话配置,符合DDD中值对象的设计原则,确保会话信息的不可变性和完整性。

2.3 会话管理服务实现(SessionManagementService)

这是核心实现类,包含以下关键功能:

2.3.1 初始化与资源管理
1
2
3
4
5
6
7
8
9
10
11
// 会话超时时间(分钟)
private static final long SESSION_TIMEOUT_MINUTES = 30;
// 定时任务调度
private final ScheduledExecutorService cleanupScheduler = Executors.newSingleThreadScheduledExecutor();
// 活跃会话存储器,key->sessionId,ConcurrentHashMap 确保线程安全
private final Map<String, SessionConfigVO> activeSessions = new ConcurrentHashMap<>();

public SessionManagementService() {
cleanupScheduler.scheduleAtFixedRate(this::cleanupExpiredSessions, 5, 5, TimeUnit.MINUTES);
log.info("会话管理服务已启动,会话超时时间: {} 分钟", SESSION_TIMEOUT_MINUTES);
}

技术点

  • 使用ConcurrentHashMap存储活跃会话,确保线程安全
  • 初始化时启动定时清理任务,每5分钟执行一次
2.3.2 会话创建(createSession)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public SessionConfigVO createSession(String gatewayId) {
log.info("创建会话 gatewayId:{}", gatewayId);

String sessionId = UUID.randomUUID().toString();

Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().multicast().onBackpressureBuffer();

// 发送端点消息 - 告知客户端消息请求地址
String messageEndpoint = "/" + gatewayId + "/mcp/message?sessionId=" + sessionId;
sink.tryEmitNext(ServerSentEvent.<String>builder()
.event("endpoint")
.data(messageEndpoint)
.build());

SessionConfigVO sessionConfigVO = new SessionConfigVO(sessionId, sink);

activeSessions.put(sessionId, sessionConfigVO);

log.info("创建会话 gatewayId:{} sessionId:{},当前活跃会话数:{}", gatewayId, sessionId, activeSessions.size());

return sessionConfigVO;
}

技术点

  • 生成唯一会话ID(UUID)
  • 创建SSE的Sink,支持服务器向客户端推送消息
  • 发送消息端点信息,告知客户端后续消息的请求地址
  • 将会话配置存储到活跃会话集合
2.3.3 会话删除(removeSession)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void removeSession(String sessionId) {
log.info("删除会话配置 sessionId:{}", sessionId);
SessionConfigVO sessionConfigVO = activeSessions.remove(sessionId);
if (null == sessionConfigVO) return;

sessionConfigVO.markInactive();

try {
sessionConfigVO.getSink().tryEmitComplete();
} catch (Exception e) {
log.warn("关闭会话Sink时出错:{}", e.getMessage());
}

log.info("移除会话:{},剩余活跃会话数:{}", sessionId, activeSessions.size());
}

技术点

  • 从活跃会话集合中移除会话
  • 标记会话为非活跃状态
  • 尝试完成SSE的Sink,关闭连接
  • 异常处理,确保即使关闭Sink失败也能继续执行
2.3.4 会话获取(getSession)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public SessionConfigVO getSession(String sessionId) {
if (null == sessionId || sessionId.isEmpty()) {
return null;
}

SessionConfigVO sessionConfigVO = activeSessions.get(sessionId);
if (null != sessionConfigVO && sessionConfigVO.isActive()) {
sessionConfigVO.updateLastAccessed();
return sessionConfigVO;
}

return null;
}

技术点

  • 空值检查,避免空指针异常
  • 验证会话是否存在且活跃
  • 更新会话的最后访问时间,延长会话生命周期
  • 确保返回有效的会话配置
2.3.5 过期会话清理(cleanupExpiredSessions)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void cleanupExpiredSessions() {
int cleanedCount = 0;

for (Map.Entry<String, SessionConfigVO> entry : activeSessions.entrySet()) {
SessionConfigVO sessionConfigVO = entry.getValue();

if (!sessionConfigVO.isActive() || sessionConfigVO.isExpired(SESSION_TIMEOUT_MINUTES)) {
removeSession(sessionConfigVO.getSessionId());
cleanedCount++;
}

}

// 记录清理日志
if (cleanedCount > 0) {
log.info("清理了 {} 个过期会话,剩余活跃会话数: {}", cleanedCount, activeSessions.size());
}
}

技术点

  • 遍历所有活跃会话
  • 检查会话是否非活跃或已过期
  • 清理符合条件的会话
  • 记录清理日志,便于监控
2.3.6 服务关闭(shutdown)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public void shutdown() {
log.info("关闭会话管理服务...");

for (String sessionId : activeSessions.keySet()) {
removeSession(sessionId);
}

// 关闭清理调度器

cleanupScheduler.shutdown();

try {
// 等待5秒让正在执行的任务完成
if (!cleanupScheduler.awaitTermination(5, TimeUnit.SECONDS)) {
// 超时强制关闭
cleanupScheduler.shutdown();
}
} catch (InterruptedException e) {
// 异常强制关闭
cleanupScheduler.shutdown();
Thread.currentThread().interrupt();
}

log.info("关闭会话管理服务完成");
}

技术点

  • 清理所有活跃会话
  • 关闭定时清理调度器
  • 优雅停机,等待正在执行的任务完成
  • 异常处理,确保服务能够正确关闭

三、技术选型与设计考量

1. 线程安全设计

  • ConcurrentHashMap:用于存储活跃会话,确保多线程环境下的线程安全
  • 单线程调度器:定时清理任务使用单线程,避免并发冲突

2. 响应式编程支持

  • Spring WebFlux SSE:使用Sinks.Many实现服务器向客户端的实时消息推送
  • Backpressure处理:使用onBackpressureBuffer策略,处理消息积压情况

3. 领域驱动设计(DDD)实践

  • 服务接口与实现分离ISessionManagementService接口定义核心操作,SessionManagementService提供实现
  • 值对象SessionConfigVO作为值对象,封装会话配置信息
  • 分层架构:清晰的目录结构,符合DDD的分层设计原则

4. 资源管理

  • 定时清理:定期清理过期会话,避免资源泄漏
  • 优雅关闭:服务关闭时清理所有资源,确保系统稳定

四、业务价值与技术价值

1. 业务价值

  • 支持长连接场景:为AI聊天、持续对话等场景提供会话保持能力
  • 实时消息推送:通过SSE实现服务器向客户端的实时消息推送
  • 会话状态管理:维护会话的生命周期,确保服务的连续性和一致性

2. 技术价值

  • 线程安全:使用并发工具类确保多线程环境下的安全
  • 资源优化:自动清理过期会话,提高系统资源利用率
  • 可扩展性:模块化设计,便于后续功能扩展
  • 可维护性:清晰的代码结构和设计模式,便于维护

五、代码优化建议

  1. 配置外部化:将会话超时时间、清理间隔等配置抽取到配置文件中,提高可配置性
  2. 异常处理增强:在关键操作中添加更详细的异常处理和日志记录
  3. 监控指标:添加会话数量、清理频率等监控指标,便于运维监控
  4. 单元测试:为核心功能编写单元测试,确保代码质量
  5. 会话持久化:考虑将会话信息持久化到数据库或Redis,提高系统可靠性

六、总结

3.2分支通过新增会话管理功能,为AI微服务网关提供了关键的会话状态管理能力。实现上采用了现代化的技术栈和设计模式,包括线程安全的并发处理、响应式编程的SSE支持、领域驱动设计的分层架构等。这些改动不仅满足了业务需求,也体现了良好的代码质量和架构设计,为项目的后续发展奠定了坚实的基础。

通过会话管理功能,AI微服务网关能够更好地支持长连接、实时交互场景,提升用户体验,同时也为后续的功能扩展(如多模型支持、消息历史管理等)做好了准备。