基于MCP SDK创建服务器完全指南
一个标准化的大语言模型上下文提供协议实现
目录
1. 简介
什么是MCP
Model Context Protocol(MCP)是一个标准化协议,旨在让应用程序以标准和安全的方式为大语言模型(LLM)提供上下文。MCP的核心理念是将"提供上下文"与"LLM交互"这两个关注点分离开来。
通过MCP协议,应用程序可以:
- 向LLM暴露数据和资源
- 提供LLM可调用的工具和功能
- 定义交互模式和提示模板
- 管理会话和状态
MCP使用JSON-RPC作为基础通信协议,并定义了一系列标准方法来处理资源访问、工具调用和提示管理。
为什么使用MCP
在传统的LLM应用中,开发者通常需要在每个应用程序中重新实现上下文获取和工具调用的逻辑。这导致:
- 代码重复
- 安全风险
- 实现不一致
- 集成困难
使用MCP可以带来以下好处:
- 标准化接口:使用一致的协议与LLM交互
- 关注点分离:将上下文提供与LLM交互解耦
- 安全性:通过定义明确的协议边界提高安全性
- 可重用性:创建可在多个应用程序中使用的MCP服务
- 扩展性:轻松添加新的数据源和工具
本指南目标
本指南旨在提供一个详细的、步骤式的教程,帮助开发者使用TypeScript MCP SDK创建自己的MCP服务器并将其打包为npm包分享给他人使用。
通过本指南,你将学习:
- 创建一个基本的MCP服务器
- 实现资源、工具和提示
- 配置不同的传输协议
- 管理会话和状态
- 发布你的服务器作为npm包
无论你是想为特定领域创建一个专用的上下文提供者,还是想构建一个通用的工具集成框架,本指南都将帮助你利用MCP SDK实现目标。
2. 环境准备
安装依赖
要开始使用MCP SDK创建服务器,你需要一个Node.js环境和一些基本依赖。
系统要求
- Node.js 18.x 或更高版本
- npm 7.x 或更高版本
创建新项目
首先,创建一个新的目录并初始化npm项目:
mkdir my-mcp-server
cd my-mcp-server
npm init -y
安装MCP SDK
安装MCP SDK作为依赖:
npm install @modelcontextprotocol/sdk
安装其他依赖
根据你的传输选择和功能需求,你可能需要安装一些额外的依赖:
# 如果你使用HTTP传输
npm install express
# 用于参数验证和类型定义
npm install zod
# 如果你需要处理环境变量
npm install dotenv
# 用于TypeScript支持
npm install typescript @types/node @types/express --save-dev
项目初始化
创建TypeScript配置
如果你使用TypeScript(推荐),创建一个tsconfig.json文件:
{
"compilerOptions": {
"target": "ES2020",
"module": "NodeNext",
"moduleResolution": "NodeNext",
"esModuleInterop": true,
"strict": true,
"outDir": "dist",
"declaration": true,
"skipLibCheck": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}
项目结构
为你的MCP服务器创建一个基本的项目结构:
my-mcp-server/
├── src/
│ ├── server.ts # 主服务器文件
│ ├── resources/ # 资源实现
│ ├── tools/ # 工具实现
│ ├── prompts/ # 提示实现
│ └── transport/ # 自定义传输(如果需要)
├── tests/ # 测试目录
├── examples/ # 用法示例
├── package.json
├── tsconfig.json
└── README.md
配置package.json
更新你的package.json文件以包含必要的脚本和配置:
{
"name": "my-mcp-server",
"version": "1.0.0",
"description": "My MCP Server Implementation",
"type": "module",
"main": "dist/server.js",
"types": "dist/server.d.ts",
"scripts": {
"build": "tsc",
"start": "node dist/server.js",
"dev": "tsx watch src/server.ts",
"test": "jest"
},
"engines": {
"node": ">=18"
},
"keywords": [
"mcp",
"modelcontextprotocol",
"llm"
],
"license": "MIT"
}
现在,我们已经完成了项目的初始设置,接下来将开始实现MCP服务器的基础部分。
3. 基础实现
创建MCP服务器
MCP SDK提供了McpServer类,它是创建MCP服务器的入口点。这个类处理所有的协议细节、消息路由和生命周期管理,让你可以专注于实现业务逻辑。
基本服务器创建
在项目的src目录中创建一个名为server.ts的文件,并添加以下代码:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
// 创建MCP服务器实例
const server = new McpServer({
name: "my-mcp-server", // 服务器名称
version: "1.0.0" // 服务器版本
});
// 后续将在这里添加资源、工具和提示的定义
// ...
// 导出服务器实例(如果构建为库)
export default server;
服务器配置
McpServer构造函数接受两个参数:服务器信息和可选的服务器选项。
服务器信息定义了服务器的基本标识:
{
name: string; // 服务器名称
version: string; // 服务器版本号
}
服务器选项可以配置服务器的高级功能:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
const server = new McpServer(
{
name: "my-mcp-server",
version: "1.0.0"
},
{
// 声明服务器能力
capabilities: {
// 启用日志功能
logging: {},
// 可以自定义其他能力
// ...
}
}
);
服务器能力
可以通过capabilities选项声明服务器支持的特殊功能:
- 日志功能: 允许客户端设置日志级别并接收日志消息
capabilities: { logging: {} } - 自定义功能: 你也可以定义自己的服务器能力
capabilities: { myCustomCapability: { param1: "value1", param2: true } }
传输协议选择
MCP SDK支持多种传输协议,用于在客户端和服务器之间传递消息。根据你的部署环境和需求,可以选择最适合的传输方式。
1. 标准输入/输出 (stdio)
最简单的传输方式,适用于命令行工具和直接集成。
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
const server = new McpServer({
name: "stdio-server",
version: "1.0.0"
});
// 创建标准输入/输出传输
const transport = new StdioServerTransport();
// 连接服务器和传输
await server.connect(transport);
// 服务器现在开始通过stdin读取请求并通过stdout发送响应
console.error("服务器已启动,通过stdin/stdout进行通信...");
2. Streamable HTTP传输
最推荐的传输方式,基于HTTP协议,支持请求/响应,并通过Server-Sent Events (SSE)提供通知流。
import express from "express";
import { randomUUID } from "node:crypto";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { InMemoryEventStore } from "@modelcontextprotocol/sdk/server/events.js";
// 创建Express应用
const app = express();
app.use(express.json());
// 创建MCP服务器
const server = new McpServer({
name: "http-server",
version: "1.0.0"
});
// 创建事件存储用于恢复会话
const eventStore = new InMemoryEventStore();
// 处理MCP请求
app.all('/mcp', async (req, res) => {
try {
// 创建传输
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
eventStore,
});
// 连接服务器和传输
await server.connect(transport);
// 处理请求
await transport.handleRequest(req, res, req.body);
} catch (error) {
console.error("处理请求时出错:", error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: "2.0",
error: {
code: -32603,
message: "内部服务器错误",
},
id: null,
});
}
}
});
// 启动服务器
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`MCP HTTP服务器运行在端口 ${PORT}`);
});
3. SSE传输 (已弃用但向后兼容)
旧版的服务器实现,已被Streamable HTTP取代,但SDK仍提供向后兼容支持。
import express from "express";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
const app = express();
app.use(express.json());
// 创建MCP服务器
const server = new McpServer({
name: "sse-server",
version: "1.0.0"
});
// SSE端点用于事件流
app.get('/sse', async (req, res) => {
const transport = new SSEServerTransport();
await server.connect(transport);
await transport.handleRequest(req, res);
});
// 消息端点用于客户端请求
app.post('/messages', async (req, res) => {
// 此处需要自行实现将请求转发到相应的SSE传输
// ...
});
// 启动服务器
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`MCP SSE服务器运行在端口 ${PORT}`);
});
连接服务器与传输
创建MCP服务器并选择传输方式后,需要将它们连接起来。MCP服务器通过connect方法连接到传输,开始接收和处理消息。
基本连接方式
连接服务器和传输的基本模式如下:
// 创建服务器
const server = new McpServer({
name: "my-server",
version: "1.0.0"
});
// 创建传输
const transport = new StdioServerTransport(); // 或其他传输实现
// 连接服务器和传输
await server.connect(transport);
会话管理与客户端连接
在HTTP传输中,你需要管理客户端会话,确保同一客户端的请求被路由到正确的传输实例:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { randomUUID } from "node:crypto";
import express from "express";
const app = express();
app.use(express.json());
// 跟踪会话和传输
const transports: Map = new Map();
// 创建MCP服务器工厂函数
const createServer = () => {
return new McpServer({
name: "session-aware-server",
version: "1.0.0"
});
};
app.post("/mcp", async (req, res) => {
// 检查会话ID
const sessionId = req.headers["mcp-session-id"] as string;
if (!sessionId) {
// 处理初始化请求
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
});
// 设置传输关闭时的清理
transport.onclose = () => {
if (transport.sessionId) {
transports.delete(transport.sessionId);
}
};
// 连接到新服务器
const server = createServer();
await server.connect(transport);
// 处理请求
await transport.handleRequest(req, res, req.body);
// 存储传输以供未来使用
if (transport.sessionId) {
transports.set(transport.sessionId, transport);
}
} else {
// 查找现有传输
const transport = transports.get(sessionId);
if (!transport) {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "无效的会话ID",
},
id: null,
});
return;
}
// 使用现有传输处理请求
await transport.handleRequest(req, res, req.body);
}
});
// 处理SSE流的GET请求
app.get("/mcp", async (req, res) => {
const sessionId = req.headers["mcp-session-id"] as string;
if (!sessionId || !transports.has(sessionId)) {
res.status(400).send("无效或缺失的会话ID");
return;
}
const transport = transports.get(sessionId)!;
await transport.handleSSERequest(req, res);
});
// 处理会话终止的DELETE请求
app.delete("/mcp", async (req, res) => {
const sessionId = req.headers["mcp-session-id"] as string;
if (!sessionId || !transports.has(sessionId)) {
res.status(400).send("无效或缺失的会话ID");
return;
}
const transport = transports.get(sessionId)!;
await transport.handleDeleteRequest(req, res);
});
传输生命周期管理
传输实例有自己的生命周期,需要正确管理:
- 创建和连接:创建传输并将其连接到服务器
- 请求处理:通过传输处理请求和发送响应
- 关闭和清理:在不再需要或客户端断开连接时关闭传输
// 创建传输
const transport = new StreamableHTTPServerTransport({
// 配置...
});
// 监听关闭事件
transport.onclose = () => {
console.log("传输已关闭,进行清理...");
// 执行必要的清理
};
// 连接到服务器
await server.connect(transport);
// ... 处理请求 ...
// 在不再需要时关闭传输
await transport.close();
现在我们已经了解了如何创建基本的MCP服务器并选择适当的传输方式。接下来,我们将深入探讨如何实现服务器的核心功能:资源、工具和提示。
4. 核心功能实现
在MCP中,服务器主要提供三种核心功能:资源、工具和提示。这些功能使LLM能够获取数据、执行操作并遵循特定的交互模式。
资源(Resources)
资源是MCP服务器向LLM暴露数据的方式。它们提供只读信息,不应执行复杂计算或产生副作用(类似于REST API中的GET端点)。
静态资源
静态资源有固定的URI,提供不变或很少变化的数据。
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
const server = new McpServer({
name: "resource-server",
version: "1.0.0"
});
// 注册一个简单的静态资源
server.resource(
"config-resource", // 资源名称(仅供标识,不需要唯一)
"https://example.com/config", // 资源URI(必须唯一)
async () => ({
contents: [{
uri: "https://example.com/config", // 必须匹配上面的URI
text: "这是一个配置资源的内容。这里可以是任何文本数据。"
}]
})
);
// 带元数据的静态资源
server.resource(
"document-resource",
"https://example.com/documents/123",
{ mimeType: "text/markdown" }, // 可选元数据
async () => ({
contents: [{
uri: "https://example.com/documents/123",
text: "# 文档标题\n\n这是一个markdown文档内容。"
}]
})
);
动态资源模板
资源模板允许使用URI模板创建动态资源,可以包含变量。
import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
const server = new McpServer({
name: "template-resource-server",
version: "1.0.0"
});
// 创建一个资源模板
server.resource(
"user-profile",
new ResourceTemplate("users://{userId}/profile", {
// 列出符合此模板的所有资源的回调(如果不支持列表功能,设为undefined)
list: async () => ({
resources: [
{ uri: "users://1001/profile", name: "用户1001的资料" },
{ uri: "users://1002/profile", name: "用户1002的资料" }
]
}),
// 可选:提供自动完成变量值的回调
complete: {
userId: async (prefix) => {
// 返回匹配前缀的可能userId值
const allIds = ["1001", "1002", "1003", "1004"];
return allIds.filter(id => id.startsWith(prefix));
}
}
}),
async (uri, variables) => {
// 从变量中获取用户ID
const userId = variables.userId;
// 这里可以从数据库或API获取用户资料
// 这个例子使用模拟数据
const userProfiles = {
"1001": "姓名:张三,年龄:30,职位:工程师",
"1002": "姓名:李四,年龄:25,职位:设计师",
// 更多用户...
};
return {
contents: [{
uri: uri.href,
text: userProfiles[userId] || `找不到用户${userId}的资料`
}]
};
}
);
二进制资源
MCP也支持二进制资源(如图像、音频等):
import { readFile } from "fs/promises";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
const server = new McpServer({
name: "binary-resource-server",
version: "1.0.0"
});
// 注册一个二进制资源
server.resource(
"image-resource",
"https://example.com/images/logo.png",
{ mimeType: "image/png" },
async () => {
// 读取文件内容
const imageData = await readFile("./assets/logo.png");
return {
contents: [{
uri: "https://example.com/images/logo.png",
// 将Buffer转换为Base64
blob: imageData.toString("base64")
}]
};
}
);
资源更新和状态管理
可以启用或禁用资源,以及发送资源列表变更通知:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
const server = new McpServer({
name: "dynamic-resource-server",
version: "1.0.0"
});
// 注册一个资源
const configResource = server.resource(
"config-resource",
"https://example.com/config",
async () => ({
contents: [{
uri: "https://example.com/config",
text: "初始配置内容"
}]
})
);
// 禁用资源
configResource.disable();
// 重新启用资源
configResource.enable();
// 更新资源
configResource.update({
name: "updated-config",
uri: "https://example.com/updated-config",
callback: async () => ({
contents: [{
uri: "https://example.com/updated-config",
text: "更新后的配置内容"
}]
})
});
// 移除资源
configResource.remove();
// 通知客户端资源列表已更改
server.sendResourceListChanged();
工具(Tools)
工具允许LLM执行操作并产生副作用,类似于REST API中的POST或PUT方法。工具可以执行计算、调用外部API、写入数据库等。
基本工具实现
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "tool-server",
version: "1.0.0"
});
// 注册一个简单工具,不带参数
server.tool(
"current-time", // 工具名称
"返回当前服务器时间", // 工具描述
async () => ({
content: [
{
type: "text",
text: `当前时间是: ${new Date().toISOString()}`
}
]
})
);
// 带参数的工具
server.tool(
"calculate-sum",
"计算两个数字的和",
{
a: z.number().describe("第一个数字"),
b: z.number().describe("第二个数字")
},
async ({ a, b }) => {
const sum = a + b;
return {
content: [
{
type: "text",
text: `${a} + ${b} = ${sum}`
}
]
};
}
);
具有注解的工具
工具可以包含特殊注解,提供额外的元数据:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "annotated-tool-server",
version: "1.0.0"
});
// 带注解的工具
server.tool(
"search-database",
"在数据库中搜索信息",
{
query: z.string().describe("搜索查询"),
limit: z.number().optional().default(10).describe("结果限制")
},
{
// 工具注解
title: "数据库搜索工具", // 更友好的显示名称
readOnlyHint: true, // 提示此工具不会修改数据
openWorldHint: false // 提示此工具不接受开放式参数
},
async ({ query, limit }) => {
// 模拟数据库搜索
const results = [`结果1: ${query}`, `结果2: ${query}`, `结果3: ${query}`]
.slice(0, limit);
return {
content: [
{
type: "text",
text: `搜索结果:\n${results.join('\n')}`
}
]
};
}
);
发送进度通知的工具
工具可以在执行期间发送通知,提供进度或中间结果:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "notification-tool-server",
version: "1.0.0"
}, { capabilities: { logging: {} } });
// 发送通知的工具
server.tool(
"long-running-task",
"执行长时间运行的任务,并发送进度通知",
{
steps: z.number().min(1).max(10).describe("任务步骤数")
},
async ({ steps }, { sendNotification }) => {
const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
// 发送开始通知
await sendNotification({
method: "notifications/message",
params: { level: "info", data: "任务开始" }
});
// 执行任务步骤
for (let i = 1; i <= steps; i++) {
// 等待1秒模拟工作
await sleep(1000);
// 发送进度通知
await sendNotification({
method: "notifications/message",
params: { level: "info", data: `完成步骤 ${i}/${steps}` }
});
}
// 返回最终结果
return {
content: [
{
type: "text",
text: `成功完成了所有${steps}个步骤。`
}
]
};
}
);
工具错误处理
工具可以通过多种方式处理错误:
import { McpServer, McpError, ErrorCode } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "error-handling-tool-server",
version: "1.0.0"
});
// 使用try-catch处理错误
server.tool(
"divide-numbers",
"除法计算器",
{
numerator: z.number().describe("分子"),
denominator: z.number().describe("分母")
},
async ({ numerator, denominator }) => {
try {
if (denominator === 0) {
throw new Error("除数不能为零");
}
const result = numerator / denominator;
return {
content: [
{
type: "text",
text: `${numerator} / ${denominator} = ${result}`
}
]
};
} catch (error) {
return {
content: [
{
type: "text",
text: `计算错误: ${error.message}`
}
],
isError: true // 标记为错误响应
};
}
}
);
// 抛出MCP错误
server.tool(
"fetch-data",
"从外部API获取数据",
{
apiUrl: z.string().url().describe("API URL")
},
async ({ apiUrl }) => {
// 检查URL是否在允许列表中
const allowedDomains = ["api.example.com", "data.mycompany.com"];
const url = new URL(apiUrl);
if (!allowedDomains.includes(url.hostname)) {
throw new McpError(
ErrorCode.InvalidParams,
`不允许的域名: ${url.hostname}`
);
}
// 继续执行fetch操作...
return {
content: [
{
type: "text",
text: `从 ${apiUrl} 获取的数据...`
}
]
};
}
);
动态工具管理
工具可以被启用、禁用、更新或移除:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "dynamic-tool-server",
version: "1.0.0"
});
// 注册一个工具
const calculatorTool = server.tool(
"calculator",
"基本计算器工具",
{
operation: z.enum(["add", "subtract", "multiply", "divide"]),
a: z.number(),
b: z.number()
},
async ({ operation, a, b }) => {
let result;
switch (operation) {
case "add": result = a + b; break;
case "subtract": result = a - b; break;
case "multiply": result = a * b; break;
case "divide": result = a / b; break;
}
return {
content: [
{
type: "text",
text: `结果: ${result}`
}
]
};
}
);
// 禁用工具
calculatorTool.disable();
// 重新启用工具
calculatorTool.enable();
// 更新工具
calculatorTool.update({
name: "advanced-calculator",
description: "增强版计算器",
paramsSchema: {
operation: z.enum(["add", "subtract", "multiply", "divide", "power"]),
a: z.number(),
b: z.number()
},
callback: async ({ operation, a, b }) => {
let result;
switch (operation) {
case "add": result = a + b; break;
case "subtract": result = a - b; break;
case "multiply": result = a * b; break;
case "divide": result = a / b; break;
case "power": result = Math.pow(a, b); break;
}
return {
content: [
{
type: "text",
text: `结果: ${result}`
}
]
};
}
});
// 移除工具
calculatorTool.remove();
// 通知客户端工具列表已更改
server.sendToolListChanged();
提示(Prompts)
提示是预定义的交互模板,可以帮助LLM理解如何与你的服务器交互。它们为特定任务定义特定的消息格式。
基本提示定义
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "prompt-server",
version: "1.0.0"
});
// 注册一个简单提示
server.prompt(
"greeting-prompt", // 提示名称
"一个友好的问候提示", // 提示描述
{
name: z.string().describe("要问候的名字")
},
async ({ name }) => ({
messages: [
{
role: "user",
content: {
type: "text",
text: `请以友好的方式问候${name}。`
}
}
]
})
);
复杂提示示例
提示可以包含多条消息、系统提示和多种内容类型:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "complex-prompt-server",
version: "1.0.0"
});
// 复杂提示示例
server.prompt(
"data-analysis-prompt",
"用于分析数据的提示模板",
{
dataType: z.enum(["sales", "inventory", "customers"]).describe("数据类型"),
timeFrame: z.string().describe("时间范围"),
focusArea: z.string().optional().describe("可选的关注点")
},
async ({ dataType, timeFrame, focusArea }) => {
// 构建提示内容
const promptText = `
我需要分析以下${dataType}数据:
- 时间范围: ${timeFrame}
${focusArea ? `- 重点关注: ${focusArea}` : ''}
请提供以下分析:
1. 主要趋势和模式
2. 异常数据点和可能的原因
3. 对未来的预测
4. 行动建议
`.trim();
return {
messages: [
{
role: "system",
content: {
type: "text",
text: "你是一名数据分析专家,擅长分析业务数据并提供见解。"
}
},
{
role: "user",
content: {
type: "text",
text: promptText
}
}
]
};
}
);
结合资源的提示
提示可以引用资源,创建更丰富的上下文:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "resource-prompt-server",
version: "1.0.0"
});
// 先定义一个资源
server.resource(
"company-guidelines",
"https://example.com/guidelines",
async () => ({
contents: [{
uri: "https://example.com/guidelines",
text: "公司回复客户邮件的指导方针:\n1. 始终保持礼貌\n2. 在24小时内回复\n3. 提供清晰的解决方案\n4. 签名中包含联系信息"
}]
})
);
// 使用资源的提示
server.prompt(
"email-response-prompt",
"生成客户邮件回复的提示",
{
customerName: z.string().describe("客户名称"),
emailContent: z.string().describe("客户邮件内容"),
priority: z.enum(["high", "medium", "low"]).describe("优先级")
},
async ({ customerName, emailContent, priority }) => {
return {
messages: [
{
role: "system",
content: {
type: "text",
text: "你是一名客户服务代表,需要回复客户邮件。"
}
},
{
role: "user",
content: [
{
type: "text",
text: `请根据公司指导方针,对以下客户邮件撰写一个专业的回复。优先级: ${priority}`
},
{
type: "embedded_resource",
resource: {
uri: "https://example.com/guidelines"
}
},
{
type: "text",
text: `客户邮件:\n\n来自: ${customerName}\n\n${emailContent}`
}
]
}
]
};
}
);
提示管理
与工具和资源类似,提示也可以被管理:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "prompt-management-server",
version: "1.0.0"
});
// 注册一个提示
const tutorialPrompt = server.prompt(
"coding-tutorial",
"编程教程提示",
{
language: z.string().describe("编程语言"),
topic: z.string().describe("教程主题"),
level: z.enum(["beginner", "intermediate", "advanced"]).describe("难度级别")
},
async ({ language, topic, level }) => ({
messages: [
{
role: "user",
content: {
type: "text",
text: `请创建一个${level}级别的${language}编程教程,主题是${topic}。`
}
}
]
})
);
// 禁用提示
tutorialPrompt.disable();
// 重新启用提示
tutorialPrompt.enable();
// 更新提示
tutorialPrompt.update({
name: "interactive-coding-tutorial",
description: "交互式编程教程提示",
argsSchema: {
language: z.string().describe("编程语言"),
topic: z.string().describe("教程主题"),
level: z.enum(["beginner", "intermediate", "advanced"]).describe("难度级别"),
includeExercises: z.boolean().default(true).describe("是否包含练习")
},
callback: async ({ language, topic, level, includeExercises }) => ({
messages: [
{
role: "user",
content: {
type: "text",
text: `请创建一个${level}级别的${language}编程教程,主题是${topic}。${
includeExercises ? "请包含实践练习。" : ""
}`
}
}
]
})
});
// 移除提示
tutorialPrompt.remove();
// 通知客户端提示列表已更改
server.sendPromptListChanged();
到此为止,我们已经学习了如何实现MCP服务器的三个核心功能:资源、工具和提示。这些功能共同构成了与LLM交互的完整接口。接下来,我们将探讨一些高级配置选项,使你的MCP服务器更加强大和灵活。
5. 高级配置
完成基本功能实现后,你可能需要一些高级配置来处理更复杂的场景、提高性能和安全性。在本节中,我们将探讨会话管理、事件存储、权限控制和错误处理等高级主题。
会话管理
MCP协议是有状态的,这意味着客户端与服务器之间建立会话并在多个请求中维持这种会话状态。高效的会话管理对于扩展服务和提供良好的用户体验至关重要。
会话创建与标识
在HTTP传输中,会话通过会话ID进行识别和管理:
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { randomUUID } from "node:crypto";
// 创建带会话ID生成器的传输
const transport = new StreamableHTTPServerTransport({
// 提供会话ID生成器函数
sessionIdGenerator: () => randomUUID(),
// 会话初始化回调
onsessioninitialized: (sessionId) => {
console.log(`会话初始化:${sessionId}`);
// 在这里可以进行会话初始化的其他操作
}
});
会话存储
为了管理多个活动会话,你可以使用会话存储:
import express from "express";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { randomUUID } from "node:crypto";
const app = express();
app.use(express.json());
// 存储活动会话
const sessions = new Map();
// 会话清理定时任务
setInterval(() => {
const now = new Date();
for (const [sessionId, session] of sessions.entries()) {
// 移除超过30分钟不活跃的会话
if (now.getTime() - session.lastActivity.getTime() > 30 * 60 * 1000) {
console.log(`移除不活跃会话: ${sessionId}`);
session.transport.close();
sessions.delete(sessionId);
}
}
}, 5 * 60 * 1000); // 每5分钟检查一次
// 处理MCP请求
app.post("/mcp", async (req, res) => {
const sessionId = req.headers["mcp-session-id"] as string;
if (sessionId && sessions.has(sessionId)) {
// 更新会话最后活动时间
sessions.get(sessionId)!.lastActivity = new Date();
// 使用现有传输处理请求
await sessions.get(sessionId)!.transport.handleRequest(req, res, req.body);
} else {
// 创建新会话
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID()
});
const server = new McpServer({
name: "session-manager-server",
version: "1.0.0"
});
await server.connect(transport);
// 处理请求
await transport.handleRequest(req, res, req.body);
// 如果生成了会话ID,则存储会话
if (transport.sessionId) {
sessions.set(transport.sessionId, {
transport,
server,
lastActivity: new Date()
});
// 设置关闭处理
transport.onclose = () => {
if (transport.sessionId) {
sessions.delete(transport.sessionId);
}
};
}
}
});
// 其他会话管理端点 (GET, DELETE等)
// ...
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`会话管理MCP服务器运行在端口 ${PORT}`);
});
会话恢复
断线重连时,客户端可以通过Last-Event-ID头部恢复会话状态:
app.get("/mcp", async (req, res) => {
const sessionId = req.headers["mcp-session-id"] as string;
const lastEventId = req.headers["last-event-id"] as string;
if (!sessionId || !sessions.has(sessionId)) {
res.status(400).send("无效或缺失的会话ID");
return;
}
const session = sessions.get(sessionId)!;
// 更新会话最后活动时间
session.lastActivity = new Date();
// 使用lastEventId处理SSE请求,支持恢复通知流
await session.transport.handleSSERequest(req, res, lastEventId);
});
事件存储
事件存储是会话恢复的基础,它保存通知事件,以便客户端重连时可以恢复错过的事件。
内存事件存储
对于简单场景,可以使用内存事件存储:
import { InMemoryEventStore } from "@modelcontextprotocol/sdk/shared/inMemoryEventStore.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
// 创建内存事件存储
const eventStore = new InMemoryEventStore({
// 设置事件过期时间(毫秒)
maxAgeMs: 30 * 60 * 1000, // 30分钟
// 设置每个会话最大事件数量
maxEventsPerSession: 1000
});
// 使用事件存储创建传输
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
eventStore
});
自定义持久化事件存储
对于生产环境,通常需要持久化事件存储,例如使用Redis或数据库:
import { EventStore, EventData } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import redis from "redis";
// 创建Redis客户端
const redisClient = redis.createClient({
url: process.env.REDIS_URL
});
await redisClient.connect();
// 实现Redis事件存储
class RedisEventStore implements EventStore {
// 存储事件
async storeEvent(sessionId: string, eventId: string, event: EventData): Promise {
// 设置事件存储到Redis,带过期时间
await redisClient.set(
`mcp:events:${sessionId}:${eventId}`,
JSON.stringify(event),
{ EX: 1800 } // 30分钟过期
);
// 更新会话的事件列表
await redisClient.lPush(`mcp:events:${sessionId}:list`, eventId);
await redisClient.expire(`mcp:events:${sessionId}:list`, 1800);
}
// 获取最近事件ID之后的所有事件
async getEventsSince(sessionId: string, lastEventId?: string): Promise
权限控制
在生产环境中,确保MCP服务器的安全性至关重要。MCP SDK提供了多种方式来实现权限控制。
基本认证
可以使用HTTP标准认证机制保护MCP服务器:
import express from "express";
import basicAuth from "express-basic-auth";
const app = express();
// 添加基本认证中间件
app.use(
"/mcp",
basicAuth({
users: { "admin": "secretpassword" },
challenge: true,
realm: "MCP Server"
})
);
// MCP路由...
app.all("/mcp", async (req, res) => {
// 处理MCP请求...
});
OAuth 2.0集成
对于更高级的认证,可以集成OAuth 2.0:
import express from "express";
import { auth } from "@modelcontextprotocol/sdk/server/auth/oauth.js";
const app = express();
// 配置OAuth中间件
const oauthMiddleware = auth.oauth({
// OAuth配置
issuerUrl: "https://auth.example.com",
clientId: "mcp-server-client-id",
audience: "https://api.example.com"
});
// 应用到MCP端点
app.use("/mcp", oauthMiddleware);
// MCP路由...
app.all("/mcp", async (req, res) => {
// req.user 现在包含已验证的用户信息
// 处理MCP请求...
});
资源和工具访问控制
可以根据用户角色或权限控制对特定资源和工具的访问:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "secure-server",
version: "1.0.0"
});
// 创建带权限检查的工具
server.tool(
"admin-tool",
"需要管理员权限的工具",
{
param: z.string()
},
async ({ param }, extra) => {
// 从请求上下文中获取用户信息
const user = extra.requestContext?.user;
// 检查权限
if (!user || !user.roles?.includes("admin")) {
throw new Error("需要管理员权限");
}
// 执行管理操作...
return {
content: [
{
type: "text",
text: `管理操作成功执行: ${param}`
}
]
};
}
);
// 在HTTP层设置请求上下文,传递给MCP服务器
app.all("/mcp", async (req, res) => {
// 假设我们已经验证了用户,并从某处获取了用户信息
const user = req.user;
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
// 设置请求上下文
requestContext: { user }
});
await server.connect(transport);
await transport.handleRequest(req, res, req.body);
});
错误处理
健壮的错误处理对于可靠的MCP服务器至关重要。MCP SDK提供了多种机制来处理和报告错误。
使用McpError
使用SDK提供的McpError类抛出标准错误:
import { McpServer, McpError, ErrorCode } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "error-handling-server",
version: "1.0.0"
});
server.tool(
"validate-data",
"验证输入数据",
{
data: z.string()
},
async ({ data }) => {
// 业务逻辑验证
if (data.length < 10) {
throw new McpError(
ErrorCode.InvalidParams,
"数据过短,至少需要10个字符"
);
}
if (data.includes("forbidden")) {
throw new McpError(
ErrorCode.InternalError,
"数据包含禁止的内容",
{ details: "检测到禁用词" } // 附加错误数据
);
}
return {
content: [
{
type: "text",
text: "数据验证通过"
}
]
};
}
);
错误日志和监控
记录和监控错误以便于排查问题:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
const server = new McpServer({
name: "monitored-server",
version: "1.0.0"
});
// 设置全局错误处理器
server.server.onerror = (error, method, params) => {
console.error(`处理${method}时出错:`, error);
// 这里可以发送错误到监控系统
sendErrorToMonitoring({
error,
method,
params,
timestamp: new Date().toISOString()
});
};
// 示例监控函数
function sendErrorToMonitoring(errorData) {
// 实现向监控系统发送错误的逻辑
// 例如 Sentry, ELK, CloudWatch 等
}
请求超时处理
设置请求超时以防止长时间阻塞:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
// 创建传输时设置超时
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
requestTimeoutMs: 30000 // 30秒超时
});
// 或者通过服务器选项全局设置
const server = new McpServer(
{
name: "timeout-aware-server",
version: "1.0.0"
},
{
// 设置默认请求超时
defaultRequestTimeoutMs: 30000
}
);
优雅地处理异步错误
处理工具回调中的异步错误:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { z } from "zod";
const server = new McpServer({
name: "async-error-server",
version: "1.0.0"
});
server.tool(
"fetch-external-data",
"从外部API获取数据",
{
endpoint: z.string().url()
},
async ({ endpoint }) => {
try {
// 使用超时控制fetch
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 5000);
const response = await fetch(endpoint, {
signal: controller.signal
});
clearTimeout(timeoutId);
if (!response.ok) {
throw new Error(`API返回错误: ${response.status}`);
}
const data = await response.text();
return {
content: [
{
type: "text",
text: `获取到的数据: ${data}`
}
]
};
} catch (error) {
// 处理不同类型的错误
if (error.name === 'AbortError') {
return {
content: [
{
type: "text",
text: "请求超时"
}
],
isError: true
};
}
// 其他错误
return {
content: [
{
type: "text",
text: `获取数据时出错: ${error.message}`
}
],
isError: true
};
}
}
);
通过合理配置会话管理、事件存储、权限控制和错误处理,你可以构建更加健壮、安全和可扩展的MCP服务器。这些高级配置对于生产环境部署尤为重要。接下来,我们将探讨不同的MCP服务器部署选项。
6. 部署选项
MCP服务器可以以多种方式部署,从简单的单节点到复杂的分布式架构。根据你的需求和资源,可以选择最适合的部署模式。
无状态模式
无状态模式是最简单的部署方式,适用于不需要维护会话状态的场景。在这种模式下,每个请求都被独立处理,无需关心之前的交互。
配置无状态模式
import express from "express";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
const app = express();
app.use(express.json());
// 创建MCP服务器
const server = new McpServer({
name: "stateless-server",
version: "1.0.0"
});
// 添加资源和工具
// ...
// 处理MCP请求 - 无状态模式
app.all("/mcp", async (req, res) => {
// 创建无状态传输(不生成会话ID)
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined // 禁用会话ID生成
});
await server.connect(transport);
await transport.handleRequest(req, res, req.body);
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`无状态MCP服务器运行在端口 ${PORT}`);
});
无状态模式的优势
- 简单性:不需要会话管理逻辑
- 可扩展性:可以轻松水平扩展,因为请求可以路由到任何服务器实例
- 资源效率:不需要存储会话状态,内存使用更高效
- 更少的维护开销:没有会话超时或清理问题
无状态模式的限制
- 无通知支持:无法发送服务器通知(如进度更新)
- 无交互状态:无法记住之前的请求或维护上下文
- 不支持长时间运行的操作:客户端无法断开连接后重新连接
持久存储模式
持久存储模式使用外部存储(如数据库或Redis)来保存会话状态,允许任何服务器实例处理来自同一客户端的请求。
实现持久存储模式
import express from "express";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { randomUUID } from "node:crypto";
import redis from "redis";
const app = express();
app.use(express.json());
// 创建Redis客户端
const redisClient = redis.createClient({
url: process.env.REDIS_URL
});
await redisClient.connect();
// 创建Redis事件存储
class RedisEventStore {
// 实现事件存储逻辑(如前文所述)
// ...
}
// 创建服务器工厂函数
const createServer = () => {
const server = new McpServer({
name: "persistent-storage-server",
version: "1.0.0"
});
// 添加资源和工具
// ...
return server;
};
// 处理MCP请求
app.all("/mcp", async (req, res) => {
// 创建使用Redis事件存储的传输
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
eventStore: new RedisEventStore()
});
// 从Redis获取会话状态(如果有)
const sessionId = req.headers["mcp-session-id"] as string;
if (sessionId) {
const sessionData = await redisClient.get(`mcp:session:${sessionId}`);
if (sessionData) {
// 恢复会话状态(如果需要)
// ...
}
}
// 连接到新服务器
const server = createServer();
await server.connect(transport);
// 处理请求
await transport.handleRequest(req, res, req.body);
// 如果生成了新会话ID,则存储会话信息
if (transport.sessionId) {
await redisClient.set(`mcp:session:${transport.sessionId}`, JSON.stringify({
created: new Date().toISOString(),
// 其他会话元数据
}));
// 设置过期时间
await redisClient.expire(`mcp:session:${transport.sessionId}`, 1800); // 30分钟
}
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`持久存储MCP服务器运行在端口 ${PORT}`);
});
持久存储模式的优势
- 水平扩展性:可以部署多个服务器实例,共享相同的存储
- 高可用性:即使某个实例故障,其他实例仍可处理请求
- 会话连续性:保持客户端会话状态,支持通知和交互式操作
- 负载均衡:可以在实例之间均衡分配负载
持久存储模式的挑战
- 存储依赖:依赖外部存储系统,增加了复杂性和潜在故障点
- 性能开销:每个请求可能需要多次存储访问
- 一致性管理:需要确保不同实例间的状态一致性
本地状态与消息路由
本地状态模式在每个服务器实例上维护状态,但使用消息路由确保同一会话的请求总是路由到相同的实例。这种方法结合了无状态和有状态模式的优点。
使用粘性会话实现
import express from "express";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { randomUUID } from "node:crypto";
import redis from "redis";
import cluster from "node:cluster";
import os from "node:os";
// 使用Node.js集群实现多进程
if (cluster.isPrimary) {
console.log(`主进程 ${process.pid} 正在运行`);
// 创建与CPU核心数量相同的工作进程
const numCPUs = os.cpus().length;
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker) => {
console.log(`工作进程 ${worker.process.pid} 已退出`);
// 重启工作进程
cluster.fork();
});
} else {
// 工作进程代码
const app = express();
app.use(express.json());
// 创建Redis客户端用于服务发现
const redisClient = redis.createClient({
url: process.env.REDIS_URL
});
await redisClient.connect();
// 会话映射表
const sessions = new Map();
// 生成唯一的工作进程ID
const workerId = `worker-${process.pid}`;
// 注册自己到服务发现
await redisClient.set(`mcp:worker:${workerId}`, JSON.stringify({
pid: process.pid,
started: new Date().toISOString(),
host: os.hostname()
}));
// 创建MCP服务器
const server = new McpServer({
name: "local-state-server",
version: "1.0.0"
});
// 添加资源和工具
// ...
// 处理MCP请求
app.all("/mcp", async (req, res) => {
const sessionId = req.headers["mcp-session-id"] as string;
if (sessionId) {
// 检查会话是否已分配给某个工作进程
const assignedWorker = await redisClient.get(`mcp:session-worker:${sessionId}`);
if (assignedWorker && assignedWorker !== workerId) {
// 会话属于另一个工作进程,返回重定向指令
return res.status(307).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Session belongs to another worker",
data: { workerId: assignedWorker }
},
id: null
});
}
// 会话属于当前工作进程或尚未分配
if (!assignedWorker) {
// 分配会话到当前工作进程
await redisClient.set(`mcp:session-worker:${sessionId}`, workerId);
await redisClient.expire(`mcp:session-worker:${sessionId}`, 1800); // 30分钟
}
// 从本地会话缓存获取传输
const transport = sessions.get(sessionId);
if (transport) {
await transport.handleRequest(req, res, req.body);
return;
}
}
// 创建新会话
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
});
transport.onclose = () => {
if (transport.sessionId) {
sessions.delete(transport.sessionId);
redisClient.del(`mcp:session-worker:${transport.sessionId}`).catch(console.error);
}
};
await server.connect(transport);
await transport.handleRequest(req, res, req.body);
// 存储新会话
if (transport.sessionId) {
sessions.set(transport.sessionId, transport);
await redisClient.set(`mcp:session-worker:${transport.sessionId}`, workerId);
await redisClient.expire(`mcp:session-worker:${transport.sessionId}`, 1800); // 30分钟
}
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`工作进程 ${process.pid} 的本地状态MCP服务器运行在端口 ${PORT}`);
});
}
使用负载均衡器
在实际部署中,可以使用负载均衡器(如Nginx、HAProxy或云服务提供商的负载均衡)实现粘性会话:
# Nginx配置示例
upstream mcp_servers {
hash $http_mcp_session_id consistent; # 使用会话ID的一致性哈希
server mcp1.example.com:3000;
server mcp2.example.com:3000;
server mcp3.example.com:3000;
}
server {
listen 80;
server_name mcp.example.com;
location /mcp {
proxy_pass http://mcp_servers;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 保留原始会话ID头部
proxy_set_header MCP-Session-ID $http_mcp_session_id;
}
}
本地状态与消息路由的优势
- 性能:无需在请求间序列化和反序列化状态
- 资源效率:避免了频繁的外部存储访问
- 简单性:无需复杂的状态同步逻辑
- 可扩展性:仍然可以水平扩展,只需确保正确路由
本地状态与消息路由的挑战
- 实例故障:如果实例故障,对应的会话将丢失
- 不均衡负载:可能导致某些实例负载过重
- 扩缩容复杂性:添加或移除实例时需要处理会话重新分配
选择哪种部署方式取决于你的应用需求、规模和复杂性。对于简单场景,无状态模式可能足够;对于需要强大会话支持但又要高可用性的场景,持久存储模式是更好的选择;而对于需要平衡性能和可扩展性的场景,本地状态与消息路由提供了一个很好的折中方案。
7. 打包发布
完成MCP服务器的开发后,可以将其打包为npm包,方便其他开发者使用。本节将介绍如何创建、测试和发布一个高质量的MCP服务器npm包。
创建npm包
首先,需要组织你的代码结构,使其适合作为npm包分发:
目录结构
my-mcp-server/
├── src/ # 源代码
│ ├── index.ts # 主入口
│ ├── server.ts # 服务器核心
│ ├── resources/ # 资源模块
│ ├── tools/ # 工具模块
│ └── prompts/ # 提示模块
├── dist/ # 编译后的代码
├── examples/ # 示例代码
├── tests/ # 测试文件
├── package.json # 包配置
├── tsconfig.json # TypeScript配置
└── README.md # 文档
入口文件
创建一个清晰的入口文件(src/index.ts),导出你的服务器及其功能:
// src/index.ts
export { createServer } from './server.js';
export * from './resources/index.js';
export * from './tools/index.js';
export * from './prompts/index.js';
// 重新导出必要的MCP类型
export {
McpServer,
ResourceTemplate,
ServerOptions,
CallToolResult,
ReadResourceResult,
GetPromptResult
} from '@modelcontextprotocol/sdk/server/mcp.js';
// 导出传输
export { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
export { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
主服务器文件
实现一个工厂函数来创建预配置的服务器(src/server.ts):
// src/server.ts
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { registerResources } from './resources/index.js';
import { registerTools } from './tools/index.js';
import { registerPrompts } from './prompts/index.js';
export interface ServerConfig {
name?: string;
version?: string;
logging?: boolean;
// 其他配置选项...
}
export function createServer(config: ServerConfig = {}) {
const server = new McpServer({
name: config.name || 'my-mcp-server',
version: config.version || '1.0.0'
}, {
capabilities: {
// 根据配置启用日志功能
...(config.logging ? { logging: {} } : {})
}
});
// 注册所有资源
registerResources(server);
// 注册所有工具
registerTools(server);
// 注册所有提示
registerPrompts(server);
return server;
}
配置package.json
更新package.json文件,包含必要的打包信息:
{
"name": "my-mcp-server",
"version": "1.0.0",
"description": "自定义MCP服务器实现",
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"files": [
"dist",
"README.md",
"LICENSE"
],
"scripts": {
"build": "tsc",
"test": "jest",
"prepublishOnly": "npm run build && npm test"
},
"exports": {
".": {
"import": "./dist/index.js",
"require": "./dist/cjs/index.js",
"types": "./dist/index.d.ts"
}
},
"keywords": [
"mcp",
"llm",
"ai",
"modelcontextprotocol"
],
"author": "你的名字",
"license": "MIT",
"peerDependencies": {
"@modelcontextprotocol/sdk": "^1.0.0"
},
"devDependencies": {
// 开发依赖...
},
"dependencies": {
// 运行时依赖...
}
}
发布到npm
准备好代码后,可以将包发布到npm仓库:
登录到npm
npm login
测试打包
在发布前,应该测试你的包能否正确安装和使用:
# 创建测试链接
npm link
# 在另一个项目中测试
cd ../test-project
npm link my-mcp-server
发布包
# 返回你的包目录
cd ../my-mcp-server
# 发布到npm
npm publish
如果只想发布给特定组织或使用私有仓库:
# 发布到组织范围
npm publish --access public
# 或发布到私有仓库
npm publish --registry=https://npm.example.com
版本管理
良好的版本管理对于npm包至关重要,遵循语义化版本(Semantic Versioning)原则:
- 主版本号:不兼容的API变更(例如:2.0.0)
- 次版本号:向后兼容的功能新增(例如:1.1.0)
- 修订号:向后兼容的问题修复(例如:1.0.1)
使用npm命令更新版本:
# 增加修订号 1.0.0 -> 1.0.1
npm version patch
# 增加次版本号 1.0.1 -> 1.1.0
npm version minor
# 增加主版本号 1.1.0 -> 2.0.0
npm version major
每次版本更新后,确保更新CHANGELOG.md文件,记录所有更改。
版本发布检查清单
在发布新版本前,确保完成以下步骤:
- 更新文档:确保README.md和其他文档与最新代码一致
- 运行测试:确保所有单元测试和集成测试通过
- 编译代码:确保TypeScript编译无错误
- 更新CHANGELOG:记录所有更改、新功能和修复
- 更新版本号:使用npm version命令更新版本
- 创建git标签:为新版本创建git标签(npm version会自动创建)
- 发布到npm:使用npm publish发布新版本
通过将你的MCP服务器发布为npm包,其他开发者可以轻松地将其集成到自己的项目中,为LLM提供特定域的上下文和功能。接下来,我们将看一些实际的MCP服务器示例实现。
8. 示例实现
本节提供几个实际的MCP服务器示例实现,从简单到复杂,帮助你理解如何应用前面章节中的概念。
示例1:基础文件系统MCP服务器
这个简单的MCP服务器提供对文件系统的访问,允许LLM读取、写入和列出文件。
主要文件
首先,创建主服务器文件(server.ts):
// server.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { fileResource } from "./resources/file.js";
import { directoryResource } from "./resources/directory.js";
import { writeFileTool } from "./tools/writeFile.js";
// 创建MCP服务器
const server = new McpServer({
name: "file-system-mcp",
version: "1.0.0"
});
// 注册资源
server.registerResource(fileResource);
server.registerResource(directoryResource);
// 注册工具
server.registerTool(writeFileTool);
// 创建传输
const transport = new StdioServerTransport();
// 连接服务器到传输
await server.connect(transport);
console.error("文件系统MCP服务器已启动");
资源实现
接下来,实现文件资源(resources/file.ts):
// resources/file.ts
import { readFile } from "fs/promises";
import { ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import path from "path";
export const fileResource: ResourceTemplate = {
// 资源名称
name: "file",
// 验证资源ID
validateId: async (id: string) => {
// 简单验证:确保ID不包含无效字符
return !id.includes("..") && !id.includes("\\") && id.length > 0;
},
// 读取资源
read: async (id: string) => {
try {
// 获取文件的绝对路径
const filePath = path.resolve(id);
// 读取文件内容
const content = await readFile(filePath, "utf-8");
return {
content,
metadata: {
path: filePath,
size: content.length,
type: path.extname(filePath).slice(1) || "txt"
}
};
} catch (error) {
throw new Error(`无法读取文件 "${id}": ${error.message}`);
}
}
};
然后,实现目录资源(resources/directory.ts):
// resources/directory.ts
import { readdir, stat } from "fs/promises";
import { ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import path from "path";
export const directoryResource: ResourceTemplate = {
// 资源名称
name: "directory",
// 验证资源ID
validateId: async (id: string) => {
// 简单验证:确保ID不包含无效字符
return !id.includes("..") && !id.includes("\\");
},
// 读取资源
read: async (id: string) => {
try {
// 获取目录的绝对路径
const dirPath = path.resolve(id || ".");
// 读取目录内容
const entries = await readdir(dirPath);
// 获取每个条目的信息
const contentsPromises = entries.map(async (entry) => {
const entryPath = path.join(dirPath, entry);
const stats = await stat(entryPath);
return {
name: entry,
path: entryPath,
isDirectory: stats.isDirectory(),
size: stats.size,
modifiedTime: stats.mtime.toISOString()
};
});
const contents = await Promise.all(contentsPromises);
return {
content: JSON.stringify(contents, null, 2),
metadata: {
path: dirPath,
entries: contents.length,
directories: contents.filter(e => e.isDirectory).length,
files: contents.filter(e => !e.isDirectory).length
}
};
} catch (error) {
throw new Error(`无法读取目录 "${id}": ${error.message}`);
}
}
};
工具实现
最后,实现写入文件工具(tools/writeFile.ts):
// tools/writeFile.ts
import { writeFile } from "fs/promises";
import { ToolTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import path from "path";
export const writeFileTool: ToolTemplate = {
// 工具名称
name: "writeFile",
// 工具描述
description: "将内容写入文件",
// 参数定义
parameters: {
type: "object",
required: ["path", "content"],
properties: {
path: {
type: "string",
description: "要写入的文件路径"
},
content: {
type: "string",
description: "要写入的内容"
},
append: {
type: "boolean",
description: "是否追加而不是覆盖",
default: false
}
}
},
// 执行工具
execute: async (params) => {
const { path: filePath, content, append } = params;
try {
// 获取文件的绝对路径
const absolutePath = path.resolve(filePath);
// 写入或追加内容
const options = { flag: append ? "a" : "w" };
await writeFile(absolutePath, content, options);
return {
success: true,
message: `成功${append ? "追加到" : "写入"}文件 "${filePath}"`,
path: absolutePath
};
} catch (error) {
throw new Error(`无法写入文件 "${filePath}": ${error.message}`);
}
}
};
使用方法
使用这个MCP服务器的命令如下:
# 编译TypeScript
npx tsc
# 运行MCP服务器
node dist/server.js
客户端可以使用这个服务器来:
- 读取文件(
file资源) - 列出目录内容(
directory资源) - 写入文件(
writeFile工具)
这个简单的示例展示了如何创建一个基本的MCP服务器,为LLM提供文件系统访问能力。
示例2:数据库查询MCP服务器
这个更复杂的示例展示如何创建一个MCP服务器,允许LLM查询和修改数据库。
服务器设置
首先,创建主服务器文件(server.ts):
// server.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import express from "express";
import { createDbConnection } from "./db.js";
import { registerSchemaResources } from "./resources/index.js";
import { registerQueryTools } from "./tools/index.js";
async function main() {
// 创建Express应用
const app = express();
app.use(express.json());
// 创建数据库连接
const db = await createDbConnection();
// 创建MCP服务器
const server = new McpServer({
name: "db-query-mcp",
version: "1.0.0"
});
// 注册资源和工具,传入数据库连接
registerSchemaResources(server, db);
registerQueryTools(server, db);
// 设置HTTP端点
app.all("/mcp", async (req, res) => {
// 为每个请求创建新的传输
const transport = new StreamableHTTPServerTransport();
// 连接MCP服务器到传输
await server.connect(transport);
// 处理HTTP请求
await transport.handleRequest(req, res, req.body);
});
// 启动服务器
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`数据库查询MCP服务器运行在端口 ${PORT}`);
});
}
main().catch(console.error);
数据库连接
接下来,设置数据库连接(db.ts):
// db.ts
import { Pool } from "pg";
export async function createDbConnection() {
const pool = new Pool({
host: process.env.DB_HOST || "localhost",
port: parseInt(process.env.DB_PORT || "5432"),
database: process.env.DB_NAME || "mydb",
user: process.env.DB_USER || "postgres",
password: process.env.DB_PASSWORD || "postgres"
});
// 测试连接
try {
const client = await pool.connect();
console.log("数据库连接成功");
client.release();
return pool;
} catch (error) {
console.error("数据库连接失败:", error);
throw error;
}
}
示例3:数据库模式和查询MCP服务器
这个示例展示了如何创建一个MCP服务器,允许LLM访问和操作数据库,同时包含适当的安全防护措施。
服务器设置
首先,创建主服务器文件(server.ts):
// server.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import express from "express";
import { createDbConnection } from "./db.js";
import { registerSchemaResources } from "./resources/index.js";
import { registerQueryTools } from "./tools/index.js";
async function main() {
// 创建Express应用
const app = express();
app.use(express.json());
// 创建数据库连接
const db = await createDbConnection();
// 创建MCP服务器
const server = new McpServer({
name: "db-schema-query-mcp",
version: "1.0.0"
});
// 注册资源和工具,传入数据库连接
registerSchemaResources(server, db);
registerQueryTools(server, db);
// 设置HTTP端点
app.all("/mcp", async (req, res) => {
// 为每个请求创建新的传输
const transport = new StreamableHTTPServerTransport();
// 连接MCP服务器到传输
await server.connect(transport);
// 处理HTTP请求
await transport.handleRequest(req, res, req.body);
});
// 启动服务器
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`数据库模式和查询MCP服务器运行在端口 ${PORT}`);
});
}
main().catch(console.error);
数据库连接
接下来,设置数据库连接(db.ts):
// db.ts
import { Pool } from "pg";
export async function createDbConnection() {
const pool = new Pool({
host: process.env.DB_HOST || "localhost",
port: parseInt(process.env.DB_PORT || "5432"),
database: process.env.DB_NAME || "mydb",
user: process.env.DB_USER || "postgres",
password: process.env.DB_PASSWORD || "postgres"
});
// 测试连接
try {
const client = await pool.connect();
console.log("数据库连接成功");
client.release();
return pool;
} catch (error) {
console.error("数据库连接失败:", error);
throw error;
}
}
资源实现
实现数据库模式资源(resources/index.ts和resources/schema.ts):
// resources/index.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { Pool } from "pg";
import { schemaResource } from "./schema.js";
import { tableResource } from "./table.js";
export function registerSchemaResources(server: McpServer, db: Pool) {
server.registerResource(schemaResource(db));
server.registerResource(tableResource(db));
}
// resources/schema.ts
import { ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import { Pool } from "pg";
export function schemaResource(db: Pool): ResourceTemplate {
return {
name: "db-schema",
validateId: async (id: string) => {
// 空ID表示请求整个数据库模式
return true;
},
read: async (id: string) => {
const client = await db.connect();
try {
// 获取所有表和视图
const tablesQuery = `
SELECT
table_schema,
table_name,
table_type
FROM
information_schema.tables
WHERE
table_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY
table_schema, table_name;
`;
const tablesResult = await client.query(tablesQuery);
// 获取表之间的关系(外键)
const relationshipsQuery = `
SELECT
tc.table_schema,
tc.constraint_name,
tc.table_name,
kcu.column_name,
ccu.table_schema AS foreign_table_schema,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM
information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY';
`;
const relationshipsResult = await client.query(relationshipsQuery);
// 构建数据库模式信息
const schema = {
tables: tablesResult.rows,
relationships: relationshipsResult.rows
};
return {
content: JSON.stringify(schema, null, 2),
metadata: {
tables: tablesResult.rowCount,
relationships: relationshipsResult.rowCount
}
};
} finally {
client.release();
}
}
};
}
// resources/table.ts
import { ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import { Pool } from "pg";
export function tableResource(db: Pool): ResourceTemplate {
return {
name: "db-table",
validateId: async (id: string) => {
// ID格式: schema.table_name
return id.includes(".") && id.split(".").length === 2;
},
read: async (id: string) => {
const [schema, tableName] = id.split(".");
const client = await db.connect();
try {
// 获取表结构
const structureQuery = `
SELECT
column_name,
data_type,
is_nullable,
column_default
FROM
information_schema.columns
WHERE
table_schema = $1 AND table_name = $2
ORDER BY
ordinal_position;
`;
const structureResult = await client.query(structureQuery, [schema, tableName]);
// 获取表约束
const constraintsQuery = `
SELECT
tc.constraint_name,
tc.constraint_type,
kcu.column_name
FROM
information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
WHERE
tc.table_schema = $1 AND tc.table_name = $2;
`;
const constraintsResult = await client.query(constraintsQuery, [schema, tableName]);
// 获取前10行数据样本
const sampleDataQuery = `
SELECT * FROM "${schema}"."${tableName}" LIMIT 10;
`;
const sampleDataResult = await client.query(sampleDataQuery);
// 构建表信息
const tableInfo = {
name: tableName,
schema: schema,
columns: structureResult.rows,
constraints: constraintsResult.rows,
sampleData: sampleDataResult.rows
};
return {
content: JSON.stringify(tableInfo, null, 2),
metadata: {
columns: structureResult.rowCount,
constraints: constraintsResult.rowCount,
sampleSize: sampleDataResult.rowCount
}
};
} finally {
client.release();
}
}
};
}
工具实现
最后,实现数据库查询工具(tools/index.ts和tools/query.ts):
// tools/index.ts
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { Pool } from "pg";
import { queryTool } from "./query.js";
export function registerQueryTools(server: McpServer, db: Pool) {
server.registerTool(queryTool(db));
}
// tools/query.ts
import { ToolTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import { Pool } from "pg";
export function queryTool(db: Pool): ToolTemplate {
return {
name: "db-query",
description: "执行SQL查询并返回结果",
parameters: {
type: "object",
required: ["sql"],
properties: {
sql: {
type: "string",
description: "要执行的SQL查询"
},
params: {
type: "array",
items: {
type: "string"
},
description: "查询参数(用于防止SQL注入)"
}
}
},
execute: async (params) => {
const { sql, params: queryParams = [] } = params;
const client = await db.connect();
try {
// 检查是否是只读查询
const isReadOnly = sql.trim().toLowerCase().startsWith("select");
if (!isReadOnly) {
// 可以增加额外的安全检查,例如只允许特定角色执行写操作
}
const result = await client.query(sql, queryParams);
return {
success: true,
rowCount: result.rowCount,
rows: result.rows,
fields: result.fields.map(f => ({
name: f.name,
dataTypeID: f.dataTypeID
}))
};
} catch (error) {
return {
success: false,
error: error.message
};
} finally {
client.release();
}
}
};
}
使用方法
使用这个MCP服务器的命令如下:
# 设置数据库环境变量
export DB_HOST=localhost
export DB_PORT=5432
export DB_NAME=mydb
export DB_USER=postgres
export DB_PASSWORD=mysecretpassword
# 编译TypeScript
npx tsc
# 运行MCP服务器
node dist/server.js
客户端可以使用这个服务器来:
- 查看数据库模式(
db-schema资源) - 检查表结构和样本数据(
db-table资源) - 执行SQL查询(
db-query工具)
这个更复杂的示例展示了如何创建一个MCP服务器,使LLM能够访问和操作数据库,同时包含适当的安全防护措施。
9. 排错指南
在开发和部署MCP服务器的过程中,你可能会遇到各种问题。本节将帮助你诊断和解决常见问题。
连接问题
客户端无法连接到服务器
如果客户端无法连接到MCP服务器,请检查以下几点:
- 服务器是否正在运行? - 确保MCP服务器进程正在运行,并检查日志输出。
- 网络连接 - 确保客户端可以访问服务器的网络地址和端口。
- 防火墙设置 - 检查防火墙是否阻止了连接。
- 正确的端点 - 验证客户端是否使用了正确的URL端点(例如
http://yourserver:3000/mcp)。 - 传输配置 - 确保客户端和服务器使用相同的传输协议(HTTP或标准输入/输出)。
HTTP 4xx 或 5xx 错误
如果收到HTTP错误,可能意味着:
- 400 Bad Request - 客户端发送了无效的MCP请求格式。
- 401 Unauthorized - 需要认证。检查认证头部。
- 403 Forbidden - 客户端没有权限执行请求的操作。
- 404 Not Found - 服务器端点不存在,检查URL。
- 500 Internal Server Error - 服务器内部错误,检查服务器日志。
资源和工具问题
资源不可用
如果尝试读取资源时收到错误,请检查:
- 资源注册 - 确保已正确注册资源模板。
- 资源ID - 验证资源ID是否有效且正确。
- 权限 - 确保客户端有权限访问该资源。
- 实现错误 - 检查资源的
read方法是否有错误,可以临时添加日志来调试。
工具执行失败
如果工具执行失败,请检查:
- 工具注册 - 确保已正确注册工具模板。
- 参数验证 - 确保传递的参数符合工具模板的
parameters定义。 - 错误处理 - 检查工具的
execute方法中的错误处理逻辑。 - 外部依赖 - 如果工具依赖外部服务(如数据库),确保这些服务可访问。
会话管理问题
会话状态丢失
如果会话状态在请求之间丢失,可能的原因包括:
- 无状态模式 - 确认你的服务器是否配置为维护会话状态。
- 会话ID处理 - 验证客户端是否正确地传递和处理
MCP-Session-ID头部。 - 会话超时 - 检查会话是否已超时并被清理。
- 负载均衡问题 - 在多实例部署中,确保请求正确地路由到保存会话的实例。
通知问题
如果客户端未收到或处理通知,请检查:
- 事件存储 - 确保事件存储正确配置且工作正常。
- SSE连接 - 验证Server-Sent Events连接是否正确建立和维护。
- 事件发送 - 在服务器代码中检查事件是否正确发送。
- 客户端处理 - 确保客户端正确地接收和处理SSE事件。
性能问题
高延迟
如果服务器响应缓慢,请考虑:
- 资源消耗 - 监控CPU和内存使用情况,检查是否有资源瓶颈。
- 外部服务延迟 - 如果依赖外部服务,这些服务可能成为瓶颈。
- 日志记录 - 过多的日志记录可能影响性能。
- 连接池 - 对于数据库或其他资源,确保连接池配置得当。
- 长操作优化 - 考虑对长时间运行的操作使用异步处理或分页。
内存泄漏
如果服务器内存使用随时间增长,可能存在内存泄漏:
- 会话清理 - 确保不活跃的会话和相关资源被正确清理。
- 事件存储 - 检查事件存储是否正确实现过期策略。
- 缓存管理 - 如果使用缓存,确保缓存有适当的大小限制和过期策略。
- 资源关闭 - 确保正确关闭和释放不再需要的资源,如数据库连接。
调试策略
启用详细日志
MCP SDK提供了日志功能,可通过以下方式启用:
const server = new McpServer({
name: "my-mcp-server",
version: "1.0.0"
}, {
capabilities: {
logging: {
level: "debug" // 或 "info", "warn", "error"
}
}
});
使用调试工具
可以使用Node.js调试工具来调试服务器:
- Node.js inspect -
node --inspect server.js,然后使用Chrome DevTools连接。 - VS Code调试 - 配置VS Code的调试设置以调试Node.js应用。
- 日志记录 - 在关键点添加
console.log或使用日志库如Winston。
请求/响应检查
使用代理工具检查HTTP请求和响应:
- Postman - 测试HTTP端点并检查响应。
- curl - 使用命令行测试请求,例如:
curl -X POST -H "Content-Type: application/json" -d '...' http://localhost:3000/mcp - 网络监控 - 使用浏览器开发工具的网络面板监控请求和响应。
通过系统地检查和排除这些常见问题区域,你应该能够识别和解决大多数MCP服务器相关的问题。对于更复杂的问题,可能需要查阅更详细的文档或寻求社区支持。
10. 参考资源
为了深入学习和掌握MCP服务器开发,以下资源将非常有用:
官方文档和资源
- Model Context Protocol GitHub仓库 - MCP的官方实现和文档。
- MCP SDK文档 - 详细的SDK使用指南。
- MCP示例代码 - 各种用例的实际示例。
相关技术文档
- Node.js文档 - 深入了解Node.js运行时和API。
- TypeScript文档 - TypeScript语言和类型系统的完整指南。
- Express.js文档 - 用于构建HTTP服务器的流行Node.js框架。
- Server-Sent Events (SSE) - MDN关于SSE的文档,用于实时通知。
API和协议规范
- MCP协议规范 - MCP协议的详细技术规范。
- JSON Schema文档 - 理解用于参数验证的JSON Schema。
- JSON-RPC 2.0规范 - MCP基于的远程过程调用协议。
安全和最佳实践
- OWASP Top 10 - Web应用安全风险和防护措施。
- OWASP Cheat Sheet Series - 特定安全场景的实用指南。
- Twelve-Factor App - 构建可伸缩服务的方法论。
社区和支持
- Anthropic Discord社区 - 与其他开发者交流和获取帮助。
- Stack Overflow MCP标签 - 技术问题和答案。
- GitHub Issues - 报告问题或请求功能。
相关LLM和AI工具
- Claude API文档 - Claude模型的使用指南。
- OpenAI API文档 - 与OpenAI模型集成。
- LangChain文档 - 用于构建LLM应用的框架。
书籍和进阶学习
- 《Designing Web APIs》 - API设计的原则和最佳实践。
- 《Building Microservices》 - 微服务架构和部署策略。
- 《Effective TypeScript》 - 掌握TypeScript的高级指南。
通过充分利用这些资源,你可以不断提升你的MCP服务器开发技能,构建更加强大、安全和高效的应用程序。