MCP 网关开发实战:从协议框架到完整消息处理(3-4 → 3-5 分支演进)
前言
在前一阶段(3-4 分支)中,我们完成了 MCP(Model Context Protocol)网关的基础框架搭建,包括会话管理、SSE 通信通道和消息路由机制。但所有的消息处理器(Handler)都还是空实现。本文将详细介绍 3-5 分支 如何将这个框架完善为一个功能完整的 MCP 服务器,实现真正的消息协议处理。
一、整体架构演进
1.1 3-4 分支:骨架阶段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| ┌─────────────────────────────────────────┐ │ McpGatewayController │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ SSE 连接 │ │ 消息接收 │ │ │ │ /mcp/sse │ │ POST 接口 │ │ │ └──────┬──────┘ └──────┬──────┘ │ └─────────┼──────────────────┼───────────┘ │ │ ▼ ▼ ┌─────────────────────────────────────────┐ │ SessionMessageService │ │ (消息分发器) │ │ │ │ │ ▼ │ │ ┌─────────────────┐ │ │ │ Handler 策略路由 │ ←── 空实现 │ │ └─────────────────┘ │ └─────────────────────────────────────────┘
|
1.2 3-5 分支:完整实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| ┌─────────────────────────────────────────┐ │ McpGatewayController │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ SSE 连接 │◄───┤ 消息响应推送 │ │ │ │ /mcp/sse │ │ (新增) │ │ │ └─────────────┘ └─────────────┘ │ └─────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────┐ │ SessionMessageService │ │ ┌─────────┐ ┌─────────┐ ┌──────────┐ │ │ │ Request │ │Response │ │Notification│ │ │ └────┬────┘ └─────────┘ └──────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────┐ │ │ │ InitializeHandler ──► 协议握手 │ │ │ │ ToolsListHandler ───► 工具列表 │ │ │ │ ToolsCallHandler ───► 工具调用 │ │ │ │ ResourcesListHandler ► 资源列表│ │ │ └─────────────────────────────────┘ │ └─────────────────────────────────────────┘
|
二、核心改动详解
2.1 消息类型体系完善
2.1.1 新增 JSONRPCNotification 类型
在 MCP 协议中,除了标准的请求-响应模式,还存在通知消息(Notification)——即不需要服务端响应的消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public sealed interface JSONRPCMessage permits JSONRPCRequest, JSONRPCNotification, JSONRPCResponse { String jsonrpc(); }
@JsonInclude(JsonInclude.Include.NON_ABSENT) public record JSONRPCNotification( @JsonProperty("jsonrpc") String jsonrpc, @Property("method") String method, @JsonProperty("params") Object params ) implements JSONRPCMessage { }
|
设计意图:
- 使用 Sealed Interface 限制消息类型,编译器可检查完整性
- 通知消息用于服务端主动推送或客户端单向告知场景
2.1.2 消息反序列化增强
1 2 3 4 5 6 7 8 9 10 11 12
| public static JSONRPCMessage deserializeJsonRpcMessage(String message) { if (map.containsKey("method") && map.containsKey("id")) { return objectMapper.convertValue(map, JSONRPCRequest.class); } else if (map.containsKey("method") && !map.containsKey("id")) { return objectMapper.convertValue(map, JSONRPCNotification.class); } else if (map.containsKey("result") || map.containsKey("error")) { return objectMapper.convertValue(map, JSONRPCResponse.class); } throw new IllegalArgumentException("..."); }
|
2.2 SessionMessageService:消息分发中枢重构
2.2.1 接口签名调整
1 2 3 4 5
| McpSchemaVO.JSONRPCResponse processHandlerMessage(McpSchemaVO.JSONRPCRequest message);
McpSchemaVO.JSONRPCResponse processHandlerMessage(McpSchemaVO.JSONRPCMessage message);
|
改动原因:服务需要处理三种消息类型,不仅限于 Request。
2.2.2 多类型消息处理逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Override public McpSchemaVO.JSONRPCResponse processHandlerMessage(McpSchemaVO.JSONRPCMessage message) { if (message instanceof McpSchemaVO.JSONRPCResponse response) { log.info("收到结果消息"); } if (message instanceof McpSchemaVO.JSONRPCRequest request) { String method = request.method(); log.info("开始处理请求,方法: {}", method); SessionMessageHandlerMethodEnum handlerEnum = SessionMessageHandlerMethodEnum.getByMethod(method); } if (message instanceof McpSchemaVO.JSONRPCNotification notification) { log.info("收到通知 {} {}", notification.method(), JSON.toJSONString(notification.params())); } return null; }
|
设计亮点:
- 使用 Java 17+ 的 Pattern Matching for instanceof 进行类型分发
- 单入口处理多种消息类型,保持代码整洁
2.3 Handler 策略实现:从空壳到业务
2.3.1 InitializeHandler(协议握手)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Service("initializeHandler") public class InitializeHandler implements IRequestHandler { @Override public McpSchemaVO.JSONRPCResponse handle(McpSchemaVO.JSONRPCRequest message) { return new McpSchemaVO.JSONRPCResponse("2.0", message.id(), Map.of( "protocolVersion", "2024-11-05", "capabilities", Map.of( "tools", Map.of(), "resources", Map.of() ), "serverInfo", Map.of( "name", "MCP Word Util Proxy Server", "version", "1.0.0" ) ), null); } }
|
MCP 协议要点:
protocolVersion:声明支持的协议版本
capabilities:告知客户端服务端支持的能力(工具、资源等)
serverInfo:服务端标识信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Service("toolsListHandler") public class ToolsListHandler implements IRequestHandler { @Override public McpSchemaVO.JSONRPCResponse handle(McpSchemaVO.JSONRPCRequest message) { return new McpSchemaVO.JSONRPCResponse("2.0", message.id(), Map.of( "tools", new Object[]{ Map.of( "name", "toUpperCase", "description", "小写转大写", "inputSchema", Map.of( "type", "object", "properties", Map.of( "word", Map.of( "type", "string", "description", "单词,字符串" ) ), "required", new String[]{"word"} ) ) } ), null); } }
|
关键设计:inputSchema 遵循 JSON Schema 规范,客户端可据此自动生成调用界面或参数校验。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| @Service("toolsCallHandler") public class ToolsCallHandler implements IRequestHandler { @Override public McpSchemaVO.JSONRPCResponse handle(McpSchemaVO.JSONRPCRequest message) { Object params = message.params(); if (!(params instanceof Map)) { return new McpSchemaVO.JSONRPCResponse("2.0", message.id(), null, new McpSchemaVO.JSONRPCResponse.JSONRPCError( McpErrorCodes.INVALID_PARAMS, "无效参数", null )); } Map<String, Object> paramsMap = (Map<String, Object>) params; String toolName = (String) paramsMap.get("name"); Map<String, Object> arguments = (Map<String, Object>) paramsMap.get("arguments"); if ("toUpperCase".equals(toolName)) { String word = arguments.get("word").toString(); return new McpSchemaVO.JSONRPCResponse("2.0", message.id(), Map.of( "content", new Object[]{ Map.of( "type", "text", "text", word.toUpperCase() ) } ), null); } return new McpSchemaVO.JSONRPCResponse("2.0", message.id(), null, new McpSchemaVO.JSONRPCResponse.JSONRPCError( McpErrorCodes.METHOD_NOT_FOUND, "方法未找到", null )); } }
|
业务示例:toUpperCase 是一个简单的单词转大写工具,用于验证端到端的工具调用流程。
2.3.4 ResourcesListHandler(资源列表)
1 2 3 4 5 6 7 8 9 10 11 12
| @Service("resourcesListHandler") public class ResourcesListHandler implements IRequestHandler { @Override public McpSchemaVO.JSONRPCResponse handle(McpSchemaVO.JSONRPCRequest message) { return new McpSchemaVO.JSONRPCResponse("2.0", message.id(), Map.of( "resources", Map.of( "resources", new Object[]{} ) ), null); } }
|
2.4 错误处理体系:McpErrorCodes
3-5 分支新增了完整的错误码定义,符合 JSON-RPC 2.0 规范:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public final class McpErrorCodes { public static final int PARSE_ERROR = -32700; public static final int INVALID_REQUEST = -32600; public static final int METHOD_NOT_FOUND = -32601; public static final int INVALID_PARAMS = -32602; public static final int INTERNAL_ERROR = -32603; public static final int SESSION_NOT_FOUND = -32000; public static final int SESSION_EXPIRED = -32001; public static final int SERVER_SHUTTING_DOWN = -32002; public static final int TOOL_NOT_FOUND = -32003; public static final int TOOL_EXECUTION_FAILED = -32004; public static final int RESOURCE_NOT_FOUND = -32005; public static final int INSUFFICIENT_PERMISSIONS = -32006; public static final int UNSUPPORTED_PROTOCOL_VERSION = -32007; }
|
设计原则:
-32768 到 -32000:JSON-RPC 2.0 保留范围
-32000 到 -32099:服务端自定义错误码
2.5 控制器层完善:SSE 响应推送
3-5 分支最关键的实现是响应消息的主动推送:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @PostMapping(value = "{gatewayId}/mcp/sse", consumes = MediaType.APPLICATION_JSON_VALUE) public Mono<ResponseEntity<Void>> handleMessage( @PathVariable("gatewayId") String gatewayId, @RequestParam String sessionId, @RequestBody String messageBody) { try { SessionConfigVO session = sessionManagementService.getSession(sessionId); if (null == session) { return Mono.just(ResponseEntity.notFound().build()); } McpSchemaVO.JSONRPCMessage jsonrpcMessage = McpSchemaVO.deserializeJsonRpcMessage(messageBody); McpSchemaVO.JSONRPCResponse jsonrpcResponse = serviceMessageService.processHandlerMessage(jsonrpcMessage); if (null != jsonrpcResponse) { String responseJson = objectMapper.writeValueAsString(jsonrpcResponse); session.getSink().tryEmitNext( ServerSentEvent.<String>builder() .event("message") .data(responseJson) .build() ); } return Mono.just(ResponseEntity.accepted().build()); } catch (Exception e) { log.error("处理 MCP SSE 消息失败", e); return Mono.just(ResponseEntity.internalServerError().build()); } }
|
通信流程:
1 2 3 4 5 6 7 8 9 10
| 客户端 服务端 │ │ ├────── POST /mcp/sse ─────────►│ │ {jsonrpc, method, id,...} │ │ │ │◄──────── 202 Accepted ────────┤ (立即返回) │ │ │◄──── SSE event: message ──────┤ (异步推送响应) │ {jsonrpc, id, result,...} │ │ │
|
三、设计模式总结
| 模式 |
应用场景 |
实现位置 |
| 策略模式 |
不同 method 对应不同处理逻辑 |
SessionMessageService + IRequestHandler |
| 简单工厂 |
根据 method 字符串获取 Handler |
SessionMessageHandlerMethodEnum |
| 密封接口 |
限制消息类型的继承体系 |
McpSchemaVO.JSONRPCMessage |
| 记录类 (Record) |
不可变的数据传输对象 |
JSONRPCRequest, JSONRPCResponse 等 |
| 响应式编程 |
异步非阻塞的 SSE 通信 |
McpGatewayController 使用 Mono/Flux |
四、测试验证
3-5 分支新增了测试方法 sseMcpClient03(),用于验证 MCP JSON-RPC 实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Test public void sseMcpClient03() { HttpClientSseClientTransport sseClientTransport = HttpClientSseClientTransport .builder("http://127.0.0.1:8777") .sseEndpoint("/sse") .build();
McpSyncClient mcpSyncClient = McpClient .sync(sseClientTransport) .requestTimeout(Duration.ofMinutes(36000)) .build(); var init = mcpSyncClient.initialize(); log.info("Initialized: {}", init); var tools = mcpSyncClient.listTools(); log.info("Tools: {}", tools); var result = mcpSyncClient.callTool("toUpperCase", Map.of("word", "hello")); log.info("Result: {}", result); }
|
五、总结与展望
5.1 3-5 分支完成的核心能力
✅ 完整的消息协议处理:支持 Request/Response/Notification 三种消息类型
✅ 标准的 MCP 生命周期:initialize → listTools → callTool
✅ 可扩展的 Handler 体系:新增方法只需实现 IRequestHandler
✅ 规范的错误处理:符合 JSON-RPC 2.0 的错误码体系
✅ 端到端的功能验证:toUpperCase 工具演示完整调用链
5.2 后续可扩展方向
- 工具动态注册:从数据库或配置中心加载工具定义
- 资源服务实现:完善 ResourcesListHandler 和 ResourcesReadHandler
- 认证授权:在 initialize 阶段加入 Token 校验
- 会话持久化:将会话状态存储到 Redis 等分布式缓存
- 监控埋点:在 Handler 层加入 metrics 收集
附录:相关源码链接
以上就是 3-4 → 3-5 分支的完整改动解析。通过这一阶段的开发,我们的 MCP 网关从一个空框架演进为一个可运行的、符合协议规范的服务端实现,为后续接入真实的大模型和工具链奠定了坚实基础。