基于python的MCP使用简单教程

MCP是一种新型的agent通信协议,其突出贡献是统一了各模型厂商的function call各不相同的局面。这里基于windows11 和python 3.11环境对MCP的使用作简单示例(模型调用的是类openai API的格式,现在各大厂商一般都支持)。 接下来让我们5分钟上手mcp:

功能说明:

该MCP client以python命令行形式交互,支持一次query调用多个工具及分步调用多个工具(REACT),建议使用deepseek V3 0324版本API,效果体验更佳~

一、新建环境

安装annaconda(ps:官方uv安装一堆bug,这里就用conda替代了)

conda create -n mcp python==3.11

conda activate mcp

pip install pqi

pqi use tuna

pip install httpx fastmcp asyncio python-dotenv openai mcp loguru

新建mcp-test文件夹,并cd到其中。

二、mcp-test文件夹中放入这三个文件:

1、客户端文件

client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import asyncio
from typing import Optional
from contextlib import AsyncExitStack
import os
import json
from loguru import logger

from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

from openai import OpenAI
from dotenv import load_dotenv

# 从 .env 文件中加载环境变量
load_dotenv()

# 获取必要的环境变量
api_key = os.environ["DS_API_KEY"]
base_url = os.environ["DS_API_BASE"]
model_name = os.environ["API_MODEL_NAME"]
max_tool_calls_allowed = 5 # 每轮对话中允许最多调用 5 次工具

logger.debug("FastMCP 服务器启动中...")

class MCPClient:
def __init__(self):
# 初始化 session 和客户端对象
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()

# 初始化 OpenAI 客户端
self.openai = OpenAI(api_key=api_key, base_url=base_url)

async def connect_to_server(self, server_script_path: str):
"""连接到 MCP 服务器

参数:
server_script_path: 服务器脚本路径 (.py 或 .js 文件)
"""
is_python = server_script_path.endswith(".py")
is_js = server_script_path.endswith(".js")
if not (is_python or is_js):
raise ValueError("服务器脚本必须是 .py 或 .js 文件")

command = "python" if is_python else "node"
server_params = StdioServerParameters(
command=command, args=[server_script_path], env=None
)

# 启动子进程并建立标准输入输出通信
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)

# 初始化客户端 session
await self.session.initialize()

# 列出可用工具
response = await self.session.list_tools()
tools = response.tools
print(
"\n成功连接到服务器,检测到的工具:",
[[tool.name, tool.description, tool.inputSchema] for tool in tools],
)

async def process_query(self, query: str) -> str:
"""处理用户查询,支持多轮工具调用"""
messages = [{"role": "user", "content": query}]

# 获取当前可用工具
response = await self.session.list_tools()
available_tools = [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": getattr(tool, "inputSchema", {}),
},
}
for tool in response.tools
]

# 开始对话循环
current_tool_calls_count = 0
while True: # 不限制总调用次数,但会受到 max_tool_calls_allowed 控制
# 调用模型生成回复
model_response = self.openai.chat.completions.create(
model=model_name,
messages=messages,
tools=available_tools,
max_tokens=1000
)

assistant_message = model_response.choices[0].message
logger.debug(f"助手返回消息: {assistant_message}")

# 将助手回复加入对话消息中
messages.append({
"role": "assistant",
"content": assistant_message.content or "",
"tool_calls": getattr(assistant_message, "tool_calls", None)
})

# 判断是否需要调用工具
if not hasattr(assistant_message, "tool_calls") or not assistant_message.tool_calls or max_tool_calls_allowed <= current_tool_calls_count:
# 无需调用工具,直接返回最终回复
return assistant_message.content or ""

# 当前轮处理所有工具调用
for tool_call in assistant_message.tool_calls:
try:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)

logger.debug(f"执行工具: {tool_name},参数: {tool_args}")
result = await self.session.call_tool(tool_name, tool_args)
logger.debug(f"工具返回结果: {result}")

# 保证结果是字符串
if isinstance(result, bytes):
result = result.decode('utf-8', errors='replace')
elif not isinstance(result, str):
result = str(result)

# 工具调用结果加入对话
messages.append({
"role": "tool",
"content": result,
"tool_call_id": tool_call.id
})

except Exception as e:
error_msg = f"工具调用失败: {str(e)}"
logger.error(error_msg)
messages.append({
"role": "tool",
"content": f"Error: {str(e)}",
"tool_call_id": tool_call.id
})

