ZeroClaw-04-多渠道消息处理深度解析

· 3865字 · 8分钟

ZeroClaw-04-多渠道消息处理深度解析 🔗

深入解析 ZeroClaw 如何统一处理来自 15+ 消息平台的消息,实现真正的"全渠道 AI 助手"。

适合阅读人群:产品经理、渠道对接开发者、架构师


引言:为什么需要多渠道? 🔗

用户分散在各种平台 🔗

  • 开发者在 Discord 上活跃
  • 商务人员在 Slack 上工作
  • 国内用户使用 Telegram 或微信
  • 海外用户习惯 WhatsApp

传统方案的问题

方案 问题
只做一个 Web 界面 用户需要切换应用,体验割裂
每个平台开发一个机器人 维护成本高,功能重复开发
使用第三方集成服务 费用高,功能受限,数据外泄

ZeroClaw 的方案:一套代码,统一接口,同时接入所有主流平台。


一、渠道系统架构 🔗

1.1 支持的平台矩阵 🔗

flowchart TB
    subgraph 即时通讯["💬 即时通讯"]
        TG["Telegram
🟢 完整支持"] DC["Discord
🟢 完整支持"] WA["WhatsApp
🟢 Cloud API"] SG["Signal
🟡 Beta"] IM["iMessage
🟡 Mac only"] MX["Matrix
🟢 含 E2EE"] end subgraph 企业协作["🏢 企业协作"] SL["Slack
🟢 完整支持"] MM["Mattermost
🟢 完整支持"] LK["Lark/飞书
🟡 计划中"] DD["DingTalk
🟡 计划中"] end subgraph 传统通讯["📧 传统通讯"] EMAIL["Email
🟡 POP3/IMAP"] IRC["IRC
🟢 完整支持"] QQ["QQ
🟡 计划中"] end subgraph 自定义["🔌 自定义"] WEB["Webhook
🟢 HTTP 接口"] CLI["CLI
🟢 命令行"] end subgraph 核心["⚙️ ZeroClaw 核心"] CORE["统一消息处理器
Channel Router"] AGENT["Agent 编排器"] end TG --> CORE DC --> CORE WA --> CORE SL --> CORE MM --> CORE MX --> CORE EMAIL --> CORE IRC --> CORE WEB --> CORE CLI --> CORE CORE --> AGENT style CORE fill:#f9f,stroke:#333,stroke-width:4px

1.2 Channel Trait 设计 🔗

// src/channels/traits.rs
#[async_trait]
pub trait Channel: Send + Sync + std::fmt::Debug {
    /// 渠道名称
    fn name(&self) -> &str;
    
    /// 发送消息
    async fn send(&self, message: OutgoingMessage) -> Result<(), ChannelError>;
    
    /// 开始监听消息
    async fn listen(&self, handler: MessageHandler) -> Result<(), ChannelError>;
    
    /// 健康检查
    async fn health_check(&self) -> HealthStatus;
    
    /// 发送"正在输入"状态(可选)
    async fn set_typing(&self, chat_id: &str) -> Result<(), ChannelError> {
        Ok(()) // 默认空实现
    }
}

/// 标准化消息格式
#[derive(Debug, Clone)]
pub struct IncomingMessage {
    pub id: String,
    pub text: String,
    pub sender: UserInfo,
    pub chat: ChatInfo,
    pub timestamp: DateTime<Utc>,
    pub attachments: Vec<Attachment>,
    pub reply_to: Option<String>,
    pub raw: serde_json::Value,  // 原始平台数据
}

pub struct OutgoingMessage {
    pub text: String,
    pub chat_id: String,
    pub reply_to: Option<String>,
    pub attachments: Vec<OutgoingAttachment>,
}

pub type MessageHandler = Box<dyn Fn(IncomingMessage) -> BoxFuture<'static, Result<()>> + Send + Sync>;

为什么这样设计?

设计决策 理由
Trait + async_trait 统一接口,支持异步
标准化消息格式 不同平台数据统一
保留 raw 字段 兼容平台特有功能
默认实现 typing 可选功能,不强制实现

二、消息生命周期 🔗

2.1 完整处理流程 🔗

多渠道消息处理分为五个阶段,每个阶段都有明确的职责边界。


阶段 1:渠道接入 🔗

将不同平台的消息格式统一为标准格式。

sequenceDiagram
    actor USER as 用户
    participant CH as 渠道客户端
    participant GW as Gateway

    USER->>CH: 发送消息
    CH->>CH: 接收原始消息
    CH->>CH: 格式规范化
    CH->>GW: 推送标准化消息

格式转换示例

  • Telegram: message.textIncomingMessage.text
  • Discord: msg.contentIncomingMessage.text
  • Slack: event.textIncomingMessage.text

阶段 2:安全验证 🔗

验证请求合法性,防止未授权访问。

sequenceDiagram
    participant GW as Gateway
    participant AUTH as 认证层
    participant ALLOW as 白名单

    GW->>AUTH: Token 验证
    AUTH-->>GW: 验证结果
    
    alt Token 有效
        GW->>ALLOW: 白名单检查
        ALLOW-->>GW: 检查结果
    else Token 无效
        GW-->>GW: 拒绝请求
    end

安全检查点

  • Token 有效性(JWT 验证)
  • 用户白名单(allowed_users
  • 渠道白名单(allowed_channels

阶段 3:路由处理 🔗

将消息路由到合适的 Worker 进行处理。

sequenceDiagram
    participant GW as Gateway
    participant ROUTER as 消息路由器
    participant AGENT as Agent

    GW->>ROUTER: 提交消息
    ROUTER->>ROUTER: 获取 Semaphore 许可
    ROUTER->>ROUTER: 选择空闲 Worker
    ROUTER->>AGENT: 派发给 Agent

路由策略

  • 轮询(Round Robin)
  • 最少连接(Least Connections)
  • 哈希分发(按用户 ID)

阶段 4:AI 处理 🔗

Agent 调用 AI 模型生成响应。

sequenceDiagram
    participant AGENT as Agent
    participant MEM as 记忆系统
    participant AI as AI 模型

    AGENT->>MEM: 查询相关记忆
    MEM-->>AGENT: 返回上下文
    AGENT->>AI: 发送对话请求
    AI-->>AGENT: 返回 AI 响应

处理步骤

  1. 记忆检索(向量 + 关键词)
  2. 提示词组装
  3. AI 调用
  4. 工具执行(如有需要)

阶段 5:响应返回 🔗

将 AI 响应返回给用户。

sequenceDiagram
    participant AGENT as Agent
    participant ROUTER as 消息路由器
    participant GW as Gateway
    participant CH as 渠道客户端
    participant USER as 用户

    AGENT->>ROUTER: 返回响应
    ROUTER->>GW: 路由响应
    GW->>CH: 渠道特定格式
    CH-->>USER: 显示消息

格式还原

  • 将标准格式转换回渠道特定格式
  • 处理富媒体(图片、文件等)
  • 错误处理和重试

完整流程概览 🔗

flowchart LR
    subgraph 输入层["📥 输入层"]
        I["多渠道输入"]
    end
    
    subgraph 安全层["🔐 安全层"]
        S["认证 + 白名单"]
    end
    
    subgraph 处理层["⚙️ 处理层"]
        R["路由"] --> A["Agent"]
        A --> AI["AI 模型"]
    end
    
    subgraph 输出层["📤 输出层"]
        O["多渠道输出"]
    end
    
    I -->|"标准化"| S
    S -->|"验证通过"| R
    AI -->|"响应"| O
    
    style A fill:#e8f5e9,stroke:#333
    style AI fill:#fff3e0,stroke:#333

2.2 渠道管理器实现 🔗

// src/channels/mod.rs
pub struct ChannelManager {
    channels: Arc<RwLock<HashMap<String, Box<dyn Channel>>>>,
    message_tx: mpsc::Sender<(String, IncomingMessage)>,
    config: ChannelsConfig,
}

impl ChannelManager {
    pub async fn register(&mut self, name: &str, channel: Box<dyn Channel>) {
        let mut channels = self.channels.write().await;
        
        // 启动监听
        let tx = self.message_tx.clone();
        let channel_name = name.to_string();
        
        tokio::spawn(async move {
            let handler = Box::new(move |msg: IncomingMessage| {
                let tx = tx.clone();
                let name = channel_name.clone();
                
                async move {
                    tx.send((name, msg)).await
                        .map_err(|e| Error::Channel(e.to_string()))
                }.boxed()
            }) as MessageHandler;
            
            if let Err(e) = channel.listen(handler).await {
                error!("Channel {} 监听出错: {}", name, e);
            }
        });
        
        channels.insert(name.to_string(), channel);
    }
    
    /// 发送消息到指定渠道
    pub async fn send(
        &self,
        channel_name: &str,
        message: OutgoingMessage,
    ) -> Result<()> {
        let channels = self.channels.read().await;
        
        let channel = channels.get(channel_name)
            .ok_or_else(|| Error::ChannelNotFound(channel_name.into()))?;
            
        channel.send(message).await
    }
    
    /// 广播消息到所有渠道
    pub async fn broadcast(&self, message: OutgoingMessage) {
        let channels = self.channels.read().await;
        
        for (name, channel) in channels.iter() {
            if let Err(e) = channel.send(message.clone()).await {
                error!("发送到 {} 失败: {}", name, e);
            }
        }
    }
}

2.3 并发控制 🔗

// src/channels/router.rs
pub struct MessageRouter {
    worker_pool: Arc<Semaphore>,
    message_rx: mpsc::Receiver<(String, IncomingMessage)>,
    agent: Arc<Agent>,
}

impl MessageRouter {
    pub async fn run(mut self) {
        while let Some((channel_name, message)) = self.message_rx.recv().await {
            // 获取信号量许可,限制并发
            let permit = match self.worker_pool.clone().try_acquire_owned() {
                Ok(p) => p,
                Err(_) => {
                    // 资源不足,发送繁忙提示
                    warn!("工作池已满,拒绝新消息");
                    continue;
                }
            };
            
            let agent = self.agent.clone();
            
            tokio::spawn(async move {
                let _permit = permit;
                
                // 白名单检查
                if !Self::check_allowlist(&channel_name, &message).await {
                    return;
                }
                
                // 处理消息
                if let Err(e) = agent.process(message).await {
                    error!("消息处理失败: {}", e);
                }
            });
        }
    }
    
    async fn check_allowlist(channel: &str, message: &IncomingMessage) -> bool {
        // 检查用户是否在白名单中
        // 具体实现...
        true
    }
}

为什么用 Semaphore?

  • 限制并发 Worker 数量(默认 10)
  • 防止 AI API 限流
  • 保护系统资源

三、Telegram 渠道详解 🔗

3.1 接收消息实现 🔗

// src/channels/telegram.rs
pub struct TelegramChannel {
    bot: Bot,
    config: TelegramConfig,
}

#[async_trait]
impl Channel for TelegramChannel {
    fn name(&self) -> &str {
        "telegram"
    }
    
    async fn listen(&self, handler: MessageHandler) -> Result<(), ChannelError> {
        let mut stream = self.bot.get_updates().await;
        
        while let Some(update) = stream.next().await {
            if let Some(message) = update.message {
                // 转换为标准格式
                let standard_msg = self.convert_message(message).await?;
                
                // 调用处理器
                if let Err(e) = handler(standard_msg).await {
                    error!("处理 Telegram 消息失败: {}", e);
                }
            }
        }
        
        Ok(())
    }
    
    async fn send(&self, message: OutgoingMessage) -> Result<(), ChannelError> {
        // 解析媒体标记
        let (text, media) = self.parse_media_tags(&message.text);
        
        match media {
            Some(Media::Photo(path)) => {
                self.bot.send_photo(
                    ChatId(message.chat_id.parse::<i64>()?),
                    InputFile::file(path),
                ).caption(text).await?;
            }
            Some(Media::Document(path)) => {
                self.bot.send_document(
                    ChatId(message.chat_id.parse::<i64>()?),
                    InputFile::file(path),
                ).caption(text).await?;
            }
            None => {
                self.bot.send_message(
                    ChatId(message.chat_id.parse::<i64>()?),
                    text,
                ).await?;
            }
        }
        
        Ok(())
    }
}

impl TelegramChannel {
    async fn convert_message(&self, msg: Message) -> Result<IncomingMessage> {
        Ok(IncomingMessage {
            id: msg.id.to_string(),
            text: msg.text().unwrap_or("").to_string(),
            sender: UserInfo {
                id: msg.from.as_ref().map(|u| u.id.to_string()).unwrap_or_default(),
                name: msg.from.as_ref().map(|u| u.username.clone()).flatten(),
            },
            chat: ChatInfo {
                id: msg.chat.id.to_string(),
                type_: match msg.chat.kind {
                    ChatKind::Public(_) => ChatType::Group,
                    ChatKind::Private(_) => ChatType::Private,
                },
            },
            timestamp: msg.date,
            attachments: vec![],
            reply_to: msg.reply_to_message.map(|m| m.id.to_string()),
            raw: serde_json::to_value(msg)?,
        })
    }
    
    /// 解析媒体标记,如 [IMAGE:/path/to/img.png]
    fn parse_media_tags(&self, text: &str) -> (String, Option<Media>) {
        let re = regex::Regex::new(r"\[(IMAGE|DOCUMENT|VIDEO):([^\]]+)\]").unwrap();
        
        let mut clean_text = text.to_string();
        let mut media = None;
        
        for cap in re.captures_iter(text) {
            let media_type = &cap[1];
            let path = &cap[2];
            
            media = Some(match media_type {
                "IMAGE" => Media::Photo(path.into()),
                "DOCUMENT" => Media::Document(path.into()),
                "VIDEO" => Media::Video(path.into()),
                _ => continue,
            });
            
            // 从文本中移除标记
            clean_text = clean_text.replace(&cap[0], "");
        }
        
        (clean_text.trim().to_string(), media)
    }
}

3.2 白名单检查 🔗

impl TelegramChannel {
    async fn check_allowlist(&self, user_id: &str) -> Result<bool> {
        let allowed = &self.config.allowed_users;
        
        // 空列表 = 拒绝所有
        if allowed.is_empty() {
            return Ok(false);
        }
        
        // 通配符 = 允许所有
        if allowed.contains(&"*".to_string()) {
            return Ok(true);
        }
        
        Ok(allowed.contains(&user_id.to_string()))
    }
    
    async fn send_unauthorized_message(&self, chat_id: ChatId, user_id: &str) {
        let msg = format!(
            "⚠️ 未授权访问\n\n\
            您的用户 ID: {}\n\
            请联系管理员将此 ID 添加到白名单。\n\n\
            管理员可以运行:\n\
            zeroclaw channel bind-telegram {}",
            user_id, user_id
        );
        
        let _ = self.bot.send_message(chat_id, msg).await;
    }
}

四、Discord 渠道详解 🔗

4.1 WebSocket 连接管理 🔗

// src/channels/discord.rs
use serenity::{Client, model::gateway::Ready, prelude::*};

pub struct DiscordChannel {
    client: Client,
    config: DiscordConfig,
}

#[async_trait]
impl EventHandler for DiscordHandler {
    async fn message(&self, ctx: Context, msg: Message) {
        // 忽略机器人自己的消息
        if msg.author.bot {
            return;
        }
        
        // 检查是否需要响应
        if !self.should_respond(&msg) {
            return;
        }
        
        // 转换为标准格式
        let standard_msg = self.convert_message(msg).await;
        
        // 设置 typing 状态
        let _ = msg.channel_id.broadcast_typing(&ctx.http).await;
        
        // 调用处理器(通过 channel 发送)
        if let Some(ref tx) = self.message_tx {
            let _ = tx.send(("discord".into(), standard_msg)).await;
        }
    }
    
    async fn ready(&self, _: Context, ready: Ready) {
        info!("Discord 连接就绪: {}", ready.user.name);
    }
}

impl DiscordChannel {
    fn should_respond(&self, msg: &Message) -> bool {
        // 1. 私信总是响应
        if msg.guild_id.is_none() {
            return true;
        }
        
        // 2. @提及响应
        if msg.mentions.iter().any(|u| u.id == self.bot_id) {
            return true;
        }
        
        // 3. 命令前缀响应
        if let Some(ref prefix) = self.config.command_prefix {
            if msg.content.starts_with(prefix) {
                return true;
            }
        }
        
        false
    }
}

4.2 Slash Command 支持 🔗

use serenity::builder::CreateApplicationCommand;

pub fn register_commands(
    commands: &mut CreateApplicationCommand,
) -> &mut CreateApplicationCommand {
    commands
        .name("ask")
        .description("向 AI 提问")
        .create_option(|option| {
            option
                .name("question")
                .description("你的问题")
                .kind(CommandOptionType::String)
                .required(true)
        })
}

async fn handle_slash_command(
    ctx: &Context,
    command: &ApplicationCommandInteraction,
) {
    match command.data.name.as_str() {
        "ask" => {
            let question = command
                .data.options
                .get(0)
                .and_then(|opt| opt.value.as_ref())
                .and_then(|v| v.as_str())
                .unwrap_or("");
                
            // 先响应"正在思考"
            command
                .create_interaction_response(&ctx.http, |response| {
                    response
                        .kind(InteractionResponseType::DeferredChannelMessageWithSource)
                })
                .await
                .unwrap();
                
            // 调用 AI(异步)
            let answer = call_ai(question).await;
            
            // 编辑响应
            command
                .edit_original_interaction_response(&ctx.http, |response| {
                    response.content(answer)
                })
                .await
                .unwrap();
        }
        _ => {}
    }
}

五、Slack 渠道详解 🔗

5.1 Events API 实现 🔗

// src/channels/slack.rs
pub struct SlackChannel {
    client: SlackClient,
    config: SlackConfig,
}

#[async_trait]
impl Channel for SlackChannel {
    async fn listen(&self, handler: MessageHandler) -> Result<(), ChannelError> {
        // Slack 使用 HTTP Webhook,不需要长连接
        // 消息通过 Gateway 接收
        Ok(())
    }
}

/// Slack 事件处理器(在 Gateway 中调用)
pub async fn handle_slack_event(
    State(state): State<AppState>,
    headers: HeaderMap,
    Json(body): Json<SlackEvent>,
) -> Result<impl IntoResponse, StatusCode> {
    // 1. 验证签名
    if !verify_slack_signature(&headers, &body, &state.config.signing_secret) {
        return Err(StatusCode::UNAUTHORIZED);
    }
    
    match body {
        SlackEvent::UrlVerification { challenge } => {
            // 配置验证
            Ok((StatusCode::OK, challenge).into_response())
        }
        
        SlackEvent::EventCallback { event } => {
            match event {
                SlackInnerEvent::AppMention { text, user, channel } => {
                    let msg = IncomingMessage {
                        id: uuid::Uuid::new_v4().to_string(),
                        text,
                        sender: UserInfo { id: user, name: None },
                        chat: ChatInfo { id: channel, type_: ChatType::Group },
                        timestamp: Utc::now(),
                        attachments: vec![],
                        reply_to: None,
                        raw: serde_json::to_value(&event).unwrap(),
                    };
                    
                    // 发送到处理器
                    state.channel_tx.send(("slack".into(), msg)).await
                        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
                }
                
                SlackInnerEvent::Message { text, user, channel, .. } => {
                    // 处理私信
                    // ...
                }
                
                _ => {}
            }
            
            Ok(StatusCode::OK.into_response())
        }
        
        _ => Ok(StatusCode::OK.into_response()),
    }
}

fn verify_slack_signature(
    headers: &HeaderMap,
    body: &str,
    signing_secret: &str,
) -> bool {
    let timestamp = headers
        .get("x-slack-request-timestamp")
        .and_then(|v| v.to_str().ok())
        .unwrap_or("");
        
    let signature = headers
        .get("x-slack-signature")
        .and_then(|v| v.to_str().ok())
        .unwrap_or("");
        
    // 检查时间戳(5分钟内)
    let now = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap()
        .as_secs();
        
    if let Ok(ts) = timestamp.parse::<u64>() {
        if now - ts > 300 {
            return false;
        }
    }
    
    // 计算签名
    let base_string = format!("v0:{}:{}", timestamp, body);
    let mut mac = HmacSha256::new_from_slice(signing_secret.as_bytes()).unwrap();
    mac.update(base_string.as_bytes());
    let computed = format!("v0={}", hex::encode(mac.finalize().into_bytes()));
    
    // 比较签名
    signature == computed
}

六、渠道对比与选型 🔗

6.1 特性对比矩阵 🔗

渠道 实时性 富媒体 企业特性 实现复杂度
Telegram 🟢 实时 🟢 全支持 🟡 有限
Discord 🟢 实时 🟢 全支持 🟢 良好
Slack 🟢 实时 🟢 全支持 🟢 优秀
WhatsApp 🟢 实时 🟢 全支持 🟢 优秀
Matrix 🟢 实时 🟡 部分 🟡 有限
Email 🟡 非实时 🔴 仅附件 🟢 标准

6.2 选型决策树 🔗

flowchart TD
    START["选择消息渠道"] --> Q1{"目标用户群体?"}
    
    Q1 -->|"技术社区"| Q2{"需要语音/视频?"}
    Q1 -->|"企业团队"| Q3["Slack/Mattermost
🟢 推荐"] Q1 -->|"普通大众"| Q4{"WhatsApp 普及?"} Q1 -->|"开发者"| Q5["Telegram/Discord
🟢 推荐"] Q1 -->|"隐私敏感"| Q6["Signal/Matrix
🟢 推荐"] Q2 -->|"是"| Q7["Discord
🟢 推荐"] Q2 -->|"否"| Q8["Telegram
🟢 推荐"] Q4 -->|"是"| Q9["WhatsApp
🟢 推荐"] Q4 -->|"否"| Q10["Telegram
🟢 推荐"] style Q3 fill:#4A154B,color:#fff style Q7 fill:#5865F2,color:#fff style Q9 fill:#25D366

七、配置示例 🔗

7.1 多渠道配置 🔗

# Telegram 配置
[channels_config.telegram]
bot_token = "${TELEGRAM_BOT_TOKEN}"
allowed_users = ["your_telegram_user_id"]
polling_interval = 30

# Discord 配置
[channels_config.discord]
bot_token = "${DISCORD_BOT_TOKEN}"
allowed_users = ["your_discord_user_id"]
command_prefix = "!ai"

# Slack 配置
[channels_config.slack]
bot_token = "xoxb-your-bot-token"
app_token = "xapp-your-app-token"
signing_secret = "your-signing-secret"
allowed_users = ["U12345678"]

# WhatsApp 配置
[channels_config.whatsapp]
access_token = "EAAB..."
phone_number_id = "123456789012345"
verify_token = "your-verify-token"
allowed_numbers = ["+1234567890"]

# Matrix 配置
[channels_config.matrix]
homeserver = "https://matrix.org"
username = "@bot:matrix.org"
password = "${MATRIX_PASSWORD}"
allowed_users = ["@you:matrix.org"]

7.2 动态添加渠道 🔗

// 运行时添加新渠道
async fn add_channel(
    manager: &mut ChannelManager,
    config: ChannelConfig,
) -> Result<()> {
    match config.type_.as_str() {
        "telegram" => {
            let channel = TelegramChannel::new(config).await?;
            manager.register("telegram", Box::new(channel)).await;
        }
        "discord" => {
            let channel = DiscordChannel::new(config).await?;
            manager.register("discord", Box::new(channel)).await;
        }
        // ...
        _ => bail!("未知渠道类型"),
    }
    
    Ok(())
}