ZeroClaw-05-工具执行生命周期深度解析

· 3627字 · 8分钟

ZeroClaw-05-工具执行生命周期深度解析 🔗

深入解析 ZeroClaw 的工具系统,理解 AI 如何从"聊天"升级为"行动"。

适合阅读人群:技术负责人、工具开发者、架构师


引言:AI 的行动能力 🔗

从"聊天"到"行动" 🔗

传统 AI:
用户:帮我查看文件内容
AI:抱歉,我无法访问您的文件系统

ZeroClaw:
用户:帮我查看文件内容
ZeroClaw:好的,我来读取文件
[执行 file_read 工具]
文件内容如下:...

这种能力的转变,让 AI 从"顾问"变成了"助手"。


一、工具系统架构 🔗

1.1 工具生态地图 🔗

flowchart TB
    subgraph 系统工具["⚙️ 系统工具"]
        SHELL["shell
执行命令"] FILE_R["file_read
读取文件"] FILE_W["file_write
写入文件"] FILE_L["file_list
列出目录"] end subgraph 开发工具["💻 开发工具"] GIT["git
版本控制"] CODE["code_search
代码搜索"] LINT["lint
代码检查"] end subgraph 网络工具["🌐 网络工具"] HTTP["http_request
HTTP 请求"] BROWSER["browser_open
浏览器操作"] SCREEN["screenshot
截图"] end subgraph 自动化工具["🤖 自动化工具"] CRON["cron
定时任务"] NOTIFY["notify
通知推送"] DELEGATE["delegate
任务委托"] end subgraph 记忆工具["💾 记忆工具"] MEM_R["memory_read
读取记忆"] MEM_W["memory_write
写入记忆"] MEM_S["memory_search
搜索记忆"] end subgraph 硬件工具["🔌 硬件工具"] GPIO["gpio
GPIO 控制"] I2C["i2c
传感器读取"] USB["usb
设备发现"] end subgraph 核心["🦀 ZeroClaw 核心"] AGENT["Agent 编排器"] REGISTRY["工具注册表"] SANDBOX["执行沙箱"] end SHELL & FILE_R & FILE_W & FILE_L --> REGISTRY GIT & CODE & LINT --> REGISTRY HTTP & BROWSER & SCREEN --> REGISTRY CRON & NOTIFY & DELEGATE --> REGISTRY MEM_R & MEM_W & MEM_S --> REGISTRY GPIO & I2C & USB --> REGISTRY REGISTRY --> AGENT AGENT --> SANDBOX style AGENT fill:#f9f,stroke:#333,stroke-width:4px style REGISTRY fill:#bbf,stroke:#333,stroke-width:2px

1.2 Tool Trait 设计 🔗

// src/tools/traits.rs
#[async_trait]
pub trait Tool: Send + Sync {
    /// 工具名称(用于 LLM 识别)
    fn name(&self) -> &str;
    
    /// 工具描述(LLM 决定何时使用)
    fn description(&self) -> &str;
    
    /// JSON Schema 定义参数
    fn parameters_schema(&self) -> Value;
    
    /// 执行工具
    async fn execute(&self, args: Value) -> Result<ToolResult>;
    
    /// 生成工具定义(给 LLM 的格式)
    fn spec(&self) -> ToolSpec {
        ToolSpec {
            name: self.name().to_string(),
            description: self.description().to_string(),
            parameters: self.parameters_schema(),
        }
    }
}

/// 工具执行结果
#[derive(Debug, Clone)]
pub struct ToolResult {
    pub success: bool,
    pub output: Option<String>,
    pub error: Option<String>,
    pub metadata: HashMap<String, String>,
}

impl ToolResult {
    pub fn success(output: impl Into<String>) -> Self {
        Self {
            success: true,
            output: Some(output.into()),
            error: None,
            metadata: HashMap::new(),
        }
    }
    
    pub fn error(error: impl Into<String>) -> Self {
        Self {
            success: false,
            output: None,
            error: Some(error.into()),
            metadata: HashMap::new(),
        }
    }
}

为什么这样设计?

设计决策 理由
JSON Schema 参数 OpenAI/Anthropic Function Calling 标准
async execute 工具可能涉及网络/IO
ToolResult 封装 统一错误处理
metadata 扩展 支持工具特有数据

二、工具调用完整流程 🔗

2.1 AI 驱动的工具调用循环 🔗

工具调用是一个迭代过程,AI 根据工具返回结果决定下一步操作,直到获得最终答案或达到最大迭代次数。


第 1 轮:理解意图 🔗

AI 分析用户请求,决定需要调用哪些工具。

sequenceDiagram
    actor USER as 用户
    participant AGENT as Agent
    participant AI as AI 模型

    USER->>AGENT: "帮我分析这个日志文件"
    AGENT->>AI: 用户请求 + 可用工具列表
    AI->>AI: 分析意图:需要读取文件
    AI-->>AGENT: 工具调用请求
file_read(path="/var/log/app.log")

AI 决策依据

  • 用户意图识别
  • 工具描述匹配
  • 参数提取(路径、命令等)

工具执行阶段 🔗

Agent 执行 AI 请求的工具调用。

sequenceDiagram
    participant AGENT as Agent
    participant TOOL as 工具系统
    participant EXEC as 执行环境

    AGENT->>TOOL: 调用 file_read
    TOOL->>TOOL: 参数验证
    TOOL->>TOOL: 权限检查(工作区限制)
    TOOL->>EXEC: 读取文件
    EXEC-->>TOOL: 返回文件内容
    TOOL-->>AGENT: ToolResult
success + 内容

安全检查

  • 参数类型验证(JSON Schema)
  • 路径检查(防止目录遍历)
  • 权限检查(白名单机制)

第 2 轮:分析内容 🔗

AI 根据工具返回结果,决定是否需要进一步操作。

sequenceDiagram
    participant AGENT as Agent
    participant AI as AI 模型

    AGENT->>AI: 用户请求 + 文件内容
    AI->>AI: 分析日志发现错误模式
    AI-->>AGENT: 工具调用请求
shell(command="grep ERROR | wc -l")

迭代条件

  • 需要更多信息
  • 需要执行命令分析
  • 需要调用其他工具

第 N 轮:生成最终答案 🔗

当 AI 获得足够信息后,生成最终响应。

sequenceDiagram
    actor USER as 用户
    participant AGENT as Agent
    participant AI as AI 模型

    AGENT->>AI: 完整上下文
原始请求 + 所有工具结果 AI->>AI: 综合分析生成答案 AI-->>AGENT: 自然语言响应
"发现3个错误,原因可能是..." AGENT-->>USER: 返回最终答案

终止条件

  • AI 返回自然语言响应(无工具调用)
  • 达到最大迭代次数(默认 10 次)
  • 工具执行失败且无法恢复

完整循环概览 🔗

flowchart LR
    subgraph 输入["📥 输入"]
        U["用户请求"]
    end
    
    subgraph 迭代循环["🔄 迭代循环"]
        direction TB
        AI1["AI 决策"] --> TOOL["工具执行"]
        TOOL --> AI1
    end
    
    subgraph 输出["📤 输出"]
        R["最终响应"]
    end
    
    U -->|"第1轮"| AI1
    AI1 -->|"完成"| R
    
    style AI1 fill:#e8f5e9,stroke:#333
    style TOOL fill:#fff3e0,stroke:#333

### 2.2 工具调用状态机

```rust
// src/agent/tool_loop.rs
pub struct ToolExecutor {
    registry: Arc,
    max_iterations: usize,
    timeout: Duration,
}

impl ToolExecutor {
    pub async fn execute_loop(
        &self,
        initial_prompt: String,
        provider: Arc,
    ) -> Result {
        let mut iteration = 0;
        let mut messages = vec![
            Message::system("You are a helpful assistant with access to tools."),
            Message::user(initial_prompt),
        ];
        
        loop {
            if iteration >= self.max_iterations {
                return Err(Error::MaxIterationsReached);
            }
            
            // 获取工具定义
            let tools = self.registry.list_specs();
            
            // 调用 AI
            let response = provider.complete_with_tools(
                CompletionRequest {
                    messages: messages.clone(),
                    tools: Some(tools),
                    ..Default::default()
                }
            ).await?;
            
            // 检查是否有工具调用
            if let Some(tool_calls) = response.tool_calls {
                // 执行所有工具调用
                let mut tool_results = Vec::new();
                
                for call in tool_calls {
                    info!("执行工具: {}({})", call.name, call.arguments);
                    
                    let result = self.execute_tool(&call).await;
                    
                    tool_results.push(ToolMessage {
                        role: "tool".into(),
                        tool_call_id: call.id,
                        content: format_result(&result),
                    });
                }
                
                // 添加助手消息(工具调用请求)
                messages.push(Message::assistant_with_tools(
                    response.content,
                    tool_calls,
                ));
                
                // 添加工具结果
                for result in tool_results {
                    messages.push(Message::from(result));
                }
                
                iteration += 1;
            } else {
                // 没有工具调用,得到最终答案
                return Ok(response.content);
            }
        }
    }
    
    async fn execute_tool(&self, call: &ToolCall) -> Result {
        // 查找工具
        let tool = self.registry.get(&call.name)
            .ok_or_else(|| Error::ToolNotFound(call.name.clone()))?;
            
        // 解析参数
        let args: Value = serde_json::from_str(&call.arguments)
            .map_err(|e| Error::InvalidToolArguments(e.to_string()))?;
            
        // 执行(带超时)
        let result = tokio::time::timeout(
            self.timeout,
            tool.execute(args)
        ).await
            .map_err(|_| Error::ToolTimeout)?
            .map_err(|e| Error::ToolExecution(e.to_string()))?;
            
        Ok(result)
    }
}

2.3 为什么限制迭代次数? 🔗

max_iterations = 10

防止无限循环

  • AI 调用工具 A
  • 工具 A 返回结果
  • AI 认为需要工具 B
  • 工具 B 返回结果
  • AI 认为需要工具 A(循环)

成本控制

  • 每次迭代消耗 Token
  • 防止意外的高额费用

用户体验

  • 快速响应
  • 避免"思考太久"

三、核心工具详解 🔗

3.1 Shell 工具实现 🔗

// src/tools/shell.rs
pub struct ShellTool {
    allowed_commands: HashSet<String>,
    forbidden_patterns: Vec<Regex>,
    runtime: Arc<dyn Runtime>,
}

#[async_trait]
impl Tool for ShellTool {
    fn name(&self) -> &str {
        "shell"
    }
    
    fn description(&self) -> &str {
        "Execute shell commands. Use for git, file operations, build tools, etc."
    }
    
    fn parameters_schema(&self) -> Value {
        json!({
            "type": "object",
            "properties": {
                "command": {
                    "type": "string",
                    "description": "The shell command to execute"
                },
                "timeout": {
                    "type": "integer",
                    "description": "Timeout in seconds (default: 60)",
                    "default": 60
                }
            },
            "required": ["command"]
        })
    }
    
    async fn execute(&self, args: Value) -> Result<ToolResult> {
        let command = args["command"].as_str()
            .ok_or_else(|| Error::MissingArgument("command"))?;
            
        let timeout = args["timeout"].as_u64().unwrap_or(60);
        
        // 1. 解析命令
        let parsed = self.parse_command(command)?;
        
        // 2. 安全检查
        self.validate_command(&parsed)?;
        
        // 3. 执行
        let result = self.runtime.execute(&parsed.full_command, timeout).await?;
        
        // 4. 格式化结果
        let output = format!(
            "Exit code: {}\nStdout:\n{}\nStderr:\n{}",
            result.exit_code,
            truncate(&result.stdout, 10000),
            truncate(&result.stderr, 5000)
        );
        
        if result.exit_code == 0 {
            Ok(ToolResult::success(output))
        } else {
            Ok(ToolResult::error(output))
        }
    }
}

impl ShellTool {
    fn parse_command(&self, input: &str) -> Result<ParsedCommand> {
        // 使用 shlex 安全解析,防止注入
        let args = shlex::split(input)
            .ok_or_else(|| Error::InvalidCommand)?;
            
        if args.is_empty() {
            bail!(Error::EmptyCommand);
        }
        
        Ok(ParsedCommand {
            name: args[0].clone(),
            args: args[1..].to_vec(),
            full_command: input.to_string(),
        })
    }
    
    fn validate_command(&self, cmd: &ParsedCommand) -> Result<()> {
        // 检查命令在白名单
        if !self.allowed_commands.contains(&cmd.name) {
            bail!(Error::CommandNotAllowed {
                command: cmd.name.clone(),
                allowed: self.allowed_commands.iter().cloned().collect(),
            });
        }
        
        // 检查危险模式
        for pattern in &self.forbidden_patterns {
            if pattern.is_match(&cmd.full_command) {
                bail!(Error::DangerousPattern);
            }
        }
        
        Ok(())
    }
}

// 默认允许的命令
const DEFAULT_ALLOWED_COMMANDS: &[&str] = &[
    "git", "ls", "cat", "grep", "find",
    "cargo", "npm", "pnpm", "yarn",
    "python", "python3", "node",
    "head", "tail", "wc", "sort", "uniq",
    "pwd", "echo", "mkdir", "touch",
];

// 禁止的危险模式
const FORBIDDEN_PATTERNS: &[&str] = &[
    r"rm\s+-rf\s+/",
    r">\s*/dev/sda",
    r"dd\s+if=/dev/zero",
    r"mkfs\.?",
    r":\(\)\{\s*:\|:&\};:",  // Fork 炸弹
];

3.2 文件工具实现 🔗

// src/tools/file_read.rs
pub struct FileReadTool {
    workspace: PathBuf,
}

#[async_trait]
impl Tool for FileReadTool {
    fn name(&self) -> &str {
        "file_read"
    }
    
    fn description(&self) -> &str {
        "Read the contents of a file. The file must be within the workspace."
    }
    
    fn parameters_schema(&self) -> Value {
        json!({
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "Path to the file (relative to workspace)"
                },
                "limit": {
                    "type": "integer",
                    "description": "Maximum number of lines to read",
                    "default": 1000
                }
            },
            "required": ["path"]
        })
    }
    
    async fn execute(&self, args: Value) -> Result<ToolResult> {
        let relative_path = args["path"].as_str()
            .ok_or_else(|| Error::MissingArgument("path"))?;
            
        let limit = args["limit"].as_usize().unwrap_or(1000);
        
        // 构建完整路径并验证
        let full_path = self.workspace.join(relative_path);
        let canonical = full_path.canonicalize()
            .map_err(|_| Error::FileNotFound(relative_path.into()))?;
            
        // 安全检查:必须在 workspace 内
        if !canonical.starts_with(&self.workspace) {
            bail!(Error::PathOutsideWorkspace);
        }
        
        // 读取文件
        let content = tokio::fs::read_to_string(&canonical).await
            .map_err(|e| Error::FileRead(e.to_string()))?;
            
        // 限制行数
        let lines: Vec<&str> = content.lines().collect();
        let truncated = if lines.len() > limit {
            format!(
                "{}\n\n... (truncated, showing {}/{} lines)",
                lines[..limit].join("\n"),
                limit,
                lines.len()
            )
        } else {
            content
        };
        
        Ok(ToolResult::success(truncated))
    }
}

3.3 文件写入工具 🔗

// src/tools/file_write.rs
pub struct FileWriteTool {
    workspace: PathBuf,
}

#[async_trait]
impl Tool for FileWriteTool {
    fn name(&self) -> &str {
        "file_write"
    }
    
    fn description(&self) -> &str {
        "Write content to a file. Creates parent directories if needed."
    }
    
    fn parameters_schema(&self) -> Value {
        json!({
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "Path to the file"
                },
                "content": {
                    "type": "string",
                    "description": "Content to write"
                }
            },
            "required": ["path", "content"]
        })
    }
    
    async fn execute(&self, args: Value) -> Result<ToolResult> {
        let path = args["path"].as_str()
            .ok_or_else(|| Error::MissingArgument("path"))?;
        let content = args["content"].as_str()
            .ok_or_else(|| Error::MissingArgument("content"))?;
            
        // 验证路径
        let full_path = self.validate_path(path)?;
        
        // 创建父目录
        if let Some(parent) = full_path.parent() {
            tokio::fs::create_dir_all(parent).await?;
        }
        
        // 写入文件
        tokio::fs::write(&full_path, content).await?;
        
        Ok(ToolResult::success(format!(
            "Successfully wrote {} bytes to {}",
            content.len(),
            path
        )))
    }
}

四、工具注册表 🔗

4.1 注册表实现 🔗

// src/tools/registry.rs
pub struct ToolRegistry {
    tools: HashMap<String, Box<dyn Tool>>,
}

impl ToolRegistry {
    pub fn new() -> Self {
        Self {
            tools: HashMap::new(),
        }
    }
    
    pub fn register(&mut self, tool: Box<dyn Tool>) {
        let name = tool.name().to_string();
        info!("注册工具: {}", name);
        self.tools.insert(name, tool);
    }
    
    pub fn get(&self, name: &str) -> Option<&dyn Tool> {
        self.tools.get(name).map(|b| b.as_ref())
    }
    
    pub fn list_specs(&self) -> Vec<ToolSpec> {
        self.tools.values()
            .map(|t| t.spec())
            .collect()
    }
    
    pub fn names(&self) -> Vec<&str> {
        self.tools.keys()
            .map(|s| s.as_str())
            .collect()
    }
}

// 初始化默认工具
pub fn create_default_registry(config: &Config) -> ToolRegistry {
    let mut registry = ToolRegistry::new();
    
    // 系统工具
    registry.register(Box::new(ShellTool::new(&config.autonomy)));
    registry.register(Box::new(FileReadTool::new(&config.workspace)));
    registry.register(Box::new(FileWriteTool::new(&config.workspace)));
    registry.register(Box::new(FileListTool::new(&config.workspace)));
    
    // 开发工具
    registry.register(Box::new(GitTool::new()));
    
    // 记忆工具
    registry.register(Box::new(MemoryReadTool::new()));
    registry.register(Box::new(MemoryWriteTool::new()));
    
    registry
}

4.2 工具权限模型 🔗

// src/tools/policy.rs
pub struct ToolPolicy {
    allowed_tools: HashSet<String>,
    blocked_tools: HashSet<String>,
    require_confirmation: HashSet<String>,
}

impl ToolPolicy {
    pub fn check(&self, tool_name: &str) -> PolicyResult {
        // 明确禁止的工具
        if self.blocked_tools.contains(tool_name) {
            return PolicyResult::Blocked("工具被管理员禁用");
        }
        
        // 白名单检查(如果配置了白名单)
        if !self.allowed_tools.is_empty() 
            && !self.allowed_tools.contains(tool_name) {
            return PolicyResult::Blocked("工具不在白名单中");
        }
        
        // 需要确认的工具
        if self.require_confirmation.contains(tool_name) {
            return PolicyResult::RequiresConfirmation;
        }
        
        PolicyResult::Allowed
    }
}

pub enum PolicyResult {
    Allowed,
    Blocked(&'static str),
    RequiresConfirmation,
}

五、执行环境 🔗

5.1 运行时接口 🔗

// src/runtime/mod.rs
#[async_trait]
pub trait Runtime: Send + Sync {
    async fn execute(&self, command: &str, timeout_secs: u64) -> Result<ExecutionResult>;
}

pub struct ExecutionResult {
    pub stdout: String,
    pub stderr: String,
    pub exit_code: i32,
    pub duration: Duration,
}

// 本地运行时
pub struct NativeRuntime {
    workspace: PathBuf,
}

#[async_trait]
impl Runtime for NativeRuntime {
    async fn execute(&self, command: &str, timeout_secs: u64) -> Result<ExecutionResult> {
        let start = Instant::now();
        
        let output = tokio::time::timeout(
            Duration::from_secs(timeout_secs),
            tokio::process::Command::new("sh")
                .arg("-c")
                .arg(command)
                .current_dir(&self.workspace)
                .output()
        ).await
            .map_err(|_| Error::Timeout)?
            .map_err(|e| Error::Execution(e.to_string()))?;
            
        Ok(ExecutionResult {
            stdout: String::from_utf8_lossy(&output.stdout).to_string(),
            stderr: String::from_utf8_lossy(&output.stderr).to_string(),
            exit_code: output.status.code().unwrap_or(-1),
            duration: start.elapsed(),
        })
    }
}

5.2 Docker 沙箱运行时 🔗

// src/runtime/docker.rs
pub struct DockerRuntime {
    image: String,
    workspace: PathBuf,
    memory_limit: usize,  // MB
    cpu_limit: f32,
}

#[async_trait]
impl Runtime for DockerRuntime {
    async fn execute(&self, command: &str, timeout_secs: u64) -> Result<ExecutionResult> {
        let container_name = format!("zeroclaw-{}", uuid::Uuid::new_v4());
        
        let output = tokio::process::Command::new("docker")
            .args(&[
                "run",
                "--rm",
                "--name", &container_name,
                "--memory", &format!("{}m", self.memory_limit),
                "--cpus", &self.cpu_limit.to_string(),
                "--read-only",
                "--network", "none",
                "-v", &format!("{}:/workspace:rw", self.workspace.display()),
                "-w", "/workspace",
                &self.image,
                "sh", "-c", command,
            ])
            .output()
            .await?;
            
        Ok(ExecutionResult {
            stdout: String::from_utf8_lossy(&output.stdout).to_string(),
            stderr: String::from_utf8_lossy(&output.stderr).to_string(),
            exit_code: output.status.code().unwrap_or(-1),
            duration: Duration::default(),
        })
    }
}

六、配置示例 🔗

6.1 完整工具配置 🔗

[autonomy]
level = "supervised"  # readonly | supervised | full
workspace_only = true
allowed_commands = ["git", "ls", "cat", "grep", "cargo", "npm"]
forbidden_paths = ["/etc", "/root", "/proc", "/sys", "~/.ssh"]

[runtime]
kind = "docker"  # native | docker

[runtime.docker]
image = "alpine:3.20"
network = "none"
memory_limit_mb = 512
cpu_limit = 1.0
read_only_rootfs = true
mount_workspace = true

[tools]
enabled = ["shell", "file_read", "file_write", "git", "memory"]
require_confirmation = ["file_write", "shell"]

6.2 添加自定义工具 🔗

// 定义自定义工具
pub struct MyCustomTool;

#[async_trait]
impl Tool for MyCustomTool {
    fn name(&self) -> &str {
        "my_tool"
    }
    
    fn description(&self) -> &str {
        "My custom tool description"
    }
    
    fn parameters_schema(&self) -> Value {
        json!({
            "type": "object",
            "properties": {
                "input": { "type": "string" }
            },
            "required": ["input"]
        })
    }
    
    async fn execute(&self, args: Value) -> Result<ToolResult> {
        let input = args["input"].as_str().unwrap_or("");
        // 实现逻辑
        Ok(ToolResult::success(format!("Processed: {}", input)))
    }
}

// 注册到注册表
registry.register(Box::new(MyCustomTool));