ZeroClaw-04-多渠道消息处理深度解析 🔗
深入解析 ZeroClaw 如何统一处理来自 15+ 消息平台的消息,实现真正的"全渠道 AI 助手"。
适合阅读人群:产品经理、渠道对接开发者、架构师
引言:为什么需要多渠道? 🔗
用户分散在各种平台 🔗
- 开发者在 Discord 上活跃
- 商务人员在 Slack 上工作
- 国内用户使用 Telegram 或微信
- 海外用户习惯 WhatsApp
传统方案的问题:
| 方案 | 问题 |
|---|---|
| 只做一个 Web 界面 | 用户需要切换应用,体验割裂 |
| 每个平台开发一个机器人 | 维护成本高,功能重复开发 |
| 使用第三方集成服务 | 费用高,功能受限,数据外泄 |
ZeroClaw 的方案:一套代码,统一接口,同时接入所有主流平台。
一、渠道系统架构 🔗
1.1 支持的平台矩阵 🔗
flowchart TB
subgraph 即时通讯["💬 即时通讯"]
TG["Telegram
🟢 完整支持"]
DC["Discord
🟢 完整支持"]
WA["WhatsApp
🟢 Cloud API"]
SG["Signal
🟡 Beta"]
IM["iMessage
🟡 Mac only"]
MX["Matrix
🟢 含 E2EE"]
end
subgraph 企业协作["🏢 企业协作"]
SL["Slack
🟢 完整支持"]
MM["Mattermost
🟢 完整支持"]
LK["Lark/飞书
🟡 计划中"]
DD["DingTalk
🟡 计划中"]
end
subgraph 传统通讯["📧 传统通讯"]
EMAIL["Email
🟡 POP3/IMAP"]
IRC["IRC
🟢 完整支持"]
QQ["QQ
🟡 计划中"]
end
subgraph 自定义["🔌 自定义"]
WEB["Webhook
🟢 HTTP 接口"]
CLI["CLI
🟢 命令行"]
end
subgraph 核心["⚙️ ZeroClaw 核心"]
CORE["统一消息处理器
Channel Router"]
AGENT["Agent 编排器"]
end
TG --> CORE
DC --> CORE
WA --> CORE
SL --> CORE
MM --> CORE
MX --> CORE
EMAIL --> CORE
IRC --> CORE
WEB --> CORE
CLI --> CORE
CORE --> AGENT
style CORE fill:#f9f,stroke:#333,stroke-width:4px
1.2 Channel Trait 设计 🔗
// src/channels/traits.rs
#[async_trait]
pub trait Channel: Send + Sync + std::fmt::Debug {
/// 渠道名称
fn name(&self) -> &str;
/// 发送消息
async fn send(&self, message: OutgoingMessage) -> Result<(), ChannelError>;
/// 开始监听消息
async fn listen(&self, handler: MessageHandler) -> Result<(), ChannelError>;
/// 健康检查
async fn health_check(&self) -> HealthStatus;
/// 发送"正在输入"状态(可选)
async fn set_typing(&self, chat_id: &str) -> Result<(), ChannelError> {
Ok(()) // 默认空实现
}
}
/// 标准化消息格式
#[derive(Debug, Clone)]
pub struct IncomingMessage {
pub id: String,
pub text: String,
pub sender: UserInfo,
pub chat: ChatInfo,
pub timestamp: DateTime<Utc>,
pub attachments: Vec<Attachment>,
pub reply_to: Option<String>,
pub raw: serde_json::Value, // 原始平台数据
}
pub struct OutgoingMessage {
pub text: String,
pub chat_id: String,
pub reply_to: Option<String>,
pub attachments: Vec<OutgoingAttachment>,
}
pub type MessageHandler = Box<dyn Fn(IncomingMessage) -> BoxFuture<'static, Result<()>> + Send + Sync>;
为什么这样设计?
| 设计决策 | 理由 |
|---|---|
| Trait + async_trait | 统一接口,支持异步 |
| 标准化消息格式 | 不同平台数据统一 |
| 保留 raw 字段 | 兼容平台特有功能 |
| 默认实现 typing | 可选功能,不强制实现 |
二、消息生命周期 🔗
2.1 完整处理流程 🔗
多渠道消息处理分为五个阶段,每个阶段都有明确的职责边界。
阶段 1:渠道接入 🔗
将不同平台的消息格式统一为标准格式。
sequenceDiagram
actor USER as 用户
participant CH as 渠道客户端
participant GW as Gateway
USER->>CH: 发送消息
CH->>CH: 接收原始消息
CH->>CH: 格式规范化
CH->>GW: 推送标准化消息
格式转换示例:
- Telegram:
message.text→IncomingMessage.text - Discord:
msg.content→IncomingMessage.text - Slack:
event.text→IncomingMessage.text
阶段 2:安全验证 🔗
验证请求合法性,防止未授权访问。
sequenceDiagram
participant GW as Gateway
participant AUTH as 认证层
participant ALLOW as 白名单
GW->>AUTH: Token 验证
AUTH-->>GW: 验证结果
alt Token 有效
GW->>ALLOW: 白名单检查
ALLOW-->>GW: 检查结果
else Token 无效
GW-->>GW: 拒绝请求
end
安全检查点:
- Token 有效性(JWT 验证)
- 用户白名单(
allowed_users) - 渠道白名单(
allowed_channels)
阶段 3:路由处理 🔗
将消息路由到合适的 Worker 进行处理。
sequenceDiagram
participant GW as Gateway
participant ROUTER as 消息路由器
participant AGENT as Agent
GW->>ROUTER: 提交消息
ROUTER->>ROUTER: 获取 Semaphore 许可
ROUTER->>ROUTER: 选择空闲 Worker
ROUTER->>AGENT: 派发给 Agent
路由策略:
- 轮询(Round Robin)
- 最少连接(Least Connections)
- 哈希分发(按用户 ID)
阶段 4:AI 处理 🔗
Agent 调用 AI 模型生成响应。
sequenceDiagram
participant AGENT as Agent
participant MEM as 记忆系统
participant AI as AI 模型
AGENT->>MEM: 查询相关记忆
MEM-->>AGENT: 返回上下文
AGENT->>AI: 发送对话请求
AI-->>AGENT: 返回 AI 响应
处理步骤:
- 记忆检索(向量 + 关键词)
- 提示词组装
- AI 调用
- 工具执行(如有需要)
阶段 5:响应返回 🔗
将 AI 响应返回给用户。
sequenceDiagram
participant AGENT as Agent
participant ROUTER as 消息路由器
participant GW as Gateway
participant CH as 渠道客户端
participant USER as 用户
AGENT->>ROUTER: 返回响应
ROUTER->>GW: 路由响应
GW->>CH: 渠道特定格式
CH-->>USER: 显示消息
格式还原:
- 将标准格式转换回渠道特定格式
- 处理富媒体(图片、文件等)
- 错误处理和重试
完整流程概览 🔗
flowchart LR
subgraph 输入层["📥 输入层"]
I["多渠道输入"]
end
subgraph 安全层["🔐 安全层"]
S["认证 + 白名单"]
end
subgraph 处理层["⚙️ 处理层"]
R["路由"] --> A["Agent"]
A --> AI["AI 模型"]
end
subgraph 输出层["📤 输出层"]
O["多渠道输出"]
end
I -->|"标准化"| S
S -->|"验证通过"| R
AI -->|"响应"| O
style A fill:#e8f5e9,stroke:#333
style AI fill:#fff3e0,stroke:#333
2.2 渠道管理器实现 🔗
// src/channels/mod.rs
pub struct ChannelManager {
channels: Arc<RwLock<HashMap<String, Box<dyn Channel>>>>,
message_tx: mpsc::Sender<(String, IncomingMessage)>,
config: ChannelsConfig,
}
impl ChannelManager {
pub async fn register(&mut self, name: &str, channel: Box<dyn Channel>) {
let mut channels = self.channels.write().await;
// 启动监听
let tx = self.message_tx.clone();
let channel_name = name.to_string();
tokio::spawn(async move {
let handler = Box::new(move |msg: IncomingMessage| {
let tx = tx.clone();
let name = channel_name.clone();
async move {
tx.send((name, msg)).await
.map_err(|e| Error::Channel(e.to_string()))
}.boxed()
}) as MessageHandler;
if let Err(e) = channel.listen(handler).await {
error!("Channel {} 监听出错: {}", name, e);
}
});
channels.insert(name.to_string(), channel);
}
/// 发送消息到指定渠道
pub async fn send(
&self,
channel_name: &str,
message: OutgoingMessage,
) -> Result<()> {
let channels = self.channels.read().await;
let channel = channels.get(channel_name)
.ok_or_else(|| Error::ChannelNotFound(channel_name.into()))?;
channel.send(message).await
}
/// 广播消息到所有渠道
pub async fn broadcast(&self, message: OutgoingMessage) {
let channels = self.channels.read().await;
for (name, channel) in channels.iter() {
if let Err(e) = channel.send(message.clone()).await {
error!("发送到 {} 失败: {}", name, e);
}
}
}
}
2.3 并发控制 🔗
// src/channels/router.rs
pub struct MessageRouter {
worker_pool: Arc<Semaphore>,
message_rx: mpsc::Receiver<(String, IncomingMessage)>,
agent: Arc<Agent>,
}
impl MessageRouter {
pub async fn run(mut self) {
while let Some((channel_name, message)) = self.message_rx.recv().await {
// 获取信号量许可,限制并发
let permit = match self.worker_pool.clone().try_acquire_owned() {
Ok(p) => p,
Err(_) => {
// 资源不足,发送繁忙提示
warn!("工作池已满,拒绝新消息");
continue;
}
};
let agent = self.agent.clone();
tokio::spawn(async move {
let _permit = permit;
// 白名单检查
if !Self::check_allowlist(&channel_name, &message).await {
return;
}
// 处理消息
if let Err(e) = agent.process(message).await {
error!("消息处理失败: {}", e);
}
});
}
}
async fn check_allowlist(channel: &str, message: &IncomingMessage) -> bool {
// 检查用户是否在白名单中
// 具体实现...
true
}
}
为什么用 Semaphore?
- 限制并发 Worker 数量(默认 10)
- 防止 AI API 限流
- 保护系统资源
三、Telegram 渠道详解 🔗
3.1 接收消息实现 🔗
// src/channels/telegram.rs
pub struct TelegramChannel {
bot: Bot,
config: TelegramConfig,
}
#[async_trait]
impl Channel for TelegramChannel {
fn name(&self) -> &str {
"telegram"
}
async fn listen(&self, handler: MessageHandler) -> Result<(), ChannelError> {
let mut stream = self.bot.get_updates().await;
while let Some(update) = stream.next().await {
if let Some(message) = update.message {
// 转换为标准格式
let standard_msg = self.convert_message(message).await?;
// 调用处理器
if let Err(e) = handler(standard_msg).await {
error!("处理 Telegram 消息失败: {}", e);
}
}
}
Ok(())
}
async fn send(&self, message: OutgoingMessage) -> Result<(), ChannelError> {
// 解析媒体标记
let (text, media) = self.parse_media_tags(&message.text);
match media {
Some(Media::Photo(path)) => {
self.bot.send_photo(
ChatId(message.chat_id.parse::<i64>()?),
InputFile::file(path),
).caption(text).await?;
}
Some(Media::Document(path)) => {
self.bot.send_document(
ChatId(message.chat_id.parse::<i64>()?),
InputFile::file(path),
).caption(text).await?;
}
None => {
self.bot.send_message(
ChatId(message.chat_id.parse::<i64>()?),
text,
).await?;
}
}
Ok(())
}
}
impl TelegramChannel {
async fn convert_message(&self, msg: Message) -> Result<IncomingMessage> {
Ok(IncomingMessage {
id: msg.id.to_string(),
text: msg.text().unwrap_or("").to_string(),
sender: UserInfo {
id: msg.from.as_ref().map(|u| u.id.to_string()).unwrap_or_default(),
name: msg.from.as_ref().map(|u| u.username.clone()).flatten(),
},
chat: ChatInfo {
id: msg.chat.id.to_string(),
type_: match msg.chat.kind {
ChatKind::Public(_) => ChatType::Group,
ChatKind::Private(_) => ChatType::Private,
},
},
timestamp: msg.date,
attachments: vec![],
reply_to: msg.reply_to_message.map(|m| m.id.to_string()),
raw: serde_json::to_value(msg)?,
})
}
/// 解析媒体标记,如 [IMAGE:/path/to/img.png]
fn parse_media_tags(&self, text: &str) -> (String, Option<Media>) {
let re = regex::Regex::new(r"\[(IMAGE|DOCUMENT|VIDEO):([^\]]+)\]").unwrap();
let mut clean_text = text.to_string();
let mut media = None;
for cap in re.captures_iter(text) {
let media_type = &cap[1];
let path = &cap[2];
media = Some(match media_type {
"IMAGE" => Media::Photo(path.into()),
"DOCUMENT" => Media::Document(path.into()),
"VIDEO" => Media::Video(path.into()),
_ => continue,
});
// 从文本中移除标记
clean_text = clean_text.replace(&cap[0], "");
}
(clean_text.trim().to_string(), media)
}
}
3.2 白名单检查 🔗
impl TelegramChannel {
async fn check_allowlist(&self, user_id: &str) -> Result<bool> {
let allowed = &self.config.allowed_users;
// 空列表 = 拒绝所有
if allowed.is_empty() {
return Ok(false);
}
// 通配符 = 允许所有
if allowed.contains(&"*".to_string()) {
return Ok(true);
}
Ok(allowed.contains(&user_id.to_string()))
}
async fn send_unauthorized_message(&self, chat_id: ChatId, user_id: &str) {
let msg = format!(
"⚠️ 未授权访问\n\n\
您的用户 ID: {}\n\
请联系管理员将此 ID 添加到白名单。\n\n\
管理员可以运行:\n\
zeroclaw channel bind-telegram {}",
user_id, user_id
);
let _ = self.bot.send_message(chat_id, msg).await;
}
}
四、Discord 渠道详解 🔗
4.1 WebSocket 连接管理 🔗
// src/channels/discord.rs
use serenity::{Client, model::gateway::Ready, prelude::*};
pub struct DiscordChannel {
client: Client,
config: DiscordConfig,
}
#[async_trait]
impl EventHandler for DiscordHandler {
async fn message(&self, ctx: Context, msg: Message) {
// 忽略机器人自己的消息
if msg.author.bot {
return;
}
// 检查是否需要响应
if !self.should_respond(&msg) {
return;
}
// 转换为标准格式
let standard_msg = self.convert_message(msg).await;
// 设置 typing 状态
let _ = msg.channel_id.broadcast_typing(&ctx.http).await;
// 调用处理器(通过 channel 发送)
if let Some(ref tx) = self.message_tx {
let _ = tx.send(("discord".into(), standard_msg)).await;
}
}
async fn ready(&self, _: Context, ready: Ready) {
info!("Discord 连接就绪: {}", ready.user.name);
}
}
impl DiscordChannel {
fn should_respond(&self, msg: &Message) -> bool {
// 1. 私信总是响应
if msg.guild_id.is_none() {
return true;
}
// 2. @提及响应
if msg.mentions.iter().any(|u| u.id == self.bot_id) {
return true;
}
// 3. 命令前缀响应
if let Some(ref prefix) = self.config.command_prefix {
if msg.content.starts_with(prefix) {
return true;
}
}
false
}
}
4.2 Slash Command 支持 🔗
use serenity::builder::CreateApplicationCommand;
pub fn register_commands(
commands: &mut CreateApplicationCommand,
) -> &mut CreateApplicationCommand {
commands
.name("ask")
.description("向 AI 提问")
.create_option(|option| {
option
.name("question")
.description("你的问题")
.kind(CommandOptionType::String)
.required(true)
})
}
async fn handle_slash_command(
ctx: &Context,
command: &ApplicationCommandInteraction,
) {
match command.data.name.as_str() {
"ask" => {
let question = command
.data.options
.get(0)
.and_then(|opt| opt.value.as_ref())
.and_then(|v| v.as_str())
.unwrap_or("");
// 先响应"正在思考"
command
.create_interaction_response(&ctx.http, |response| {
response
.kind(InteractionResponseType::DeferredChannelMessageWithSource)
})
.await
.unwrap();
// 调用 AI(异步)
let answer = call_ai(question).await;
// 编辑响应
command
.edit_original_interaction_response(&ctx.http, |response| {
response.content(answer)
})
.await
.unwrap();
}
_ => {}
}
}
五、Slack 渠道详解 🔗
5.1 Events API 实现 🔗
// src/channels/slack.rs
pub struct SlackChannel {
client: SlackClient,
config: SlackConfig,
}
#[async_trait]
impl Channel for SlackChannel {
async fn listen(&self, handler: MessageHandler) -> Result<(), ChannelError> {
// Slack 使用 HTTP Webhook,不需要长连接
// 消息通过 Gateway 接收
Ok(())
}
}
/// Slack 事件处理器(在 Gateway 中调用)
pub async fn handle_slack_event(
State(state): State<AppState>,
headers: HeaderMap,
Json(body): Json<SlackEvent>,
) -> Result<impl IntoResponse, StatusCode> {
// 1. 验证签名
if !verify_slack_signature(&headers, &body, &state.config.signing_secret) {
return Err(StatusCode::UNAUTHORIZED);
}
match body {
SlackEvent::UrlVerification { challenge } => {
// 配置验证
Ok((StatusCode::OK, challenge).into_response())
}
SlackEvent::EventCallback { event } => {
match event {
SlackInnerEvent::AppMention { text, user, channel } => {
let msg = IncomingMessage {
id: uuid::Uuid::new_v4().to_string(),
text,
sender: UserInfo { id: user, name: None },
chat: ChatInfo { id: channel, type_: ChatType::Group },
timestamp: Utc::now(),
attachments: vec![],
reply_to: None,
raw: serde_json::to_value(&event).unwrap(),
};
// 发送到处理器
state.channel_tx.send(("slack".into(), msg)).await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
}
SlackInnerEvent::Message { text, user, channel, .. } => {
// 处理私信
// ...
}
_ => {}
}
Ok(StatusCode::OK.into_response())
}
_ => Ok(StatusCode::OK.into_response()),
}
}
fn verify_slack_signature(
headers: &HeaderMap,
body: &str,
signing_secret: &str,
) -> bool {
let timestamp = headers
.get("x-slack-request-timestamp")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let signature = headers
.get("x-slack-signature")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
// 检查时间戳(5分钟内)
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
if let Ok(ts) = timestamp.parse::<u64>() {
if now - ts > 300 {
return false;
}
}
// 计算签名
let base_string = format!("v0:{}:{}", timestamp, body);
let mut mac = HmacSha256::new_from_slice(signing_secret.as_bytes()).unwrap();
mac.update(base_string.as_bytes());
let computed = format!("v0={}", hex::encode(mac.finalize().into_bytes()));
// 比较签名
signature == computed
}
六、渠道对比与选型 🔗
6.1 特性对比矩阵 🔗
| 渠道 | 实时性 | 富媒体 | 企业特性 | 实现复杂度 |
|---|---|---|---|---|
| Telegram | 🟢 实时 | 🟢 全支持 | 🟡 有限 | 低 |
| Discord | 🟢 实时 | 🟢 全支持 | 🟢 良好 | 中 |
| Slack | 🟢 实时 | 🟢 全支持 | 🟢 优秀 | 中 |
| 🟢 实时 | 🟢 全支持 | 🟢 优秀 | 高 | |
| Matrix | 🟢 实时 | 🟡 部分 | 🟡 有限 | 中 |
| 🟡 非实时 | 🔴 仅附件 | 🟢 标准 | 低 |
6.2 选型决策树 🔗
flowchart TD
START["选择消息渠道"] --> Q1{"目标用户群体?"}
Q1 -->|"技术社区"| Q2{"需要语音/视频?"}
Q1 -->|"企业团队"| Q3["Slack/Mattermost
🟢 推荐"]
Q1 -->|"普通大众"| Q4{"WhatsApp 普及?"}
Q1 -->|"开发者"| Q5["Telegram/Discord
🟢 推荐"]
Q1 -->|"隐私敏感"| Q6["Signal/Matrix
🟢 推荐"]
Q2 -->|"是"| Q7["Discord
🟢 推荐"]
Q2 -->|"否"| Q8["Telegram
🟢 推荐"]
Q4 -->|"是"| Q9["WhatsApp
🟢 推荐"]
Q4 -->|"否"| Q10["Telegram
🟢 推荐"]
style Q3 fill:#4A154B,color:#fff
style Q7 fill:#5865F2,color:#fff
style Q9 fill:#25D366
七、配置示例 🔗
7.1 多渠道配置 🔗
# Telegram 配置
[channels_config.telegram]
bot_token = "${TELEGRAM_BOT_TOKEN}"
allowed_users = ["your_telegram_user_id"]
polling_interval = 30
# Discord 配置
[channels_config.discord]
bot_token = "${DISCORD_BOT_TOKEN}"
allowed_users = ["your_discord_user_id"]
command_prefix = "!ai"
# Slack 配置
[channels_config.slack]
bot_token = "xoxb-your-bot-token"
app_token = "xapp-your-app-token"
signing_secret = "your-signing-secret"
allowed_users = ["U12345678"]
# WhatsApp 配置
[channels_config.whatsapp]
access_token = "EAAB..."
phone_number_id = "123456789012345"
verify_token = "your-verify-token"
allowed_numbers = ["+1234567890"]
# Matrix 配置
[channels_config.matrix]
homeserver = "https://matrix.org"
username = "@bot:matrix.org"
password = "${MATRIX_PASSWORD}"
allowed_users = ["@you:matrix.org"]
7.2 动态添加渠道 🔗
// 运行时添加新渠道
async fn add_channel(
manager: &mut ChannelManager,
config: ChannelConfig,
) -> Result<()> {
match config.type_.as_str() {
"telegram" => {
let channel = TelegramChannel::new(config).await?;
manager.register("telegram", Box::new(channel)).await;
}
"discord" => {
let channel = DiscordChannel::new(config).await?;
manager.register("discord", Box::new(channel)).await;
}
// ...
_ => bail!("未知渠道类型"),
}
Ok(())
}