ZeroClaw-05-工具执行生命周期深度解析 🔗
深入解析 ZeroClaw 的工具系统,理解 AI 如何从"聊天"升级为"行动"。
适合阅读人群:技术负责人、工具开发者、架构师
引言:AI 的行动能力 🔗
从"聊天"到"行动" 🔗
传统 AI:
用户:帮我查看文件内容
AI:抱歉,我无法访问您的文件系统
ZeroClaw:
用户:帮我查看文件内容
ZeroClaw:好的,我来读取文件
[执行 file_read 工具]
文件内容如下:...
这种能力的转变,让 AI 从"顾问"变成了"助手"。
一、工具系统架构 🔗
1.1 工具生态地图 🔗
flowchart TB
subgraph 系统工具["⚙️ 系统工具"]
SHELL["shell
执行命令"]
FILE_R["file_read
读取文件"]
FILE_W["file_write
写入文件"]
FILE_L["file_list
列出目录"]
end
subgraph 开发工具["💻 开发工具"]
GIT["git
版本控制"]
CODE["code_search
代码搜索"]
LINT["lint
代码检查"]
end
subgraph 网络工具["🌐 网络工具"]
HTTP["http_request
HTTP 请求"]
BROWSER["browser_open
浏览器操作"]
SCREEN["screenshot
截图"]
end
subgraph 自动化工具["🤖 自动化工具"]
CRON["cron
定时任务"]
NOTIFY["notify
通知推送"]
DELEGATE["delegate
任务委托"]
end
subgraph 记忆工具["💾 记忆工具"]
MEM_R["memory_read
读取记忆"]
MEM_W["memory_write
写入记忆"]
MEM_S["memory_search
搜索记忆"]
end
subgraph 硬件工具["🔌 硬件工具"]
GPIO["gpio
GPIO 控制"]
I2C["i2c
传感器读取"]
USB["usb
设备发现"]
end
subgraph 核心["🦀 ZeroClaw 核心"]
AGENT["Agent 编排器"]
REGISTRY["工具注册表"]
SANDBOX["执行沙箱"]
end
SHELL & FILE_R & FILE_W & FILE_L --> REGISTRY
GIT & CODE & LINT --> REGISTRY
HTTP & BROWSER & SCREEN --> REGISTRY
CRON & NOTIFY & DELEGATE --> REGISTRY
MEM_R & MEM_W & MEM_S --> REGISTRY
GPIO & I2C & USB --> REGISTRY
REGISTRY --> AGENT
AGENT --> SANDBOX
style AGENT fill:#f9f,stroke:#333,stroke-width:4px
style REGISTRY fill:#bbf,stroke:#333,stroke-width:2px
1.2 Tool Trait 设计 🔗
// src/tools/traits.rs
#[async_trait]
pub trait Tool: Send + Sync {
/// 工具名称(用于 LLM 识别)
fn name(&self) -> &str;
/// 工具描述(LLM 决定何时使用)
fn description(&self) -> &str;
/// JSON Schema 定义参数
fn parameters_schema(&self) -> Value;
/// 执行工具
async fn execute(&self, args: Value) -> Result<ToolResult>;
/// 生成工具定义(给 LLM 的格式)
fn spec(&self) -> ToolSpec {
ToolSpec {
name: self.name().to_string(),
description: self.description().to_string(),
parameters: self.parameters_schema(),
}
}
}
/// 工具执行结果
#[derive(Debug, Clone)]
pub struct ToolResult {
pub success: bool,
pub output: Option<String>,
pub error: Option<String>,
pub metadata: HashMap<String, String>,
}
impl ToolResult {
pub fn success(output: impl Into<String>) -> Self {
Self {
success: true,
output: Some(output.into()),
error: None,
metadata: HashMap::new(),
}
}
pub fn error(error: impl Into<String>) -> Self {
Self {
success: false,
output: None,
error: Some(error.into()),
metadata: HashMap::new(),
}
}
}
为什么这样设计?
| 设计决策 | 理由 |
|---|---|
| JSON Schema 参数 | OpenAI/Anthropic Function Calling 标准 |
| async execute | 工具可能涉及网络/IO |
| ToolResult 封装 | 统一错误处理 |
| metadata 扩展 | 支持工具特有数据 |
二、工具调用完整流程 🔗
2.1 AI 驱动的工具调用循环 🔗
工具调用是一个迭代过程,AI 根据工具返回结果决定下一步操作,直到获得最终答案或达到最大迭代次数。
第 1 轮:理解意图 🔗
AI 分析用户请求,决定需要调用哪些工具。
sequenceDiagram
actor USER as 用户
participant AGENT as Agent
participant AI as AI 模型
USER->>AGENT: "帮我分析这个日志文件"
AGENT->>AI: 用户请求 + 可用工具列表
AI->>AI: 分析意图:需要读取文件
AI-->>AGENT: 工具调用请求
file_read(path="/var/log/app.log")
AI 决策依据:
- 用户意图识别
- 工具描述匹配
- 参数提取(路径、命令等)
工具执行阶段 🔗
Agent 执行 AI 请求的工具调用。
sequenceDiagram
participant AGENT as Agent
participant TOOL as 工具系统
participant EXEC as 执行环境
AGENT->>TOOL: 调用 file_read
TOOL->>TOOL: 参数验证
TOOL->>TOOL: 权限检查(工作区限制)
TOOL->>EXEC: 读取文件
EXEC-->>TOOL: 返回文件内容
TOOL-->>AGENT: ToolResult
success + 内容
安全检查:
- 参数类型验证(JSON Schema)
- 路径检查(防止目录遍历)
- 权限检查(白名单机制)
第 2 轮:分析内容 🔗
AI 根据工具返回结果,决定是否需要进一步操作。
sequenceDiagram
participant AGENT as Agent
participant AI as AI 模型
AGENT->>AI: 用户请求 + 文件内容
AI->>AI: 分析日志发现错误模式
AI-->>AGENT: 工具调用请求
shell(command="grep ERROR | wc -l")
迭代条件:
- 需要更多信息
- 需要执行命令分析
- 需要调用其他工具
第 N 轮:生成最终答案 🔗
当 AI 获得足够信息后,生成最终响应。
sequenceDiagram
actor USER as 用户
participant AGENT as Agent
participant AI as AI 模型
AGENT->>AI: 完整上下文
原始请求 + 所有工具结果
AI->>AI: 综合分析生成答案
AI-->>AGENT: 自然语言响应
"发现3个错误,原因可能是..."
AGENT-->>USER: 返回最终答案
终止条件:
- AI 返回自然语言响应(无工具调用)
- 达到最大迭代次数(默认 10 次)
- 工具执行失败且无法恢复
完整循环概览 🔗
flowchart LR
subgraph 输入["📥 输入"]
U["用户请求"]
end
subgraph 迭代循环["🔄 迭代循环"]
direction TB
AI1["AI 决策"] --> TOOL["工具执行"]
TOOL --> AI1
end
subgraph 输出["📤 输出"]
R["最终响应"]
end
U -->|"第1轮"| AI1
AI1 -->|"完成"| R
style AI1 fill:#e8f5e9,stroke:#333
style TOOL fill:#fff3e0,stroke:#333
### 2.2 工具调用状态机
```rust
// src/agent/tool_loop.rs
pub struct ToolExecutor {
registry: Arc,
max_iterations: usize,
timeout: Duration,
}
impl ToolExecutor {
pub async fn execute_loop(
&self,
initial_prompt: String,
provider: Arc,
) -> Result {
let mut iteration = 0;
let mut messages = vec![
Message::system("You are a helpful assistant with access to tools."),
Message::user(initial_prompt),
];
loop {
if iteration >= self.max_iterations {
return Err(Error::MaxIterationsReached);
}
// 获取工具定义
let tools = self.registry.list_specs();
// 调用 AI
let response = provider.complete_with_tools(
CompletionRequest {
messages: messages.clone(),
tools: Some(tools),
..Default::default()
}
).await?;
// 检查是否有工具调用
if let Some(tool_calls) = response.tool_calls {
// 执行所有工具调用
let mut tool_results = Vec::new();
for call in tool_calls {
info!("执行工具: {}({})", call.name, call.arguments);
let result = self.execute_tool(&call).await;
tool_results.push(ToolMessage {
role: "tool".into(),
tool_call_id: call.id,
content: format_result(&result),
});
}
// 添加助手消息(工具调用请求)
messages.push(Message::assistant_with_tools(
response.content,
tool_calls,
));
// 添加工具结果
for result in tool_results {
messages.push(Message::from(result));
}
iteration += 1;
} else {
// 没有工具调用,得到最终答案
return Ok(response.content);
}
}
}
async fn execute_tool(&self, call: &ToolCall) -> Result {
// 查找工具
let tool = self.registry.get(&call.name)
.ok_or_else(|| Error::ToolNotFound(call.name.clone()))?;
// 解析参数
let args: Value = serde_json::from_str(&call.arguments)
.map_err(|e| Error::InvalidToolArguments(e.to_string()))?;
// 执行(带超时)
let result = tokio::time::timeout(
self.timeout,
tool.execute(args)
).await
.map_err(|_| Error::ToolTimeout)?
.map_err(|e| Error::ToolExecution(e.to_string()))?;
Ok(result)
}
}
2.3 为什么限制迭代次数? 🔗
max_iterations = 10
防止无限循环:
- AI 调用工具 A
- 工具 A 返回结果
- AI 认为需要工具 B
- 工具 B 返回结果
- AI 认为需要工具 A(循环)
成本控制:
- 每次迭代消耗 Token
- 防止意外的高额费用
用户体验:
- 快速响应
- 避免"思考太久"
三、核心工具详解 🔗
3.1 Shell 工具实现 🔗
// src/tools/shell.rs
pub struct ShellTool {
allowed_commands: HashSet<String>,
forbidden_patterns: Vec<Regex>,
runtime: Arc<dyn Runtime>,
}
#[async_trait]
impl Tool for ShellTool {
fn name(&self) -> &str {
"shell"
}
fn description(&self) -> &str {
"Execute shell commands. Use for git, file operations, build tools, etc."
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute"
},
"timeout": {
"type": "integer",
"description": "Timeout in seconds (default: 60)",
"default": 60
}
},
"required": ["command"]
})
}
async fn execute(&self, args: Value) -> Result<ToolResult> {
let command = args["command"].as_str()
.ok_or_else(|| Error::MissingArgument("command"))?;
let timeout = args["timeout"].as_u64().unwrap_or(60);
// 1. 解析命令
let parsed = self.parse_command(command)?;
// 2. 安全检查
self.validate_command(&parsed)?;
// 3. 执行
let result = self.runtime.execute(&parsed.full_command, timeout).await?;
// 4. 格式化结果
let output = format!(
"Exit code: {}\nStdout:\n{}\nStderr:\n{}",
result.exit_code,
truncate(&result.stdout, 10000),
truncate(&result.stderr, 5000)
);
if result.exit_code == 0 {
Ok(ToolResult::success(output))
} else {
Ok(ToolResult::error(output))
}
}
}
impl ShellTool {
fn parse_command(&self, input: &str) -> Result<ParsedCommand> {
// 使用 shlex 安全解析,防止注入
let args = shlex::split(input)
.ok_or_else(|| Error::InvalidCommand)?;
if args.is_empty() {
bail!(Error::EmptyCommand);
}
Ok(ParsedCommand {
name: args[0].clone(),
args: args[1..].to_vec(),
full_command: input.to_string(),
})
}
fn validate_command(&self, cmd: &ParsedCommand) -> Result<()> {
// 检查命令在白名单
if !self.allowed_commands.contains(&cmd.name) {
bail!(Error::CommandNotAllowed {
command: cmd.name.clone(),
allowed: self.allowed_commands.iter().cloned().collect(),
});
}
// 检查危险模式
for pattern in &self.forbidden_patterns {
if pattern.is_match(&cmd.full_command) {
bail!(Error::DangerousPattern);
}
}
Ok(())
}
}
// 默认允许的命令
const DEFAULT_ALLOWED_COMMANDS: &[&str] = &[
"git", "ls", "cat", "grep", "find",
"cargo", "npm", "pnpm", "yarn",
"python", "python3", "node",
"head", "tail", "wc", "sort", "uniq",
"pwd", "echo", "mkdir", "touch",
];
// 禁止的危险模式
const FORBIDDEN_PATTERNS: &[&str] = &[
r"rm\s+-rf\s+/",
r">\s*/dev/sda",
r"dd\s+if=/dev/zero",
r"mkfs\.?",
r":\(\)\{\s*:\|:&\};:", // Fork 炸弹
];
3.2 文件工具实现 🔗
// src/tools/file_read.rs
pub struct FileReadTool {
workspace: PathBuf,
}
#[async_trait]
impl Tool for FileReadTool {
fn name(&self) -> &str {
"file_read"
}
fn description(&self) -> &str {
"Read the contents of a file. The file must be within the workspace."
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file (relative to workspace)"
},
"limit": {
"type": "integer",
"description": "Maximum number of lines to read",
"default": 1000
}
},
"required": ["path"]
})
}
async fn execute(&self, args: Value) -> Result<ToolResult> {
let relative_path = args["path"].as_str()
.ok_or_else(|| Error::MissingArgument("path"))?;
let limit = args["limit"].as_usize().unwrap_or(1000);
// 构建完整路径并验证
let full_path = self.workspace.join(relative_path);
let canonical = full_path.canonicalize()
.map_err(|_| Error::FileNotFound(relative_path.into()))?;
// 安全检查:必须在 workspace 内
if !canonical.starts_with(&self.workspace) {
bail!(Error::PathOutsideWorkspace);
}
// 读取文件
let content = tokio::fs::read_to_string(&canonical).await
.map_err(|e| Error::FileRead(e.to_string()))?;
// 限制行数
let lines: Vec<&str> = content.lines().collect();
let truncated = if lines.len() > limit {
format!(
"{}\n\n... (truncated, showing {}/{} lines)",
lines[..limit].join("\n"),
limit,
lines.len()
)
} else {
content
};
Ok(ToolResult::success(truncated))
}
}
3.3 文件写入工具 🔗
// src/tools/file_write.rs
pub struct FileWriteTool {
workspace: PathBuf,
}
#[async_trait]
impl Tool for FileWriteTool {
fn name(&self) -> &str {
"file_write"
}
fn description(&self) -> &str {
"Write content to a file. Creates parent directories if needed."
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file"
},
"content": {
"type": "string",
"description": "Content to write"
}
},
"required": ["path", "content"]
})
}
async fn execute(&self, args: Value) -> Result<ToolResult> {
let path = args["path"].as_str()
.ok_or_else(|| Error::MissingArgument("path"))?;
let content = args["content"].as_str()
.ok_or_else(|| Error::MissingArgument("content"))?;
// 验证路径
let full_path = self.validate_path(path)?;
// 创建父目录
if let Some(parent) = full_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
// 写入文件
tokio::fs::write(&full_path, content).await?;
Ok(ToolResult::success(format!(
"Successfully wrote {} bytes to {}",
content.len(),
path
)))
}
}
四、工具注册表 🔗
4.1 注册表实现 🔗
// src/tools/registry.rs
pub struct ToolRegistry {
tools: HashMap<String, Box<dyn Tool>>,
}
impl ToolRegistry {
pub fn new() -> Self {
Self {
tools: HashMap::new(),
}
}
pub fn register(&mut self, tool: Box<dyn Tool>) {
let name = tool.name().to_string();
info!("注册工具: {}", name);
self.tools.insert(name, tool);
}
pub fn get(&self, name: &str) -> Option<&dyn Tool> {
self.tools.get(name).map(|b| b.as_ref())
}
pub fn list_specs(&self) -> Vec<ToolSpec> {
self.tools.values()
.map(|t| t.spec())
.collect()
}
pub fn names(&self) -> Vec<&str> {
self.tools.keys()
.map(|s| s.as_str())
.collect()
}
}
// 初始化默认工具
pub fn create_default_registry(config: &Config) -> ToolRegistry {
let mut registry = ToolRegistry::new();
// 系统工具
registry.register(Box::new(ShellTool::new(&config.autonomy)));
registry.register(Box::new(FileReadTool::new(&config.workspace)));
registry.register(Box::new(FileWriteTool::new(&config.workspace)));
registry.register(Box::new(FileListTool::new(&config.workspace)));
// 开发工具
registry.register(Box::new(GitTool::new()));
// 记忆工具
registry.register(Box::new(MemoryReadTool::new()));
registry.register(Box::new(MemoryWriteTool::new()));
registry
}
4.2 工具权限模型 🔗
// src/tools/policy.rs
pub struct ToolPolicy {
allowed_tools: HashSet<String>,
blocked_tools: HashSet<String>,
require_confirmation: HashSet<String>,
}
impl ToolPolicy {
pub fn check(&self, tool_name: &str) -> PolicyResult {
// 明确禁止的工具
if self.blocked_tools.contains(tool_name) {
return PolicyResult::Blocked("工具被管理员禁用");
}
// 白名单检查(如果配置了白名单)
if !self.allowed_tools.is_empty()
&& !self.allowed_tools.contains(tool_name) {
return PolicyResult::Blocked("工具不在白名单中");
}
// 需要确认的工具
if self.require_confirmation.contains(tool_name) {
return PolicyResult::RequiresConfirmation;
}
PolicyResult::Allowed
}
}
pub enum PolicyResult {
Allowed,
Blocked(&'static str),
RequiresConfirmation,
}
五、执行环境 🔗
5.1 运行时接口 🔗
// src/runtime/mod.rs
#[async_trait]
pub trait Runtime: Send + Sync {
async fn execute(&self, command: &str, timeout_secs: u64) -> Result<ExecutionResult>;
}
pub struct ExecutionResult {
pub stdout: String,
pub stderr: String,
pub exit_code: i32,
pub duration: Duration,
}
// 本地运行时
pub struct NativeRuntime {
workspace: PathBuf,
}
#[async_trait]
impl Runtime for NativeRuntime {
async fn execute(&self, command: &str, timeout_secs: u64) -> Result<ExecutionResult> {
let start = Instant::now();
let output = tokio::time::timeout(
Duration::from_secs(timeout_secs),
tokio::process::Command::new("sh")
.arg("-c")
.arg(command)
.current_dir(&self.workspace)
.output()
).await
.map_err(|_| Error::Timeout)?
.map_err(|e| Error::Execution(e.to_string()))?;
Ok(ExecutionResult {
stdout: String::from_utf8_lossy(&output.stdout).to_string(),
stderr: String::from_utf8_lossy(&output.stderr).to_string(),
exit_code: output.status.code().unwrap_or(-1),
duration: start.elapsed(),
})
}
}
5.2 Docker 沙箱运行时 🔗
// src/runtime/docker.rs
pub struct DockerRuntime {
image: String,
workspace: PathBuf,
memory_limit: usize, // MB
cpu_limit: f32,
}
#[async_trait]
impl Runtime for DockerRuntime {
async fn execute(&self, command: &str, timeout_secs: u64) -> Result<ExecutionResult> {
let container_name = format!("zeroclaw-{}", uuid::Uuid::new_v4());
let output = tokio::process::Command::new("docker")
.args(&[
"run",
"--rm",
"--name", &container_name,
"--memory", &format!("{}m", self.memory_limit),
"--cpus", &self.cpu_limit.to_string(),
"--read-only",
"--network", "none",
"-v", &format!("{}:/workspace:rw", self.workspace.display()),
"-w", "/workspace",
&self.image,
"sh", "-c", command,
])
.output()
.await?;
Ok(ExecutionResult {
stdout: String::from_utf8_lossy(&output.stdout).to_string(),
stderr: String::from_utf8_lossy(&output.stderr).to_string(),
exit_code: output.status.code().unwrap_or(-1),
duration: Duration::default(),
})
}
}
六、配置示例 🔗
6.1 完整工具配置 🔗
[autonomy]
level = "supervised" # readonly | supervised | full
workspace_only = true
allowed_commands = ["git", "ls", "cat", "grep", "cargo", "npm"]
forbidden_paths = ["/etc", "/root", "/proc", "/sys", "~/.ssh"]
[runtime]
kind = "docker" # native | docker
[runtime.docker]
image = "alpine:3.20"
network = "none"
memory_limit_mb = 512
cpu_limit = 1.0
read_only_rootfs = true
mount_workspace = true
[tools]
enabled = ["shell", "file_read", "file_write", "git", "memory"]
require_confirmation = ["file_write", "shell"]
6.2 添加自定义工具 🔗
// 定义自定义工具
pub struct MyCustomTool;
#[async_trait]
impl Tool for MyCustomTool {
fn name(&self) -> &str {
"my_tool"
}
fn description(&self) -> &str {
"My custom tool description"
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"input": { "type": "string" }
},
"required": ["input"]
})
}
async fn execute(&self, args: Value) -> Result<ToolResult> {
let input = args["input"].as_str().unwrap_or("");
// 实现逻辑
Ok(ToolResult::success(format!("Processed: {}", input)))
}
}
// 注册到注册表
registry.register(Box::new(MyCustomTool));