Case模块运行机制详解
一、整体架构
case模块采用了责任链模式和策略模式的组合设计,通过多个节点串联处理MCP会话请求。整体架构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| ┌─────────────────────────────────────────────────────────────┐ │ Case Module │ │ │ │ ┌──────────────┐ ┌──────────────────────────────┐ │ │ │ Service │─────▶│ Session Factory │ │ │ │ │ │ │ │ │ │ McpMessage │ │ DefaultMcpSessionFactory │ │ │ │ Service │ │ │ │ │ └──────────────┘ └──────────────┬───────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────┐ │ │ │ Abstract Support │ │ │ │ │ │ │ │ AbstractMcpSessionSupport│ │ │ └──────────────┬───────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────────────────┐ │ │ │ Responsibility Chain Nodes │ │ │ │ │ │ │ │ RootNode → VerifyNode → SessionNode → EndNode │ │ │ ↓ ↓ ↓ ↓ │ │ │ 路由 鉴权 创建会话 返回响应 │ │ └─────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘
|
二、核心组件解析
1. 服务层
McpMessageService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Service public class McpMessageService implements IMcpSessionService {
@Resource private DefaultMcpSessionFactory defaultMcpSessionFactory;
@Override public Flux<ServerSentEvent<String>> createMcpSession(String gatewayId) throws Exception { StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> strategyHandler = defaultMcpSessionFactory.strategyHandler(); return strategyHandler.apply(gatewayId, new DynamicContext()); } }
|
职责:
- 作为服务入口,接收客户端请求
- 通过工厂获取策略处理器
- 启动责任链处理流程
2. 工厂层
DefaultMcpSessionFactory
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Service public class DefaultMcpSessionFactory {
@Resource private RootNode rootNode;
public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> strategyHandler() { return rootNode; }
@Data @Builder @AllArgsConstructor @NoArgsConstructor public static class DynamicContext { private SessionConfigVO sessionConfigVO; } }
|
职责:
- 创建策略处理器(责任链起点)
- 定义动态上下文,用于节点间数据传递
- 管理责任链的初始化
3. 抽象支持层
AbstractMcpSessionSupport
1 2 3 4 5 6 7 8 9 10 11 12
| public abstract class AbstractMcpSessionSupport extends AbstractMultiThreadStrategyRouter<String, DynamicContext, Flux<ServerSentEvent<String>>> {
@Resource protected ISessionManagementService sessionManagementService;
@Override protected void multiThread(String requestParameter, DynamicContext dynamicContext) throws ExecutionException, InterruptedException, TimeoutException { } }
|
职责:
- 继承自框架的抽象路由器,提供责任链的基础能力
- 注入领域服务,供子类使用
- 定义多线程处理扩展点
关键特性:
- 继承
AbstractMultiThreadStrategyRouter,支持多线程策略路由
- 提供统一的依赖注入管理
- 定义了节点的基本行为模式
三、责任链节点详解
节点执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Client Request ↓ ┌─────────────┐ │ RootNode │ ──── 日志记录、参数校验 └──────┬──────┘ ↓ ┌─────────────┐ │ VerifyNode │ ──── 身份验证、权限检查 └──────┬──────┘ ↓ ┌─────────────┐ │ SessionNode │ ──── 创建会话、初始化资源 └──────┬──────┘ ↓ ┌─────────────┐ │ EndNode │ ──── 返回响应、资源清理 └─────────────┘
|
1. RootNode(根节点)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Slf4j @Service public class RootNode extends AbstractMcpSessionSupport {
@Resource private VerifyNode verifyNode;
@Override protected Flux<ServerSentEvent<String>> doApply(String requestParameter, DynamicContext dynamicContext) throws Exception { try { log.info("创建会话 mcp session RootNode:{}", requestParameter); return router(requestParameter, dynamicContext); } catch (Exception e) { log.error("创建会话 mcp session RootNode 异常:{}", requestParameter, e); throw e; } }
@Override public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> get(String requestParameter, DynamicContext dynamicContext) throws Exception { return verifyNode; } }
|
职责:
- 作为责任链的入口节点
- 记录请求日志
- 执行初步的参数校验
- 路由到下一个节点
设计模式:
- 模板方法模式:
doApply 定义了节点的处理流程
- 策略模式:
get 方法返回下一个节点,实现动态路由
2. VerifyNode(鉴权节点)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Service public class VerifyNode extends AbstractMcpSessionSupport {
@Resource private SessionNode sessionNode;
@Override protected Flux<ServerSentEvent<String>> doApply(String requestParameter, DynamicContext dynamicContext) throws Exception { return router(requestParameter, dynamicContext); }
@Override public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> get(String requestParameter, DynamicContext dynamicContext) throws Exception { return sessionNode; } }
|
职责:
- 验证请求参数的合法性
- 验证用户身份
- 检查访问权限
- 记录鉴权日志
扩展点:
- 可添加 API Key 验证
- 可添加 IP 白名单检查
- 可添加请求频率限制
3. SessionNode(会话节点)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Slf4j @Service public class SessionNode extends AbstractMcpSessionSupport {
@Resource private EndNode endNode;
@Override protected Flux<ServerSentEvent<String>> doApply(String requestParameter, DynamicContext dynamicContext) throws Exception { log.info("创建会话-SessionNode:{}", requestParameter);
SessionConfigVO sessionConfigVO = sessionManagementService.createSession(requestParameter);
dynamicContext.setSessionConfigVO(sessionConfigVO);
return router(requestParameter, dynamicContext); }
@Override public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> get(String requestParameter, DynamicContext dynamicContext) throws Exception { return endNode; } }
|
职责:
- 调用领域服务创建会话
- 生成唯一的会话ID
- 创建SSE的Sink
- 将会话配置写入上下文
关键操作:
1 2 3 4 5 6 7 8
| SessionConfigVO sessionConfigVO = sessionManagementService.createSession(requestParameter);
dynamicContext.setSessionConfigVO(sessionConfigVO);
return router(requestParameter, dynamicContext);
|
4. EndNode(结束节点)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| @Slf4j @Service public class EndNode extends AbstractMcpSessionSupport {
@Override protected Flux<ServerSentEvent<String>> doApply(String requestParameter, DynamicContext dynamicContext) throws Exception { log.info("创建会话-EndNode:{}", requestParameter);
SessionConfigVO sessionConfigVO = dynamicContext.getSessionConfigVO(); String sessionId = sessionConfigVO.getSessionId();
Sinks.Many<ServerSentEvent<String>> sink = sessionConfigVO.getSink();
return sink.asFlux() .mergeWith( Flux.interval(Duration.ofSeconds(60)) .map(i -> ServerSentEvent.<String>builder() .event("ping") .data("ping") .build()) ) .doOnCancel(() -> { log.info("SSE连接取消,会话ID: {}", sessionId); sessionManagementService.removeSession(sessionId); }) .doOnTerminate(() -> { log.info("SSE连接终止,会话ID: {}", sessionId); sessionManagementService.removeSession(sessionId); }); }
@Override public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> get(String requestParameter, DynamicContext dynamicContext) throws Exception { return defaultStrategyHandler; } }
|
职责:
- 构建最终的SSE响应流
- 添加心跳机制,保持连接活跃
- 处理连接取消和终止事件
- 清理会话资源
关键特性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| return sink.asFlux()
.mergeWith( Flux.interval(Duration.ofSeconds(60)) .map(i -> ServerSentEvent.<String>builder() .event("ping") .data("ping") .build()) )
.doOnCancel(() -> { }) .doOnTerminate(() -> { })
|
四、运行流程详解
完整执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| 1. 客户端发起请求 ↓ 2. McpGatewayController 接收请求 ↓ 3. 调用 McpMessageService.createMcpSession(gatewayId) ↓ 4. McpMessageService 通过 DefaultMcpSessionFactory 获取策略处理器 ↓ 5. DefaultMcpSessionFactory 返回 RootNode(责任链起点) ↓ 6. 执行 RootNode.doApply() - 记录日志 - 调用 router() 方法 - router() 调用 get() 获取下一个节点 - router() 调用下一个节点的 apply() 方法 ↓ 7. 执行 VerifyNode.doApply() - 执行鉴权逻辑 - 调用 router() 方法 - 路由到 SessionNode ↓ 8. 执行 SessionNode.doApply() - 调用 sessionManagementService.createSession(gatewayId) - 创建会话配置 - 写入上下文 - 路由到 EndNode ↓ 9. 执行 EndNode.doApply() - 从上下文获取会话配置 - 构建SSE响应流 - 添加心跳机制 - 添加连接生命周期管理 - 返回 Flux<ServerSentEvent<String>> ↓ 10. 响应返回给客户端
|
时序图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| Client Controller McpMessageService Factory RootNode VerifyNode SessionNode EndNode Domain │ │ │ │ │ │ │ │ │ │──── Request ────▶│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │── createSession ──▶│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │─ strategyHandler() ────────▶│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │◀───── return rootNode ─────│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │──── apply() ────────────────────────────▶│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │── doApply() │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │── router() ────────────────────────────────────────────▶│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │◀───── get() ──│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │── doApply() ──│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │── router() ─────────────────────────────────▶│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │◀─ get() ───│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │─ doApply() ─────────────────────▶│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │◀─ createSession ─│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │── router() ─────▶│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │◀─ get() ──── │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │─ doApply() ─────▶│ │ │ │ │ │ │ │ │ │ │ │ │◀─────────────────────────────────────────────────────────────── return Flux ────│ │ │ │ │ │ │ │ │ │ │◀─────────────────────────────────────────────────────────────────────────────── SSE Stream ──────────────────────────────│
|
五、关键技术点
1. 责任链模式
实现方式:
- 每个节点继承
AbstractMcpSessionSupport
- 实现
doApply() 方法定义当前节点的业务逻辑
- 实现
get() 方法返回下一个节点
- 通过
router() 方法串联节点
优势:
- 解耦:每个节点职责单一,互不影响
- 灵活:可动态调整节点顺序或添加新节点
- 可测试:每个节点可独立测试
2. 策略模式
实现方式:
get() 方法返回下一个节点的策略处理器
- 支持根据参数动态选择下一个节点
扩展能力:
1 2 3 4 5 6 7 8 9
| @Override public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> get(String requestParameter, DynamicContext dynamicContext) { if (needVerification(requestParameter)) { return verifyNode; } else { return sessionNode; } }
|
3. 上下文传递
DynamicContext 的作用:
- 在节点间传递数据
- 避免全局变量
- 保证线程安全(每个请求独立的上下文)
使用示例:
1 2 3 4 5 6
| SessionConfigVO sessionConfigVO = sessionManagementService.createSession(requestParameter); dynamicContext.setSessionConfigVO(sessionConfigVO);
SessionConfigVO sessionConfigVO = dynamicContext.getSessionConfigVO();
|
4. 响应式编程
Flux 和 SSE 的结合:
1 2 3 4 5 6 7 8 9 10
| return sink.asFlux() .mergeWith( Flux.interval(Duration.ofSeconds(60)) .map(i -> ServerSentEvent.<String>builder() .event("ping") .data("ping") .build()) ) .doOnCancel(() -> { }) .doOnTerminate(() -> { });
|
关键概念:
- Flux:响应式流,支持背压
- Sinks:响应式发射器,支持多播
- ServerSentEvent:SSE事件对象
- mergeWith:合并多个流
- doOnCancel/doOnTerminate:生命周期钩子
六、设计优势
1. 单一职责原则
每个节点只负责一个特定的功能:
- RootNode:日志记录和路由
- VerifyNode:身份验证
- SessionNode:会话创建
- EndNode:响应构建和资源清理
2. 开闭原则
对扩展开放,对修改关闭:
- 新增功能只需添加新节点
- 无需修改现有节点的代码
- 支持动态调整节点顺序
3. 依赖倒置原则
高层模块不依赖低层模块:
- 节点依赖抽象类
AbstractMcpSessionSupport
- 通过接口
ISessionManagementService 调用领域服务
- 通过依赖注入获取下一个节点
4. 接口隔离原则
接口设计精简:
IMcpSessionService 只定义必要的方法
- 每个节点只实现需要的方法
七、扩展建议
1. 添加新节点
例如,添加限流节点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Service public class RateLimitNode extends AbstractMcpSessionSupport {
@Resource private VerifyNode verifyNode;
@Override protected Flux<ServerSentEvent<String>> doApply(String requestParameter, DynamicContext dynamicContext) throws Exception { if (isRateLimited(requestParameter)) { throw new AppException("请求频率过高,请稍后再试"); } return router(requestParameter, dynamicContext); }
@Override public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> get(String requestParameter, DynamicContext dynamicContext) { return verifyNode; } }
|
2. 条件路由
根据不同条件选择不同的处理链:
1 2 3 4 5 6 7 8
| @Override public StrategyHandler<String, DynamicContext, Flux<ServerSentEvent<String>>> get(String requestParameter, DynamicContext dynamicContext) { if (isPremiumUser(requestParameter)) { return premiumSessionNode; } else { return verifyNode; } }
|
3. 异步处理
利用 multiThread 方法实现异步处理:
1 2 3 4 5 6 7 8
| @Override protected void multiThread(String requestParameter, DynamicContext dynamicContext) throws ExecutionException, InterruptedException, TimeoutException { CompletableFuture.runAsync(() -> { }); }
|
八、总结
case模块通过责任链模式和策略模式的组合,实现了一个灵活、可扩展、高性能的会话处理框架。其核心设计思想包括:
- 分层设计:服务层、工厂层、节点层各司其职
- 责任链模式:节点串联,职责单一
- 策略模式:动态路由,灵活扩展
- 上下文传递:节点间数据传递,线程安全
- 响应式编程:支持SSE实时通信,背压控制
这种设计不仅提高了代码的可维护性和可测试性,还为未来的功能扩展提供了良好的基础。通过添加新的节点或调整节点顺序,可以轻松应对不同的业务场景需求。