mirror of
https://github.com/RightNow-AI/openfang.git
synced 2026-04-25 17:25:11 +02:00
v0.5.7: multi-instance hands + 8 critical fixes
## Headline feature - Multi-instance Hands via optional instance_name (customer ask + #878). Web UI, CLI (--name / -n), API, kernel, registry all threaded. Two clip-youtube + clip-tiktok instances now coexist. Backward compatible when instance_name is omitted. ## Critical bug fixes - #919 [SECURITY] rm bypass closed. process_start tool now validates against exec_policy allowlist and rejects shell metacharacters in both command and args. Added 5 regression tests. - #1013 session_repair phase ordering — dedup now runs BEFORE synthetic result insertion, fixing Moonshot's non-unique tool_call_id format (function_name:index). Added regression test. - #1003 global [[fallback_providers]] now actually used at runtime. resolve_driver wraps primary in FallbackDriver with global fallback chain. Network errors escalate to fallback instead of infinite retry. - #937 Discord gateway heartbeat. Spawns interval task, tracks sequence, handles ACKs, detects zombie connections, force-closes on missing ACK. Credits @hello-world-bfree (PR #938) for the diagnosis. - #935 System prompt leak in Web UI. get_agent_session now filters Role::System by default (?include_system=true for debug). Defense in depth client-side filter too. - #984 Custom hands persistence. install_from_path copies to ~/.openfang/hands/. Kernel loads them on startup. - #884 Workspace version bump 0.5.5 -> 0.5.7. Binaries now correctly report --version as 0.5.7 instead of stale 0.5.5. ## Cleanup - rmcp 1.3 builder API adopted (credits @jefflower PR #986) for StreamableHttpClientTransportConfig. Drops unused Arc import. ## Stats - 22 files changed, all workspace tests passing (1800+) - Live-tested with daemon: v0.5.7 reported, multi-instance hands verified end-to-end, Groq round-trip PONG confirmed
This commit is contained in:
29
Cargo.lock
generated
29
Cargo.lock
generated
@@ -3977,7 +3977,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "openfang-api"
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
dependencies = [
|
||||
"argon2",
|
||||
"async-trait",
|
||||
@@ -4019,7 +4019,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "openfang-channels"
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
dependencies = [
|
||||
"aes",
|
||||
"async-trait",
|
||||
@@ -4058,7 +4058,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "openfang-cli"
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
dependencies = [
|
||||
"clap",
|
||||
"clap_complete",
|
||||
@@ -4086,7 +4086,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "openfang-desktop"
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
dependencies = [
|
||||
"axum",
|
||||
"open",
|
||||
@@ -4112,7 +4112,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "openfang-extensions"
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
dependencies = [
|
||||
"aes-gcm",
|
||||
"argon2",
|
||||
@@ -4140,10 +4140,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "openfang-hands"
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"dashmap",
|
||||
"dirs 6.0.0",
|
||||
"openfang-types",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -4157,7 +4158,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "openfang-kernel"
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
@@ -4195,7 +4196,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "openfang-memory"
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
@@ -4215,7 +4216,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "openfang-migrate"
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"dirs 6.0.0",
|
||||
@@ -4234,7 +4235,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "openfang-runtime"
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
@@ -4270,7 +4271,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "openfang-skills"
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"hex",
|
||||
@@ -4293,7 +4294,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "openfang-types"
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
@@ -4312,7 +4313,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "openfang-wire"
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
@@ -9368,7 +9369,7 @@ checksum = "b9cc00251562a284751c9973bace760d86c0276c471b4be569fe6b068ee97a56"
|
||||
|
||||
[[package]]
|
||||
name = "xtask"
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
|
||||
[[package]]
|
||||
name = "yoke"
|
||||
|
||||
@@ -18,7 +18,7 @@ members = [
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
version = "0.5.5"
|
||||
version = "0.5.7"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0 OR MIT"
|
||||
repository = "https://github.com/RightNow-AI/openfang"
|
||||
|
||||
@@ -436,9 +436,16 @@ pub async fn send_message(
|
||||
}
|
||||
|
||||
/// GET /api/agents/:id/session — Get agent session (conversation history).
|
||||
///
|
||||
/// Query parameters:
|
||||
/// - `include_system` — when `true`, system-role messages are included in the
|
||||
/// response (intended for debugging only). Defaults to `false` so the
|
||||
/// internal system prompt is never leaked into the Web UI conversation
|
||||
/// history (issue #935).
|
||||
pub async fn get_agent_session(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(id): Path<String>,
|
||||
Query(params): Query<HashMap<String, String>>,
|
||||
) -> impl IntoResponse {
|
||||
let agent_id: AgentId = match id.parse() {
|
||||
Ok(id) => id,
|
||||
@@ -450,6 +457,14 @@ pub async fn get_agent_session(
|
||||
}
|
||||
};
|
||||
|
||||
// SECURITY (#935): Default to filtering out system-role messages so the
|
||||
// internal system prompt is never exposed in the Web UI conversation
|
||||
// history. Callers can opt-in via `?include_system=true` for debugging.
|
||||
let include_system = params
|
||||
.get("include_system")
|
||||
.map(|v| matches!(v.as_str(), "1" | "true" | "yes" | "TRUE" | "True"))
|
||||
.unwrap_or(false);
|
||||
|
||||
let entry = match state.kernel.registry.get(agent_id) {
|
||||
Some(e) => e,
|
||||
None => {
|
||||
@@ -462,6 +477,18 @@ pub async fn get_agent_session(
|
||||
|
||||
match state.kernel.memory.get_session(entry.session_id) {
|
||||
Ok(Some(session)) => {
|
||||
// Filter out system-role messages BEFORE any rendering / truncation
|
||||
// logic so the system prompt cannot leak into the response. The
|
||||
// raw message count is preserved separately for the API consumer.
|
||||
let raw_message_count = session.messages.len();
|
||||
let filtered_messages: Vec<&openfang_types::message::Message> = session
|
||||
.messages
|
||||
.iter()
|
||||
.filter(|m| {
|
||||
include_system || m.role != openfang_types::message::Role::System
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Two-pass approach: ToolUse blocks live in Assistant messages while
|
||||
// ToolResult blocks arrive in subsequent User messages. Pass 1
|
||||
// collects all tool_use entries keyed by id; pass 2 attaches results.
|
||||
@@ -472,7 +499,8 @@ pub async fn get_agent_session(
|
||||
let mut tool_use_index: std::collections::HashMap<String, (usize, usize)> =
|
||||
std::collections::HashMap::new();
|
||||
|
||||
for m in &session.messages {
|
||||
for m in &filtered_messages {
|
||||
let m = *m;
|
||||
let mut tools: Vec<serde_json::Value> = Vec::new();
|
||||
let mut msg_images: Vec<serde_json::Value> = Vec::new();
|
||||
let content = match &m.content {
|
||||
@@ -562,8 +590,8 @@ pub async fn get_agent_session(
|
||||
built_messages.push(msg);
|
||||
}
|
||||
|
||||
// Pass 2: walk messages again and attach ToolResult to the correct tool
|
||||
for m in &session.messages {
|
||||
// Pass 2: walk filtered messages again and attach ToolResult to the correct tool
|
||||
for m in &filtered_messages {
|
||||
if let openfang_types::message::MessageContent::Blocks(blocks) = &m.content {
|
||||
for b in blocks {
|
||||
if let openfang_types::message::ContentBlock::ToolResult {
|
||||
@@ -593,12 +621,16 @@ pub async fn get_agent_session(
|
||||
}
|
||||
|
||||
let messages = built_messages;
|
||||
// `message_count` reflects what the API actually returns (system
|
||||
// messages excluded by default). `raw_message_count` is exposed
|
||||
// for callers that need to know the underlying total.
|
||||
(
|
||||
StatusCode::OK,
|
||||
Json(serde_json::json!({
|
||||
"session_id": session.id.0.to_string(),
|
||||
"agent_id": session.agent_id.0.to_string(),
|
||||
"message_count": session.messages.len(),
|
||||
"message_count": messages.len(),
|
||||
"raw_message_count": raw_message_count,
|
||||
"context_window_tokens": session.context_window_tokens,
|
||||
"label": session.label,
|
||||
"messages": messages,
|
||||
@@ -4038,12 +4070,18 @@ pub async fn list_active_hands(State(state): State<Arc<AppState>>) -> impl IntoR
|
||||
let items: Vec<serde_json::Value> = instances
|
||||
.iter()
|
||||
.map(|i| {
|
||||
// Effective agent name: custom instance_name takes priority, otherwise HAND.toml default.
|
||||
let effective_agent_name = i
|
||||
.instance_name
|
||||
.clone()
|
||||
.unwrap_or_else(|| i.agent_name.clone());
|
||||
serde_json::json!({
|
||||
"instance_id": i.instance_id,
|
||||
"hand_id": i.hand_id,
|
||||
"instance_name": i.instance_name,
|
||||
"status": format!("{}", i.status),
|
||||
"agent_id": i.agent_id.map(|a| a.to_string()),
|
||||
"agent_name": i.agent_name,
|
||||
"agent_name": effective_agent_name,
|
||||
"activated_at": i.activated_at.to_rfc3339(),
|
||||
"updated_at": i.updated_at.to_rfc3339(),
|
||||
})
|
||||
@@ -4507,9 +4545,12 @@ pub async fn activate_hand(
|
||||
Path(hand_id): Path<String>,
|
||||
body: Option<Json<openfang_hands::ActivateHandRequest>>,
|
||||
) -> impl IntoResponse {
|
||||
let config = body.map(|b| b.0.config).unwrap_or_default();
|
||||
let (config, instance_name) = match body.map(|b| b.0) {
|
||||
Some(r) => (r.config, r.instance_name),
|
||||
None => (std::collections::HashMap::new(), None),
|
||||
};
|
||||
|
||||
match state.kernel.activate_hand(&hand_id, config) {
|
||||
match state.kernel.activate_hand(&hand_id, config, instance_name) {
|
||||
Ok(instance) => {
|
||||
// If the hand agent has a non-reactive schedule (autonomous hands),
|
||||
// start its background loop so it begins running immediately.
|
||||
@@ -4533,14 +4574,19 @@ pub async fn activate_hand(
|
||||
}
|
||||
}
|
||||
}
|
||||
let effective_agent_name = instance
|
||||
.instance_name
|
||||
.clone()
|
||||
.unwrap_or_else(|| instance.agent_name.clone());
|
||||
(
|
||||
StatusCode::OK,
|
||||
Json(serde_json::json!({
|
||||
"instance_id": instance.instance_id,
|
||||
"hand_id": instance.hand_id,
|
||||
"instance_name": instance.instance_name,
|
||||
"status": format!("{}", instance.status),
|
||||
"agent_id": instance.agent_id.map(|a| a.to_string()),
|
||||
"agent_name": instance.agent_name,
|
||||
"agent_name": effective_agent_name,
|
||||
"activated_at": instance.activated_at.to_rfc3339(),
|
||||
})),
|
||||
)
|
||||
|
||||
@@ -2892,6 +2892,14 @@ args = ["-y", "@modelcontextprotocol/server-filesystem", "/path"]</pre>
|
||||
|
||||
<!-- ═══ Step 2: Configure ═══ -->
|
||||
<div class="hand-wizard-body" x-show="setupStep === 2">
|
||||
<!-- Optional instance name (for running multiple instances of the same hand) -->
|
||||
<div class="mb-4">
|
||||
<div class="text-xs text-dim mb-1" style="letter-spacing:0.5px;text-transform:uppercase">
|
||||
Instance name <span style="text-transform:none;letter-spacing:0;color:#888">(optional)</span>
|
||||
</div>
|
||||
<div class="text-xs text-dim mb-2">Leave empty for a single instance of this hand. Set a name to run multiple instances in parallel.</div>
|
||||
<input type="text" class="form-input" x-model="setupWizard.instanceName" placeholder="e.g. clip-youtube, lead-q4-outreach" style="width:100%">
|
||||
</div>
|
||||
<template x-if="!setupHasSettings">
|
||||
<div class="text-sm text-dim" style="text-align:center;padding:20px 0">No configuration needed for this hand. Click Next to continue.</div>
|
||||
</template>
|
||||
|
||||
@@ -519,7 +519,14 @@ function chatPage() {
|
||||
try {
|
||||
var data = await OpenFangAPI.get('/api/agents/' + agentId + '/session');
|
||||
if (data.messages && data.messages.length) {
|
||||
self.messages = data.messages.map(function(m) {
|
||||
// Defense-in-depth (#935): never render system-role messages in the
|
||||
// conversation history view, even if the backend somehow returns
|
||||
// one. The server already filters these out by default, but we
|
||||
// guard here too so a regression cannot leak the system prompt.
|
||||
var visible = data.messages.filter(function(m) {
|
||||
return m && m.role !== 'System' && m.role !== 'system';
|
||||
});
|
||||
self.messages = visible.map(function(m) {
|
||||
var role = m.role === 'User' ? 'user' : (m.role === 'System' ? 'system' : 'agent');
|
||||
var text = typeof m.content === 'string' ? m.content : JSON.stringify(m.content);
|
||||
// Sanitize any raw function-call text from history
|
||||
|
||||
@@ -122,6 +122,8 @@ function handsPage() {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Initialize optional instance name (for multi-instance hands).
|
||||
data.instanceName = '';
|
||||
this.setupWizard = data;
|
||||
// Skip deps step if no requirements
|
||||
var hasReqs = data.requirements && data.requirements.length > 0;
|
||||
@@ -408,8 +410,14 @@ function handsPage() {
|
||||
}
|
||||
this.activatingId = handId;
|
||||
try {
|
||||
var data = await OpenFangAPI.post('/api/hands/' + handId + '/activate', { config: config });
|
||||
this.showToast('Hand "' + handId + '" activated as ' + (data.agent_name || data.instance_id));
|
||||
var payload = { config: config };
|
||||
var name = (this.setupWizard.instanceName || '').trim();
|
||||
if (name) {
|
||||
payload.instance_name = name;
|
||||
}
|
||||
var data = await OpenFangAPI.post('/api/hands/' + handId + '/activate', payload);
|
||||
var label = data.instance_name || data.agent_name || data.instance_id;
|
||||
this.showToast('Hand "' + handId + '" activated as ' + label);
|
||||
this.closeSetupWizard();
|
||||
await this.loadActive();
|
||||
this.tab = 'active';
|
||||
|
||||
@@ -314,6 +314,115 @@ async fn test_agent_session_empty() {
|
||||
assert_eq!(body["messages"].as_array().unwrap().len(), 0);
|
||||
}
|
||||
|
||||
/// Regression test for #935: the GET /api/agents/:id/session endpoint
|
||||
/// must NOT expose internal system-prompt messages to the Web UI.
|
||||
///
|
||||
/// We construct a session containing a System message + a User message + an
|
||||
/// Assistant message, persist it via the kernel's memory store, then call the
|
||||
/// HTTP endpoint and assert:
|
||||
/// 1. The default response excludes the system message entirely.
|
||||
/// 2. `message_count` reflects only the visible (user + assistant) messages.
|
||||
/// 3. `raw_message_count` exposes the underlying total.
|
||||
/// 4. With `?include_system=true`, the system message IS returned (debug
|
||||
/// mode opt-in).
|
||||
#[tokio::test]
|
||||
async fn test_agent_session_filters_system_messages() {
|
||||
use openfang_types::message::{Message, Role};
|
||||
|
||||
let server = start_test_server().await;
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
// Spawn agent
|
||||
let resp = client
|
||||
.post(format!("{}/api/agents", server.base_url))
|
||||
.json(&serde_json::json!({"manifest_toml": TEST_MANIFEST}))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let body: serde_json::Value = resp.json().await.unwrap();
|
||||
let agent_id_str = body["agent_id"].as_str().unwrap().to_string();
|
||||
|
||||
// Look up the agent's session id and inject a forged history that
|
||||
// contains a system-role message (simulating what an OpenAI-compat
|
||||
// client could push, or what a future regression might persist).
|
||||
let agent_id: openfang_types::agent::AgentId = agent_id_str.parse().unwrap();
|
||||
let entry = server.state.kernel.registry.get(agent_id).unwrap();
|
||||
let session_id = entry.session_id;
|
||||
let mut session = server
|
||||
.state
|
||||
.kernel
|
||||
.memory
|
||||
.get_session(session_id)
|
||||
.unwrap()
|
||||
.expect("session should exist after spawn");
|
||||
|
||||
session.messages = vec![
|
||||
Message {
|
||||
role: Role::System,
|
||||
content: openfang_types::message::MessageContent::Text(
|
||||
"INTERNAL SYSTEM PROMPT — must not leak to UI".to_string(),
|
||||
),
|
||||
},
|
||||
Message::user("hello"),
|
||||
Message::assistant("hi there"),
|
||||
];
|
||||
server.state.kernel.memory.save_session(&session).unwrap();
|
||||
|
||||
// --- Default request: system message must be filtered out ---
|
||||
let resp = client
|
||||
.get(format!(
|
||||
"{}/api/agents/{}/session",
|
||||
server.base_url, agent_id_str
|
||||
))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), 200);
|
||||
let body: serde_json::Value = resp.json().await.unwrap();
|
||||
|
||||
let messages = body["messages"].as_array().unwrap();
|
||||
assert_eq!(messages.len(), 2, "should only see user + assistant");
|
||||
assert_eq!(body["message_count"], 2);
|
||||
assert_eq!(body["raw_message_count"], 3);
|
||||
|
||||
// No message in the response should carry the System role label, and
|
||||
// the system prompt text MUST NOT appear anywhere in the payload.
|
||||
for m in messages {
|
||||
let role = m["role"].as_str().unwrap_or("");
|
||||
assert_ne!(role, "System", "system role leaked into UI history");
|
||||
assert_ne!(role, "system", "system role leaked into UI history");
|
||||
}
|
||||
let body_str = serde_json::to_string(&body).unwrap();
|
||||
assert!(
|
||||
!body_str.contains("INTERNAL SYSTEM PROMPT"),
|
||||
"system prompt content leaked into session response: {body_str}"
|
||||
);
|
||||
|
||||
// Verify the visible roles are exactly what we expect.
|
||||
assert_eq!(messages[0]["role"], "User");
|
||||
assert_eq!(messages[0]["content"], "hello");
|
||||
assert_eq!(messages[1]["role"], "Assistant");
|
||||
assert_eq!(messages[1]["content"], "hi there");
|
||||
|
||||
// --- Opt-in debug mode: ?include_system=true returns it ---
|
||||
let resp = client
|
||||
.get(format!(
|
||||
"{}/api/agents/{}/session?include_system=true",
|
||||
server.base_url, agent_id_str
|
||||
))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.status(), 200);
|
||||
let body: serde_json::Value = resp.json().await.unwrap();
|
||||
let messages = body["messages"].as_array().unwrap();
|
||||
assert_eq!(messages.len(), 3, "include_system=true should return all 3");
|
||||
assert_eq!(messages[0]["role"], "System");
|
||||
assert_eq!(messages[0]["content"], "INTERNAL SYSTEM PROMPT — must not leak to UI");
|
||||
assert_eq!(body["message_count"], 3);
|
||||
assert_eq!(body["raw_message_count"], 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_send_message_with_llm() {
|
||||
if std::env::var("GROQ_API_KEY").is_err() {
|
||||
|
||||
@@ -10,9 +10,11 @@ use async_trait::async_trait;
|
||||
use futures::{SinkExt, Stream, StreamExt};
|
||||
use std::collections::HashMap;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::{mpsc, watch, RwLock};
|
||||
use tokio::sync::{mpsc, watch, Mutex, RwLock};
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use zeroize::Zeroizing;
|
||||
|
||||
@@ -33,6 +35,18 @@ mod opcode {
|
||||
pub const HEARTBEAT_ACK: u64 = 11;
|
||||
}
|
||||
|
||||
/// Build a Discord gateway heartbeat (opcode 1) payload.
|
||||
///
|
||||
/// Per the Discord gateway spec, the payload `d` field is the last received
|
||||
/// dispatch sequence number, or `null` if no dispatch has been received yet.
|
||||
/// See: <https://discord.com/developers/docs/topics/gateway#sending-heartbeats>
|
||||
fn build_heartbeat_payload(last_sequence: Option<u64>) -> serde_json::Value {
|
||||
serde_json::json!({
|
||||
"op": opcode::HEARTBEAT,
|
||||
"d": last_sequence,
|
||||
})
|
||||
}
|
||||
|
||||
/// Discord Gateway adapter using WebSocket.
|
||||
pub struct DiscordAdapter {
|
||||
/// SECURITY: Bot token is zeroized on drop to prevent memory disclosure.
|
||||
@@ -191,8 +205,15 @@ impl ChannelAdapter for DiscordAdapter {
|
||||
backoff = INITIAL_BACKOFF;
|
||||
info!("Discord gateway connected");
|
||||
|
||||
let (mut ws_tx, mut ws_rx) = ws_stream.split();
|
||||
let mut _heartbeat_interval: Option<u64> = None;
|
||||
let (ws_tx_raw, mut ws_rx) = ws_stream.split();
|
||||
// Wrap the sink so the periodic heartbeat task and the inner
|
||||
// loop can both write to it.
|
||||
let ws_tx = Arc::new(Mutex::new(ws_tx_raw));
|
||||
let mut heartbeat_handle: Option<JoinHandle<()>> = None;
|
||||
// Tracks whether the most recent heartbeat we sent has been
|
||||
// ACKed (opcode 11). Initialized to `true` so the first
|
||||
// heartbeat is always allowed to fire.
|
||||
let heartbeat_acked = Arc::new(AtomicBool::new(true));
|
||||
|
||||
// Inner message loop — returns true if we should reconnect
|
||||
let should_reconnect = 'inner: loop {
|
||||
@@ -201,7 +222,10 @@ impl ChannelAdapter for DiscordAdapter {
|
||||
_ = shutdown.changed() => {
|
||||
if *shutdown.borrow() {
|
||||
info!("Discord shutdown requested");
|
||||
let _ = ws_tx.close().await;
|
||||
if let Some(h) = heartbeat_handle.take() {
|
||||
h.abort();
|
||||
}
|
||||
let _ = ws_tx.lock().await.close().await;
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
@@ -239,7 +263,8 @@ impl ChannelAdapter for DiscordAdapter {
|
||||
|
||||
let op = payload["op"].as_u64().unwrap_or(999);
|
||||
|
||||
// Update sequence number
|
||||
// Update sequence number from any payload that carries one
|
||||
// (typically dispatch events, opcode 0).
|
||||
if let Some(s) = payload["s"].as_u64() {
|
||||
*sequence.write().await = Some(s);
|
||||
}
|
||||
@@ -248,9 +273,72 @@ impl ChannelAdapter for DiscordAdapter {
|
||||
opcode::HELLO => {
|
||||
let interval =
|
||||
payload["d"]["heartbeat_interval"].as_u64().unwrap_or(45000);
|
||||
_heartbeat_interval = Some(interval);
|
||||
debug!("Discord HELLO: heartbeat_interval={interval}ms");
|
||||
|
||||
// Spawn the periodic heartbeat task BEFORE we send
|
||||
// IDENTIFY/RESUME, per the Discord gateway flow.
|
||||
// Abort any stale handle from a previous attempt
|
||||
// first (defensive — should normally be None here).
|
||||
if let Some(h) = heartbeat_handle.take() {
|
||||
h.abort();
|
||||
}
|
||||
heartbeat_acked.store(true, Ordering::Relaxed);
|
||||
let hb_sink = ws_tx.clone();
|
||||
let hb_seq = sequence.clone();
|
||||
let hb_acked = heartbeat_acked.clone();
|
||||
let mut hb_shutdown = shutdown.clone();
|
||||
heartbeat_handle = Some(tokio::spawn(async move {
|
||||
let mut ticker =
|
||||
tokio::time::interval(Duration::from_millis(interval));
|
||||
// Skip the immediate first tick — we want to
|
||||
// wait one full interval before the first beat.
|
||||
ticker.tick().await;
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = ticker.tick() => {}
|
||||
_ = hb_shutdown.changed() => {
|
||||
if *hb_shutdown.borrow() {
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// If the previous heartbeat was never
|
||||
// ACKed, the connection is zombied — close
|
||||
// the sink so the read loop sees EOF and
|
||||
// triggers a reconnect (Discord spec).
|
||||
if !hb_acked.swap(false, Ordering::Relaxed) {
|
||||
warn!(
|
||||
"Discord: previous heartbeat not ACKed, \
|
||||
forcing reconnect"
|
||||
);
|
||||
let _ = hb_sink.lock().await.close().await;
|
||||
return;
|
||||
}
|
||||
|
||||
let seq = *hb_seq.read().await;
|
||||
let payload = build_heartbeat_payload(seq);
|
||||
let text = match serde_json::to_string(&payload) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
error!("Discord: failed to serialize heartbeat: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let send_res = hb_sink
|
||||
.lock()
|
||||
.await
|
||||
.send(tokio_tungstenite::tungstenite::Message::Text(text))
|
||||
.await;
|
||||
if let Err(e) = send_res {
|
||||
warn!("Discord: failed to send heartbeat: {e}");
|
||||
return;
|
||||
}
|
||||
debug!("Discord heartbeat sent (seq={:?})", seq);
|
||||
}
|
||||
}));
|
||||
|
||||
// Try RESUME if we have a session, otherwise IDENTIFY
|
||||
let has_session = session_id_store.read().await.is_some();
|
||||
let has_seq = sequence.read().await.is_some();
|
||||
@@ -284,6 +372,8 @@ impl ChannelAdapter for DiscordAdapter {
|
||||
};
|
||||
|
||||
if let Err(e) = ws_tx
|
||||
.lock()
|
||||
.await
|
||||
.send(tokio_tungstenite::tungstenite::Message::Text(
|
||||
serde_json::to_string(&gateway_msg).unwrap(),
|
||||
))
|
||||
@@ -350,16 +440,23 @@ impl ChannelAdapter for DiscordAdapter {
|
||||
opcode::HEARTBEAT => {
|
||||
// Server requests immediate heartbeat
|
||||
let seq = *sequence.read().await;
|
||||
let hb = serde_json::json!({ "op": opcode::HEARTBEAT, "d": seq });
|
||||
let hb = build_heartbeat_payload(seq);
|
||||
let _ = ws_tx
|
||||
.lock()
|
||||
.await
|
||||
.send(tokio_tungstenite::tungstenite::Message::Text(
|
||||
serde_json::to_string(&hb).unwrap(),
|
||||
))
|
||||
.await;
|
||||
// The server-requested heartbeat counts as a fresh
|
||||
// beat — reset the ACK gate so the periodic task
|
||||
// doesn't see a stale "unacked" flag.
|
||||
heartbeat_acked.store(false, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
opcode::HEARTBEAT_ACK => {
|
||||
debug!("Discord heartbeat ACK received");
|
||||
heartbeat_acked.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
opcode::RECONNECT => {
|
||||
@@ -385,6 +482,12 @@ impl ChannelAdapter for DiscordAdapter {
|
||||
}
|
||||
};
|
||||
|
||||
// Tear down the heartbeat task before we either exit or
|
||||
// reconnect, so it doesn't outlive its WebSocket sink.
|
||||
if let Some(h) = heartbeat_handle.take() {
|
||||
h.abort();
|
||||
}
|
||||
|
||||
if !should_reconnect || *shutdown.borrow() {
|
||||
break;
|
||||
}
|
||||
@@ -889,6 +992,28 @@ mod tests {
|
||||
assert!(!msg.is_group);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_heartbeat_payload_with_sequence() {
|
||||
let payload = build_heartbeat_payload(Some(42));
|
||||
assert_eq!(payload["op"], 1);
|
||||
assert_eq!(payload["d"], 42);
|
||||
// Round-trip through serde_json::to_string and re-parse to assert
|
||||
// valid JSON matching {"op":1,"d":42} regardless of key ordering.
|
||||
let s = serde_json::to_string(&payload).unwrap();
|
||||
let parsed: serde_json::Value = serde_json::from_str(&s).unwrap();
|
||||
assert_eq!(parsed, serde_json::json!({"op": 1, "d": 42}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_heartbeat_payload_without_sequence() {
|
||||
let payload = build_heartbeat_payload(None);
|
||||
assert_eq!(payload["op"], 1);
|
||||
assert!(payload["d"].is_null());
|
||||
let s = serde_json::to_string(&payload).unwrap();
|
||||
let parsed: serde_json::Value = serde_json::from_str(&s).unwrap();
|
||||
assert_eq!(parsed, serde_json::json!({"op": 1, "d": serde_json::Value::Null}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_discord_adapter_creation() {
|
||||
let adapter = DiscordAdapter::new(
|
||||
|
||||
@@ -406,6 +406,9 @@ enum HandCommands {
|
||||
Activate {
|
||||
/// Hand ID (e.g. "clip", "lead", "researcher").
|
||||
id: String,
|
||||
/// Optional instance name. Required to run multiple instances of the same hand.
|
||||
#[arg(long, short = 'n')]
|
||||
name: Option<String>,
|
||||
},
|
||||
/// Deactivate an active hand instance.
|
||||
Deactivate {
|
||||
@@ -1000,7 +1003,7 @@ fn main() {
|
||||
HandCommands::List => cmd_hand_list(),
|
||||
HandCommands::Active => cmd_hand_active(),
|
||||
HandCommands::Install { path } => cmd_hand_install(&path),
|
||||
HandCommands::Activate { id } => cmd_hand_activate(&id),
|
||||
HandCommands::Activate { id, name } => cmd_hand_activate(&id, name),
|
||||
HandCommands::Deactivate { id } => cmd_hand_deactivate(&id),
|
||||
HandCommands::Info { id } => cmd_hand_info(&id),
|
||||
HandCommands::CheckDeps { id } => cmd_hand_check_deps(&id),
|
||||
@@ -4368,23 +4371,37 @@ fn cmd_hand_active() {
|
||||
}
|
||||
}
|
||||
|
||||
fn cmd_hand_activate(id: &str) {
|
||||
fn cmd_hand_activate(id: &str, name: Option<String>) {
|
||||
let base = require_daemon("hand activate");
|
||||
let client = daemon_client();
|
||||
let request_body = match &name {
|
||||
Some(n) => serde_json::json!({ "instance_name": n }).to_string(),
|
||||
None => "{}".to_string(),
|
||||
};
|
||||
let body = daemon_json(
|
||||
client
|
||||
.post(format!("{base}/api/hands/{id}/activate"))
|
||||
.header("content-type", "application/json")
|
||||
.body("{}")
|
||||
.body(request_body)
|
||||
.send(),
|
||||
);
|
||||
if body.get("instance_id").is_some() {
|
||||
println!(
|
||||
"Hand '{}' activated (instance: {}, agent: {})",
|
||||
id,
|
||||
body["instance_id"].as_str().unwrap_or("?"),
|
||||
body["agent_name"].as_str().unwrap_or("?"),
|
||||
);
|
||||
if let Some(n) = &name {
|
||||
println!(
|
||||
"Hand '{}' activated (instance: {}, name: {}, agent: {})",
|
||||
id,
|
||||
body["instance_id"].as_str().unwrap_or("?"),
|
||||
n,
|
||||
body["agent_name"].as_str().unwrap_or("?"),
|
||||
);
|
||||
} else {
|
||||
println!(
|
||||
"Hand '{}' activated (instance: {}, agent: {})",
|
||||
id,
|
||||
body["instance_id"].as_str().unwrap_or("?"),
|
||||
body["agent_name"].as_str().unwrap_or("?"),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
eprintln!(
|
||||
"Failed to activate hand '{}': {}",
|
||||
|
||||
@@ -2226,13 +2226,22 @@ pub fn spawn_fetch_active_hands(backend: BackendRef, tx: mpsc::Sender<AppEvent>)
|
||||
}
|
||||
|
||||
/// Activate a hand.
|
||||
pub fn spawn_activate_hand(backend: BackendRef, hand_id: String, tx: mpsc::Sender<AppEvent>) {
|
||||
pub fn spawn_activate_hand(
|
||||
backend: BackendRef,
|
||||
hand_id: String,
|
||||
instance_name: Option<String>,
|
||||
tx: mpsc::Sender<AppEvent>,
|
||||
) {
|
||||
std::thread::spawn(move || match backend {
|
||||
BackendRef::Daemon(base_url) => {
|
||||
let client = daemon_client();
|
||||
let payload = match &instance_name {
|
||||
Some(n) => serde_json::json!({ "instance_name": n }),
|
||||
None => serde_json::json!({}),
|
||||
};
|
||||
match client
|
||||
.post(format!("{base_url}/api/hands/{hand_id}/activate"))
|
||||
.json(&serde_json::json!({}))
|
||||
.json(&payload)
|
||||
.send()
|
||||
{
|
||||
Ok(resp) if resp.status().is_success() => {
|
||||
@@ -2252,7 +2261,7 @@ pub fn spawn_activate_hand(backend: BackendRef, hand_id: String, tx: mpsc::Sende
|
||||
}
|
||||
}
|
||||
BackendRef::InProcess(kernel) => {
|
||||
match kernel.activate_hand(&hand_id, std::collections::HashMap::new()) {
|
||||
match kernel.activate_hand(&hand_id, std::collections::HashMap::new(), instance_name) {
|
||||
Ok(_) => {
|
||||
let _ = tx.send(AppEvent::HandActivated(hand_id));
|
||||
}
|
||||
|
||||
@@ -1598,9 +1598,14 @@ impl App {
|
||||
event::spawn_fetch_active_hands(backend, self.event_tx.clone());
|
||||
}
|
||||
}
|
||||
hands::HandsAction::ActivateHand(hand_id) => {
|
||||
hands::HandsAction::ActivateHand(hand_id, instance_name) => {
|
||||
if let Some(backend) = self.backend.to_ref() {
|
||||
event::spawn_activate_hand(backend, hand_id, self.event_tx.clone());
|
||||
event::spawn_activate_hand(
|
||||
backend,
|
||||
hand_id,
|
||||
instance_name,
|
||||
self.event_tx.clone(),
|
||||
);
|
||||
}
|
||||
}
|
||||
hands::HandsAction::DeactivateHand(instance_id) => {
|
||||
|
||||
@@ -55,7 +55,9 @@ pub enum HandsAction {
|
||||
Continue,
|
||||
RefreshDefinitions,
|
||||
RefreshActive,
|
||||
ActivateHand(String),
|
||||
/// Activate a hand. Second field is the optional instance name.
|
||||
/// TODO: add text-input modal for custom instance names (#878 follow-up).
|
||||
ActivateHand(String, Option<String>),
|
||||
DeactivateHand(String),
|
||||
PauseHand(String),
|
||||
ResumeHand(String),
|
||||
@@ -124,7 +126,11 @@ impl HandsState {
|
||||
KeyCode::Enter | KeyCode::Char('a') => {
|
||||
if let Some(sel) = self.marketplace_list.selected() {
|
||||
if sel < self.definitions.len() {
|
||||
return HandsAction::ActivateHand(self.definitions[sel].id.clone());
|
||||
// TODO: add text-input modal for custom instance names (#878 follow-up)
|
||||
return HandsAction::ActivateHand(
|
||||
self.definitions[sel].id.clone(),
|
||||
None,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"$schema": "https://schema.tauri.app/config/2",
|
||||
"productName": "OpenFang",
|
||||
"version": "0.5.5",
|
||||
"version": "0.5.7",
|
||||
"identifier": "ai.openfang.desktop",
|
||||
"build": {},
|
||||
"app": {
|
||||
|
||||
@@ -15,6 +15,7 @@ tracing = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
dashmap = { workspace = true }
|
||||
dirs = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-test = { workspace = true }
|
||||
|
||||
@@ -330,6 +330,31 @@ pub fn parse_hand_toml(content: &str) -> Result<HandDefinition, toml::de::Error>
|
||||
Ok(wrapper.hand)
|
||||
}
|
||||
|
||||
/// Recursively copy a directory and all its contents.
|
||||
///
|
||||
/// Used by `HandRegistry::install_from_path` to persist a custom hand's
|
||||
/// source directory into `~/.openfang/hands/<hand_id>/` so installed hands
|
||||
/// survive daemon restarts (issue #984).
|
||||
pub(crate) fn copy_dir_all(
|
||||
src: impl AsRef<std::path::Path>,
|
||||
dst: impl AsRef<std::path::Path>,
|
||||
) -> std::io::Result<()> {
|
||||
let src = src.as_ref();
|
||||
let dst = dst.as_ref();
|
||||
std::fs::create_dir_all(dst)?;
|
||||
for entry in std::fs::read_dir(src)? {
|
||||
let entry = entry?;
|
||||
let ty = entry.file_type()?;
|
||||
let dst_path = dst.join(entry.file_name());
|
||||
if ty.is_dir() {
|
||||
copy_dir_all(entry.path(), &dst_path)?;
|
||||
} else {
|
||||
std::fs::copy(entry.path(), &dst_path)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Complete Hand definition — parsed from HAND.toml.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct HandDefinition {
|
||||
@@ -396,6 +421,12 @@ pub struct HandInstance {
|
||||
pub instance_id: Uuid,
|
||||
/// Which hand definition this is an instance of.
|
||||
pub hand_id: String,
|
||||
/// Optional user-supplied instance label. When set, multiple instances of
|
||||
/// the same hand can coexist as long as each (hand_id, instance_name) pair
|
||||
/// is unique. When `None`, the legacy single-instance-per-hand rule
|
||||
/// applies.
|
||||
#[serde(default)]
|
||||
pub instance_name: Option<String>,
|
||||
/// Current status.
|
||||
pub status: HandStatus,
|
||||
/// The agent that was spawned for this hand.
|
||||
@@ -416,11 +447,13 @@ impl HandInstance {
|
||||
hand_id: &str,
|
||||
agent_name: &str,
|
||||
config: HashMap<String, serde_json::Value>,
|
||||
instance_name: Option<String>,
|
||||
) -> Self {
|
||||
let now = Utc::now();
|
||||
Self {
|
||||
instance_id: Uuid::new_v4(),
|
||||
hand_id: hand_id.to_string(),
|
||||
instance_name,
|
||||
status: HandStatus::Active,
|
||||
agent_id: None,
|
||||
agent_name: agent_name.to_string(),
|
||||
@@ -437,6 +470,10 @@ pub struct ActivateHandRequest {
|
||||
/// Optional configuration overrides.
|
||||
#[serde(default)]
|
||||
pub config: HashMap<String, serde_json::Value>,
|
||||
/// Optional unique instance label. Allows multiple instances of the same
|
||||
/// hand to coexist as long as each name is distinct.
|
||||
#[serde(default)]
|
||||
pub instance_name: Option<String>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -462,11 +499,12 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn hand_instance_new() {
|
||||
let instance = HandInstance::new("clip", "clip-hand", HashMap::new());
|
||||
let instance = HandInstance::new("clip", "clip-hand", HashMap::new(), None);
|
||||
assert_eq!(instance.hand_id, "clip");
|
||||
assert_eq!(instance.agent_name, "clip-hand");
|
||||
assert_eq!(instance.status, HandStatus::Active);
|
||||
assert!(instance.agent_id.is_none());
|
||||
assert!(instance.instance_name.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -124,6 +124,67 @@ impl HandRegistry {
|
||||
count
|
||||
}
|
||||
|
||||
/// Scan a directory for custom hand definitions and load them into the
|
||||
/// registry. Mirrors `SkillRegistry::load_workspace_skills` — each
|
||||
/// subdirectory containing a `HAND.toml` is treated as a hand, with an
|
||||
/// optional sibling `SKILL.md` attached as the skill content.
|
||||
///
|
||||
/// Parse failures on individual hands are logged and skipped so a single
|
||||
/// bad manifest cannot take down the whole registry.
|
||||
///
|
||||
/// Returns the number of hands successfully loaded. A non-existent
|
||||
/// `hands_dir` returns `Ok(0)` — this is the normal case on a fresh
|
||||
/// install where the user has not run `openfang hand install` yet.
|
||||
///
|
||||
/// Added for issue #984 — custom hands installed via `openfang hand
|
||||
/// install <path>` were only held in memory and lost on daemon restart.
|
||||
pub fn load_workspace_hands(&self, hands_dir: &std::path::Path) -> HandResult<usize> {
|
||||
if !hands_dir.exists() {
|
||||
return Ok(0);
|
||||
}
|
||||
let mut count = 0;
|
||||
let entries = std::fs::read_dir(hands_dir)
|
||||
.map_err(|e| HandError::Config(format!("read_dir {}: {e}", hands_dir.display())))?;
|
||||
for entry in entries {
|
||||
let entry = match entry {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
warn!(error = %e, "Failed to read hands dir entry, skipping");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let path = entry.path();
|
||||
if !path.is_dir() {
|
||||
continue;
|
||||
}
|
||||
let toml_path = path.join("HAND.toml");
|
||||
if !toml_path.exists() {
|
||||
continue;
|
||||
}
|
||||
let contents = match std::fs::read_to_string(&toml_path) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
warn!(path = %toml_path.display(), error = %e, "Failed to read HAND.toml, skipping");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let skill_path = path.join("SKILL.md");
|
||||
let skill_content = std::fs::read_to_string(&skill_path).unwrap_or_default();
|
||||
match bundled::parse_bundled("custom", &contents, &skill_content) {
|
||||
Ok(def) => {
|
||||
let hand_id = def.id.clone();
|
||||
info!(hand = %hand_id, path = %path.display(), "Loaded workspace hand");
|
||||
self.definitions.insert(hand_id, def);
|
||||
count += 1;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(path = %toml_path.display(), error = %e, "Invalid HAND.toml, skipping");
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Install a hand from a directory containing HAND.toml (and optional SKILL.md).
|
||||
pub fn install_from_path(&self, path: &std::path::Path) -> HandResult<HandDefinition> {
|
||||
let toml_path = path.join("HAND.toml");
|
||||
@@ -145,6 +206,33 @@ impl HandRegistry {
|
||||
|
||||
info!(hand = %def.id, name = %def.name, path = %path.display(), "Installed hand from path");
|
||||
self.definitions.insert(def.id.clone(), def.clone());
|
||||
|
||||
// Persist the hand to the user's data dir so it survives daemon
|
||||
// restart (issue #984). Best-effort: failures are logged but do not
|
||||
// abort the install, because the hand is already registered in
|
||||
// memory and the user gets a working install for the current
|
||||
// session. On next restart, `load_workspace_hands` will pick it up
|
||||
// from disk.
|
||||
if let Some(home) = dirs::home_dir() {
|
||||
let dest_dir = home.join(".openfang").join("hands").join(&def.id);
|
||||
// Canonicalize both paths before comparing so we don't re-copy a
|
||||
// hand that is already being installed from its persistent
|
||||
// location (e.g. `openfang hand install ~/.openfang/hands/foo`).
|
||||
let same_path = match (path.canonicalize(), dest_dir.canonicalize()) {
|
||||
(Ok(a), Ok(b)) => a == b,
|
||||
_ => path == dest_dir,
|
||||
};
|
||||
if !same_path {
|
||||
if let Err(e) = std::fs::create_dir_all(&dest_dir) {
|
||||
warn!(error = %e, dest = %dest_dir.display(), "Failed to create hands persistence dir");
|
||||
} else if let Err(e) = crate::copy_dir_all(path, &dest_dir) {
|
||||
warn!(error = %e, dest = %dest_dir.display(), "Failed to persist hand");
|
||||
} else {
|
||||
info!(hand = %def.id, dest = %dest_dir.display(), "Persisted hand to workspace");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(def)
|
||||
}
|
||||
|
||||
@@ -200,27 +288,42 @@ impl HandRegistry {
|
||||
}
|
||||
|
||||
/// Activate a hand — creates an instance (agent spawning is done by kernel).
|
||||
///
|
||||
/// `instance_name` is an optional user-supplied label. When set, multiple
|
||||
/// instances of the same hand can coexist as long as each
|
||||
/// (hand_id, instance_name) pair is unique. When `None`, the legacy
|
||||
/// single-instance-per-hand rule applies — a second unnamed activation of
|
||||
/// the same hand is rejected.
|
||||
pub fn activate(
|
||||
&self,
|
||||
hand_id: &str,
|
||||
config: HashMap<String, serde_json::Value>,
|
||||
instance_name: Option<String>,
|
||||
) -> HandResult<HandInstance> {
|
||||
let def = self
|
||||
.definitions
|
||||
.get(hand_id)
|
||||
.ok_or_else(|| HandError::NotFound(hand_id.to_string()))?;
|
||||
|
||||
// Check if already active
|
||||
// Reject only when the exact same (hand_id, instance_name) is already active.
|
||||
// This lets multiple uniquely-named instances of the same hand coexist.
|
||||
for entry in self.instances.iter() {
|
||||
if entry.hand_id == hand_id && entry.status == HandStatus::Active {
|
||||
return Err(HandError::AlreadyActive(hand_id.to_string()));
|
||||
if entry.hand_id == hand_id
|
||||
&& entry.instance_name == instance_name
|
||||
&& entry.status == HandStatus::Active
|
||||
{
|
||||
let label = match &instance_name {
|
||||
Some(name) => format!("{hand_id} (instance: {name})"),
|
||||
None => hand_id.to_string(),
|
||||
};
|
||||
return Err(HandError::AlreadyActive(label));
|
||||
}
|
||||
}
|
||||
|
||||
let instance = HandInstance::new(hand_id, &def.agent.name, config);
|
||||
let instance = HandInstance::new(hand_id, &def.agent.name, config, instance_name.clone());
|
||||
let id = instance.instance_id;
|
||||
self.instances.insert(id, instance.clone());
|
||||
info!(hand = %hand_id, instance = %id, "Hand activated");
|
||||
info!(hand = %hand_id, instance = %id, instance_name = ?instance_name, "Hand activated");
|
||||
Ok(instance)
|
||||
}
|
||||
|
||||
@@ -673,7 +776,7 @@ mod tests {
|
||||
let reg = HandRegistry::new();
|
||||
reg.load_bundled();
|
||||
|
||||
let instance = reg.activate("clip", HashMap::new()).unwrap();
|
||||
let instance = reg.activate("clip", HashMap::new(), None).unwrap();
|
||||
assert_eq!(instance.hand_id, "clip");
|
||||
assert_eq!(instance.status, HandStatus::Active);
|
||||
|
||||
@@ -681,7 +784,7 @@ mod tests {
|
||||
assert_eq!(instances.len(), 1);
|
||||
|
||||
// Can't activate again while active
|
||||
let err = reg.activate("clip", HashMap::new());
|
||||
let err = reg.activate("clip", HashMap::new(), None);
|
||||
assert!(err.is_err());
|
||||
|
||||
// Deactivate
|
||||
@@ -695,7 +798,7 @@ mod tests {
|
||||
let reg = HandRegistry::new();
|
||||
reg.load_bundled();
|
||||
|
||||
let instance = reg.activate("clip", HashMap::new()).unwrap();
|
||||
let instance = reg.activate("clip", HashMap::new(), None).unwrap();
|
||||
let id = instance.instance_id;
|
||||
|
||||
reg.pause(id).unwrap();
|
||||
@@ -714,7 +817,7 @@ mod tests {
|
||||
let reg = HandRegistry::new();
|
||||
reg.load_bundled();
|
||||
|
||||
let instance = reg.activate("clip", HashMap::new()).unwrap();
|
||||
let instance = reg.activate("clip", HashMap::new(), None).unwrap();
|
||||
let id = instance.instance_id;
|
||||
let agent_id = AgentId::new();
|
||||
|
||||
@@ -745,7 +848,7 @@ mod tests {
|
||||
fn not_found_errors() {
|
||||
let reg = HandRegistry::new();
|
||||
assert!(reg.get_definition("nonexistent").is_none());
|
||||
assert!(reg.activate("nonexistent", HashMap::new()).is_err());
|
||||
assert!(reg.activate("nonexistent", HashMap::new(), None).is_err());
|
||||
assert!(reg.check_requirements("nonexistent").is_err());
|
||||
assert!(reg.deactivate(Uuid::new_v4()).is_err());
|
||||
assert!(reg.pause(Uuid::new_v4()).is_err());
|
||||
@@ -757,7 +860,7 @@ mod tests {
|
||||
let reg = HandRegistry::new();
|
||||
reg.load_bundled();
|
||||
|
||||
let instance = reg.activate("clip", HashMap::new()).unwrap();
|
||||
let instance = reg.activate("clip", HashMap::new(), None).unwrap();
|
||||
let id = instance.instance_id;
|
||||
|
||||
reg.set_error(id, "something broke".to_string()).unwrap();
|
||||
@@ -830,7 +933,7 @@ mod tests {
|
||||
reg.load_bundled();
|
||||
|
||||
// Lead hand has no requirements — activate it
|
||||
let instance = reg.activate("lead", HashMap::new()).unwrap();
|
||||
let instance = reg.activate("lead", HashMap::new(), None).unwrap();
|
||||
let r = reg.readiness("lead").unwrap();
|
||||
assert!(r.requirements_met);
|
||||
assert!(r.active);
|
||||
@@ -847,7 +950,7 @@ mod tests {
|
||||
// Browser hand requires python3 (non-optional) + chromium (optional).
|
||||
// requirements_met only reflects non-optional requirements.
|
||||
// degraded = active + any requirement (including optional) unsatisfied.
|
||||
let instance = reg.activate("browser", HashMap::new()).unwrap();
|
||||
let instance = reg.activate("browser", HashMap::new(), None).unwrap();
|
||||
let r = reg.readiness("browser").unwrap();
|
||||
assert!(r.active);
|
||||
|
||||
@@ -874,7 +977,7 @@ mod tests {
|
||||
let reg = HandRegistry::new();
|
||||
reg.load_bundled();
|
||||
|
||||
let instance = reg.activate("lead", HashMap::new()).unwrap();
|
||||
let instance = reg.activate("lead", HashMap::new(), None).unwrap();
|
||||
reg.pause(instance.instance_id).unwrap();
|
||||
|
||||
let r = reg.readiness("lead").unwrap();
|
||||
@@ -897,4 +1000,150 @@ mod tests {
|
||||
};
|
||||
assert!(!req.optional);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_load_workspace_hands_from_directory() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let hands_dir = tmp.path();
|
||||
let hand_dir = hands_dir.join("test-custom-hand");
|
||||
std::fs::create_dir_all(&hand_dir).unwrap();
|
||||
let hand_toml = r#"
|
||||
id = "test-custom-hand"
|
||||
name = "Test Custom Hand"
|
||||
description = "A custom hand loaded from the workspace directory"
|
||||
category = "other"
|
||||
version = "0.1.0"
|
||||
author = "tester"
|
||||
|
||||
[agent]
|
||||
name = "test-agent"
|
||||
description = "A test agent"
|
||||
module = "builtin:chat"
|
||||
provider = "anthropic"
|
||||
model = "claude-sonnet-4-20250514"
|
||||
system_prompt = "You are a test agent."
|
||||
"#;
|
||||
std::fs::write(hand_dir.join("HAND.toml"), hand_toml).unwrap();
|
||||
|
||||
let registry = HandRegistry::new();
|
||||
let count = registry.load_workspace_hands(hands_dir).unwrap();
|
||||
assert_eq!(count, 1);
|
||||
assert!(registry.get_definition("test-custom-hand").is_some());
|
||||
let def = registry.get_definition("test-custom-hand").unwrap();
|
||||
assert_eq!(def.name, "Test Custom Hand");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_load_workspace_hands_missing_dir_returns_zero() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let missing = tmp.path().join("does-not-exist");
|
||||
let registry = HandRegistry::new();
|
||||
let count = registry.load_workspace_hands(&missing).unwrap();
|
||||
assert_eq!(count, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_load_workspace_hands_skips_invalid_toml() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let hands_dir = tmp.path();
|
||||
|
||||
// Valid hand
|
||||
let good_dir = hands_dir.join("good-hand");
|
||||
std::fs::create_dir_all(&good_dir).unwrap();
|
||||
let good_toml = r#"
|
||||
id = "good-hand"
|
||||
name = "Good Hand"
|
||||
description = "..."
|
||||
category = "other"
|
||||
|
||||
[agent]
|
||||
name = "good-agent"
|
||||
description = "..."
|
||||
system_prompt = "You are good."
|
||||
"#;
|
||||
std::fs::write(good_dir.join("HAND.toml"), good_toml).unwrap();
|
||||
|
||||
// Invalid hand (missing required agent section)
|
||||
let bad_dir = hands_dir.join("bad-hand");
|
||||
std::fs::create_dir_all(&bad_dir).unwrap();
|
||||
std::fs::write(bad_dir.join("HAND.toml"), "not valid toml {[[[").unwrap();
|
||||
|
||||
// Directory without HAND.toml — should be silently skipped
|
||||
let empty_dir = hands_dir.join("empty-dir");
|
||||
std::fs::create_dir_all(&empty_dir).unwrap();
|
||||
|
||||
let registry = HandRegistry::new();
|
||||
let count = registry.load_workspace_hands(hands_dir).unwrap();
|
||||
assert_eq!(count, 1, "only the valid hand should load");
|
||||
assert!(registry.get_definition("good-hand").is_some());
|
||||
assert!(registry.get_definition("bad-hand").is_none());
|
||||
}
|
||||
|
||||
/// Build a `HandRegistry` pre-populated with a single dummy hand
|
||||
/// definition that has no requirements — used by the multi-instance
|
||||
/// activation tests below.
|
||||
fn test_registry_with_dummy_hand(hand_id: &str) -> HandRegistry {
|
||||
let toml_str = format!(
|
||||
r#"
|
||||
id = "{hand_id}"
|
||||
name = "Dummy Hand"
|
||||
description = "A dummy hand for tests"
|
||||
category = "other"
|
||||
tools = []
|
||||
|
||||
[agent]
|
||||
name = "dummy-agent"
|
||||
description = "dummy"
|
||||
system_prompt = "you are a dummy."
|
||||
|
||||
[dashboard]
|
||||
metrics = []
|
||||
"#
|
||||
);
|
||||
let def = crate::bundled::parse_bundled("dummy", &toml_str, "").unwrap();
|
||||
let reg = HandRegistry::new();
|
||||
reg.definitions.insert(def.id.clone(), def);
|
||||
reg
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_activate_same_hand_twice_with_different_instance_names_succeeds() {
|
||||
let reg = test_registry_with_dummy_hand("test-hand");
|
||||
let a = reg
|
||||
.activate("test-hand", HashMap::new(), Some("instance-a".into()))
|
||||
.unwrap();
|
||||
let b = reg
|
||||
.activate("test-hand", HashMap::new(), Some("instance-b".into()))
|
||||
.unwrap();
|
||||
assert_ne!(a.instance_id, b.instance_id);
|
||||
assert_eq!(a.instance_name, Some("instance-a".into()));
|
||||
assert_eq!(b.instance_name, Some("instance-b".into()));
|
||||
let active: Vec<_> = reg
|
||||
.list_instances()
|
||||
.into_iter()
|
||||
.filter(|i| i.status == HandStatus::Active)
|
||||
.collect();
|
||||
assert_eq!(active.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_activate_same_hand_same_instance_name_rejects() {
|
||||
let reg = test_registry_with_dummy_hand("test-hand");
|
||||
reg.activate("test-hand", HashMap::new(), Some("same".into()))
|
||||
.unwrap();
|
||||
let err = reg
|
||||
.activate("test-hand", HashMap::new(), Some("same".into()))
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, HandError::AlreadyActive(_)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_activate_same_hand_unnamed_twice_still_rejects() {
|
||||
let reg = test_registry_with_dummy_hand("test-hand");
|
||||
reg.activate("test-hand", HashMap::new(), None).unwrap();
|
||||
let err = reg
|
||||
.activate("test-hand", HashMap::new(), None)
|
||||
.unwrap_err();
|
||||
assert!(matches!(err, HandError::AlreadyActive(_)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -790,6 +790,23 @@ impl OpenFangKernel {
|
||||
info!("Loaded {hand_count} bundled hand(s)");
|
||||
}
|
||||
|
||||
// Load custom hands from the user's workspace (issue #984).
|
||||
// Hands installed via `openfang hand install <path>` are persisted to
|
||||
// `<home>/hands/<hand_id>/` so they survive daemon restarts.
|
||||
let workspace_hands_dir = config.home_dir.join("hands");
|
||||
match hand_registry.load_workspace_hands(&workspace_hands_dir) {
|
||||
Ok(n) if n > 0 => {
|
||||
info!(
|
||||
"Loaded {n} workspace hand(s) from {}",
|
||||
workspace_hands_dir.display()
|
||||
);
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
warn!("Failed to load workspace hands: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize extension/integration registry
|
||||
let mut extension_registry =
|
||||
openfang_extensions::registry::IntegrationRegistry::new(&config.home_dir);
|
||||
@@ -3317,6 +3334,7 @@ impl OpenFangKernel {
|
||||
&self,
|
||||
hand_id: &str,
|
||||
config: std::collections::HashMap<String, serde_json::Value>,
|
||||
instance_name: Option<String>,
|
||||
) -> KernelResult<openfang_hands::HandInstance> {
|
||||
use openfang_hands::HandError;
|
||||
|
||||
@@ -3333,7 +3351,7 @@ impl OpenFangKernel {
|
||||
// Create the instance in the registry
|
||||
let instance = self
|
||||
.hand_registry
|
||||
.activate(hand_id, config)
|
||||
.activate(hand_id, config, instance_name.clone())
|
||||
.map_err(|e| match e {
|
||||
HandError::AlreadyActive(id) => KernelError::OpenFang(OpenFangError::Internal(
|
||||
format!("Hand already active: {id}"),
|
||||
@@ -3354,8 +3372,15 @@ impl OpenFangKernel {
|
||||
def.agent.model.clone()
|
||||
};
|
||||
|
||||
// When a custom instance_name is provided, use it as the agent name so multiple
|
||||
// instances of the same hand type can coexist. Falls back to the HAND.toml name
|
||||
// for backward compatibility (single-instance mode).
|
||||
let agent_name = instance_name
|
||||
.clone()
|
||||
.unwrap_or_else(|| def.agent.name.clone());
|
||||
|
||||
let mut manifest = AgentManifest {
|
||||
name: def.agent.name.clone(),
|
||||
name: agent_name.clone(),
|
||||
description: def.agent.description.clone(),
|
||||
module: def.agent.module.clone(),
|
||||
model: ModelConfig {
|
||||
@@ -3460,7 +3485,7 @@ impl OpenFangKernel {
|
||||
.registry
|
||||
.list()
|
||||
.into_iter()
|
||||
.find(|e| e.name == def.agent.name);
|
||||
.find(|e| e.name == agent_name);
|
||||
let old_agent_id = existing.as_ref().map(|e| e.id);
|
||||
let saved_triggers = old_agent_id
|
||||
.map(|id| self.triggers.take_agent_triggers(id))
|
||||
@@ -3472,7 +3497,14 @@ impl OpenFangKernel {
|
||||
|
||||
// Spawn the agent with a fixed ID based on hand_id for stable identity across restarts.
|
||||
// This ensures triggers and cron jobs continue to work after daemon restart.
|
||||
let fixed_agent_id = AgentId::from_string(hand_id);
|
||||
// Named instances derive the UUID from instance_id so each coexists with a
|
||||
// unique stable agent id. Unnamed instances keep the legacy "derive from
|
||||
// hand_id" behavior for backward compatibility.
|
||||
let fixed_agent_id = if instance_name.is_some() {
|
||||
AgentId::from_string(&format!("hand_instance_{}", instance.instance_id))
|
||||
} else {
|
||||
AgentId::from_string(hand_id)
|
||||
};
|
||||
let agent_id = self.spawn_agent_with_parent(manifest, None, Some(fixed_agent_id))?;
|
||||
|
||||
// Restore triggers from the old agent under the new agent ID (#519).
|
||||
@@ -3884,7 +3916,7 @@ impl OpenFangKernel {
|
||||
if !saved_hands.is_empty() {
|
||||
info!("Restoring {} persisted hand(s)", saved_hands.len());
|
||||
for (hand_id, config, old_agent_id) in saved_hands {
|
||||
match self.activate_hand(&hand_id, config) {
|
||||
match self.activate_hand(&hand_id, config, None) {
|
||||
Ok(inst) => {
|
||||
info!(hand = %hand_id, instance = %inst.instance_id, "Hand restored");
|
||||
// Reassign cron jobs and triggers from the pre-restart
|
||||
@@ -4678,62 +4710,108 @@ impl OpenFangKernel {
|
||||
}
|
||||
};
|
||||
|
||||
// If fallback models are configured, wrap in FallbackDriver
|
||||
if !manifest.fallback_models.is_empty() {
|
||||
// Primary driver uses the agent's own model name (already set in request)
|
||||
let mut chain: Vec<(
|
||||
std::sync::Arc<dyn openfang_runtime::llm_driver::LlmDriver>,
|
||||
String,
|
||||
)> = vec![(primary.clone(), String::new())];
|
||||
for fb in &manifest.fallback_models {
|
||||
// Resolve "default" provider/model to the kernel's configured defaults,
|
||||
// mirroring the overlay logic for the primary model.
|
||||
let dm = &self.config.default_model;
|
||||
let fb_provider = if fb.provider.is_empty() || fb.provider == "default" {
|
||||
dm.provider.clone()
|
||||
} else {
|
||||
fb.provider.clone()
|
||||
};
|
||||
let fb_model_name = if fb.model.is_empty() || fb.model == "default" {
|
||||
dm.model.clone()
|
||||
} else {
|
||||
fb.model.clone()
|
||||
};
|
||||
let _ = &fb_model_name; // used below in strip_provider_prefix
|
||||
// Build the complete fallback chain:
|
||||
// 1. Primary driver (from the agent manifest)
|
||||
// 2. Per-agent `manifest.fallback_models` (#845)
|
||||
// 3. Global `config.fallback_providers` (#1003) — applied to *every* agent
|
||||
//
|
||||
// Wrap in FallbackDriver whenever the chain has more than one entry. This
|
||||
// ensures that when a local provider (e.g. LM Studio) goes offline at
|
||||
// runtime, the agent loop transparently fails over to the next provider
|
||||
// instead of retrying the unreachable primary forever.
|
||||
//
|
||||
// Primary driver uses an empty model name so the request's `model` field
|
||||
// (which is the agent's own model) is used as-is.
|
||||
let mut chain: Vec<(
|
||||
std::sync::Arc<dyn openfang_runtime::llm_driver::LlmDriver>,
|
||||
String,
|
||||
)> = vec![(primary.clone(), String::new())];
|
||||
|
||||
let fb_api_key = if let Some(env) = &fb.api_key_env {
|
||||
std::env::var(env).ok()
|
||||
} else if fb_provider == dm.provider && !dm.api_key_env.is_empty() {
|
||||
std::env::var(&dm.api_key_env).ok()
|
||||
} else {
|
||||
// Resolve using provider_api_keys / convention for custom providers
|
||||
let env_var = self.config.resolve_api_key_env(&fb_provider);
|
||||
std::env::var(&env_var).ok()
|
||||
};
|
||||
let config = DriverConfig {
|
||||
provider: fb_provider.clone(),
|
||||
api_key: fb_api_key,
|
||||
base_url: fb
|
||||
.base_url
|
||||
.clone()
|
||||
.or_else(|| dm.base_url.clone())
|
||||
.or_else(|| self.lookup_provider_url(&fb_provider)),
|
||||
skip_permissions: true,
|
||||
};
|
||||
match drivers::create_driver(&config) {
|
||||
Ok(d) => chain.push((d, strip_provider_prefix(&fb_model_name, &fb_provider))),
|
||||
Err(e) => {
|
||||
warn!("Fallback driver '{}' failed to init: {e}", fb_provider);
|
||||
}
|
||||
// 2. Per-agent fallback models from the manifest.
|
||||
for fb in &manifest.fallback_models {
|
||||
// Resolve "default" provider/model to the kernel's configured defaults,
|
||||
// mirroring the overlay logic for the primary model.
|
||||
let dm = &self.config.default_model;
|
||||
let fb_provider = if fb.provider.is_empty() || fb.provider == "default" {
|
||||
dm.provider.clone()
|
||||
} else {
|
||||
fb.provider.clone()
|
||||
};
|
||||
let fb_model_name = if fb.model.is_empty() || fb.model == "default" {
|
||||
dm.model.clone()
|
||||
} else {
|
||||
fb.model.clone()
|
||||
};
|
||||
|
||||
let fb_api_key = if let Some(env) = &fb.api_key_env {
|
||||
self.resolve_credential(env)
|
||||
} else if fb_provider == dm.provider && !dm.api_key_env.is_empty() {
|
||||
self.resolve_credential(&dm.api_key_env)
|
||||
} else {
|
||||
// Resolve using provider_api_keys / convention for custom providers
|
||||
let env_var = self.config.resolve_api_key_env(&fb_provider);
|
||||
self.resolve_credential(&env_var)
|
||||
};
|
||||
let config = DriverConfig {
|
||||
provider: fb_provider.clone(),
|
||||
api_key: fb_api_key,
|
||||
base_url: fb
|
||||
.base_url
|
||||
.clone()
|
||||
.or_else(|| dm.base_url.clone())
|
||||
.or_else(|| self.lookup_provider_url(&fb_provider)),
|
||||
skip_permissions: true,
|
||||
};
|
||||
match drivers::create_driver(&config) {
|
||||
Ok(d) => chain.push((d, strip_provider_prefix(&fb_model_name, &fb_provider))),
|
||||
Err(e) => {
|
||||
warn!("Fallback driver '{}' failed to init: {e}", fb_provider);
|
||||
}
|
||||
}
|
||||
if chain.len() > 1 {
|
||||
return Ok(Arc::new(
|
||||
openfang_runtime::drivers::fallback::FallbackDriver::with_models(chain),
|
||||
));
|
||||
}
|
||||
|
||||
// 3. Global fallback providers from config.toml — `[[fallback_providers]]`.
|
||||
// These apply to every agent so that when the primary provider becomes
|
||||
// unreachable at runtime (network failure, daemon shutdown, etc.) the
|
||||
// agent loop fails over to the next provider in the chain. (#1003)
|
||||
for fb in &self.config.fallback_providers {
|
||||
let fb_api_key = {
|
||||
let env_var = if !fb.api_key_env.is_empty() {
|
||||
fb.api_key_env.clone()
|
||||
} else {
|
||||
self.config.resolve_api_key_env(&fb.provider)
|
||||
};
|
||||
self.resolve_credential(&env_var)
|
||||
};
|
||||
let fb_config = DriverConfig {
|
||||
provider: fb.provider.clone(),
|
||||
api_key: fb_api_key,
|
||||
base_url: fb
|
||||
.base_url
|
||||
.clone()
|
||||
.or_else(|| self.lookup_provider_url(&fb.provider)),
|
||||
skip_permissions: true,
|
||||
};
|
||||
match drivers::create_driver(&fb_config) {
|
||||
Ok(d) => {
|
||||
chain.push((d, strip_provider_prefix(&fb.model, &fb.provider)));
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
provider = %fb.provider,
|
||||
error = %e,
|
||||
"Global fallback provider init failed — skipped"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if chain.len() > 1 {
|
||||
return Ok(Arc::new(
|
||||
openfang_runtime::drivers::fallback::FallbackDriver::with_models(chain),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(primary)
|
||||
}
|
||||
|
||||
@@ -6234,7 +6312,7 @@ impl KernelHandle for OpenFangKernel {
|
||||
config: std::collections::HashMap<String, serde_json::Value>,
|
||||
) -> Result<serde_json::Value, String> {
|
||||
let instance = self
|
||||
.activate_hand(hand_id, config)
|
||||
.activate_hand(hand_id, config, None)
|
||||
.map_err(|e| format!("{e}"))?;
|
||||
|
||||
Ok(serde_json::json!({
|
||||
@@ -6865,7 +6943,7 @@ mod tests {
|
||||
|
||||
let kernel = OpenFangKernel::boot_with_config(config).expect("Kernel should boot");
|
||||
let instance = kernel
|
||||
.activate_hand("browser", HashMap::new())
|
||||
.activate_hand("browser", HashMap::new(), None)
|
||||
.expect("browser hand should activate");
|
||||
let agent_id = instance.agent_id.expect("browser hand agent id");
|
||||
let entry = kernel
|
||||
|
||||
@@ -245,4 +245,75 @@ mod tests {
|
||||
// All drivers rate-limited — error should bubble up
|
||||
assert!(matches!(result, Err(LlmError::RateLimited { .. })));
|
||||
}
|
||||
|
||||
/// Regression test for #1003: when the primary driver returns a network /
|
||||
/// connection error (e.g. LM Studio shut down → reqwest connection refused),
|
||||
/// the FallbackDriver MUST escalate to the next driver in the chain instead
|
||||
/// of bubbling the error up to the agent loop (which would then retry the
|
||||
/// dead primary forever).
|
||||
#[tokio::test]
|
||||
async fn test_network_error_falls_through_to_secondary() {
|
||||
struct NetworkFailDriver;
|
||||
|
||||
#[async_trait]
|
||||
impl LlmDriver for NetworkFailDriver {
|
||||
async fn complete(
|
||||
&self,
|
||||
_req: CompletionRequest,
|
||||
) -> Result<CompletionResponse, LlmError> {
|
||||
// Simulates `reqwest::Error` from a connection refused — exactly
|
||||
// what an offline LM Studio looks like in production.
|
||||
Err(LlmError::Http(
|
||||
"error sending request: connection refused (os error 10061)".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
let driver = FallbackDriver::new(vec![
|
||||
Arc::new(NetworkFailDriver) as Arc<dyn LlmDriver>,
|
||||
Arc::new(OkDriver) as Arc<dyn LlmDriver>,
|
||||
]);
|
||||
let result = driver.complete(test_request()).await;
|
||||
assert!(
|
||||
result.is_ok(),
|
||||
"FallbackDriver should escalate network errors to the next driver"
|
||||
);
|
||||
assert_eq!(result.unwrap().text(), "OK");
|
||||
}
|
||||
|
||||
/// Same as above but for streaming. The streaming path is what the agent
|
||||
/// loop hits in practice for LM Studio etc., so it must also fall through.
|
||||
#[tokio::test]
|
||||
async fn test_network_error_falls_through_streaming() {
|
||||
struct NetworkFailDriver;
|
||||
|
||||
#[async_trait]
|
||||
impl LlmDriver for NetworkFailDriver {
|
||||
async fn complete(
|
||||
&self,
|
||||
_req: CompletionRequest,
|
||||
) -> Result<CompletionResponse, LlmError> {
|
||||
Err(LlmError::Http("connection refused".to_string()))
|
||||
}
|
||||
|
||||
async fn stream(
|
||||
&self,
|
||||
_req: CompletionRequest,
|
||||
_tx: tokio::sync::mpsc::Sender<StreamEvent>,
|
||||
) -> Result<CompletionResponse, LlmError> {
|
||||
Err(LlmError::Http("connection refused".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
let driver = FallbackDriver::new(vec![
|
||||
Arc::new(NetworkFailDriver) as Arc<dyn LlmDriver>,
|
||||
Arc::new(OkDriver) as Arc<dyn LlmDriver>,
|
||||
]);
|
||||
let (tx, _rx) = tokio::sync::mpsc::channel(16);
|
||||
let result = driver.stream(test_request(), tx).await;
|
||||
assert!(
|
||||
result.is_ok(),
|
||||
"FallbackDriver::stream should also escalate network errors"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,6 @@ use rmcp::service::RunningService;
|
||||
use rmcp::{RoleClient, ServiceExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tracing::{debug, info};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -307,11 +306,10 @@ impl McpConnection {
|
||||
}
|
||||
}
|
||||
|
||||
let config = StreamableHttpClientTransportConfig {
|
||||
uri: Arc::from(url),
|
||||
custom_headers,
|
||||
..Default::default()
|
||||
};
|
||||
// rmcp 1.3+ marks StreamableHttpClientTransportConfig as #[non_exhaustive].
|
||||
// Use the official builder API (credit: @jefflower, PR #986).
|
||||
let config = StreamableHttpClientTransportConfig::with_uri(url)
|
||||
.custom_headers(custom_headers);
|
||||
|
||||
let transport = StreamableHttpClientTransport::from_config(config);
|
||||
|
||||
|
||||
@@ -38,9 +38,17 @@ pub struct RepairStats {
|
||||
/// 1. Drops orphaned ToolResult blocks that have no matching ToolUse
|
||||
/// 2. Drops empty messages
|
||||
/// - 2b. Reorders misplaced ToolResults to follow their matching ToolUse
|
||||
/// - 2c. Inserts synthetic error results for unmatched ToolUse blocks
|
||||
/// - 2d. Deduplicates ToolResults with the same tool_use_id
|
||||
/// - 2c. Deduplicates ToolResults with the same tool_use_id
|
||||
/// - 2d. Inserts synthetic error results for unmatched ToolUse blocks
|
||||
/// 3. Merges consecutive same-role messages
|
||||
///
|
||||
/// Note: dedup MUST run before synthetic insertion. Some providers (e.g., Moonshot)
|
||||
/// reuse `tool_use_id` values across turns (`function_name:index` format). After
|
||||
/// compaction, multiple ToolUse blocks may share the same id with only one matching
|
||||
/// ToolResult. If synthetic insertion ran first it would see the id as "matched" and
|
||||
/// skip it; dedup would then leave one ToolUse orphaned. By deduping first, we
|
||||
/// guarantee that each unique id has at most one result, and synthetic insertion
|
||||
/// can correctly count uses vs. results to top up missing pairings.
|
||||
pub fn validate_and_repair(messages: &[Message]) -> Vec<Message> {
|
||||
validate_and_repair_with_stats(messages).0
|
||||
}
|
||||
@@ -117,14 +125,25 @@ pub fn validate_and_repair_with_stats(messages: &[Message]) -> (Vec<Message>, Re
|
||||
let reordered_count = reorder_tool_results(&mut cleaned);
|
||||
stats.results_reordered = reordered_count;
|
||||
|
||||
// Phase 2c: Insert synthetic error results for unmatched ToolUse blocks
|
||||
let synthetic_count = insert_synthetic_results(&mut cleaned);
|
||||
stats.synthetic_results_inserted = synthetic_count;
|
||||
|
||||
// Phase 2d: Deduplicate ToolResults
|
||||
// Phase 2c: Deduplicate ToolResults FIRST.
|
||||
//
|
||||
// This must run before synthetic insertion (issue #1013). Providers like
|
||||
// Moonshot reuse tool_use_ids across turns in `function_name:index` form
|
||||
// (e.g. "memory_store:0"). After compaction we may have multiple ToolUse
|
||||
// blocks sharing the same id with multiple ToolResult blocks for the same
|
||||
// id. If synthetic insertion ran first, it would see the id as "matched"
|
||||
// and not insert any synthetic. Dedup would then strip the duplicate
|
||||
// result, leaving a ToolUse orphaned and producing an API 400.
|
||||
// Dedup only removes duplicate ToolResult blocks; ToolUse blocks are
|
||||
// untouched, so the next phase can pair any leftover orphaned ToolUses
|
||||
// with synthetic results.
|
||||
let dedup_count = deduplicate_tool_results(&mut cleaned);
|
||||
stats.duplicates_removed = dedup_count;
|
||||
|
||||
// Phase 2d: Insert synthetic error results for unmatched ToolUse blocks.
|
||||
let synthetic_count = insert_synthetic_results(&mut cleaned);
|
||||
stats.synthetic_results_inserted = synthetic_count;
|
||||
|
||||
// Phase 2e: Skip aborted/errored assistant messages
|
||||
// An assistant message with no content blocks (or only empty text) followed by
|
||||
// a user message containing ToolResults indicates an interrupted tool-use.
|
||||
@@ -319,35 +338,44 @@ fn reorder_tool_results(messages: &mut Vec<Message>) -> usize {
|
||||
reorder_count
|
||||
}
|
||||
|
||||
/// Phase 2c: Insert synthetic error results for unmatched ToolUse blocks.
|
||||
/// Phase 2d: Insert synthetic error results for unmatched ToolUse blocks.
|
||||
///
|
||||
/// If an assistant message contains a ToolUse block but there is no matching
|
||||
/// ToolResult anywhere in the history, a synthetic error result is inserted
|
||||
/// immediately after the assistant message to prevent API validation errors.
|
||||
///
|
||||
/// This counts ToolUse and ToolResult occurrences per id (not just presence)
|
||||
/// so it correctly handles providers like Moonshot that reuse tool_use_ids
|
||||
/// across turns (e.g. "memory_store:0" called multiple times). If two ToolUses
|
||||
/// share an id but only one ToolResult exists, one synthetic will be inserted
|
||||
/// for the still-orphaned use.
|
||||
fn insert_synthetic_results(messages: &mut Vec<Message>) -> usize {
|
||||
// Collect all existing ToolResult IDs
|
||||
let existing_result_ids: HashSet<String> = messages
|
||||
.iter()
|
||||
.flat_map(|m| match &m.content {
|
||||
MessageContent::Blocks(blocks) => blocks
|
||||
.iter()
|
||||
.filter_map(|b| match b {
|
||||
ContentBlock::ToolResult { tool_use_id, .. } => Some(tool_use_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
_ => vec![],
|
||||
})
|
||||
.collect();
|
||||
// Count existing ToolResult IDs (occurrences, not just presence).
|
||||
let mut available_result_counts: HashMap<String, usize> = HashMap::new();
|
||||
for msg in messages.iter() {
|
||||
if let MessageContent::Blocks(blocks) = &msg.content {
|
||||
for b in blocks {
|
||||
if let ContentBlock::ToolResult { tool_use_id, .. } = b {
|
||||
*available_result_counts
|
||||
.entry(tool_use_id.clone())
|
||||
.or_insert(0) += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Find ToolUse blocks without matching results
|
||||
// Walk ToolUse blocks in order; consume one available result per id and
|
||||
// mark any leftover ToolUses as orphaned.
|
||||
let mut orphaned_uses: Vec<(usize, String)> = Vec::new(); // (assistant_msg_idx, tool_use_id)
|
||||
for (idx, msg) in messages.iter().enumerate() {
|
||||
if msg.role == Role::Assistant {
|
||||
if let MessageContent::Blocks(blocks) = &msg.content {
|
||||
for block in blocks {
|
||||
if let ContentBlock::ToolUse { id, .. } = block {
|
||||
if !existing_result_ids.contains(id) {
|
||||
let remaining = available_result_counts.entry(id.clone()).or_insert(0);
|
||||
if *remaining > 0 {
|
||||
*remaining -= 1;
|
||||
} else {
|
||||
orphaned_uses.push((idx, id.clone()));
|
||||
}
|
||||
}
|
||||
@@ -409,10 +437,15 @@ fn insert_synthetic_results(messages: &mut Vec<Message>) -> usize {
|
||||
count
|
||||
}
|
||||
|
||||
/// Phase 2d: Drop duplicate ToolResults for the same tool_use_id.
|
||||
/// Phase 2c: Drop duplicate ToolResults for the same tool_use_id.
|
||||
///
|
||||
/// If multiple ToolResult blocks exist for the same tool_use_id across the
|
||||
/// message history, only the first one is kept. Returns the count of duplicates removed.
|
||||
///
|
||||
/// Note: this only removes duplicate ToolResult blocks. ToolUse blocks are
|
||||
/// untouched, so the subsequent synthetic-insertion phase can pair any
|
||||
/// orphaned ToolUses (e.g. from Moonshot's repeated tool_use_ids) with
|
||||
/// synthetic results.
|
||||
fn deduplicate_tool_results(messages: &mut Vec<Message>) -> usize {
|
||||
let mut seen_ids: HashSet<String> = HashSet::new();
|
||||
let mut removed = 0usize;
|
||||
@@ -928,6 +961,148 @@ mod tests {
|
||||
assert_eq!(result_count, 1, "Should keep only the first ToolResult");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_moonshot_duplicate_tool_ids_gets_synthetic_after_dedup_1013() {
|
||||
// Regression for issue #1013.
|
||||
//
|
||||
// Moonshot returns tool_use_ids in `function_name:index` format
|
||||
// (e.g. "memory_store:0") which repeat across turns when the same tool
|
||||
// is called multiple times. After compaction we may keep multiple
|
||||
// turns containing the same id. Phase ordering used to insert
|
||||
// synthetic results BEFORE deduping, so duplicate-id ToolResults
|
||||
// looked "matched" and dedup later stripped one, leaving an orphan
|
||||
// and producing an API 400.
|
||||
//
|
||||
// After the fix, dedup runs first, then synthetic insertion counts
|
||||
// ToolUse vs ToolResult occurrences per id and tops up the missing
|
||||
// pairing.
|
||||
let messages = vec![
|
||||
Message::user("Remember this fact"),
|
||||
// First turn: assistant calls memory_store with id "memory_store:0".
|
||||
Message {
|
||||
role: Role::Assistant,
|
||||
content: MessageContent::Blocks(vec![ContentBlock::ToolUse {
|
||||
id: "memory_store:0".to_string(),
|
||||
name: "memory_store".to_string(),
|
||||
input: serde_json::json!({"key": "fact1", "value": "hello"}),
|
||||
provider_metadata: None,
|
||||
}]),
|
||||
},
|
||||
// Matching ToolResult for the first call.
|
||||
Message {
|
||||
role: Role::User,
|
||||
content: MessageContent::Blocks(vec![ContentBlock::ToolResult {
|
||||
tool_use_id: "memory_store:0".to_string(),
|
||||
tool_name: "memory_store".to_string(),
|
||||
content: "stored".to_string(),
|
||||
is_error: false,
|
||||
}]),
|
||||
},
|
||||
// Second turn: assistant calls memory_store again with the SAME id
|
||||
// because Moonshot reuses the `function_name:index` format.
|
||||
Message {
|
||||
role: Role::Assistant,
|
||||
content: MessageContent::Blocks(vec![ContentBlock::ToolUse {
|
||||
id: "memory_store:0".to_string(),
|
||||
name: "memory_store".to_string(),
|
||||
input: serde_json::json!({"key": "fact2", "value": "world"}),
|
||||
provider_metadata: None,
|
||||
}]),
|
||||
},
|
||||
// No matching ToolResult for the second call (e.g. lost during
|
||||
// compaction or interrupted mid-execution).
|
||||
Message::user("Did it work?"),
|
||||
];
|
||||
|
||||
let (repaired, stats) = validate_and_repair_with_stats(&messages);
|
||||
|
||||
// Count ToolUse blocks and ToolResult blocks for "memory_store:0".
|
||||
let mut tool_use_count = 0usize;
|
||||
let mut tool_result_count = 0usize;
|
||||
for m in &repaired {
|
||||
if let MessageContent::Blocks(blocks) = &m.content {
|
||||
for b in blocks {
|
||||
match b {
|
||||
ContentBlock::ToolUse { id, .. } if id == "memory_store:0" => {
|
||||
tool_use_count += 1;
|
||||
}
|
||||
ContentBlock::ToolResult { tool_use_id, .. }
|
||||
if tool_use_id == "memory_store:0" =>
|
||||
{
|
||||
tool_result_count += 1;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Both ToolUses must be preserved.
|
||||
assert_eq!(
|
||||
tool_use_count, 2,
|
||||
"Both ToolUse blocks should be preserved after repair"
|
||||
);
|
||||
// Every ToolUse must have a corresponding ToolResult.
|
||||
assert_eq!(
|
||||
tool_result_count, tool_use_count,
|
||||
"Every ToolUse should have exactly one corresponding ToolResult \
|
||||
(uses={tool_use_count}, results={tool_result_count})"
|
||||
);
|
||||
|
||||
// A synthetic result must have been inserted for the orphaned use.
|
||||
assert_eq!(
|
||||
stats.synthetic_results_inserted, 1,
|
||||
"Exactly one synthetic result should be inserted for the orphaned ToolUse"
|
||||
);
|
||||
|
||||
// No ToolResult should be orphaned (every ToolResult must have a
|
||||
// matching ToolUse id).
|
||||
let mut tool_use_ids: HashSet<String> = HashSet::new();
|
||||
for m in &repaired {
|
||||
if let MessageContent::Blocks(blocks) = &m.content {
|
||||
for b in blocks {
|
||||
if let ContentBlock::ToolUse { id, .. } = b {
|
||||
tool_use_ids.insert(id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for m in &repaired {
|
||||
if let MessageContent::Blocks(blocks) = &m.content {
|
||||
for b in blocks {
|
||||
if let ContentBlock::ToolResult { tool_use_id, .. } = b {
|
||||
assert!(
|
||||
tool_use_ids.contains(tool_use_id),
|
||||
"ToolResult {tool_use_id} has no matching ToolUse"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Verify the synthetic result is marked as an error result and
|
||||
// contains the interrupted-tool message.
|
||||
let synthetic_present = repaired.iter().any(|m| {
|
||||
if let MessageContent::Blocks(blocks) = &m.content {
|
||||
blocks.iter().any(|b| match b {
|
||||
ContentBlock::ToolResult {
|
||||
tool_use_id,
|
||||
is_error,
|
||||
content,
|
||||
..
|
||||
} => tool_use_id == "memory_store:0" && *is_error && content.contains("interrupted"),
|
||||
_ => false,
|
||||
})
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
assert!(
|
||||
synthetic_present,
|
||||
"A synthetic error ToolResult for memory_store:0 should be present"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_strip_tool_result_details() {
|
||||
let short = "Normal tool output";
|
||||
|
||||
@@ -893,6 +893,41 @@ mod tests {
|
||||
assert!(validate_command_allowlist(&cjk_cmd, &policy).is_err());
|
||||
}
|
||||
|
||||
/// Regression test for GitHub issue #919.
|
||||
///
|
||||
/// User reported that `rm /home/jcl/test/test.txt` succeeds in Allowlist
|
||||
/// mode even when `rm` is NOT in `allowed_commands`. The bypass turned out
|
||||
/// to be the `process_start` tool, which spawned subprocesses without
|
||||
/// consulting `exec_policy` at all (fixed in tool_runner.rs).
|
||||
///
|
||||
/// This test pins down the contract on the validator itself: given the
|
||||
/// EXACT policy from the bug report, `rm /tmp/test.txt` MUST be rejected
|
||||
/// with "not in the exec allowlist" so that any future tool path which
|
||||
/// spawns subprocesses can call it and get a correct answer.
|
||||
#[test]
|
||||
fn test_issue_919_rm_blocked_when_not_in_allowlist() {
|
||||
let policy = ExecPolicy {
|
||||
mode: ExecSecurityMode::Allowlist,
|
||||
allowed_commands: vec!["ls".to_string(), "echo".to_string()],
|
||||
..ExecPolicy::default()
|
||||
};
|
||||
// The exact command from the bug report.
|
||||
let result = validate_command_allowlist("rm /tmp/test.txt", &policy);
|
||||
assert!(
|
||||
result.is_err(),
|
||||
"rm must be blocked when not in allowed_commands (issue #919)"
|
||||
);
|
||||
let err = result.unwrap_err();
|
||||
assert!(
|
||||
err.contains("not in the exec allowlist"),
|
||||
"Error message must indicate allowlist rejection, got: {err}"
|
||||
);
|
||||
assert!(
|
||||
err.contains("rm"),
|
||||
"Error message must name the rejected command, got: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_extract_all_commands_cjk_separators() {
|
||||
// Ensure extract_all_commands handles CJK content between separators
|
||||
|
||||
@@ -21,8 +21,11 @@ const MAX_AGENT_CALL_DEPTH: u32 = 5;
|
||||
/// Check if a tool name refers to a shell execution tool.
|
||||
///
|
||||
/// Used to determine whether exec_policy settings should bypass the approval gate.
|
||||
/// SECURITY (#919): `process_start` is also a shell execution path — it spawns
|
||||
/// arbitrary subprocesses via the persistent process manager. It must be gated
|
||||
/// by the same approval rules as `shell_exec`.
|
||||
fn is_shell_tool(name: &str) -> bool {
|
||||
name == "shell_exec"
|
||||
matches!(name, "shell_exec" | "process_start")
|
||||
}
|
||||
|
||||
/// Check if a shell command should be blocked by taint tracking.
|
||||
@@ -353,7 +356,9 @@ pub async fn execute_tool(
|
||||
"channel_send" => tool_channel_send(input, kernel, workspace_root).await,
|
||||
|
||||
// Persistent process tools
|
||||
"process_start" => tool_process_start(input, process_manager, caller_agent_id).await,
|
||||
"process_start" => {
|
||||
tool_process_start(input, process_manager, caller_agent_id, exec_policy).await
|
||||
}
|
||||
"process_poll" => tool_process_poll(input, process_manager).await,
|
||||
"process_write" => tool_process_write(input, process_manager).await,
|
||||
"process_kill" => tool_process_kill(input, process_manager).await,
|
||||
@@ -3085,10 +3090,18 @@ async fn tool_docker_exec(
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Start a long-running process (REPL, server, watcher).
|
||||
///
|
||||
/// SECURITY (#919): process_start previously spawned subprocesses with NO
|
||||
/// exec policy enforcement, allowing an LLM in Allowlist mode to bypass
|
||||
/// allowed_commands entirely. For example, process_start with command="rm"
|
||||
/// args=["/some/file"] would delete the file even though "rm" was not
|
||||
/// in the allowlist. This function now performs the same checks as
|
||||
/// shell_exec: metacharacter rejection plus exec_policy validation.
|
||||
async fn tool_process_start(
|
||||
input: &serde_json::Value,
|
||||
pm: Option<&crate::process_manager::ProcessManager>,
|
||||
caller_agent_id: Option<&str>,
|
||||
exec_policy: Option<&openfang_types::config::ExecPolicy>,
|
||||
) -> Result<String, String> {
|
||||
let pm = pm.ok_or("Process manager not available")?;
|
||||
let agent_id = caller_agent_id.unwrap_or("default");
|
||||
@@ -3104,6 +3117,42 @@ async fn tool_process_start(
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
// SECURITY: Reject shell metacharacters in the command name itself.
|
||||
// The command field must be a single binary token.
|
||||
if let Some(reason) = crate::subprocess_sandbox::contains_shell_metacharacters(command) {
|
||||
return Err(format!(
|
||||
"process_start blocked: command contains {reason}. \
|
||||
Shell metacharacters are never allowed in the command field."
|
||||
));
|
||||
}
|
||||
// Also reject metacharacters anywhere in the arguments. While direct
|
||||
// spawn does not interpret these, blocking them prevents an LLM from
|
||||
// smuggling a chained command past the allowlist via an argument.
|
||||
for arg in &args {
|
||||
if let Some(reason) = crate::subprocess_sandbox::contains_shell_metacharacters(arg) {
|
||||
return Err(format!(
|
||||
"process_start blocked: argument contains {reason}. \
|
||||
Shell metacharacters are not allowed in process arguments."
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// SECURITY (#919): Enforce exec policy against the base command. The
|
||||
// shared validate_command_allowlist handles Deny / Full / Allowlist and
|
||||
// falls through to allow commands listed in safe_bins or allowed_commands.
|
||||
if let Some(policy) = exec_policy {
|
||||
if let Err(reason) =
|
||||
crate::subprocess_sandbox::validate_command_allowlist(command, policy)
|
||||
{
|
||||
return Err(format!(
|
||||
"process_start blocked: {reason}. Current exec_policy.mode = '{:?}'. \
|
||||
To allow this command, add it to exec_policy.allowed_commands or \
|
||||
set exec_policy.mode = 'full'.",
|
||||
policy.mode
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let proc_id = pm.start(agent_id, command, &args).await?;
|
||||
Ok(serde_json::json!({
|
||||
"process_id": proc_id,
|
||||
@@ -4017,4 +4066,130 @@ mod tests {
|
||||
// Cleanup
|
||||
let _ = std::fs::remove_dir_all(&tmp);
|
||||
}
|
||||
|
||||
// ── Regression: GitHub issue #919 — rm bypass via process_start ──────
|
||||
//
|
||||
// Before the fix, an LLM in Allowlist mode could call process_start
|
||||
// with command="rm" and args=["/some/file"] to delete files even though
|
||||
// "rm" was not in exec_policy.allowed_commands. tool_process_start
|
||||
// spawned the subprocess directly without ever consulting exec_policy.
|
||||
//
|
||||
// These tests pin down the new contract:
|
||||
// 1. process_start with a non-allowlisted binary returns Err.
|
||||
// 2. The Err message identifies allowlist rejection (so callers and
|
||||
// logs can distinguish it from a generic spawn failure).
|
||||
// 3. process_start with an allowlisted binary still works.
|
||||
// 4. is_shell_tool() now reports process_start as a shell tool so
|
||||
// the approval-gate path treats it the same as shell_exec.
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_issue_919_process_start_rm_blocked_in_allowlist() {
|
||||
use openfang_types::config::{ExecPolicy, ExecSecurityMode};
|
||||
|
||||
let pm = crate::process_manager::ProcessManager::new(5);
|
||||
let policy = ExecPolicy {
|
||||
mode: ExecSecurityMode::Allowlist,
|
||||
allowed_commands: vec!["ls".to_string(), "echo".to_string()],
|
||||
..ExecPolicy::default()
|
||||
};
|
||||
let input = serde_json::json!({
|
||||
"command": "rm",
|
||||
"args": ["/tmp/openfang_test_should_not_be_deleted.txt"],
|
||||
});
|
||||
|
||||
let result = tool_process_start(&input, Some(&pm), Some("test-agent"), Some(&policy)).await;
|
||||
|
||||
assert!(
|
||||
result.is_err(),
|
||||
"process_start must reject 'rm' when not in allowlist (issue #919). Got: {:?}",
|
||||
result
|
||||
);
|
||||
let err = result.unwrap_err();
|
||||
assert!(
|
||||
err.contains("not in the exec allowlist"),
|
||||
"Error must indicate allowlist rejection, got: {err}"
|
||||
);
|
||||
assert!(
|
||||
err.contains("process_start blocked"),
|
||||
"Error must identify process_start as the blocking tool, got: {err}"
|
||||
);
|
||||
assert_eq!(
|
||||
pm.count(),
|
||||
0,
|
||||
"No process must have been spawned when allowlist rejects the command"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_issue_919_process_start_metachar_in_command_blocked() {
|
||||
use openfang_types::config::{ExecPolicy, ExecSecurityMode};
|
||||
|
||||
let pm = crate::process_manager::ProcessManager::new(5);
|
||||
let policy = ExecPolicy {
|
||||
mode: ExecSecurityMode::Full,
|
||||
..ExecPolicy::default()
|
||||
};
|
||||
// Even in Full mode, smuggling shell metacharacters into the command
|
||||
// field must be rejected — process_start does direct exec, not shell.
|
||||
let input = serde_json::json!({
|
||||
"command": "rm; cat /etc/passwd",
|
||||
"args": [],
|
||||
});
|
||||
let result = tool_process_start(&input, Some(&pm), Some("test-agent"), Some(&policy)).await;
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().contains("metacharacter") || pm.count() == 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_issue_919_process_start_metachar_in_arg_blocked() {
|
||||
use openfang_types::config::{ExecPolicy, ExecSecurityMode};
|
||||
|
||||
let pm = crate::process_manager::ProcessManager::new(5);
|
||||
let policy = ExecPolicy {
|
||||
mode: ExecSecurityMode::Allowlist,
|
||||
allowed_commands: vec!["echo".to_string()],
|
||||
..ExecPolicy::default()
|
||||
};
|
||||
// Smuggling a chained command via an argument: echo "$(rm -rf /)"
|
||||
let input = serde_json::json!({
|
||||
"command": "echo",
|
||||
"args": ["$(rm -rf /)"],
|
||||
});
|
||||
let result = tool_process_start(&input, Some(&pm), Some("test-agent"), Some(&policy)).await;
|
||||
assert!(
|
||||
result.is_err(),
|
||||
"process_start must reject metacharacters in args"
|
||||
);
|
||||
assert_eq!(pm.count(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_issue_919_process_start_deny_mode_blocks_everything() {
|
||||
use openfang_types::config::{ExecPolicy, ExecSecurityMode};
|
||||
|
||||
let pm = crate::process_manager::ProcessManager::new(5);
|
||||
let policy = ExecPolicy {
|
||||
mode: ExecSecurityMode::Deny,
|
||||
..ExecPolicy::default()
|
||||
};
|
||||
let input = serde_json::json!({
|
||||
"command": "echo",
|
||||
"args": ["hello"],
|
||||
});
|
||||
let result = tool_process_start(&input, Some(&pm), Some("test-agent"), Some(&policy)).await;
|
||||
assert!(result.is_err(), "Deny mode must block process_start");
|
||||
assert!(result.unwrap_err().to_lowercase().contains("disabled"));
|
||||
assert_eq!(pm.count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_issue_919_is_shell_tool_includes_process_start() {
|
||||
// process_start must be treated as a shell tool by the approval gate
|
||||
// so #772 (full-mode approval bypass) and #919 (allowlist enforcement)
|
||||
// both apply consistently.
|
||||
assert!(is_shell_tool("shell_exec"));
|
||||
assert!(is_shell_tool("process_start"));
|
||||
assert!(!is_shell_tool("file_read"));
|
||||
assert!(!is_shell_tool("web_fetch"));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user