MCP 网关开发实战:从协议框架到完整消息处理(3-4 → 3-5 分支演进)

前言

在前一阶段(3-4 分支)中,我们完成了 MCP(Model Context Protocol)网关的基础框架搭建,包括会话管理、SSE 通信通道和消息路由机制。但所有的消息处理器(Handler)都还是空实现。本文将详细介绍 3-5 分支 如何将这个框架完善为一个功能完整的 MCP 服务器,实现真正的消息协议处理。


一、整体架构演进

1.1 3-4 分支:骨架阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
┌─────────────────────────────────────────┐
│ McpGatewayController │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ SSE 连接 │ │ 消息接收 │ │
│ │ /mcp/sse │ │ POST 接口 │ │
│ └──────┬──────┘ └──────┬──────┘ │
└─────────┼──────────────────┼───────────┘
│ │
▼ ▼
┌─────────────────────────────────────────┐
│ SessionMessageService │
│ (消息分发器) │
│ │ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ Handler 策略路由 │ ←── 空实现 │
│ └─────────────────┘ │
└─────────────────────────────────────────┘

1.2 3-5 分支:完整实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
┌─────────────────────────────────────────┐
│ McpGatewayController │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ SSE 连接 │◄───┤ 消息响应推送 │ │
│ │ /mcp/sse │ │ (新增) │ │
│ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────┘


┌─────────────────────────────────────────┐
│ SessionMessageService │
│ ┌─────────┐ ┌─────────┐ ┌──────────┐ │
│ │ Request │ │Response │ │Notification│ │
│ └────┬────┘ └─────────┘ └──────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ InitializeHandler ──► 协议握手 │ │
│ │ ToolsListHandler ───► 工具列表 │ │
│ │ ToolsCallHandler ───► 工具调用 │ │
│ │ ResourcesListHandler ► 资源列表│ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘

二、核心改动详解

2.1 消息类型体系完善

2.1.1 新增 JSONRPCNotification 类型

在 MCP 协议中,除了标准的请求-响应模式,还存在通知消息(Notification)——即不需要服务端响应的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// McpSchemaVO.java
public sealed interface JSONRPCMessage
permits JSONRPCRequest, JSONRPCNotification, JSONRPCResponse {
String jsonrpc();
}

// 新增通知类型(无 id 字段)
@JsonInclude(JsonInclude.Include.NON_ABSENT)
public record JSONRPCNotification(
@JsonProperty("jsonrpc") String jsonrpc,
@Property("method") String method,
@JsonProperty("params") Object params
) implements JSONRPCMessage {
}

设计意图

  • 使用 Sealed Interface 限制消息类型,编译器可检查完整性
  • 通知消息用于服务端主动推送或客户端单向告知场景

2.1.2 消息反序列化增强

1
2
3
4
5
6
7
8
9
10
11
12
public static JSONRPCMessage deserializeJsonRpcMessage(String message) {
// ...
if (map.containsKey("method") && map.containsKey("id")) {
return objectMapper.convertValue(map, JSONRPCRequest.class);
} else if (map.containsKey("method") && !map.containsKey("id")) {
// 3-5 新增:识别通知消息
return objectMapper.convertValue(map, JSONRPCNotification.class);
} else if (map.containsKey("result") || map.containsKey("error")) {
return objectMapper.convertValue(map, JSONRPCResponse.class);
}
throw new IllegalArgumentException("...");
}

2.2 SessionMessageService:消息分发中枢重构

2.2.1 接口签名调整

1
2
3
4
5
// 3-4 分支
McpSchemaVO.JSONRPCResponse processHandlerMessage(McpSchemaVO.JSONRPCRequest message);

// 3-5 分支
McpSchemaVO.JSONRPCResponse processHandlerMessage(McpSchemaVO.JSONRPCMessage message);

改动原因:服务需要处理三种消息类型,不仅限于 Request。

2.2.2 多类型消息处理逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public McpSchemaVO.JSONRPCResponse processHandlerMessage(McpSchemaVO.JSONRPCMessage message) {

// 处理响应消息(通常是客户端收到通知后的回执)
if (message instanceof McpSchemaVO.JSONRPCResponse response) {
log.info("收到结果消息");
}

// 处理请求消息(核心业务流程)
if (message instanceof McpSchemaVO.JSONRPCRequest request) {
String method = request.method();
log.info("开始处理请求,方法: {}", method);

// 策略路由:method → Handler
SessionMessageHandlerMethodEnum handlerEnum =
SessionMessageHandlerMethodEnum.getByMethod(method);
// ... 调用对应 Handler
}

// 处理通知消息
if (message instanceof McpSchemaVO.JSONRPCNotification notification) {
log.info("收到通知 {} {}", notification.method(),
JSON.toJSONString(notification.params()));
}

return null;
}

