AI Agent
2741 字约 9 分钟
2026-05-20
Agent 是当前 AI 应用的前沿方向。如果说 RAG 是给模型"装上眼睛",Agent 就是给模型"装上手"——让它自主规划、调用工具,完成复杂的多步任务。
2025年是 Agent 从实验走向生产的关键一年。
1. 什么是 Agent
1.1 核心组成
┌─────────────────────────────────────┐
│ AI Agent │
│ ┌─────────┐ ┌──────────────────┐ │
│ │ 大模型 │ │ 工具集 │ │
│ │ (大脑) │ │ 搜索/代码/API... │ │
│ └────┬────┘ └─────────┬────────┘ │
│ │ │ │
│ ┌────▼─────────────────▼────────┐ │
│ │ 记忆系统 │ │
│ │ 短期(对话历史)+ 长期(向量库)│ │
│ └───────────────────────────────┘ │
└─────────────────────────────────────┘- 大模型:负责推理、规划、决策("大脑")
- 工具集:可以调用的函数,让模型能影响外部世界("手")
- 记忆:维护上下文状态,支持长时间任务
- 感知-规划-行动循环:不断迭代直到完成目标
1.2 Agent vs 普通 LLM 应用
| 普通 LLM 调用 | Agent | |
|---|---|---|
| 交互方式 | 单轮或固定多轮 | 动态迭代循环 |
| 工具使用 | 无 | 可调用外部工具 |
| 目标 | 回答问题 | 完成任务 |
| 自主性 | 低 | 高(自主规划步骤) |
| 适合 | 问答、生成 | 代码执行、数据处理、自动化 |
2. ReAct(推理 + 行动)
2.1 核心模式
Reason(推理)→ Act(行动)→ Observe(观察)→ 循环
Question: 特斯拉当前市值是多少?与苹果市值的比例是多少?
Thought: 我需要查询两个公司的市值,然后计算比例。
Action: search("Tesla TSLA market cap 2025")
Observation: 特斯拉市值约为 7800 亿美元(2025年5月)
Thought: 好,现在需要查苹果市值。
Action: search("Apple AAPL market cap 2025")
Observation: 苹果市值约为 3.2 万亿美元
Thought: 现在计算比例:7800 / 32000 = 0.244
Action: calculate("7800 / 32000")
Observation: 0.24375
Answer: 特斯拉当前市值约为苹果的 24.4%,约为 7800 亿美元 vs 苹果的 3.2 万亿美元。2.2 完整实现
import anthropic
import json
import re
from typing import Callable
client = anthropic.Anthropic()
# 定义工具
TOOLS = [
{
"name": "web_search",
"description": "搜索网络获取最新信息。用于查询实时数据、新闻、价格等。",
"input_schema": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "搜索查询词"}
},
"required": ["query"]
}
},
{
"name": "python_eval",
"description": "执行 Python 代码进行计算。返回 print() 的输出或表达式结果。",
"input_schema": {
"type": "object",
"properties": {
"code": {"type": "string", "description": "要执行的 Python 代码"}
},
"required": ["code"]
}
},
{
"name": "read_file",
"description": "读取本地文件内容。",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"}
},
"required": ["path"]
}
},
{
"name": "write_file",
"description": "将内容写入本地文件。",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "文件路径"},
"content": {"type": "string", "description": "要写入的内容"}
},
"required": ["path", "content"]
}
}
]
def execute_tool(name: str, inputs: dict) -> str:
"""执行工具,返回结果字符串"""
if name == "web_search":
# 实际项目中接入真实搜索 API(Serper, Brave, Tavily 等)
# 这里用 Mock
query = inputs["query"]
return f"[模拟搜索结果] 关于'{query}'的搜索结果:..."
elif name == "python_eval":
import io, contextlib
output = io.StringIO()
try:
with contextlib.redirect_stdout(output):
exec(inputs["code"], {"__builtins__": __builtins__})
result = output.getvalue()
if not result:
# 尝试 eval
try:
result = str(eval(inputs["code"]))
except:
result = "代码执行成功(无输出)"
return result.strip()
except Exception as e:
return f"错误: {type(e).__name__}: {str(e)}"
elif name == "read_file":
try:
with open(inputs["path"], "r", encoding="utf-8") as f:
content = f.read()
return content[:5000] # 限制返回长度
except FileNotFoundError:
return f"错误:文件 {inputs['path']} 不存在"
except Exception as e:
return f"读取失败:{str(e)}"
elif name == "write_file":
try:
with open(inputs["path"], "w", encoding="utf-8") as f:
f.write(inputs["content"])
return f"成功写入 {len(inputs['content'])} 字符到 {inputs['path']}"
except Exception as e:
return f"写入失败:{str(e)}"
return f"未知工具:{name}"
class Agent:
def __init__(
self,
system: str = "",
model: str = "claude-sonnet-4-6",
max_iterations: int = 15,
verbose: bool = True
):
self.system = system or """你是一个能力强大的 AI 助手,可以使用工具来完成任务。
原则:
1. 分析任务,制定解决步骤
2. 合理使用工具获取信息或执行操作
3. 验证结果的正确性
4. 如果遇到错误,分析原因并调整策略
5. 任务完成后给出清晰的总结"""
self.model = model
self.max_iterations = max_iterations
self.verbose = verbose
self.history = []
def run(self, task: str) -> str:
"""运行 Agent 完成任务"""
self.history = [{"role": "user", "content": task}]
if self.verbose:
print(f"\n{'='*60}")
print(f"任务:{task}")
print('='*60)
for iteration in range(self.max_iterations):
response = client.messages.create(
model=self.model,
max_tokens=4096,
system=self.system,
tools=TOOLS,
messages=self.history
)
if self.verbose:
print(f"\n[迭代 {iteration+1}/{self.max_iterations}] "
f"stop_reason: {response.stop_reason}")
# 将模型响应加入历史
self.history.append({
"role": "assistant",
"content": response.content
})
# 任务完成
if response.stop_reason == "end_turn":
# 提取文本回复
final_text = ""
for block in response.content:
if hasattr(block, "text"):
final_text += block.text
if self.verbose:
print(f"\n{'='*60}")
print("任务完成!")
print(f"最终回复:{final_text[:200]}...")
return final_text
# 执行工具调用
elif response.stop_reason == "tool_use":
tool_results = []
for block in response.content:
if block.type == "tool_use":
if self.verbose:
print(f" → 调用 {block.name}: {json.dumps(block.input, ensure_ascii=False)[:100]}")
result = execute_tool(block.name, block.input)
if self.verbose:
print(f" ← 结果: {str(result)[:200]}")
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(result)
})
self.history.append({
"role": "user",
"content": tool_results
})
else:
break
return "达到最大迭代次数,任务未完成。"
# 使用
agent = Agent(verbose=True)
result = agent.run("计算 1 到 100 所有奇数的平方和")3. 多 Agent 系统
3.1 为什么需要多 Agent?
单个 Agent 的局限:
- 上下文窗口有限(复杂任务超出限制)
- 一个 Agent 不可能精通所有领域
- 并行处理能力受限
多 Agent 系统让专业化的 Agent 各司其职,协同完成复杂任务。
3.2 Orchestrator-Worker 模式
class OrchestratorAgent:
"""调度员 Agent:分解任务,分配给专业 Agent"""
def __init__(self):
self.workers = {
"researcher": ResearchAgent(),
"analyst": AnalysisAgent(),
"writer": WriterAgent()
}
def decompose_task(self, task: str) -> list[dict]:
"""让 LLM 将任务分解为子任务"""
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"""将以下任务分解为有序的子任务列表。
以 JSON 格式输出:
{{"tasks": [{{"id": 1, "agent": "researcher/analyst/writer", "description": "...", "depends_on": []}}]}}
任务:{task}"""
}]
)
return json.loads(response.content[0].text)["tasks"]
def run(self, task: str) -> str:
"""执行整个任务流程"""
subtasks = self.decompose_task(task)
results = {}
for subtask in subtasks:
# 等待依赖项完成
context = {
"subtask": subtask["description"],
"previous_results": {k: v for k, v in results.items()
if k in subtask.get("depends_on", [])}
}
agent = self.workers.get(subtask["agent"])
if agent:
results[subtask["id"]] = agent.run(context)
# 最终整合
return self._synthesize(task, results)
def _synthesize(self, original_task: str, results: dict) -> str:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"""将以下子任务结果整合为最终答案:
原始任务:{original_task}
子任务结果:{json.dumps(results, ensure_ascii=False, indent=2)}
请生成完整、连贯的最终报告。"""
}]
)
return response.content[0].text3.3 并行 Agent
import asyncio
from concurrent.futures import ThreadPoolExecutor
def run_agent_parallel(tasks: list[str]) -> list[str]:
"""并行运行多个 Agent(IO密集型可以用线程池)"""
with ThreadPoolExecutor(max_workers=5) as executor:
agent = Agent(verbose=False)
futures = [executor.submit(agent.run, task) for task in tasks]
results = [f.result() for f in futures]
return results
# 示例:同时分析多个文档
documents = ["文档1内容...", "文档2内容...", "文档3内容..."]
tasks = [f"总结以下文档的要点:{doc}" for doc in documents]
summaries = run_agent_parallel(tasks)4. MCP(Model Context Protocol)
4.1 什么是 MCP
Anthropic 推出的开放协议,标准化 AI 模型和外部工具/数据源的连接方式。
类比:MCP 对 AI 应用就像 USB 对电脑——统一接口,任何工具都能即插即用,无需为每个工具写定制代码。
Claude(MCP Client)
↕ JSON-RPC over stdio/HTTP
MCP Server(工具提供方)
↕ 原生 API/SDK
实际工具(文件系统、数据库、GitHub...)4.2 现有 MCP Server
// Claude Desktop 配置(~/.claude/settings.json)
{
"mcpServers": {
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/path/to/allowed/dir"]
},
"github": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-github"],
"env": {"GITHUB_PERSONAL_ACCESS_TOKEN": "your-token"}
},
"postgres": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-postgres", "postgresql://localhost/mydb"]
},
"brave-search": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-brave-search"],
"env": {"BRAVE_API_KEY": "your-key"}
}
}
}4.3 自定义 MCP Server
# 用 Python SDK 创建自定义 MCP Server
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.types import Tool, TextContent
import mcp.server.stdio
app = Server("my-custom-server")
@app.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="query_database",
description="查询内部数据库获取业务数据",
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SQL 查询语句"},
"limit": {"type": "integer", "default": 100}
},
"required": ["sql"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "query_database":
# 执行数据库查询
import asyncpg
conn = await asyncpg.connect("postgresql://localhost/mydb")
rows = await conn.fetch(arguments["sql"])
await conn.close()
result = [dict(row) for row in rows[:arguments.get("limit", 100)]]
return [TextContent(type="text", text=json.dumps(result, ensure_ascii=False))]
raise ValueError(f"未知工具: {name}")
# 运行 Server
async def main():
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, InitializationOptions())
if __name__ == "__main__":
import asyncio
asyncio.run(main())5. Agent 记忆系统
5.1 短期记忆(对话历史)
有限的上下文窗口需要主动管理:
class ConversationMemory:
def __init__(self, max_tokens: int = 50000):
self.messages = []
self.max_tokens = max_tokens
def add(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
self._trim_if_needed()
def _trim_if_needed(self):
"""如果历史太长,压缩旧对话"""
total = sum(len(m["content"]) // 2 for m in self.messages)
if total > self.max_tokens and len(self.messages) > 6:
# 保留最新的6条,压缩旧的
old_messages = self.messages[:-6]
recent_messages = self.messages[-6:]
summary = self._summarize(old_messages)
self.messages = [
{"role": "system", "content": f"[历史摘要] {summary}"},
*recent_messages
]
def _summarize(self, messages: list) -> str:
content = "\n".join(f"{m['role']}: {m['content'][:200]}" for m in messages)
response = client.messages.create(
model="claude-haiku-4-5",
max_tokens=300,
messages=[{
"role": "user",
"content": f"将以下对话压缩为100字以内的摘要,保留关键信息:\n{content}"
}]
)
return response.content[0].text5.2 长期记忆(向量数据库)
import chromadb
class LongTermMemory:
def __init__(self):
self.client = chromadb.PersistentClient(path="./agent_memory")
self.collection = self.client.get_or_create_collection("agent_facts")
def remember(self, content: str, category: str = "general"):
"""存储重要信息"""
import hashlib
doc_id = hashlib.md5(content.encode()).hexdigest()[:8]
self.collection.upsert(
documents=[content],
metadatas=[{"category": category, "timestamp": str(import_time())}],
ids=[doc_id]
)
def recall(self, query: str, n: int = 5) -> list[str]:
"""检索相关记忆"""
results = self.collection.query(
query_texts=[query],
n_results=n,
include=["documents"]
)
return results["documents"][0] if results["documents"] else []
def inject_relevant_memories(self, query: str) -> str:
"""将相关记忆注入到 Prompt 中"""
memories = self.recall(query)
if not memories:
return ""
return "相关记忆:\n" + "\n".join(f"- {m}" for m in memories)
def import_time():
from datetime import datetime
return datetime.now().isoformat()6. Agent 实践经验
6.1 常见陷阱
无限循环:Agent 可能陷入循环,反复尝试同样的操作
# 检测循环
class LoopDetector:
def __init__(self, max_same_action: int = 3):
self.action_history = []
self.max_same = max_same_action
def check(self, tool_name: str, inputs: dict) -> bool:
action_key = f"{tool_name}:{json.dumps(inputs, sort_keys=True)}"
same_count = self.action_history.count(action_key)
if same_count >= self.max_same:
return True # 检测到循环
self.action_history.append(action_key)
return False工具描述不够清晰:模型根据描述决定用哪个工具,描述不清就会误用
# ❌ 差的描述
{"name": "db_query", "description": "查询数据库"}
# ✅ 好的描述
{
"name": "db_query",
"description": "查询业务数据库获取历史数据。适用场景:查询用户信息、订单记录、产品库存等。不适用:实时数据、股价、天气。",
"input_schema": {...}
}工具权限过大:Agent 可能误删文件、发送邮件等不可逆操作
# 危险操作需要人工确认
DANGEROUS_TOOLS = {"delete_file", "send_email", "execute_sql_write"}
def execute_tool_safe(name: str, inputs: dict) -> str:
if name in DANGEROUS_TOOLS:
print(f"\n⚠️ 危险操作确认")
print(f"工具:{name}")
print(f"参数:{json.dumps(inputs, ensure_ascii=False, indent=2)}")
confirm = input("确认执行?(y/n): ")
if confirm.lower() != 'y':
return "操作已取消"
return execute_tool(name, inputs)6.2 可观察性
Agent 的行为必须可追踪:
import logging
from datetime import datetime
class AgentLogger:
def __init__(self, log_file: str = "agent.log"):
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format='%(asctime)s - %(message)s'
)
self.log = logging.getLogger("agent")
self.session_id = datetime.now().strftime("%Y%m%d_%H%M%S")
def log_tool_call(self, iteration: int, tool: str, inputs: dict, result: str):
self.log.info(json.dumps({
"session": self.session_id,
"iteration": iteration,
"type": "tool_call",
"tool": tool,
"inputs": inputs,
"result": result[:200],
"result_length": len(result)
}, ensure_ascii=False))
def log_final(self, task: str, answer: str, iterations: int, success: bool):
self.log.info(json.dumps({
"session": self.session_id,
"type": "final",
"task": task[:100],
"iterations": iterations,
"success": success,
"answer_length": len(answer)
}, ensure_ascii=False))6.3 框架选择建议
| 框架 | 推荐场景 | 避免场景 |
|---|---|---|
| 直接调用 API | 定制需求高,需要精确控制 | 快速原型 |
| LangChain | 需要丰富的工具链 | 对延迟敏感(抽象层开销) |
| LlamaIndex | 以 RAG 为核心 | 复杂工具调用 |
| AutoGen | 多 Agent 对话 | 简单单 Agent |
| CrewAI | 角色扮演式多 Agent | 需要精细控制的场景 |
个人建议:先理解 API 原生调用,掌握工具调用的底层逻辑,再根据需求引入框架。很多时候直接调用 API + 少量自定义代码,比引入复杂框架更好维护。