current_tool_calls_count += 1
if current_tool_calls_count >= max_tool_calls_allowed:
logger.warning("工具调用次数过多,停止调用。")

# 循环继续,根据模型判断是否继续调用工具

async def chat_loop(self):
"""运行交互式聊天循环"""
print("\nMCP 客户端已启动!")
print("输入你的问题,或输入 'quit' 退出。")

while True:
try:
query = input("\nQuery: ").strip()

if query.lower() == "quit":
break

response = await self.process_query(query)
print("\n" + response)

except Exception as e:
logger.exception("聊天循环中出错")
print(f"\n出错了: {str(e)}")

async def cleanup(self):
"""清理资源"""
await self.exit_stack.aclose()


async def main():
if len(sys.argv) < 2:
print("用法: python client.py <服务器脚本路径>")
sys.exit(1)

client = MCPClient()
try:
await client.connect_to_server(sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()


if __name__ == "__main__":
import sys

asyncio.run(main())

2、服务端文件

tools.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP
from loguru import logger

# 初始化 FastMCP 服务器
mcp = FastMCP("tools")
logger.debug("FastMCP 服务器启动中...")

@mcp.tool()
async def count_letters(word: str, letter: str) -> str:
"""统计单词中特定字母出现的次数。

参数:
word: 要分析的单词
letter: 要统计的字母(单个字符)
"""
# 确保 letter 是单个字符
if len(letter) != 1:
return "请提供单个字母进行统计"

# 将输入转换为小写以忽略大小写差异
word = word.lower()
letter = letter.lower()

# 统计字母出现次数
count = word.count(letter)

return f"单词 '{word}' 中有 {count} 个 '{letter}'"

@mcp.tool()
async def compare_expressions(expr1: str, expr2: str) -> str:
"""比较两个数字或数学表达式的大小。

支持数字的四则运算(+,-,*,/),可以比较类似"99-1"和"98"这样的表达式大小。

参数:
expr1: 第一个表达式,如 "99", "2*3+4"
expr2: 第二个表达式,如 "98", "10*2"
"""
try:
# 计算第一个表达式的值
value1 = eval(expr1)

# 计算第二个表达式的值
value2 = eval(expr2)

# 比较结果
if value1 > value2:
return f"表达式 \"{expr1}\" (计算结果: {value1}) 比 \"{expr2}\" (计算结果: {value2}) 大"
elif value1 < value2:
return f"表达式 \"{expr1}\" (计算结果: {value1}) 比 \"{expr2}\" (计算结果: {value2}) 小"
else:
return f"表达式 \"{expr1}\" 和 \"{expr2}\" 相等 (都等于 {value1})"

except Exception as e:
# 处理计算错误
return f"计算错误: {str(e)}。请确保输入的是有效的数学表达式。"

@mcp.tool()
async def get_current_date_time() -> str:
"""获取当前时间和日期。"""
import datetime
now = datetime.datetime.now()
return f'当前时间为:{now.strftime("%Y-%m-%d %H:%M:%S")}'

@mcp.tool()
async def save_to_file(content) -> str:
"""将内容保存到文件中。

参数:
content: 要保存的内容
"""
try:
with open('data.txt', 'w') as f:
f.write(content)
return f"内容已保存到文件 '{'data.txt'}'"
except Exception as e:
return f"保存文件时出现错误: {e}"

if __name__ == "__main__":
# 初始化并运行服务器
mcp.run(transport='stdio')

3、配置文件

.env

1
2
3
DS_API_KEY=xxx  #根据你的api key填写
DS_API_BASE=https://xxx/api/v1 #根据你请求的具体地址填写
API_MODEL_NAME=xxx

三、运行带服务的mcp客户端

在mcp-test路径下执行:

1
python client.py tools.py

服务开始运行,输入你的指令,如

query:当前时间是?

image-20250322214756793

query:比较4.88+1*22与1-3%2谁更大,将大的结果存储到文件中

PixPin_2025-04-12_11-45-32

好了,以上你已经入门MCP了,更加详细的内容可以参考官方博客: https://modelcontextprotocol.info/docs/introduction/


基于python的MCP使用简单教程
https://linxkon.github.io/基于python的MCP使用简单教程.html
作者
linxkon
发布于
2025年3月22日
许可协议