设计亮点

  • 使用 Java 17+ 的 Pattern Matching for instanceof 进行类型分发
  • 单入口处理多种消息类型,保持代码整洁

2.3 Handler 策略实现:从空壳到业务

2.3.1 InitializeHandler(协议握手)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Service("initializeHandler")
public class InitializeHandler implements IRequestHandler {

@Override
public McpSchemaVO.JSONRPCResponse handle(McpSchemaVO.JSONRPCRequest message) {
return new McpSchemaVO.JSONRPCResponse("2.0", message.id(), Map.of(
"protocolVersion", "2024-11-05",
"capabilities", Map.of(
"tools", Map.of(),
"resources", Map.of()
),
"serverInfo", Map.of(
"name", "MCP Word Util Proxy Server", // 3-5 更新
"version", "1.0.0"
)
), null);
}
}

MCP 协议要点

  • protocolVersion:声明支持的协议版本
  • capabilities:告知客户端服务端支持的能力(工具、资源等)
  • serverInfo:服务端标识信息

2.3.2 ToolsListHandler(工具发现)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Service("toolsListHandler")
public class ToolsListHandler implements IRequestHandler {

@Override
public McpSchemaVO.JSONRPCResponse handle(McpSchemaVO.JSONRPCRequest message) {
return new McpSchemaVO.JSONRPCResponse("2.0", message.id(), Map.of(
"tools", new Object[]{
Map.of(
"name", "toUpperCase",
"description", "小写转大写",
"inputSchema", Map.of(
"type", "object",
"properties", Map.of(
"word", Map.of(
"type", "string",
"description", "单词,字符串"
)
),
"required", new String[]{"word"}
)
)
}
), null);
}
}

关键设计inputSchema 遵循 JSON Schema 规范,客户端可据此自动生成调用界面或参数校验。

2.3.3 ToolsCallHandler(工具执行)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Service("toolsCallHandler")
public class ToolsCallHandler implements IRequestHandler {

@Override
public McpSchemaVO.JSONRPCResponse handle(McpSchemaVO.JSONRPCRequest message) {
Object params = message.params();

// 参数校验
if (!(params instanceof Map)) {
return new McpSchemaVO.JSONRPCResponse("2.0",
message.id(),
null,
new McpSchemaVO.JSONRPCResponse.JSONRPCError(
McpErrorCodes.INVALID_PARAMS,
"无效参数",
null
));
}

Map<String, Object> paramsMap = (Map<String, Object>) params;
String toolName = (String) paramsMap.get("name");
Map<String, Object> arguments = (Map<String, Object>) paramsMap.get("arguments");

// 工具路由
if ("toUpperCase".equals(toolName)) {
String word = arguments.get("word").toString();

// 返回 MCP 标准格式的工具执行结果
return new McpSchemaVO.JSONRPCResponse("2.0", message.id(), Map.of(
"content", new Object[]{
Map.of(
"type", "text",
"text", word.toUpperCase()
)
}
), null);
}

// 工具未找到
return new McpSchemaVO.JSONRPCResponse("2.0",
message.id(),
null,
new McpSchemaVO.JSONRPCResponse.JSONRPCError(
McpErrorCodes.METHOD_NOT_FOUND,
"方法未找到",
null
));
}
}

业务示例toUpperCase 是一个简单的单词转大写工具,用于验证端到端的工具调用流程。

2.3.4 ResourcesListHandler(资源列表)

1
2
3
4
5
6
7
8
9
10
11
12
@Service("resourcesListHandler")
public class ResourcesListHandler implements IRequestHandler {

@Override
public McpSchemaVO.JSONRPCResponse handle(McpSchemaVO.JSONRPCRequest message) {
return new McpSchemaVO.JSONRPCResponse("2.0", message.id(), Map.of(
"resources", Map.of(
"resources", new Object[]{} // 当前版本暂无资源
)
), null);
}
}

2.4 错误处理体系:McpErrorCodes

3-5 分支新增了完整的错误码定义,符合 JSON-RPC 2.0 规范:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final class McpErrorCodes {
// JSON-RPC 2.0 标准错误码
public static final int PARSE_ERROR = -32700; // 解析错误
public static final int INVALID_REQUEST = -32600; // 无效请求
public static final int METHOD_NOT_FOUND = -32601; // 方法未找到
public static final int INVALID_PARAMS = -32602; // 无效参数
public static final int INTERNAL_ERROR = -32603; // 内部错误

// MCP 特定错误码(-32000 到 -32099)
public static final int SESSION_NOT_FOUND = -32000;
public static final int SESSION_EXPIRED = -32001;
public static final int SERVER_SHUTTING_DOWN = -32002;
public static final int TOOL_NOT_FOUND = -32003;
public static final int TOOL_EXECUTION_FAILED = -32004;
public static final int RESOURCE_NOT_FOUND = -32005;
public static final int INSUFFICIENT_PERMISSIONS = -32006;
public static final int UNSUPPORTED_PROTOCOL_VERSION = -32007;
}

