diff --git a/crates/openfang-api/src/routes.rs b/crates/openfang-api/src/routes.rs index 6876bbf2..952fc932 100644 --- a/crates/openfang-api/src/routes.rs +++ b/crates/openfang-api/src/routes.rs @@ -8321,40 +8321,66 @@ pub async fn reload_integrations(State(state): State>) -> impl Int // --------------------------------------------------------------------------- // Scheduled Jobs (cron) endpoints // --------------------------------------------------------------------------- +// +// Historical note: an earlier implementation of `/api/schedules*` wrote to a +// shared-memory key (`__openfang_schedules`) that no executor ever read — so +// scheduled jobs registered via this API never actually fired (#1069). These +// routes now delegate to the kernel's real cron scheduler, which already +// backs `/api/cron/jobs*`. The request/response shape has been preserved as +// close as possible to the legacy route for dashboard compatibility. -/// The well-known shared-memory agent ID used for cross-agent KV storage. -/// Must match the value in `openfang-kernel/src/kernel.rs::shared_memory_agent_id()`. -fn schedule_shared_agent_id() -> AgentId { - AgentId(uuid::Uuid::from_bytes([ - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x01, - ])) +/// Convert an internal `CronJob` into the legacy `/api/schedules` response +/// shape so existing dashboard code keeps working. +fn cron_job_to_schedule_view( + kernel: &OpenFangKernel, + job: &openfang_types::scheduler::CronJob, +) -> serde_json::Value { + use openfang_types::scheduler::{CronAction, CronSchedule}; + + let cron = match &job.schedule { + CronSchedule::Cron { expr, .. } => expr.clone(), + CronSchedule::Every { every_secs } => format!("every {every_secs}s"), + CronSchedule::At { at } => at.to_rfc3339(), + }; + let message = match &job.action { + CronAction::AgentTurn { message, .. } => message.clone(), + CronAction::SystemEvent { text } => text.clone(), + CronAction::WorkflowRun { workflow_id, .. } => format!("workflow:{workflow_id}"), + }; + let meta = kernel.cron_scheduler.get_meta(job.id); + let last_status = meta.as_ref().and_then(|m| m.last_status.clone()); + + serde_json::json!({ + "id": job.id.to_string(), + "name": job.name, + "cron": cron, + "agent_id": job.agent_id.to_string(), + "message": message, + "enabled": job.enabled, + "created_at": job.created_at.to_rfc3339(), + "last_run": job.last_run.map(|t| t.to_rfc3339()), + "next_run": job.next_run.map(|t| t.to_rfc3339()), + "last_status": last_status, + }) } -const SCHEDULES_KEY: &str = "__openfang_schedules"; - -/// GET /api/schedules — List all cron-based scheduled jobs. +/// GET /api/schedules — List all scheduled jobs from the cron scheduler. pub async fn list_schedules(State(state): State>) -> impl IntoResponse { - let agent_id = schedule_shared_agent_id(); - match state.kernel.memory.structured_get(agent_id, SCHEDULES_KEY) { - Ok(Some(serde_json::Value::Array(arr))) => { - let total = arr.len(); - Json(serde_json::json!({"schedules": arr, "total": total})) - } - Ok(_) => Json(serde_json::json!({"schedules": [], "total": 0})), - Err(e) => { - tracing::warn!("Failed to load schedules: {e}"); - Json(serde_json::json!({"schedules": [], "total": 0, "error": format!("{e}")})) - } - } + let jobs = state.kernel.cron_scheduler.list_all_jobs(); + let total = jobs.len(); + let schedules: Vec = jobs + .iter() + .map(|j| cron_job_to_schedule_view(&state.kernel, j)) + .collect(); + Json(serde_json::json!({"schedules": schedules, "total": total})) } -/// POST /api/schedules — Create a new cron-based scheduled job. +/// POST /api/schedules — Create a new scheduled job in the cron scheduler. pub async fn create_schedule( State(state): State>, Json(req): Json, ) -> impl IntoResponse { - let name = match req["name"].as_str() { + let name_raw = match req["name"].as_str() { Some(n) if !n.is_empty() => n.to_string(), _ => { return ( @@ -8374,7 +8400,6 @@ pub async fn create_schedule( } }; - // Validate cron expression: must be 5 space-separated fields let cron_parts: Vec<&str> = cron.split_whitespace().collect(); if cron_parts.len() != 5 { return ( @@ -8392,166 +8417,183 @@ pub async fn create_schedule( Json(serde_json::json!({"error": "Missing required field: agent_id"})), ); } - // Validate agent exists (UUID or name lookup) - let agent_exists = if let Ok(aid) = agent_id_str.parse::() { - state.kernel.registry.get(aid).is_some() + let target_agent = if let Ok(aid) = agent_id_str.parse::() { + if state.kernel.registry.get(aid).is_some() { + Some(aid) + } else { + None + } } else { state .kernel .registry .list() - .iter() - .any(|a| a.name == agent_id_str) + .into_iter() + .find(|a| a.name == agent_id_str) + .map(|a| a.id) }; - if !agent_exists { - return ( - StatusCode::NOT_FOUND, - Json(serde_json::json!({"error": format!("Agent not found: {agent_id_str}")})), - ); - } + let target_agent = match target_agent { + Some(a) => a, + None => { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": format!("Agent not found: {agent_id_str}")})), + ); + } + }; + let message = req["message"].as_str().unwrap_or("").to_string(); let enabled = req.get("enabled").and_then(|v| v.as_bool()).unwrap_or(true); + let job_message = if message.is_empty() { + format!("[Scheduled task '{name_raw}']") + } else { + message.clone() + }; - let schedule_id = uuid::Uuid::new_v4().to_string(); - let entry = serde_json::json!({ - "id": schedule_id, - "name": name, - "cron": cron, - "agent_id": agent_id_str, - "message": message, - "enabled": enabled, - "created_at": chrono::Utc::now().to_rfc3339(), - "last_run": null, - "run_count": 0, + let job_json = serde_json::json!({ + "name": sanitize_schedule_job_name(&name_raw), + "schedule": { "kind": "cron", "expr": cron, "tz": null }, + "action": { + "kind": "agent_turn", + "message": job_message, + "model_override": null, + "timeout_secs": null, + }, + "delivery": { "kind": "none" }, + "one_shot": false, }); - let shared_id = schedule_shared_agent_id(); - let mut schedules: Vec = - match state.kernel.memory.structured_get(shared_id, SCHEDULES_KEY) { - Ok(Some(serde_json::Value::Array(arr))) => arr, - _ => Vec::new(), - }; - - schedules.push(entry.clone()); - if let Err(e) = state.kernel.memory.structured_set( - shared_id, - SCHEDULES_KEY, - serde_json::Value::Array(schedules), - ) { - tracing::warn!("Failed to save schedule: {e}"); - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": format!("Failed to save schedule: {e}")})), - ); + match state + .kernel + .cron_create(&target_agent.to_string(), job_json) + .await + { + Ok(resp) => { + let job_id = serde_json::from_str::(&resp) + .ok() + .and_then(|v| v["job_id"].as_str().map(str::to_string)) + .unwrap_or_default(); + if !enabled { + if let Ok(uuid) = uuid::Uuid::parse_str(&job_id) { + let cj_id = openfang_types::scheduler::CronJobId(uuid); + let _ = state.kernel.cron_scheduler.set_enabled(cj_id, false); + let _ = state.kernel.cron_scheduler.persist(); + } + } + // Build response in the legacy shape. + let body = if let Ok(uuid) = uuid::Uuid::parse_str(&job_id) { + let cj_id = openfang_types::scheduler::CronJobId(uuid); + match state.kernel.cron_scheduler.get_job(cj_id) { + Some(job) => cron_job_to_schedule_view(&state.kernel, &job), + None => serde_json::json!({ + "id": job_id, + "name": name_raw, + "cron": cron, + "agent_id": target_agent.to_string(), + "message": message, + "enabled": enabled, + }), + } + } else { + serde_json::json!({ + "id": job_id, + "name": name_raw, + "cron": cron, + "agent_id": target_agent.to_string(), + "message": message, + "enabled": enabled, + }) + }; + (StatusCode::CREATED, Json(body)) + } + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": format!("Failed to create schedule: {e}")})), + ), } - - (StatusCode::CREATED, Json(entry)) } -/// PUT /api/schedules/:id — Update a scheduled job (toggle enabled, edit fields). +/// PUT /api/schedules/:id — Update a scheduled job. +/// +/// Supports toggling `enabled`. Other fields are not mutable without a +/// delete+recreate (the underlying cron scheduler does not expose in-place +/// edits); those are accepted but ignored with a note in the response. pub async fn update_schedule( State(state): State>, Path(id): Path, Json(req): Json, ) -> impl IntoResponse { - let shared_id = schedule_shared_agent_id(); - let mut schedules: Vec = - match state.kernel.memory.structured_get(shared_id, SCHEDULES_KEY) { - Ok(Some(serde_json::Value::Array(arr))) => arr, - _ => Vec::new(), - }; - - let mut found = false; - for s in schedules.iter_mut() { - if s["id"].as_str() == Some(&id) { - found = true; - if let Some(enabled) = req.get("enabled").and_then(|v| v.as_bool()) { - s["enabled"] = serde_json::Value::Bool(enabled); - } - if let Some(name) = req.get("name").and_then(|v| v.as_str()) { - s["name"] = serde_json::Value::String(name.to_string()); - } - if let Some(cron) = req.get("cron").and_then(|v| v.as_str()) { - let cron_parts: Vec<&str> = cron.split_whitespace().collect(); - if cron_parts.len() != 5 { - return ( - StatusCode::BAD_REQUEST, - Json(serde_json::json!({"error": "Invalid cron expression"})), - ); - } - s["cron"] = serde_json::Value::String(cron.to_string()); - } - if let Some(agent_id) = req.get("agent_id").and_then(|v| v.as_str()) { - s["agent_id"] = serde_json::Value::String(agent_id.to_string()); - } - if let Some(message) = req.get("message").and_then(|v| v.as_str()) { - s["message"] = serde_json::Value::String(message.to_string()); - } - break; + let uuid = match uuid::Uuid::parse_str(&id) { + Ok(u) => u, + Err(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "Invalid schedule id"})), + ); } - } + }; + let cj_id = openfang_types::scheduler::CronJobId(uuid); - if !found { + if state.kernel.cron_scheduler.get_job(cj_id).is_none() { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Schedule not found"})), ); } - if let Err(e) = state.kernel.memory.structured_set( - shared_id, - SCHEDULES_KEY, - serde_json::Value::Array(schedules), - ) { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": format!("Failed to update schedule: {e}")})), - ); + let mut note: Option<&'static str> = None; + if let Some(enabled) = req.get("enabled").and_then(|v| v.as_bool()) { + if let Err(e) = state.kernel.cron_scheduler.set_enabled(cj_id, enabled) { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": format!("{e}")})), + ); + } + let _ = state.kernel.cron_scheduler.persist(); + } + if req.get("name").is_some() + || req.get("cron").is_some() + || req.get("agent_id").is_some() + || req.get("message").is_some() + { + note = Some("Only 'enabled' is mutable; delete and recreate to change other fields."); } - ( - StatusCode::OK, - Json(serde_json::json!({"status": "updated", "schedule_id": id})), - ) + let mut body = serde_json::json!({"status": "updated", "schedule_id": id}); + if let Some(n) = note { + body["note"] = serde_json::Value::String(n.to_string()); + } + (StatusCode::OK, Json(body)) } -/// DELETE /api/schedules/:id — Remove a scheduled job. +/// DELETE /api/schedules/:id — Remove a scheduled job from the cron scheduler. pub async fn delete_schedule( State(state): State>, Path(id): Path, ) -> impl IntoResponse { - let shared_id = schedule_shared_agent_id(); - let mut schedules: Vec = - match state.kernel.memory.structured_get(shared_id, SCHEDULES_KEY) { - Ok(Some(serde_json::Value::Array(arr))) => arr, - _ => Vec::new(), - }; - - let before = schedules.len(); - schedules.retain(|s| s["id"].as_str() != Some(&id)); - - if schedules.len() == before { - return ( + let uuid = match uuid::Uuid::parse_str(&id) { + Ok(u) => u, + Err(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "Invalid schedule id"})), + ); + } + }; + let cj_id = openfang_types::scheduler::CronJobId(uuid); + match state.kernel.cron_scheduler.remove_job(cj_id) { + Ok(_) => { + let _ = state.kernel.cron_scheduler.persist(); + ( + StatusCode::OK, + Json(serde_json::json!({"status": "removed", "schedule_id": id})), + ) + } + Err(_) => ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Schedule not found"})), - ); + ), } - - if let Err(e) = state.kernel.memory.structured_set( - shared_id, - SCHEDULES_KEY, - serde_json::Value::Array(schedules), - ) { - return ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(serde_json::json!({"error": format!("Failed to delete schedule: {e}")})), - ); - } - - ( - StatusCode::OK, - Json(serde_json::json!({"status": "removed", "schedule_id": id})), - ) } /// POST /api/schedules/:id/run — Manually run a scheduled job now. @@ -8559,101 +8601,41 @@ pub async fn run_schedule( State(state): State>, Path(id): Path, ) -> impl IntoResponse { - let shared_id = schedule_shared_agent_id(); - let schedules: Vec = - match state.kernel.memory.structured_get(shared_id, SCHEDULES_KEY) { - Ok(Some(serde_json::Value::Array(arr))) => arr, - _ => Vec::new(), - }; - - let schedule = match schedules.iter().find(|s| s["id"].as_str() == Some(&id)) { - Some(s) => s.clone(), - None => { + let uuid = match uuid::Uuid::parse_str(&id) { + Ok(u) => u, + Err(_) => { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "Invalid schedule id"})), + ); + } + }; + let cj_id = openfang_types::scheduler::CronJobId(uuid); + let job = match state.kernel.cron_scheduler.try_claim_for_run(cj_id) { + Ok(j) => j, + Err(openfang_kernel::cron::ClaimError::NotFound) => { return ( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Schedule not found"})), ); } - }; - - let agent_id_str = schedule["agent_id"].as_str().unwrap_or(""); - let message = schedule["message"] - .as_str() - .unwrap_or("Scheduled task triggered manually."); - let name = schedule["name"].as_str().unwrap_or("(unnamed)"); - - // Find the target agent — require explicit agent_id, no silent fallback - let target_agent = if !agent_id_str.is_empty() { - if let Ok(aid) = agent_id_str.parse::() { - if state.kernel.registry.get(aid).is_some() { - Some(aid) - } else { - None - } - } else { - state - .kernel - .registry - .list() - .iter() - .find(|a| a.name == agent_id_str) - .map(|a| a.id) - } - } else { - None - }; - - let target_agent = match target_agent { - Some(a) => a, - None => { + Err(openfang_kernel::cron::ClaimError::Disabled) => { return ( - StatusCode::NOT_FOUND, - Json( - serde_json::json!({"error": "No target agent found. Specify an agent_id or start an agent first."}), - ), + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": "Schedule is disabled"})), ); } }; - let run_message = if message.is_empty() { - format!("[Scheduled task '{}' triggered manually]", name) - } else { - message.to_string() - }; - - // Update last_run and run_count - let mut schedules_updated: Vec = - match state.kernel.memory.structured_get(shared_id, SCHEDULES_KEY) { - Ok(Some(serde_json::Value::Array(arr))) => arr, - _ => Vec::new(), - }; - for s in schedules_updated.iter_mut() { - if s["id"].as_str() == Some(&id) { - s["last_run"] = serde_json::Value::String(chrono::Utc::now().to_rfc3339()); - let count = s["run_count"].as_u64().unwrap_or(0); - s["run_count"] = serde_json::json!(count + 1); - break; - } - } - let _ = state.kernel.memory.structured_set( - shared_id, - SCHEDULES_KEY, - serde_json::Value::Array(schedules_updated), - ); - - let kernel_handle: Arc = state.kernel.clone() as Arc; - match state - .kernel - .send_message_with_handle(target_agent, &run_message, Some(kernel_handle), None, None) - .await - { - Ok(result) => ( + let agent_id_str = job.agent_id.to_string(); + match state.kernel.cron_run_job(&job).await { + Ok(response) => ( StatusCode::OK, Json(serde_json::json!({ "status": "completed", "schedule_id": id, - "agent_id": target_agent.to_string(), - "response": result.response, + "agent_id": agent_id_str, + "response": response, })), ), Err(e) => ( @@ -8667,6 +8649,26 @@ pub async fn run_schedule( } } +/// Sanitize a user-supplied schedule name into a valid `CronJob.name`. +/// Matches the kernel's own sanitizer used by the migration path. +fn sanitize_schedule_job_name(raw: &str) -> String { + let filtered: String = raw + .chars() + .map(|c| { + if c.is_alphanumeric() || c == ' ' || c == '-' || c == '_' { + c + } else { + '-' + } + }) + .collect(); + let trimmed = filtered.trim(); + if trimmed.is_empty() { + return "schedule".to_string(); + } + trimmed.chars().take(128).collect() +} + // --------------------------------------------------------------------------- // Agent Identity endpoint // --------------------------------------------------------------------------- diff --git a/crates/openfang-kernel/src/kernel.rs b/crates/openfang-kernel/src/kernel.rs index e8e89e94..fc6c77f0 100644 --- a/crates/openfang-kernel/src/kernel.rs +++ b/crates/openfang-kernel/src/kernel.rs @@ -4287,6 +4287,11 @@ impl OpenFangKernel { } } + // One-shot migration of legacy shared-memory `__openfang_schedules` + // entries (from the old broken `schedule_create` path) into the real + // cron scheduler. Idempotent via a marker key. + self.migrate_shared_memory_schedules(); + // Cron scheduler tick loop — fires due jobs every 15 seconds { let kernel = Arc::clone(self); @@ -4649,6 +4654,192 @@ impl OpenFangKernel { }); } + /// Migrate legacy `__openfang_schedules` shared-memory entries into the + /// real cron scheduler. + /// + /// The old `schedule_create` tool and `/api/schedules` POST route wrote + /// to a shared-memory key that no executor ever read — so jobs registered + /// that way never fired (#1069). This migration runs once at startup, is + /// idempotent via a marker key, and leaves an empty array behind so the + /// old key is no longer written to. + /// + /// Entries with unresolved target agents are skipped (logged at warn + /// level). Successfully migrated entries are added to the cron scheduler + /// and the scheduler is persisted. + pub(crate) fn migrate_shared_memory_schedules(&self) { + const LEGACY_KEY: &str = "__openfang_schedules"; + const MARKER_KEY: &str = "__openfang_schedules_migrated_v1"; + + let shared = shared_memory_agent_id(); + + // Idempotency: if marker is already set, don't re-read. + if let Ok(Some(serde_json::Value::Bool(true))) = + self.memory.structured_get(shared, MARKER_KEY) + { + return; + } + + let entries: Vec = match self.memory.structured_get(shared, LEGACY_KEY) { + Ok(Some(serde_json::Value::Array(arr))) => arr, + Ok(_) => { + // No entries ever written. Mark as migrated and exit. + let _ = + self.memory + .structured_set(shared, MARKER_KEY, serde_json::Value::Bool(true)); + return; + } + Err(e) => { + warn!("Schedule migration: failed to read legacy key: {e}"); + return; + } + }; + + if entries.is_empty() { + let _ = self + .memory + .structured_set(shared, MARKER_KEY, serde_json::Value::Bool(true)); + return; + } + + let mut migrated = 0usize; + let mut skipped = 0usize; + + for entry in &entries { + match self.migrate_single_schedule_entry(entry) { + Ok(()) => migrated += 1, + Err(reason) => { + skipped += 1; + warn!( + reason = %reason, + entry = %entry, + "Schedule migration: skipping legacy entry" + ); + } + } + } + + info!( + migrated, + skipped, + total = entries.len(), + "Migrated legacy __openfang_schedules entries to cron scheduler" + ); + + // Clear the legacy key (store an empty array) and mark migrated so + // the old location is never written to again. + if let Err(e) = + self.memory + .structured_set(shared, LEGACY_KEY, serde_json::Value::Array(Vec::new())) + { + warn!("Schedule migration: failed to clear legacy key: {e}"); + } + if let Err(e) = + self.memory + .structured_set(shared, MARKER_KEY, serde_json::Value::Bool(true)) + { + warn!("Schedule migration: failed to set marker: {e}"); + } + + if migrated > 0 { + if let Err(e) = self.cron_scheduler.persist() { + warn!("Schedule migration: cron persist failed: {e}"); + } + } + } + + /// Convert a single legacy schedule entry into a `CronJob` and add it to + /// the cron scheduler. Returns `Err` with a human-readable reason when + /// the entry cannot be migrated (so the caller can log and skip). + fn migrate_single_schedule_entry(&self, entry: &serde_json::Value) -> Result<(), String> { + use openfang_types::scheduler::{ + CronAction, CronDelivery, CronJob, CronJobId, CronSchedule, + }; + + let cron_expr = entry["cron"] + .as_str() + .ok_or_else(|| "missing 'cron' field".to_string())? + .trim() + .to_string(); + if cron_expr.is_empty() { + return Err("empty cron expression".to_string()); + } + + // Resolve target agent. Tool-shape uses `agent` (name or UUID); + // HTTP-shape uses `agent_id` (UUID or name). Try both. + let agent_hint = entry["agent_id"] + .as_str() + .filter(|s| !s.is_empty()) + .or_else(|| entry["agent"].as_str()) + .unwrap_or("") + .trim() + .to_string(); + + let target_agent = if agent_hint.is_empty() { + return Err("no target agent specified".to_string()); + } else if let Ok(uuid) = uuid::Uuid::parse_str(&agent_hint) { + let aid = AgentId(uuid); + if self.registry.get(aid).is_none() { + return Err(format!("agent {agent_hint} not in registry")); + } + aid + } else { + let found = self + .registry + .list() + .into_iter() + .find(|a| a.name == agent_hint); + match found { + Some(a) => a.id, + None => return Err(format!("agent '{agent_hint}' not found")), + } + }; + + // Message for the agent turn: prefer explicit `message`, fallback to + // `description` (tool shape), else a default string. + let message = entry["message"] + .as_str() + .filter(|s| !s.is_empty()) + .or_else(|| entry["description"].as_str()) + .unwrap_or("Scheduled task") + .to_string(); + + // Job name: prefer `name`, else sanitize description, else a default. + let raw_name = entry["name"] + .as_str() + .filter(|s| !s.is_empty()) + .or_else(|| entry["description"].as_str()) + .unwrap_or("migrated-schedule") + .to_string(); + let name = sanitize_cron_job_name(&raw_name); + + let enabled = entry["enabled"].as_bool().unwrap_or(true); + + let job = CronJob { + id: CronJobId::new(), + agent_id: target_agent, + name, + enabled, + schedule: CronSchedule::Cron { + expr: cron_expr, + tz: None, + }, + action: CronAction::AgentTurn { + message, + model_override: None, + timeout_secs: None, + }, + delivery: CronDelivery::None, + created_at: chrono::Utc::now(), + last_run: None, + next_run: None, + }; + + self.cron_scheduler + .add_job(job, false) + .map_err(|e| format!("add_job failed: {e}"))?; + Ok(()) + } + /// Gracefully shutdown the kernel. /// /// This cleanly shuts down in-memory state but preserves persistent agent @@ -6018,6 +6209,31 @@ pub fn shared_memory_agent_id() -> AgentId { ])) } +/// Sanitize a human-readable string into a valid `CronJob.name`. +/// +/// `CronJob::validate` requires the name to be 1..=128 chars and composed +/// of alphanumeric, space, hyphen, and underscore characters only. This is +/// used by the legacy schedule migration path where the source "name" may +/// contain punctuation or be too long. +fn sanitize_cron_job_name(raw: &str) -> String { + let filtered: String = raw + .chars() + .map(|c| { + if c.is_alphanumeric() || c == ' ' || c == '-' || c == '_' { + c + } else { + '-' + } + }) + .collect(); + let trimmed = filtered.trim(); + if trimmed.is_empty() { + return "migrated-schedule".to_string(); + } + let truncated: String = trimmed.chars().take(128).collect(); + truncated +} + /// Deliver a cron job's agent response to the configured delivery target. async fn cron_deliver_response( kernel: &OpenFangKernel, @@ -7099,4 +7315,228 @@ mod tests { kernel.shutdown(); } + + // ---------------------------------------------------------------------- + // Issue #1069: sanitize_cron_job_name + shared-memory schedule migration + // ---------------------------------------------------------------------- + + #[test] + fn test_sanitize_cron_job_name_basic() { + assert_eq!(super::sanitize_cron_job_name("hello"), "hello"); + assert_eq!(super::sanitize_cron_job_name("hello world"), "hello world"); + assert_eq!(super::sanitize_cron_job_name("job_name-1"), "job_name-1"); + } + + #[test] + fn test_sanitize_cron_job_name_strips_punctuation() { + let out = super::sanitize_cron_job_name("Remind me: report!!"); + assert!(!out.contains(':')); + assert!(!out.contains('!')); + assert!(out + .chars() + .all(|c| c.is_alphanumeric() || c == ' ' || c == '-' || c == '_')); + } + + #[test] + fn test_sanitize_cron_job_name_empty_fallback() { + assert_eq!(super::sanitize_cron_job_name(""), "migrated-schedule"); + assert_eq!(super::sanitize_cron_job_name(" "), "migrated-schedule"); + } + + #[test] + fn test_sanitize_cron_job_name_caps_128_chars() { + let long = "x".repeat(500); + let out = super::sanitize_cron_job_name(&long); + assert!(out.chars().count() <= 128); + } + + /// Register a minimal test agent in a booted kernel and return its ID. + /// Kept local to the tests module to avoid widening the kernel's public + /// surface. + fn register_test_agent(kernel: &OpenFangKernel, name: &str) -> AgentId { + let agent_id = AgentId::new(); + let entry = AgentEntry { + id: agent_id, + name: name.to_string(), + manifest: test_manifest(name, "migration test", vec![]), + state: AgentState::Running, + mode: AgentMode::default(), + created_at: chrono::Utc::now(), + last_active: chrono::Utc::now(), + parent: None, + children: vec![], + session_id: SessionId::new(), + tags: vec![], + identity: Default::default(), + onboarding_completed: false, + onboarding_completed_at: None, + }; + kernel.registry.register(entry).unwrap(); + agent_id + } + + #[test] + fn test_migrate_shared_memory_schedules_imports_legacy_entries() { + let tmp = tempfile::tempdir().unwrap(); + let home_dir = tmp.path().join("openfang-migrate"); + std::fs::create_dir_all(&home_dir).unwrap(); + let config = KernelConfig { + home_dir: home_dir.clone(), + data_dir: home_dir.join("data"), + ..KernelConfig::default() + }; + + let kernel = OpenFangKernel::boot_with_config(config).expect("kernel boots"); + + // Register a target agent the legacy entries can point at. + let agent = register_test_agent(&kernel, "report-agent"); + + // Pre-populate the legacy shared-memory key with two entries in the + // two shapes that actually shipped: (a) tool-shape (description + + // agent name) and (b) HTTP-shape (name + agent_id UUID). + let shared = super::shared_memory_agent_id(); + let legacy_entries = serde_json::json!([ + { + "description": "Send the daily report", + "cron": "0 9 * * *", + "agent": "report-agent", + }, + { + "name": "weekly/summary: monday!", + "message": "Post the weekly summary", + "cron": "0 10 * * 1", + "agent_id": agent.0.to_string(), + }, + ]); + kernel + .memory + .structured_set(shared, "__openfang_schedules", legacy_entries) + .unwrap(); + + // Sanity: before migration, the cron scheduler is empty. + assert_eq!(kernel.cron_scheduler.total_jobs(), 0); + + kernel.migrate_shared_memory_schedules(); + + // Both legacy entries should now live in the cron scheduler. + let jobs = kernel.cron_scheduler.list_jobs(agent); + assert_eq!(jobs.len(), 2, "both legacy entries should migrate"); + + let names: Vec<&str> = jobs.iter().map(|j| j.name.as_str()).collect(); + assert!(names.iter().any(|n| n.contains("Send the daily report"))); + // Punctuation in the second entry's name is sanitized to hyphens. + assert!( + names.iter().any(|n| !n.contains('/') && !n.contains(':')), + "sanitized name must not contain '/' or ':' ({names:?})" + ); + + // The legacy key is cleared and the marker is set so we never read + // it again. + let remaining = kernel + .memory + .structured_get(shared, "__openfang_schedules") + .unwrap(); + assert_eq!(remaining, Some(serde_json::Value::Array(vec![]))); + let marker = kernel + .memory + .structured_get(shared, "__openfang_schedules_migrated_v1") + .unwrap(); + assert_eq!(marker, Some(serde_json::Value::Bool(true))); + + kernel.shutdown(); + } + + #[test] + fn test_migrate_shared_memory_schedules_is_idempotent() { + let tmp = tempfile::tempdir().unwrap(); + let home_dir = tmp.path().join("openfang-migrate-idem"); + std::fs::create_dir_all(&home_dir).unwrap(); + let config = KernelConfig { + home_dir: home_dir.clone(), + data_dir: home_dir.join("data"), + ..KernelConfig::default() + }; + let kernel = OpenFangKernel::boot_with_config(config).expect("kernel boots"); + let agent = register_test_agent(&kernel, "idem-agent"); + let shared = super::shared_memory_agent_id(); + + kernel + .memory + .structured_set( + shared, + "__openfang_schedules", + serde_json::json!([{ + "description": "Ping", + "cron": "*/5 * * * *", + "agent_id": agent.0.to_string(), + }]), + ) + .unwrap(); + + kernel.migrate_shared_memory_schedules(); + assert_eq!(kernel.cron_scheduler.list_jobs(agent).len(), 1); + + // Second call must not re-import anything even if someone re-writes + // the legacy key by accident; the marker gates us. + kernel + .memory + .structured_set( + shared, + "__openfang_schedules", + serde_json::json!([{ + "description": "Ping again", + "cron": "*/5 * * * *", + "agent_id": agent.0.to_string(), + }]), + ) + .unwrap(); + kernel.migrate_shared_memory_schedules(); + assert_eq!( + kernel.cron_scheduler.list_jobs(agent).len(), + 1, + "migration must be idempotent via the marker key" + ); + + kernel.shutdown(); + } + + #[test] + fn test_migrate_shared_memory_schedules_skips_unknown_agent() { + let tmp = tempfile::tempdir().unwrap(); + let home_dir = tmp.path().join("openfang-migrate-skip"); + std::fs::create_dir_all(&home_dir).unwrap(); + let config = KernelConfig { + home_dir: home_dir.clone(), + data_dir: home_dir.join("data"), + ..KernelConfig::default() + }; + let kernel = OpenFangKernel::boot_with_config(config).expect("kernel boots"); + let shared = super::shared_memory_agent_id(); + + // Entry references an agent that does not exist in the registry. + kernel + .memory + .structured_set( + shared, + "__openfang_schedules", + serde_json::json!([{ + "description": "Ping", + "cron": "*/5 * * * *", + "agent": "does-not-exist", + }]), + ) + .unwrap(); + + kernel.migrate_shared_memory_schedules(); + + // Nothing migrated, but the marker is still set so we don't retry. + assert_eq!(kernel.cron_scheduler.total_jobs(), 0); + let marker = kernel + .memory + .structured_get(shared, "__openfang_schedules_migrated_v1") + .unwrap(); + assert_eq!(marker, Some(serde_json::Value::Bool(true))); + + kernel.shutdown(); + } } diff --git a/crates/openfang-runtime/src/tool_runner.rs b/crates/openfang-runtime/src/tool_runner.rs index da3e54e5..426637a7 100644 --- a/crates/openfang-runtime/src/tool_runner.rs +++ b/crates/openfang-runtime/src/tool_runner.rs @@ -313,8 +313,8 @@ pub async fn execute_tool( "event_publish" => tool_event_publish(input, kernel).await, // Scheduling tools - "schedule_create" => tool_schedule_create(input, kernel).await, - "schedule_list" => tool_schedule_list(kernel).await, + "schedule_create" => tool_schedule_create(input, kernel, caller_agent_id).await, + "schedule_list" => tool_schedule_list(kernel, caller_agent_id).await, "schedule_delete" => tool_schedule_delete(input, kernel).await, // Knowledge graph tools @@ -2091,11 +2091,65 @@ fn parse_time_to_hour(s: &str) -> Result { Ok(hour) } -const SCHEDULES_KEY: &str = "__openfang_schedules"; +/// Sanitize a description into a valid `CronJob.name` (alphanumeric + +/// space/hyphen/underscore, 1..=128 chars). +fn sanitize_schedule_name(description: &str) -> String { + let filtered: String = description + .chars() + .map(|c| { + if c.is_alphanumeric() || c == ' ' || c == '-' || c == '_' { + c + } else { + '-' + } + }) + .collect(); + let trimmed = filtered.trim(); + if trimmed.is_empty() { + return "scheduled-task".to_string(); + } + trimmed.chars().take(128).collect() +} + +/// Resolve the `agent` field of `schedule_create` into an agent UUID string +/// suitable for `KernelHandle::cron_create`. +/// +/// - Empty / "self" → caller's agent ID. +/// - Valid UUID → passed through. +/// - Non-empty name → looked up via `find_agents`; an exact name match wins, +/// a single fuzzy match is accepted, ambiguity is an error. +fn resolve_schedule_target( + kh: &Arc, + agent: &str, + caller_agent_id: Option<&str>, +) -> Result { + let a = agent.trim(); + if a.is_empty() || a.eq_ignore_ascii_case("self") { + return caller_agent_id.map(|s| s.to_string()).ok_or_else(|| { + "No caller agent available; specify 'agent' to target an agent by name or UUID" + .to_string() + }); + } + if uuid::Uuid::parse_str(a).is_ok() { + return Ok(a.to_string()); + } + let matches = kh.find_agents(a); + if let Some(m) = matches.iter().find(|m| m.name == a) { + return Ok(m.id.clone()); + } + match matches.len() { + 0 => Err(format!("Agent '{a}' not found")), + 1 => Ok(matches[0].id.clone()), + n => Err(format!( + "Agent name '{a}' is ambiguous ({n} matches). Pass the agent UUID." + )), + } +} async fn tool_schedule_create( input: &serde_json::Value, kernel: Option<&Arc>, + caller_agent_id: Option<&str>, ) -> Result { let kh = require_kernel(kernel)?; let description = input["description"] @@ -2104,58 +2158,77 @@ async fn tool_schedule_create( let schedule_str = input["schedule"] .as_str() .ok_or("Missing 'schedule' parameter")?; - let agent = input["agent"].as_str().unwrap_or(""); + let agent_input = input["agent"].as_str().unwrap_or(""); let cron_expr = parse_schedule_to_cron(schedule_str)?; - let schedule_id = uuid::Uuid::new_v4().to_string(); + let target_agent_id = resolve_schedule_target(kh, agent_input, caller_agent_id)?; + let name = sanitize_schedule_name(description); - let entry = serde_json::json!({ - "id": schedule_id, - "description": description, - "schedule_input": schedule_str, - "cron": cron_expr, - "agent": agent, - "created_at": chrono::Utc::now().to_rfc3339(), - "enabled": true, + let job_json = serde_json::json!({ + "name": name, + "schedule": { "kind": "cron", "expr": cron_expr, "tz": null }, + "action": { + "kind": "agent_turn", + "message": description, + "model_override": null, + "timeout_secs": null, + }, + "delivery": { "kind": "none" }, + "one_shot": false, }); - // Load existing schedules from shared memory - let mut schedules: Vec = match kh.memory_recall(SCHEDULES_KEY)? { - Some(serde_json::Value::Array(arr)) => arr, - _ => Vec::new(), + let resp = kh.cron_create(&target_agent_id, job_json).await?; + // Kernel returns JSON `{ "job_id": "...", "status": "created" }`. + let job_id = serde_json::from_str::(&resp) + .ok() + .and_then(|v| v["job_id"].as_str().map(str::to_string)) + .unwrap_or_else(|| resp.clone()); + + let agent_display = if agent_input.trim().is_empty() { + "(self)".to_string() + } else { + agent_input.to_string() }; - - schedules.push(entry); - kh.memory_store(SCHEDULES_KEY, serde_json::Value::Array(schedules))?; - Ok(format!( - "Schedule created:\n ID: {schedule_id}\n Description: {description}\n Cron: {cron_expr}\n Original: {schedule_str}" + "Schedule created:\n ID: {job_id}\n Description: {description}\n Cron: {cron_expr}\n Original: {schedule_str}\n Agent: {agent_display}" )) } -async fn tool_schedule_list(kernel: Option<&Arc>) -> Result { +async fn tool_schedule_list( + kernel: Option<&Arc>, + caller_agent_id: Option<&str>, +) -> Result { let kh = require_kernel(kernel)?; + let agent_id = + caller_agent_id.ok_or("Agent ID required for schedule_list (no caller context)")?; - let schedules: Vec = match kh.memory_recall(SCHEDULES_KEY)? { - Some(serde_json::Value::Array(arr)) => arr, - _ => Vec::new(), - }; - - if schedules.is_empty() { + let jobs = kh.cron_list(agent_id).await?; + if jobs.is_empty() { return Ok("No scheduled tasks.".to_string()); } - let mut output = format!("Scheduled tasks ({}):\n\n", schedules.len()); - for s in &schedules { - let enabled = s["enabled"].as_bool().unwrap_or(true); + let mut output = format!("Scheduled tasks ({}):\n\n", jobs.len()); + for job in &jobs { + let enabled = job["enabled"].as_bool().unwrap_or(true); let status = if enabled { "active" } else { "paused" }; + let id = job["id"].as_str().unwrap_or("?"); + let schedule_display = match job["schedule"]["kind"].as_str() { + Some("cron") => job["schedule"]["expr"].as_str().unwrap_or("?").to_string(), + Some("every") => format!( + "every {}s", + job["schedule"]["every_secs"].as_u64().unwrap_or(0) + ), + Some("at") => job["schedule"]["at"].as_str().unwrap_or("?").to_string(), + _ => "?".to_string(), + }; + let description = job["action"]["message"] + .as_str() + .or_else(|| job["action"]["text"].as_str()) + .unwrap_or_else(|| job["name"].as_str().unwrap_or("?")); + let created = job["created_at"].as_str().unwrap_or("?"); + let agent = job["agent_id"].as_str().unwrap_or("(self)"); output.push_str(&format!( - " [{status}] {} — {}\n Cron: {} | Agent: {}\n Created: {}\n\n", - s["id"].as_str().unwrap_or("?"), - s["description"].as_str().unwrap_or("?"), - s["cron"].as_str().unwrap_or("?"), - s["agent"].as_str().unwrap_or("(self)"), - s["created_at"].as_str().unwrap_or("?"), + " [{status}] {id} — {description}\n Cron: {schedule_display} | Agent: {agent}\n Created: {created}\n\n" )); } @@ -2168,20 +2241,9 @@ async fn tool_schedule_delete( ) -> Result { let kh = require_kernel(kernel)?; let id = input["id"].as_str().ok_or("Missing 'id' parameter")?; - - let mut schedules: Vec = match kh.memory_recall(SCHEDULES_KEY)? { - Some(serde_json::Value::Array(arr)) => arr, - _ => Vec::new(), - }; - - let before = schedules.len(); - schedules.retain(|s| s["id"].as_str() != Some(id)); - - if schedules.len() == before { - return Err(format!("Schedule '{id}' not found.")); - } - - kh.memory_store(SCHEDULES_KEY, serde_json::Value::Array(schedules))?; + kh.cron_cancel(id) + .await + .map_err(|e| format!("Schedule '{id}' not found: {e}"))?; Ok(format!("Schedule '{id}' deleted.")) } @@ -4191,4 +4253,268 @@ mod tests { assert!(!is_shell_tool("file_read")); assert!(!is_shell_tool("web_fetch")); } + + // ---------------------------------------------------------------------- + // Issue #1069: schedule_* tools route through the kernel cron scheduler + // ---------------------------------------------------------------------- + + #[test] + fn test_sanitize_schedule_name_strips_punctuation() { + // Colons, commas, dots, and other punctuation are replaced with '-'. + let out = sanitize_schedule_name("Remind me: file report, please."); + assert!(!out.contains(':')); + assert!(!out.contains(',')); + assert!(!out.contains('.')); + // Spaces, hyphens, and underscores survive. + assert!(out + .chars() + .all(|c| c.is_alphanumeric() || c == ' ' || c == '-' || c == '_')); + assert!(!out.is_empty()); + } + + #[test] + fn test_sanitize_schedule_name_empty_fallback() { + assert_eq!(sanitize_schedule_name(""), "scheduled-task"); + assert_eq!(sanitize_schedule_name(" "), "scheduled-task"); + } + + #[test] + fn test_sanitize_schedule_name_caps_length() { + let long = "a".repeat(500); + let out = sanitize_schedule_name(&long); + assert!(out.chars().count() <= 128); + } + + // Minimal in-memory KernelHandle used to verify schedule_* tool wiring. + // Records every cron_* call so tests can assert what the tool pushed into + // the kernel, without booting a real OpenFangKernel. + struct FakeKernelHandle { + created: std::sync::Mutex>, + cancelled: std::sync::Mutex>, + jobs: std::sync::Mutex>, + } + + impl FakeKernelHandle { + fn new() -> Self { + Self { + created: std::sync::Mutex::new(Vec::new()), + cancelled: std::sync::Mutex::new(Vec::new()), + jobs: std::sync::Mutex::new(Vec::new()), + } + } + + fn with_job(self, job: serde_json::Value) -> Self { + self.jobs.lock().unwrap().push(job); + self + } + } + + #[async_trait::async_trait] + impl crate::kernel_handle::KernelHandle for FakeKernelHandle { + async fn spawn_agent( + &self, + _manifest_toml: &str, + _parent_id: Option<&str>, + ) -> Result<(String, String), String> { + Err("not used".into()) + } + async fn send_to_agent(&self, _agent_id: &str, _message: &str) -> Result { + Err("not used".into()) + } + fn list_agents(&self) -> Vec { + vec![] + } + fn kill_agent(&self, _agent_id: &str) -> Result<(), String> { + Ok(()) + } + fn memory_store(&self, _key: &str, _value: serde_json::Value) -> Result<(), String> { + Ok(()) + } + fn memory_recall(&self, _key: &str) -> Result, String> { + Ok(None) + } + fn find_agents(&self, _query: &str) -> Vec { + vec![] + } + async fn task_post( + &self, + _title: &str, + _description: &str, + _assigned_to: Option<&str>, + _created_by: Option<&str>, + ) -> Result { + Err("not used".into()) + } + async fn task_claim(&self, _agent_id: &str) -> Result, String> { + Ok(None) + } + async fn task_complete(&self, _task_id: &str, _result: &str) -> Result<(), String> { + Ok(()) + } + async fn task_list(&self, _status: Option<&str>) -> Result, String> { + Ok(vec![]) + } + async fn publish_event( + &self, + _event_type: &str, + _payload: serde_json::Value, + ) -> Result<(), String> { + Ok(()) + } + async fn knowledge_add_entity( + &self, + _entity: openfang_types::memory::Entity, + ) -> Result { + Err("not used".into()) + } + async fn knowledge_add_relation( + &self, + _relation: openfang_types::memory::Relation, + ) -> Result { + Err("not used".into()) + } + async fn knowledge_query( + &self, + _pattern: openfang_types::memory::GraphPattern, + ) -> Result, String> { + Ok(vec![]) + } + + async fn cron_create( + &self, + agent_id: &str, + job_json: serde_json::Value, + ) -> Result { + let id = format!("job-{}", self.created.lock().unwrap().len()); + self.created + .lock() + .unwrap() + .push((agent_id.to_string(), job_json.clone())); + // Mirror what the real kernel returns (see cron_create in + // openfang-kernel): `{ "job_id": "...", "status": "created" }`. + let resp = serde_json::json!({ "job_id": id, "status": "created" }); + Ok(resp.to_string()) + } + + async fn cron_list(&self, _agent_id: &str) -> Result, String> { + Ok(self.jobs.lock().unwrap().clone()) + } + + async fn cron_cancel(&self, job_id: &str) -> Result<(), String> { + self.cancelled.lock().unwrap().push(job_id.to_string()); + Ok(()) + } + } + + #[tokio::test] + async fn test_schedule_create_routes_to_cron_scheduler() { + let fake = Arc::new(FakeKernelHandle::new()); + let handle: Arc = fake.clone(); + let caller = "11111111-1111-1111-1111-111111111111"; + + let input = serde_json::json!({ + "description": "Daily report", + "schedule": "daily at 9am", + "agent": "self", + }); + + let out = tool_schedule_create(&input, Some(&handle), Some(caller)) + .await + .expect("tool_schedule_create should succeed with a valid schedule"); + + // User-facing response shape is preserved. + assert!(out.starts_with("Schedule created:")); + assert!(out.contains("Daily report")); + assert!(out.contains("Cron: ")); + + // The fake kernel received a cron_create for the caller agent with a + // well-formed job_json. This is the whole point of #1069: the tool + // must call into the cron scheduler, not just write to shared memory. + let created = fake.created.lock().unwrap(); + assert_eq!(created.len(), 1, "cron_create must be called exactly once"); + assert_eq!(created[0].0, caller, "target agent must be the caller"); + let job = &created[0].1; + assert_eq!(job["schedule"]["kind"], "cron"); + assert_eq!(job["action"]["kind"], "agent_turn"); + assert_eq!(job["action"]["message"], "Daily report"); + assert!(job["schedule"]["expr"].is_string()); + assert_eq!(job["one_shot"], false); + } + + #[tokio::test] + async fn test_schedule_create_rejects_missing_description() { + let fake = Arc::new(FakeKernelHandle::new()); + let handle: Arc = fake.clone(); + let input = serde_json::json!({ "schedule": "every hour" }); + let err = tool_schedule_create(&input, Some(&handle), Some("aaa")) + .await + .unwrap_err(); + assert!(err.contains("description")); + } + + #[tokio::test] + async fn test_schedule_list_reads_from_cron_scheduler() { + let job = serde_json::json!({ + "id": "cron-1", + "name": "demo", + "enabled": true, + "schedule": { "kind": "cron", "expr": "0 9 * * *" }, + "action": { "kind": "agent_turn", "message": "hello" }, + "created_at": "2026-01-01T00:00:00Z", + "agent_id": "aaa", + }); + let fake = Arc::new(FakeKernelHandle::new().with_job(job)); + let handle: Arc = fake.clone(); + + let out = tool_schedule_list(Some(&handle), Some("aaa")) + .await + .expect("schedule_list should succeed"); + assert!(out.contains("Scheduled tasks (1)")); + assert!(out.contains("0 9 * * *")); + assert!(out.contains("hello")); + } + + #[tokio::test] + async fn test_schedule_list_empty() { + let fake = Arc::new(FakeKernelHandle::new()); + let handle: Arc = fake.clone(); + let out = tool_schedule_list(Some(&handle), Some("aaa")) + .await + .unwrap(); + assert_eq!(out, "No scheduled tasks."); + } + + #[tokio::test] + async fn test_schedule_delete_routes_to_cron_cancel() { + let fake = Arc::new(FakeKernelHandle::new()); + let handle: Arc = fake.clone(); + let input = serde_json::json!({ "id": "abc-123" }); + let out = tool_schedule_delete(&input, Some(&handle)).await.unwrap(); + assert!(out.contains("abc-123")); + let cancelled = fake.cancelled.lock().unwrap(); + assert_eq!(cancelled.len(), 1); + assert_eq!(cancelled[0], "abc-123"); + } + + #[tokio::test] + async fn test_schedule_tools_require_kernel() { + // Without a kernel handle, the new tools must fail loudly rather than + // silently writing to the old shared-memory key. + let err = tool_schedule_create( + &serde_json::json!({"description": "x", "schedule": "every hour"}), + None, + Some("aaa"), + ) + .await + .unwrap_err(); + assert!(err.to_lowercase().contains("kernel")); + + let err = tool_schedule_list(None, Some("aaa")).await.unwrap_err(); + assert!(err.to_lowercase().contains("kernel")); + + let err = tool_schedule_delete(&serde_json::json!({"id": "x"}), None) + .await + .unwrap_err(); + assert!(err.to_lowercase().contains("kernel")); + } }