Case模块运行机制详解

一、整体架构

case模块采用了责任链模式策略模式的组合设计,通过多个节点串联处理MCP会话请求。整体架构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
┌─────────────────────────────────────────────────────────────┐
│ Case Module │
│ │
│ ┌──────────────┐ ┌──────────────────────────────┐ │
│ │ Service │─────▶│ Session Factory │ │
│ │ │ │ │ │
│ │ McpMessage │ │ DefaultMcpSessionFactory │ │
│ │ Service │ │ │ │
│ └──────────────┘ └──────────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────┐ │
│ │ Abstract Support │ │
│ │ │ │
│ │ AbstractMcpSessionSupport│ │
│ └──────────────┬───────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────┐ │
│ │ Responsibility Chain Nodes │ │
│ │ │ │
│ │ RootNode → VerifyNode → SessionNode → EndNode │
│ │ ↓ ↓ ↓ ↓ │
│ │ 路由 鉴权 创建会话 返回响应 │
│ └─────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

二、核心组件解析

1. 服务层

McpMessageService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Service
public class McpMessageService implements IMcpSessionService {

@Resource
private DefaultMcpSessionFactory defaultMcpSessionFactory;

@Override
public Flux<ServerSentEvent<String>> createMcpSession(String gatewayId) throws Exception {
// 获取策略处理器
StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> strategyHandler =
defaultMcpSessionFactory.strategyHandler();

// 执行处理链
return strategyHandler.apply(gatewayId, new DynamicContext());
}
}

职责

  • 作为服务入口,接收客户端请求
  • 通过工厂获取策略处理器
  • 启动责任链处理流程

2. 工厂层

DefaultMcpSessionFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Service
public class DefaultMcpSessionFactory {

@Resource
private RootNode rootNode;

public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> strategyHandler() {
return rootNode; // 返回责任链的起点
}

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class DynamicContext {
private SessionConfigVO sessionConfigVO; // 动态上下文,用于节点间传递数据
}
}

职责

  • 创建策略处理器(责任链起点)
  • 定义动态上下文,用于节点间数据传递
  • 管理责任链的初始化

3. 抽象支持层

AbstractMcpSessionSupport

1
2
3
4
5
6
7
8
9
10
11
12
public abstract class AbstractMcpSessionSupport 
extends AbstractMultiThreadStrategyRouter<String, DynamicContext, Flux<ServerSentEvent<String>>> {

@Resource
protected ISessionManagementService sessionManagementService; // 注入领域服务

@Override
protected void multiThread(String requestParameter, DynamicContext dynamicContext)
throws ExecutionException, InterruptedException, TimeoutException {
// 多线程处理逻辑(可扩展)
}
}

职责

  • 继承自框架的抽象路由器,提供责任链的基础能力
  • 注入领域服务,供子类使用
  • 定义多线程处理扩展点

关键特性

  • 继承 AbstractMultiThreadStrategyRouter,支持多线程策略路由
  • 提供统一的依赖注入管理
  • 定义了节点的基本行为模式

三、责任链节点详解

节点执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Client Request

┌─────────────┐
│ RootNode │ ──── 日志记录、参数校验
└──────┬──────┘

┌─────────────┐
│ VerifyNode │ ──── 身份验证、权限检查
└──────┬──────┘

┌─────────────┐
│ SessionNode │ ──── 创建会话、初始化资源
└──────┬──────┘

┌─────────────┐
│ EndNode │ ──── 返回响应、资源清理
└─────────────┘