设计原则

  • -32768-32000:JSON-RPC 2.0 保留范围
  • -32000-32099:服务端自定义错误码

2.5 控制器层完善:SSE 响应推送

3-5 分支最关键的实现是响应消息的主动推送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@PostMapping(value = "{gatewayId}/mcp/sse", consumes = MediaType.APPLICATION_JSON_VALUE)
public Mono<ResponseEntity<Void>> handleMessage(
@PathVariable("gatewayId") String gatewayId,
@RequestParam String sessionId,
@RequestBody String messageBody) {

try {
// 1. 查找会话
SessionConfigVO session = sessionManagementService.getSession(sessionId);
if (null == session) {
return Mono.just(ResponseEntity.notFound().build());
}

// 2. 反序列化消息
McpSchemaVO.JSONRPCMessage jsonrpcMessage =
McpSchemaVO.deserializeJsonRpcMessage(messageBody);

// 3. 调用业务处理器
McpSchemaVO.JSONRPCResponse jsonrpcResponse =
serviceMessageService.processHandlerMessage(jsonrpcMessage);

// 4. 【关键】通过 SSE 推送响应给客户端
if (null != jsonrpcResponse) {
String responseJson = objectMapper.writeValueAsString(jsonrpcResponse);
session.getSink().tryEmitNext(
ServerSentEvent.<String>builder()
.event("message")
.data(responseJson)
.build()
);
}

// 5. 立即返回 202 Accepted(异步处理模式)
return Mono.just(ResponseEntity.accepted().build());

} catch (Exception e) {
log.error("处理 MCP SSE 消息失败", e);
return Mono.just(ResponseEntity.internalServerError().build());
}
}

通信流程

1
2
3
4
5
6
7
8
9
10
客户端                          服务端
│ │
├────── POST /mcp/sse ─────────►│
│ {jsonrpc, method, id,...} │
│ │
│◄──────── 202 Accepted ────────┤ (立即返回)
│ │
│◄──── SSE event: message ──────┤ (异步推送响应)
│ {jsonrpc, id, result,...} │
│ │

三、设计模式总结

模式 应用场景 实现位置
策略模式 不同 method 对应不同处理逻辑 SessionMessageService + IRequestHandler
简单工厂 根据 method 字符串获取 Handler SessionMessageHandlerMethodEnum
密封接口 限制消息类型的继承体系 McpSchemaVO.JSONRPCMessage
记录类 (Record) 不可变的数据传输对象 JSONRPCRequest, JSONRPCResponse
响应式编程 异步非阻塞的 SSE 通信 McpGatewayController 使用 Mono/Flux

四、测试验证

3-5 分支新增了测试方法 sseMcpClient03(),用于验证 MCP JSON-RPC 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Test
public void sseMcpClient03() {
HttpClientSseClientTransport sseClientTransport =
HttpClientSseClientTransport
.builder("http://127.0.0.1:8777")
.sseEndpoint("/sse")
.build();

McpSyncClient mcpSyncClient = McpClient
.sync(sseClientTransport)
.requestTimeout(Duration.ofMinutes(36000))
.build();

// 1. 初始化握手
var init = mcpSyncClient.initialize();
log.info("Initialized: {}", init);

// 2. 获取工具列表
var tools = mcpSyncClient.listTools();
log.info("Tools: {}", tools);

// 3. 调用工具
var result = mcpSyncClient.callTool("toUpperCase",
Map.of("word", "hello"));
log.info("Result: {}", result); // 输出: HELLO
}

五、总结与展望

5.1 3-5 分支完成的核心能力

完整的消息协议处理:支持 Request/Response/Notification 三种消息类型
标准的 MCP 生命周期:initialize → listTools → callTool
可扩展的 Handler 体系:新增方法只需实现 IRequestHandler
规范的错误处理:符合 JSON-RPC 2.0 的错误码体系
端到端的功能验证toUpperCase 工具演示完整调用链

5.2 后续可扩展方向

  • 工具动态注册:从数据库或配置中心加载工具定义
  • 资源服务实现:完善 ResourcesListHandler 和 ResourcesReadHandler
  • 认证授权:在 initialize 阶段加入 Token 校验
  • 会话持久化:将会话状态存储到 Redis 等分布式缓存
  • 监控埋点:在 Handler 层加入 metrics 收集

附录:相关源码链接


以上就是 3-43-5 分支的完整改动解析。通过这一阶段的开发,我们的 MCP 网关从一个空框架演进为一个可运行的、符合协议规范的服务端实现,为后续接入真实的大模型和工具链奠定了坚实基础。