1. RootNode(根节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
@Service
public class RootNode extends AbstractMcpSessionSupport {

@Resource
private VerifyNode verifyNode; // 下一个节点

@Override
protected Flux<ServerSentEvent<String>> doApply(String requestParameter, DynamicContext dynamicContext) throws Exception {
try {
log.info("创建会话 mcp session RootNode:{}", requestParameter);

// 执行当前节点的业务逻辑
// ...

// 路由到下一个节点
return router(requestParameter, dynamicContext);
} catch (Exception e) {
log.error("创建会话 mcp session RootNode 异常:{}", requestParameter, e);
throw e;
}
}

@Override
public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> get(String requestParameter, DynamicContext dynamicContext) throws Exception {
return verifyNode; // 返回下一个节点
}
}

职责

  • 作为责任链的入口节点
  • 记录请求日志
  • 执行初步的参数校验
  • 路由到下一个节点

设计模式

  • 模板方法模式doApply 定义了节点的处理流程
  • 策略模式get 方法返回下一个节点,实现动态路由

2. VerifyNode(鉴权节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
public class VerifyNode extends AbstractMcpSessionSupport {

@Resource
private SessionNode sessionNode; // 下一个节点

@Override
protected Flux<ServerSentEvent<String>> doApply(String requestParameter, DynamicContext dynamicContext) throws Exception {
// 鉴权逻辑(待实现)
// TODO: 验证 gatewayId 的合法性
// TODO: 验证 API Key
// TODO: 检查权限

// 路由到下一个节点
return router(requestParameter, dynamicContext);
}

@Override
public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> get(String requestParameter, DynamicContext dynamicContext) throws Exception {
return sessionNode; // 返回下一个节点
}
}

职责

  • 验证请求参数的合法性
  • 验证用户身份
  • 检查访问权限
  • 记录鉴权日志

扩展点

  • 可添加 API Key 验证
  • 可添加 IP 白名单检查
  • 可添加请求频率限制

3. SessionNode(会话节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Slf4j
@Service
public class SessionNode extends AbstractMcpSessionSupport {

@Resource
private EndNode endNode; // 下一个节点

@Override
protected Flux<ServerSentEvent<String>> doApply(String requestParameter, DynamicContext dynamicContext) throws Exception {
log.info("创建会话-SessionNode:{}", requestParameter);

// 调用领域服务创建会话
SessionConfigVO sessionConfigVO = sessionManagementService.createSession(requestParameter);

// 将会话配置写入上下文,供后续节点使用
dynamicContext.setSessionConfigVO(sessionConfigVO);

// 路由到下一个节点
return router(requestParameter, dynamicContext);
}

@Override
public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> get(String requestParameter, DynamicContext dynamicContext) throws Exception {
return endNode; // 返回下一个节点
}
}

职责

  • 调用领域服务创建会话
  • 生成唯一的会话ID
  • 创建SSE的Sink
  • 将会话配置写入上下文

关键操作

1
2
3
4
5
6
7
8
// 1. 创建会话
SessionConfigVO sessionConfigVO = sessionManagementService.createSession(requestParameter);

// 2. 写入上下文
dynamicContext.setSessionConfigVO(sessionConfigVO);

// 3. 传递给下一个节点
return router(requestParameter, dynamicContext);

4. EndNode(结束节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Slf4j
@Service
public class EndNode extends AbstractMcpSessionSupport {

@Override
protected Flux<ServerSentEvent<String>> doApply(String requestParameter, DynamicContext dynamicContext) throws Exception {
log.info("创建会话-EndNode:{}", requestParameter);

// 从上下文中获取会话配置
SessionConfigVO sessionConfigVO = dynamicContext.getSessionConfigVO();
String sessionId = sessionConfigVO.getSessionId();

Sinks.Many<ServerSentEvent<String>> sink = sessionConfigVO.getSink();

// 构建响应流
return sink.asFlux()
.mergeWith(
// 心跳机制 - 防止连接超时
Flux.interval(Duration.ofSeconds(60))
.map(i -> ServerSentEvent.<String>builder()
.event("ping")
.data("ping")
.build())
)
// 连接取消时的清理逻辑
.doOnCancel(() -> {
log.info("SSE连接取消,会话ID: {}", sessionId);
sessionManagementService.removeSession(sessionId);
})
// 连接终止时的清理逻辑
.doOnTerminate(() -> {
log.info("SSE连接终止,会话ID: {}", sessionId);
sessionManagementService.removeSession(sessionId);
});
}

@Override
public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> get(String requestParameter, DynamicContext dynamicContext) throws Exception {
return defaultStrategyHandler; // 返回默认处理器(责任链结束)
}
}

职责

  • 构建最终的SSE响应流
  • 添加心跳机制,保持连接活跃
  • 处理连接取消和终止事件
  • 清理会话资源

关键特性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 1. SSE流式响应
return sink.asFlux()

// 2. 心跳机制
.mergeWith(
Flux.interval(Duration.ofSeconds(60))
.map(i -> ServerSentEvent.<String>builder()
.event("ping")
.data("ping")
.build())
)

// 3. 连接生命周期管理
.doOnCancel(() -> { /* 清理逻辑 */ })
.doOnTerminate(() -> { /* 清理逻辑 */ })

四、运行流程详解

完整执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1. 客户端发起请求

2. McpGatewayController 接收请求

3. 调用 McpMessageService.createMcpSession(gatewayId)

4. McpMessageService 通过 DefaultMcpSessionFactory 获取策略处理器

5. DefaultMcpSessionFactory 返回 RootNode(责任链起点)

6. 执行 RootNode.doApply()
- 记录日志
- 调用 router() 方法
- router() 调用 get() 获取下一个节点
- router() 调用下一个节点的 apply() 方法

7. 执行 VerifyNode.doApply()
- 执行鉴权逻辑
- 调用 router() 方法
- 路由到 SessionNode

8. 执行 SessionNode.doApply()
- 调用 sessionManagementService.createSession(gatewayId)
- 创建会话配置
- 写入上下文
- 路由到 EndNode

9. 执行 EndNode.doApply()
- 从上下文获取会话配置
- 构建SSE响应流
- 添加心跳机制
- 添加连接生命周期管理
- 返回 Flux<ServerSentEvent<String>>

10. 响应返回给客户端

时序图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Client          Controller      McpMessageService    Factory      RootNode    VerifyNode    SessionNode    EndNode      Domain
│ │ │ │ │ │ │ │ │
│──── Request ────▶│ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │ │
│ │── createSession ──▶│ │ │ │ │ │ │
│ │ │ │ │ │ │ │ │
│ │ │─ strategyHandler() ────────▶│ │ │ │ │
│ │ │ │ │ │ │ │ │
│ │ │◀───── return rootNode ─────│ │ │ │ │
│ │ │ │ │ │ │ │ │
│ │ │──── apply() ────────────────────────────▶│ │ │ │
│ │ │ │ │ │ │ │ │
│ │ │ │ │── doApply() │ │ │ │
│ │ │ │ │ │ │ │ │
│ │ │ │ │── router() ────────────────────────────────────────────▶│
│ │ │ │ │ │ │ │ │
│ │ │ │ │ │◀───── get() ──│ │ │
│ │ │ │ │ │ │ │ │
│ │ │ │ │ │── doApply() ──│ │ │
│ │ │ │ │ │ │ │ │
│ │ │ │ │ │── router() ─────────────────────────────────▶│
│ │ │ │ │ │ │ │ │
│ │ │ │ │ │ │◀─ get() ───│ │
│ │ │ │ │ │ │ │ │
│ │ │ │ │ │ │─ doApply() ─────────────────────▶│
│ │ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │◀─ createSession ─│
│ │ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │── router() ─────▶│
│ │ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │◀─ get() ──── │
│ │ │ │ │ │ │ │ │
│ │ │ │ │ │ │ │─ doApply() ─────▶│
│ │ │ │ │ │ │ │ │
│ │ │◀─────────────────────────────────────────────────────────────── return Flux ────│
│ │ │ │ │ │ │ │ │
│◀─────────────────────────────────────────────────────────────────────────────── SSE Stream ──────────────────────────────│

五、关键技术点

1. 责任链模式

实现方式

  • 每个节点继承 AbstractMcpSessionSupport
  • 实现 doApply() 方法定义当前节点的业务逻辑
  • 实现 get() 方法返回下一个节点
  • 通过 router() 方法串联节点

优势

  • 解耦:每个节点职责单一,互不影响
  • 灵活:可动态调整节点顺序或添加新节点
  • 可测试:每个节点可独立测试

2. 策略模式

实现方式

  • get() 方法返回下一个节点的策略处理器
  • 支持根据参数动态选择下一个节点

扩展能力

1
2
3
4
5
6
7
8
9
@Override
public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> get(String requestParameter, DynamicContext dynamicContext) {
// 根据条件动态选择下一个节点
if (needVerification(requestParameter)) {
return verifyNode;
} else {
return sessionNode; // 跳过鉴权节点
}
}

3. 上下文传递

DynamicContext 的作用

  • 在节点间传递数据
  • 避免全局变量
  • 保证线程安全(每个请求独立的上下文)

使用示例

1
2
3
4
5
6
// SessionNode 写入上下文
SessionConfigVO sessionConfigVO = sessionManagementService.createSession(requestParameter);
dynamicContext.setSessionConfigVO(sessionConfigVO);

// EndNode 读取上下文
SessionConfigVO sessionConfigVO = dynamicContext.getSessionConfigVO();

4. 响应式编程

Flux 和 SSE 的结合

1
2
3
4
5
6
7
8
9
10
return sink.asFlux()  // 将 Sink 转换为 Flux
.mergeWith( // 合并心跳流
Flux.interval(Duration.ofSeconds(60))
.map(i -> ServerSentEvent.<String>builder()
.event("ping")
.data("ping")
.build())
)
.doOnCancel(() -> { /* 清理逻辑 */ }) // 连接取消时执行
.doOnTerminate(() -> { /* 清理逻辑 */ }); // 连接终止时执行

关键概念

  • Flux:响应式流,支持背压
  • Sinks:响应式发射器,支持多播
  • ServerSentEvent:SSE事件对象
  • mergeWith:合并多个流
  • doOnCancel/doOnTerminate:生命周期钩子

六、设计优势

1. 单一职责原则

每个节点只负责一个特定的功能:

  • RootNode:日志记录和路由
  • VerifyNode:身份验证
  • SessionNode:会话创建
  • EndNode:响应构建和资源清理

2. 开闭原则

对扩展开放,对修改关闭:

  • 新增功能只需添加新节点
  • 无需修改现有节点的代码
  • 支持动态调整节点顺序

3. 依赖倒置原则

高层模块不依赖低层模块:

  • 节点依赖抽象类 AbstractMcpSessionSupport
  • 通过接口 ISessionManagementService 调用领域服务
  • 通过依赖注入获取下一个节点

4. 接口隔离原则

接口设计精简:

  • IMcpSessionService 只定义必要的方法
  • 每个节点只实现需要的方法

七、扩展建议

1. 添加新节点

例如,添加限流节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Service
public class RateLimitNode extends AbstractMcpSessionSupport {

@Resource
private VerifyNode verifyNode;

@Override
protected Flux<ServerSentEvent<String>> doApply(String requestParameter, DynamicContext dynamicContext) throws Exception {
// 检查请求频率
if (isRateLimited(requestParameter)) {
throw new AppException("请求频率过高,请稍后再试");
}

return router(requestParameter, dynamicContext);
}

@Override
public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> get(String requestParameter, DynamicContext dynamicContext) {
return verifyNode;
}
}

2. 条件路由

根据不同条件选择不同的处理链:

1
2
3
4
5
6
7
8
@Override
public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> get(String requestParameter, DynamicContext dynamicContext) {
if (isPremiumUser(requestParameter)) {
return premiumSessionNode; // 高级用户走快速通道
} else {
return verifyNode; // 普通用户需要鉴权
}
}

3. 异步处理

利用 multiThread 方法实现异步处理:

1
2
3
4
5
6
7
8
@Override
protected void multiThread(String requestParameter, DynamicContext dynamicContext)
throws ExecutionException, InterruptedException, TimeoutException {
// 异步执行耗时操作
CompletableFuture.runAsync(() -> {
// 执行异步任务
});
}

八、总结

case模块通过责任链模式策略模式的组合,实现了一个灵活、可扩展、高性能的会话处理框架。其核心设计思想包括:

  1. 分层设计:服务层、工厂层、节点层各司其职
  2. 责任链模式:节点串联,职责单一
  3. 策略模式:动态路由,灵活扩展
  4. 上下文传递:节点间数据传递,线程安全
  5. 响应式编程:支持SSE实时通信,背压控制

这种设计不仅提高了代码的可维护性和可测试性,还为未来的功能扩展提供了良好的基础。通过添加新的节点或调整节点顺序,可以轻松应对不同的业务场景需求。