mirror of
https://github.com/RightNow-AI/openfang.git
synced 2026-04-25 17:25:11 +02:00
fix(scheduler): route schedule_* tools and /api/schedules through kernel cron scheduler (#1069) (#1077)
The schedule_create tool, its sibling schedule_list and schedule_delete, and the matching /api/schedules HTTP routes were all writing to a shared-memory key that no executor ever read. Jobs registered that way silently never fired. Route all three tools and all /api/schedules endpoints through the real cron scheduler in openfang-kernel. Add a one-shot idempotent migration at kernel startup that imports legacy __openfang_schedules entries into the cron scheduler and clears the old key. Tests: - Unit tests for sanitize_schedule_name and sanitize_cron_job_name - Tool wrapper tests using a fake KernelHandle that verify schedule_create/list/delete route into cron_create/list/cancel - Migration tests cover the happy path, idempotency via the marker key, and skipping entries whose target agent is not in the registry Quality gates: cargo check + test + clippy -D warnings + fmt clean on openfang-kernel, openfang-runtime, openfang-api. Made-with: Cursor
This commit is contained in:
@@ -8321,40 +8321,66 @@ pub async fn reload_integrations(State(state): State<Arc<AppState>>) -> impl Int
|
||||
// ---------------------------------------------------------------------------
|
||||
// Scheduled Jobs (cron) endpoints
|
||||
// ---------------------------------------------------------------------------
|
||||
//
|
||||
// Historical note: an earlier implementation of `/api/schedules*` wrote to a
|
||||
// shared-memory key (`__openfang_schedules`) that no executor ever read — so
|
||||
// scheduled jobs registered via this API never actually fired (#1069). These
|
||||
// routes now delegate to the kernel's real cron scheduler, which already
|
||||
// backs `/api/cron/jobs*`. The request/response shape has been preserved as
|
||||
// close as possible to the legacy route for dashboard compatibility.
|
||||
|
||||
/// The well-known shared-memory agent ID used for cross-agent KV storage.
|
||||
/// Must match the value in `openfang-kernel/src/kernel.rs::shared_memory_agent_id()`.
|
||||
fn schedule_shared_agent_id() -> AgentId {
|
||||
AgentId(uuid::Uuid::from_bytes([
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||
0x01,
|
||||
]))
|
||||
/// Convert an internal `CronJob` into the legacy `/api/schedules` response
|
||||
/// shape so existing dashboard code keeps working.
|
||||
fn cron_job_to_schedule_view(
|
||||
kernel: &OpenFangKernel,
|
||||
job: &openfang_types::scheduler::CronJob,
|
||||
) -> serde_json::Value {
|
||||
use openfang_types::scheduler::{CronAction, CronSchedule};
|
||||
|
||||
let cron = match &job.schedule {
|
||||
CronSchedule::Cron { expr, .. } => expr.clone(),
|
||||
CronSchedule::Every { every_secs } => format!("every {every_secs}s"),
|
||||
CronSchedule::At { at } => at.to_rfc3339(),
|
||||
};
|
||||
let message = match &job.action {
|
||||
CronAction::AgentTurn { message, .. } => message.clone(),
|
||||
CronAction::SystemEvent { text } => text.clone(),
|
||||
CronAction::WorkflowRun { workflow_id, .. } => format!("workflow:{workflow_id}"),
|
||||
};
|
||||
let meta = kernel.cron_scheduler.get_meta(job.id);
|
||||
let last_status = meta.as_ref().and_then(|m| m.last_status.clone());
|
||||
|
||||
serde_json::json!({
|
||||
"id": job.id.to_string(),
|
||||
"name": job.name,
|
||||
"cron": cron,
|
||||
"agent_id": job.agent_id.to_string(),
|
||||
"message": message,
|
||||
"enabled": job.enabled,
|
||||
"created_at": job.created_at.to_rfc3339(),
|
||||
"last_run": job.last_run.map(|t| t.to_rfc3339()),
|
||||
"next_run": job.next_run.map(|t| t.to_rfc3339()),
|
||||
"last_status": last_status,
|
||||
})
|
||||
}
|
||||
|
||||
const SCHEDULES_KEY: &str = "__openfang_schedules";
|
||||
|
||||
/// GET /api/schedules — List all cron-based scheduled jobs.
|
||||
/// GET /api/schedules — List all scheduled jobs from the cron scheduler.
|
||||
pub async fn list_schedules(State(state): State<Arc<AppState>>) -> impl IntoResponse {
|
||||
let agent_id = schedule_shared_agent_id();
|
||||
match state.kernel.memory.structured_get(agent_id, SCHEDULES_KEY) {
|
||||
Ok(Some(serde_json::Value::Array(arr))) => {
|
||||
let total = arr.len();
|
||||
Json(serde_json::json!({"schedules": arr, "total": total}))
|
||||
}
|
||||
Ok(_) => Json(serde_json::json!({"schedules": [], "total": 0})),
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to load schedules: {e}");
|
||||
Json(serde_json::json!({"schedules": [], "total": 0, "error": format!("{e}")}))
|
||||
}
|
||||
}
|
||||
let jobs = state.kernel.cron_scheduler.list_all_jobs();
|
||||
let total = jobs.len();
|
||||
let schedules: Vec<serde_json::Value> = jobs
|
||||
.iter()
|
||||
.map(|j| cron_job_to_schedule_view(&state.kernel, j))
|
||||
.collect();
|
||||
Json(serde_json::json!({"schedules": schedules, "total": total}))
|
||||
}
|
||||
|
||||
/// POST /api/schedules — Create a new cron-based scheduled job.
|
||||
/// POST /api/schedules — Create a new scheduled job in the cron scheduler.
|
||||
pub async fn create_schedule(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Json(req): Json<serde_json::Value>,
|
||||
) -> impl IntoResponse {
|
||||
let name = match req["name"].as_str() {
|
||||
let name_raw = match req["name"].as_str() {
|
||||
Some(n) if !n.is_empty() => n.to_string(),
|
||||
_ => {
|
||||
return (
|
||||
@@ -8374,7 +8400,6 @@ pub async fn create_schedule(
|
||||
}
|
||||
};
|
||||
|
||||
// Validate cron expression: must be 5 space-separated fields
|
||||
let cron_parts: Vec<&str> = cron.split_whitespace().collect();
|
||||
if cron_parts.len() != 5 {
|
||||
return (
|
||||
@@ -8392,166 +8417,183 @@ pub async fn create_schedule(
|
||||
Json(serde_json::json!({"error": "Missing required field: agent_id"})),
|
||||
);
|
||||
}
|
||||
// Validate agent exists (UUID or name lookup)
|
||||
let agent_exists = if let Ok(aid) = agent_id_str.parse::<AgentId>() {
|
||||
state.kernel.registry.get(aid).is_some()
|
||||
let target_agent = if let Ok(aid) = agent_id_str.parse::<AgentId>() {
|
||||
if state.kernel.registry.get(aid).is_some() {
|
||||
Some(aid)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
state
|
||||
.kernel
|
||||
.registry
|
||||
.list()
|
||||
.iter()
|
||||
.any(|a| a.name == agent_id_str)
|
||||
.into_iter()
|
||||
.find(|a| a.name == agent_id_str)
|
||||
.map(|a| a.id)
|
||||
};
|
||||
if !agent_exists {
|
||||
return (
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(serde_json::json!({"error": format!("Agent not found: {agent_id_str}")})),
|
||||
);
|
||||
}
|
||||
let target_agent = match target_agent {
|
||||
Some(a) => a,
|
||||
None => {
|
||||
return (
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(serde_json::json!({"error": format!("Agent not found: {agent_id_str}")})),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let message = req["message"].as_str().unwrap_or("").to_string();
|
||||
let enabled = req.get("enabled").and_then(|v| v.as_bool()).unwrap_or(true);
|
||||
let job_message = if message.is_empty() {
|
||||
format!("[Scheduled task '{name_raw}']")
|
||||
} else {
|
||||
message.clone()
|
||||
};
|
||||
|
||||
let schedule_id = uuid::Uuid::new_v4().to_string();
|
||||
let entry = serde_json::json!({
|
||||
"id": schedule_id,
|
||||
"name": name,
|
||||
"cron": cron,
|
||||
"agent_id": agent_id_str,
|
||||
"message": message,
|
||||
"enabled": enabled,
|
||||
"created_at": chrono::Utc::now().to_rfc3339(),
|
||||
"last_run": null,
|
||||
"run_count": 0,
|
||||
let job_json = serde_json::json!({
|
||||
"name": sanitize_schedule_job_name(&name_raw),
|
||||
"schedule": { "kind": "cron", "expr": cron, "tz": null },
|
||||
"action": {
|
||||
"kind": "agent_turn",
|
||||
"message": job_message,
|
||||
"model_override": null,
|
||||
"timeout_secs": null,
|
||||
},
|
||||
"delivery": { "kind": "none" },
|
||||
"one_shot": false,
|
||||
});
|
||||
|
||||
let shared_id = schedule_shared_agent_id();
|
||||
let mut schedules: Vec<serde_json::Value> =
|
||||
match state.kernel.memory.structured_get(shared_id, SCHEDULES_KEY) {
|
||||
Ok(Some(serde_json::Value::Array(arr))) => arr,
|
||||
_ => Vec::new(),
|
||||
};
|
||||
|
||||
schedules.push(entry.clone());
|
||||
if let Err(e) = state.kernel.memory.structured_set(
|
||||
shared_id,
|
||||
SCHEDULES_KEY,
|
||||
serde_json::Value::Array(schedules),
|
||||
) {
|
||||
tracing::warn!("Failed to save schedule: {e}");
|
||||
return (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": format!("Failed to save schedule: {e}")})),
|
||||
);
|
||||
match state
|
||||
.kernel
|
||||
.cron_create(&target_agent.to_string(), job_json)
|
||||
.await
|
||||
{
|
||||
Ok(resp) => {
|
||||
let job_id = serde_json::from_str::<serde_json::Value>(&resp)
|
||||
.ok()
|
||||
.and_then(|v| v["job_id"].as_str().map(str::to_string))
|
||||
.unwrap_or_default();
|
||||
if !enabled {
|
||||
if let Ok(uuid) = uuid::Uuid::parse_str(&job_id) {
|
||||
let cj_id = openfang_types::scheduler::CronJobId(uuid);
|
||||
let _ = state.kernel.cron_scheduler.set_enabled(cj_id, false);
|
||||
let _ = state.kernel.cron_scheduler.persist();
|
||||
}
|
||||
}
|
||||
// Build response in the legacy shape.
|
||||
let body = if let Ok(uuid) = uuid::Uuid::parse_str(&job_id) {
|
||||
let cj_id = openfang_types::scheduler::CronJobId(uuid);
|
||||
match state.kernel.cron_scheduler.get_job(cj_id) {
|
||||
Some(job) => cron_job_to_schedule_view(&state.kernel, &job),
|
||||
None => serde_json::json!({
|
||||
"id": job_id,
|
||||
"name": name_raw,
|
||||
"cron": cron,
|
||||
"agent_id": target_agent.to_string(),
|
||||
"message": message,
|
||||
"enabled": enabled,
|
||||
}),
|
||||
}
|
||||
} else {
|
||||
serde_json::json!({
|
||||
"id": job_id,
|
||||
"name": name_raw,
|
||||
"cron": cron,
|
||||
"agent_id": target_agent.to_string(),
|
||||
"message": message,
|
||||
"enabled": enabled,
|
||||
})
|
||||
};
|
||||
(StatusCode::CREATED, Json(body))
|
||||
}
|
||||
Err(e) => (
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"error": format!("Failed to create schedule: {e}")})),
|
||||
),
|
||||
}
|
||||
|
||||
(StatusCode::CREATED, Json(entry))
|
||||
}
|
||||
|
||||
/// PUT /api/schedules/:id — Update a scheduled job (toggle enabled, edit fields).
|
||||
/// PUT /api/schedules/:id — Update a scheduled job.
|
||||
///
|
||||
/// Supports toggling `enabled`. Other fields are not mutable without a
|
||||
/// delete+recreate (the underlying cron scheduler does not expose in-place
|
||||
/// edits); those are accepted but ignored with a note in the response.
|
||||
pub async fn update_schedule(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(id): Path<String>,
|
||||
Json(req): Json<serde_json::Value>,
|
||||
) -> impl IntoResponse {
|
||||
let shared_id = schedule_shared_agent_id();
|
||||
let mut schedules: Vec<serde_json::Value> =
|
||||
match state.kernel.memory.structured_get(shared_id, SCHEDULES_KEY) {
|
||||
Ok(Some(serde_json::Value::Array(arr))) => arr,
|
||||
_ => Vec::new(),
|
||||
};
|
||||
|
||||
let mut found = false;
|
||||
for s in schedules.iter_mut() {
|
||||
if s["id"].as_str() == Some(&id) {
|
||||
found = true;
|
||||
if let Some(enabled) = req.get("enabled").and_then(|v| v.as_bool()) {
|
||||
s["enabled"] = serde_json::Value::Bool(enabled);
|
||||
}
|
||||
if let Some(name) = req.get("name").and_then(|v| v.as_str()) {
|
||||
s["name"] = serde_json::Value::String(name.to_string());
|
||||
}
|
||||
if let Some(cron) = req.get("cron").and_then(|v| v.as_str()) {
|
||||
let cron_parts: Vec<&str> = cron.split_whitespace().collect();
|
||||
if cron_parts.len() != 5 {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"error": "Invalid cron expression"})),
|
||||
);
|
||||
}
|
||||
s["cron"] = serde_json::Value::String(cron.to_string());
|
||||
}
|
||||
if let Some(agent_id) = req.get("agent_id").and_then(|v| v.as_str()) {
|
||||
s["agent_id"] = serde_json::Value::String(agent_id.to_string());
|
||||
}
|
||||
if let Some(message) = req.get("message").and_then(|v| v.as_str()) {
|
||||
s["message"] = serde_json::Value::String(message.to_string());
|
||||
}
|
||||
break;
|
||||
let uuid = match uuid::Uuid::parse_str(&id) {
|
||||
Ok(u) => u,
|
||||
Err(_) => {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"error": "Invalid schedule id"})),
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
let cj_id = openfang_types::scheduler::CronJobId(uuid);
|
||||
|
||||
if !found {
|
||||
if state.kernel.cron_scheduler.get_job(cj_id).is_none() {
|
||||
return (
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(serde_json::json!({"error": "Schedule not found"})),
|
||||
);
|
||||
}
|
||||
|
||||
if let Err(e) = state.kernel.memory.structured_set(
|
||||
shared_id,
|
||||
SCHEDULES_KEY,
|
||||
serde_json::Value::Array(schedules),
|
||||
) {
|
||||
return (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": format!("Failed to update schedule: {e}")})),
|
||||
);
|
||||
let mut note: Option<&'static str> = None;
|
||||
if let Some(enabled) = req.get("enabled").and_then(|v| v.as_bool()) {
|
||||
if let Err(e) = state.kernel.cron_scheduler.set_enabled(cj_id, enabled) {
|
||||
return (
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(serde_json::json!({"error": format!("{e}")})),
|
||||
);
|
||||
}
|
||||
let _ = state.kernel.cron_scheduler.persist();
|
||||
}
|
||||
if req.get("name").is_some()
|
||||
|| req.get("cron").is_some()
|
||||
|| req.get("agent_id").is_some()
|
||||
|| req.get("message").is_some()
|
||||
{
|
||||
note = Some("Only 'enabled' is mutable; delete and recreate to change other fields.");
|
||||
}
|
||||
|
||||
(
|
||||
StatusCode::OK,
|
||||
Json(serde_json::json!({"status": "updated", "schedule_id": id})),
|
||||
)
|
||||
let mut body = serde_json::json!({"status": "updated", "schedule_id": id});
|
||||
if let Some(n) = note {
|
||||
body["note"] = serde_json::Value::String(n.to_string());
|
||||
}
|
||||
(StatusCode::OK, Json(body))
|
||||
}
|
||||
|
||||
/// DELETE /api/schedules/:id — Remove a scheduled job.
|
||||
/// DELETE /api/schedules/:id — Remove a scheduled job from the cron scheduler.
|
||||
pub async fn delete_schedule(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(id): Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
let shared_id = schedule_shared_agent_id();
|
||||
let mut schedules: Vec<serde_json::Value> =
|
||||
match state.kernel.memory.structured_get(shared_id, SCHEDULES_KEY) {
|
||||
Ok(Some(serde_json::Value::Array(arr))) => arr,
|
||||
_ => Vec::new(),
|
||||
};
|
||||
|
||||
let before = schedules.len();
|
||||
schedules.retain(|s| s["id"].as_str() != Some(&id));
|
||||
|
||||
if schedules.len() == before {
|
||||
return (
|
||||
let uuid = match uuid::Uuid::parse_str(&id) {
|
||||
Ok(u) => u,
|
||||
Err(_) => {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"error": "Invalid schedule id"})),
|
||||
);
|
||||
}
|
||||
};
|
||||
let cj_id = openfang_types::scheduler::CronJobId(uuid);
|
||||
match state.kernel.cron_scheduler.remove_job(cj_id) {
|
||||
Ok(_) => {
|
||||
let _ = state.kernel.cron_scheduler.persist();
|
||||
(
|
||||
StatusCode::OK,
|
||||
Json(serde_json::json!({"status": "removed", "schedule_id": id})),
|
||||
)
|
||||
}
|
||||
Err(_) => (
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(serde_json::json!({"error": "Schedule not found"})),
|
||||
);
|
||||
),
|
||||
}
|
||||
|
||||
if let Err(e) = state.kernel.memory.structured_set(
|
||||
shared_id,
|
||||
SCHEDULES_KEY,
|
||||
serde_json::Value::Array(schedules),
|
||||
) {
|
||||
return (
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Json(serde_json::json!({"error": format!("Failed to delete schedule: {e}")})),
|
||||
);
|
||||
}
|
||||
|
||||
(
|
||||
StatusCode::OK,
|
||||
Json(serde_json::json!({"status": "removed", "schedule_id": id})),
|
||||
)
|
||||
}
|
||||
|
||||
/// POST /api/schedules/:id/run — Manually run a scheduled job now.
|
||||
@@ -8559,101 +8601,41 @@ pub async fn run_schedule(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path(id): Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
let shared_id = schedule_shared_agent_id();
|
||||
let schedules: Vec<serde_json::Value> =
|
||||
match state.kernel.memory.structured_get(shared_id, SCHEDULES_KEY) {
|
||||
Ok(Some(serde_json::Value::Array(arr))) => arr,
|
||||
_ => Vec::new(),
|
||||
};
|
||||
|
||||
let schedule = match schedules.iter().find(|s| s["id"].as_str() == Some(&id)) {
|
||||
Some(s) => s.clone(),
|
||||
None => {
|
||||
let uuid = match uuid::Uuid::parse_str(&id) {
|
||||
Ok(u) => u,
|
||||
Err(_) => {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"error": "Invalid schedule id"})),
|
||||
);
|
||||
}
|
||||
};
|
||||
let cj_id = openfang_types::scheduler::CronJobId(uuid);
|
||||
let job = match state.kernel.cron_scheduler.try_claim_for_run(cj_id) {
|
||||
Ok(j) => j,
|
||||
Err(openfang_kernel::cron::ClaimError::NotFound) => {
|
||||
return (
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(serde_json::json!({"error": "Schedule not found"})),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let agent_id_str = schedule["agent_id"].as_str().unwrap_or("");
|
||||
let message = schedule["message"]
|
||||
.as_str()
|
||||
.unwrap_or("Scheduled task triggered manually.");
|
||||
let name = schedule["name"].as_str().unwrap_or("(unnamed)");
|
||||
|
||||
// Find the target agent — require explicit agent_id, no silent fallback
|
||||
let target_agent = if !agent_id_str.is_empty() {
|
||||
if let Ok(aid) = agent_id_str.parse::<AgentId>() {
|
||||
if state.kernel.registry.get(aid).is_some() {
|
||||
Some(aid)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
state
|
||||
.kernel
|
||||
.registry
|
||||
.list()
|
||||
.iter()
|
||||
.find(|a| a.name == agent_id_str)
|
||||
.map(|a| a.id)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let target_agent = match target_agent {
|
||||
Some(a) => a,
|
||||
None => {
|
||||
Err(openfang_kernel::cron::ClaimError::Disabled) => {
|
||||
return (
|
||||
StatusCode::NOT_FOUND,
|
||||
Json(
|
||||
serde_json::json!({"error": "No target agent found. Specify an agent_id or start an agent first."}),
|
||||
),
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(serde_json::json!({"error": "Schedule is disabled"})),
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
let run_message = if message.is_empty() {
|
||||
format!("[Scheduled task '{}' triggered manually]", name)
|
||||
} else {
|
||||
message.to_string()
|
||||
};
|
||||
|
||||
// Update last_run and run_count
|
||||
let mut schedules_updated: Vec<serde_json::Value> =
|
||||
match state.kernel.memory.structured_get(shared_id, SCHEDULES_KEY) {
|
||||
Ok(Some(serde_json::Value::Array(arr))) => arr,
|
||||
_ => Vec::new(),
|
||||
};
|
||||
for s in schedules_updated.iter_mut() {
|
||||
if s["id"].as_str() == Some(&id) {
|
||||
s["last_run"] = serde_json::Value::String(chrono::Utc::now().to_rfc3339());
|
||||
let count = s["run_count"].as_u64().unwrap_or(0);
|
||||
s["run_count"] = serde_json::json!(count + 1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
let _ = state.kernel.memory.structured_set(
|
||||
shared_id,
|
||||
SCHEDULES_KEY,
|
||||
serde_json::Value::Array(schedules_updated),
|
||||
);
|
||||
|
||||
let kernel_handle: Arc<dyn KernelHandle> = state.kernel.clone() as Arc<dyn KernelHandle>;
|
||||
match state
|
||||
.kernel
|
||||
.send_message_with_handle(target_agent, &run_message, Some(kernel_handle), None, None)
|
||||
.await
|
||||
{
|
||||
Ok(result) => (
|
||||
let agent_id_str = job.agent_id.to_string();
|
||||
match state.kernel.cron_run_job(&job).await {
|
||||
Ok(response) => (
|
||||
StatusCode::OK,
|
||||
Json(serde_json::json!({
|
||||
"status": "completed",
|
||||
"schedule_id": id,
|
||||
"agent_id": target_agent.to_string(),
|
||||
"response": result.response,
|
||||
"agent_id": agent_id_str,
|
||||
"response": response,
|
||||
})),
|
||||
),
|
||||
Err(e) => (
|
||||
@@ -8667,6 +8649,26 @@ pub async fn run_schedule(
|
||||
}
|
||||
}
|
||||
|
||||
/// Sanitize a user-supplied schedule name into a valid `CronJob.name`.
|
||||
/// Matches the kernel's own sanitizer used by the migration path.
|
||||
fn sanitize_schedule_job_name(raw: &str) -> String {
|
||||
let filtered: String = raw
|
||||
.chars()
|
||||
.map(|c| {
|
||||
if c.is_alphanumeric() || c == ' ' || c == '-' || c == '_' {
|
||||
c
|
||||
} else {
|
||||
'-'
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let trimmed = filtered.trim();
|
||||
if trimmed.is_empty() {
|
||||
return "schedule".to_string();
|
||||
}
|
||||
trimmed.chars().take(128).collect()
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Agent Identity endpoint
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -4287,6 +4287,11 @@ impl OpenFangKernel {
|
||||
}
|
||||
}
|
||||
|
||||
// One-shot migration of legacy shared-memory `__openfang_schedules`
|
||||
// entries (from the old broken `schedule_create` path) into the real
|
||||
// cron scheduler. Idempotent via a marker key.
|
||||
self.migrate_shared_memory_schedules();
|
||||
|
||||
// Cron scheduler tick loop — fires due jobs every 15 seconds
|
||||
{
|
||||
let kernel = Arc::clone(self);
|
||||
@@ -4649,6 +4654,192 @@ impl OpenFangKernel {
|
||||
});
|
||||
}
|
||||
|
||||
/// Migrate legacy `__openfang_schedules` shared-memory entries into the
|
||||
/// real cron scheduler.
|
||||
///
|
||||
/// The old `schedule_create` tool and `/api/schedules` POST route wrote
|
||||
/// to a shared-memory key that no executor ever read — so jobs registered
|
||||
/// that way never fired (#1069). This migration runs once at startup, is
|
||||
/// idempotent via a marker key, and leaves an empty array behind so the
|
||||
/// old key is no longer written to.
|
||||
///
|
||||
/// Entries with unresolved target agents are skipped (logged at warn
|
||||
/// level). Successfully migrated entries are added to the cron scheduler
|
||||
/// and the scheduler is persisted.
|
||||
pub(crate) fn migrate_shared_memory_schedules(&self) {
|
||||
const LEGACY_KEY: &str = "__openfang_schedules";
|
||||
const MARKER_KEY: &str = "__openfang_schedules_migrated_v1";
|
||||
|
||||
let shared = shared_memory_agent_id();
|
||||
|
||||
// Idempotency: if marker is already set, don't re-read.
|
||||
if let Ok(Some(serde_json::Value::Bool(true))) =
|
||||
self.memory.structured_get(shared, MARKER_KEY)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
let entries: Vec<serde_json::Value> = match self.memory.structured_get(shared, LEGACY_KEY) {
|
||||
Ok(Some(serde_json::Value::Array(arr))) => arr,
|
||||
Ok(_) => {
|
||||
// No entries ever written. Mark as migrated and exit.
|
||||
let _ =
|
||||
self.memory
|
||||
.structured_set(shared, MARKER_KEY, serde_json::Value::Bool(true));
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Schedule migration: failed to read legacy key: {e}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if entries.is_empty() {
|
||||
let _ = self
|
||||
.memory
|
||||
.structured_set(shared, MARKER_KEY, serde_json::Value::Bool(true));
|
||||
return;
|
||||
}
|
||||
|
||||
let mut migrated = 0usize;
|
||||
let mut skipped = 0usize;
|
||||
|
||||
for entry in &entries {
|
||||
match self.migrate_single_schedule_entry(entry) {
|
||||
Ok(()) => migrated += 1,
|
||||
Err(reason) => {
|
||||
skipped += 1;
|
||||
warn!(
|
||||
reason = %reason,
|
||||
entry = %entry,
|
||||
"Schedule migration: skipping legacy entry"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
migrated,
|
||||
skipped,
|
||||
total = entries.len(),
|
||||
"Migrated legacy __openfang_schedules entries to cron scheduler"
|
||||
);
|
||||
|
||||
// Clear the legacy key (store an empty array) and mark migrated so
|
||||
// the old location is never written to again.
|
||||
if let Err(e) =
|
||||
self.memory
|
||||
.structured_set(shared, LEGACY_KEY, serde_json::Value::Array(Vec::new()))
|
||||
{
|
||||
warn!("Schedule migration: failed to clear legacy key: {e}");
|
||||
}
|
||||
if let Err(e) =
|
||||
self.memory
|
||||
.structured_set(shared, MARKER_KEY, serde_json::Value::Bool(true))
|
||||
{
|
||||
warn!("Schedule migration: failed to set marker: {e}");
|
||||
}
|
||||
|
||||
if migrated > 0 {
|
||||
if let Err(e) = self.cron_scheduler.persist() {
|
||||
warn!("Schedule migration: cron persist failed: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert a single legacy schedule entry into a `CronJob` and add it to
|
||||
/// the cron scheduler. Returns `Err` with a human-readable reason when
|
||||
/// the entry cannot be migrated (so the caller can log and skip).
|
||||
fn migrate_single_schedule_entry(&self, entry: &serde_json::Value) -> Result<(), String> {
|
||||
use openfang_types::scheduler::{
|
||||
CronAction, CronDelivery, CronJob, CronJobId, CronSchedule,
|
||||
};
|
||||
|
||||
let cron_expr = entry["cron"]
|
||||
.as_str()
|
||||
.ok_or_else(|| "missing 'cron' field".to_string())?
|
||||
.trim()
|
||||
.to_string();
|
||||
if cron_expr.is_empty() {
|
||||
return Err("empty cron expression".to_string());
|
||||
}
|
||||
|
||||
// Resolve target agent. Tool-shape uses `agent` (name or UUID);
|
||||
// HTTP-shape uses `agent_id` (UUID or name). Try both.
|
||||
let agent_hint = entry["agent_id"]
|
||||
.as_str()
|
||||
.filter(|s| !s.is_empty())
|
||||
.or_else(|| entry["agent"].as_str())
|
||||
.unwrap_or("")
|
||||
.trim()
|
||||
.to_string();
|
||||
|
||||
let target_agent = if agent_hint.is_empty() {
|
||||
return Err("no target agent specified".to_string());
|
||||
} else if let Ok(uuid) = uuid::Uuid::parse_str(&agent_hint) {
|
||||
let aid = AgentId(uuid);
|
||||
if self.registry.get(aid).is_none() {
|
||||
return Err(format!("agent {agent_hint} not in registry"));
|
||||
}
|
||||
aid
|
||||
} else {
|
||||
let found = self
|
||||
.registry
|
||||
.list()
|
||||
.into_iter()
|
||||
.find(|a| a.name == agent_hint);
|
||||
match found {
|
||||
Some(a) => a.id,
|
||||
None => return Err(format!("agent '{agent_hint}' not found")),
|
||||
}
|
||||
};
|
||||
|
||||
// Message for the agent turn: prefer explicit `message`, fallback to
|
||||
// `description` (tool shape), else a default string.
|
||||
let message = entry["message"]
|
||||
.as_str()
|
||||
.filter(|s| !s.is_empty())
|
||||
.or_else(|| entry["description"].as_str())
|
||||
.unwrap_or("Scheduled task")
|
||||
.to_string();
|
||||
|
||||
// Job name: prefer `name`, else sanitize description, else a default.
|
||||
let raw_name = entry["name"]
|
||||
.as_str()
|
||||
.filter(|s| !s.is_empty())
|
||||
.or_else(|| entry["description"].as_str())
|
||||
.unwrap_or("migrated-schedule")
|
||||
.to_string();
|
||||
let name = sanitize_cron_job_name(&raw_name);
|
||||
|
||||
let enabled = entry["enabled"].as_bool().unwrap_or(true);
|
||||
|
||||
let job = CronJob {
|
||||
id: CronJobId::new(),
|
||||
agent_id: target_agent,
|
||||
name,
|
||||
enabled,
|
||||
schedule: CronSchedule::Cron {
|
||||
expr: cron_expr,
|
||||
tz: None,
|
||||
},
|
||||
action: CronAction::AgentTurn {
|
||||
message,
|
||||
model_override: None,
|
||||
timeout_secs: None,
|
||||
},
|
||||
delivery: CronDelivery::None,
|
||||
created_at: chrono::Utc::now(),
|
||||
last_run: None,
|
||||
next_run: None,
|
||||
};
|
||||
|
||||
self.cron_scheduler
|
||||
.add_job(job, false)
|
||||
.map_err(|e| format!("add_job failed: {e}"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Gracefully shutdown the kernel.
|
||||
///
|
||||
/// This cleanly shuts down in-memory state but preserves persistent agent
|
||||
@@ -6018,6 +6209,31 @@ pub fn shared_memory_agent_id() -> AgentId {
|
||||
]))
|
||||
}
|
||||
|
||||
/// Sanitize a human-readable string into a valid `CronJob.name`.
|
||||
///
|
||||
/// `CronJob::validate` requires the name to be 1..=128 chars and composed
|
||||
/// of alphanumeric, space, hyphen, and underscore characters only. This is
|
||||
/// used by the legacy schedule migration path where the source "name" may
|
||||
/// contain punctuation or be too long.
|
||||
fn sanitize_cron_job_name(raw: &str) -> String {
|
||||
let filtered: String = raw
|
||||
.chars()
|
||||
.map(|c| {
|
||||
if c.is_alphanumeric() || c == ' ' || c == '-' || c == '_' {
|
||||
c
|
||||
} else {
|
||||
'-'
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let trimmed = filtered.trim();
|
||||
if trimmed.is_empty() {
|
||||
return "migrated-schedule".to_string();
|
||||
}
|
||||
let truncated: String = trimmed.chars().take(128).collect();
|
||||
truncated
|
||||
}
|
||||
|
||||
/// Deliver a cron job's agent response to the configured delivery target.
|
||||
async fn cron_deliver_response(
|
||||
kernel: &OpenFangKernel,
|
||||
@@ -7099,4 +7315,228 @@ mod tests {
|
||||
|
||||
kernel.shutdown();
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------
|
||||
// Issue #1069: sanitize_cron_job_name + shared-memory schedule migration
|
||||
// ----------------------------------------------------------------------
|
||||
|
||||
#[test]
|
||||
fn test_sanitize_cron_job_name_basic() {
|
||||
assert_eq!(super::sanitize_cron_job_name("hello"), "hello");
|
||||
assert_eq!(super::sanitize_cron_job_name("hello world"), "hello world");
|
||||
assert_eq!(super::sanitize_cron_job_name("job_name-1"), "job_name-1");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sanitize_cron_job_name_strips_punctuation() {
|
||||
let out = super::sanitize_cron_job_name("Remind me: report!!");
|
||||
assert!(!out.contains(':'));
|
||||
assert!(!out.contains('!'));
|
||||
assert!(out
|
||||
.chars()
|
||||
.all(|c| c.is_alphanumeric() || c == ' ' || c == '-' || c == '_'));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sanitize_cron_job_name_empty_fallback() {
|
||||
assert_eq!(super::sanitize_cron_job_name(""), "migrated-schedule");
|
||||
assert_eq!(super::sanitize_cron_job_name(" "), "migrated-schedule");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sanitize_cron_job_name_caps_128_chars() {
|
||||
let long = "x".repeat(500);
|
||||
let out = super::sanitize_cron_job_name(&long);
|
||||
assert!(out.chars().count() <= 128);
|
||||
}
|
||||
|
||||
/// Register a minimal test agent in a booted kernel and return its ID.
|
||||
/// Kept local to the tests module to avoid widening the kernel's public
|
||||
/// surface.
|
||||
fn register_test_agent(kernel: &OpenFangKernel, name: &str) -> AgentId {
|
||||
let agent_id = AgentId::new();
|
||||
let entry = AgentEntry {
|
||||
id: agent_id,
|
||||
name: name.to_string(),
|
||||
manifest: test_manifest(name, "migration test", vec![]),
|
||||
state: AgentState::Running,
|
||||
mode: AgentMode::default(),
|
||||
created_at: chrono::Utc::now(),
|
||||
last_active: chrono::Utc::now(),
|
||||
parent: None,
|
||||
children: vec![],
|
||||
session_id: SessionId::new(),
|
||||
tags: vec![],
|
||||
identity: Default::default(),
|
||||
onboarding_completed: false,
|
||||
onboarding_completed_at: None,
|
||||
};
|
||||
kernel.registry.register(entry).unwrap();
|
||||
agent_id
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_migrate_shared_memory_schedules_imports_legacy_entries() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let home_dir = tmp.path().join("openfang-migrate");
|
||||
std::fs::create_dir_all(&home_dir).unwrap();
|
||||
let config = KernelConfig {
|
||||
home_dir: home_dir.clone(),
|
||||
data_dir: home_dir.join("data"),
|
||||
..KernelConfig::default()
|
||||
};
|
||||
|
||||
let kernel = OpenFangKernel::boot_with_config(config).expect("kernel boots");
|
||||
|
||||
// Register a target agent the legacy entries can point at.
|
||||
let agent = register_test_agent(&kernel, "report-agent");
|
||||
|
||||
// Pre-populate the legacy shared-memory key with two entries in the
|
||||
// two shapes that actually shipped: (a) tool-shape (description +
|
||||
// agent name) and (b) HTTP-shape (name + agent_id UUID).
|
||||
let shared = super::shared_memory_agent_id();
|
||||
let legacy_entries = serde_json::json!([
|
||||
{
|
||||
"description": "Send the daily report",
|
||||
"cron": "0 9 * * *",
|
||||
"agent": "report-agent",
|
||||
},
|
||||
{
|
||||
"name": "weekly/summary: monday!",
|
||||
"message": "Post the weekly summary",
|
||||
"cron": "0 10 * * 1",
|
||||
"agent_id": agent.0.to_string(),
|
||||
},
|
||||
]);
|
||||
kernel
|
||||
.memory
|
||||
.structured_set(shared, "__openfang_schedules", legacy_entries)
|
||||
.unwrap();
|
||||
|
||||
// Sanity: before migration, the cron scheduler is empty.
|
||||
assert_eq!(kernel.cron_scheduler.total_jobs(), 0);
|
||||
|
||||
kernel.migrate_shared_memory_schedules();
|
||||
|
||||
// Both legacy entries should now live in the cron scheduler.
|
||||
let jobs = kernel.cron_scheduler.list_jobs(agent);
|
||||
assert_eq!(jobs.len(), 2, "both legacy entries should migrate");
|
||||
|
||||
let names: Vec<&str> = jobs.iter().map(|j| j.name.as_str()).collect();
|
||||
assert!(names.iter().any(|n| n.contains("Send the daily report")));
|
||||
// Punctuation in the second entry's name is sanitized to hyphens.
|
||||
assert!(
|
||||
names.iter().any(|n| !n.contains('/') && !n.contains(':')),
|
||||
"sanitized name must not contain '/' or ':' ({names:?})"
|
||||
);
|
||||
|
||||
// The legacy key is cleared and the marker is set so we never read
|
||||
// it again.
|
||||
let remaining = kernel
|
||||
.memory
|
||||
.structured_get(shared, "__openfang_schedules")
|
||||
.unwrap();
|
||||
assert_eq!(remaining, Some(serde_json::Value::Array(vec![])));
|
||||
let marker = kernel
|
||||
.memory
|
||||
.structured_get(shared, "__openfang_schedules_migrated_v1")
|
||||
.unwrap();
|
||||
assert_eq!(marker, Some(serde_json::Value::Bool(true)));
|
||||
|
||||
kernel.shutdown();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_migrate_shared_memory_schedules_is_idempotent() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let home_dir = tmp.path().join("openfang-migrate-idem");
|
||||
std::fs::create_dir_all(&home_dir).unwrap();
|
||||
let config = KernelConfig {
|
||||
home_dir: home_dir.clone(),
|
||||
data_dir: home_dir.join("data"),
|
||||
..KernelConfig::default()
|
||||
};
|
||||
let kernel = OpenFangKernel::boot_with_config(config).expect("kernel boots");
|
||||
let agent = register_test_agent(&kernel, "idem-agent");
|
||||
let shared = super::shared_memory_agent_id();
|
||||
|
||||
kernel
|
||||
.memory
|
||||
.structured_set(
|
||||
shared,
|
||||
"__openfang_schedules",
|
||||
serde_json::json!([{
|
||||
"description": "Ping",
|
||||
"cron": "*/5 * * * *",
|
||||
"agent_id": agent.0.to_string(),
|
||||
}]),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
kernel.migrate_shared_memory_schedules();
|
||||
assert_eq!(kernel.cron_scheduler.list_jobs(agent).len(), 1);
|
||||
|
||||
// Second call must not re-import anything even if someone re-writes
|
||||
// the legacy key by accident; the marker gates us.
|
||||
kernel
|
||||
.memory
|
||||
.structured_set(
|
||||
shared,
|
||||
"__openfang_schedules",
|
||||
serde_json::json!([{
|
||||
"description": "Ping again",
|
||||
"cron": "*/5 * * * *",
|
||||
"agent_id": agent.0.to_string(),
|
||||
}]),
|
||||
)
|
||||
.unwrap();
|
||||
kernel.migrate_shared_memory_schedules();
|
||||
assert_eq!(
|
||||
kernel.cron_scheduler.list_jobs(agent).len(),
|
||||
1,
|
||||
"migration must be idempotent via the marker key"
|
||||
);
|
||||
|
||||
kernel.shutdown();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_migrate_shared_memory_schedules_skips_unknown_agent() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let home_dir = tmp.path().join("openfang-migrate-skip");
|
||||
std::fs::create_dir_all(&home_dir).unwrap();
|
||||
let config = KernelConfig {
|
||||
home_dir: home_dir.clone(),
|
||||
data_dir: home_dir.join("data"),
|
||||
..KernelConfig::default()
|
||||
};
|
||||
let kernel = OpenFangKernel::boot_with_config(config).expect("kernel boots");
|
||||
let shared = super::shared_memory_agent_id();
|
||||
|
||||
// Entry references an agent that does not exist in the registry.
|
||||
kernel
|
||||
.memory
|
||||
.structured_set(
|
||||
shared,
|
||||
"__openfang_schedules",
|
||||
serde_json::json!([{
|
||||
"description": "Ping",
|
||||
"cron": "*/5 * * * *",
|
||||
"agent": "does-not-exist",
|
||||
}]),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
kernel.migrate_shared_memory_schedules();
|
||||
|
||||
// Nothing migrated, but the marker is still set so we don't retry.
|
||||
assert_eq!(kernel.cron_scheduler.total_jobs(), 0);
|
||||
let marker = kernel
|
||||
.memory
|
||||
.structured_get(shared, "__openfang_schedules_migrated_v1")
|
||||
.unwrap();
|
||||
assert_eq!(marker, Some(serde_json::Value::Bool(true)));
|
||||
|
||||
kernel.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -313,8 +313,8 @@ pub async fn execute_tool(
|
||||
"event_publish" => tool_event_publish(input, kernel).await,
|
||||
|
||||
// Scheduling tools
|
||||
"schedule_create" => tool_schedule_create(input, kernel).await,
|
||||
"schedule_list" => tool_schedule_list(kernel).await,
|
||||
"schedule_create" => tool_schedule_create(input, kernel, caller_agent_id).await,
|
||||
"schedule_list" => tool_schedule_list(kernel, caller_agent_id).await,
|
||||
"schedule_delete" => tool_schedule_delete(input, kernel).await,
|
||||
|
||||
// Knowledge graph tools
|
||||
@@ -2091,11 +2091,65 @@ fn parse_time_to_hour(s: &str) -> Result<u32, String> {
|
||||
Ok(hour)
|
||||
}
|
||||
|
||||
const SCHEDULES_KEY: &str = "__openfang_schedules";
|
||||
/// Sanitize a description into a valid `CronJob.name` (alphanumeric +
|
||||
/// space/hyphen/underscore, 1..=128 chars).
|
||||
fn sanitize_schedule_name(description: &str) -> String {
|
||||
let filtered: String = description
|
||||
.chars()
|
||||
.map(|c| {
|
||||
if c.is_alphanumeric() || c == ' ' || c == '-' || c == '_' {
|
||||
c
|
||||
} else {
|
||||
'-'
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let trimmed = filtered.trim();
|
||||
if trimmed.is_empty() {
|
||||
return "scheduled-task".to_string();
|
||||
}
|
||||
trimmed.chars().take(128).collect()
|
||||
}
|
||||
|
||||
/// Resolve the `agent` field of `schedule_create` into an agent UUID string
|
||||
/// suitable for `KernelHandle::cron_create`.
|
||||
///
|
||||
/// - Empty / "self" → caller's agent ID.
|
||||
/// - Valid UUID → passed through.
|
||||
/// - Non-empty name → looked up via `find_agents`; an exact name match wins,
|
||||
/// a single fuzzy match is accepted, ambiguity is an error.
|
||||
fn resolve_schedule_target(
|
||||
kh: &Arc<dyn KernelHandle>,
|
||||
agent: &str,
|
||||
caller_agent_id: Option<&str>,
|
||||
) -> Result<String, String> {
|
||||
let a = agent.trim();
|
||||
if a.is_empty() || a.eq_ignore_ascii_case("self") {
|
||||
return caller_agent_id.map(|s| s.to_string()).ok_or_else(|| {
|
||||
"No caller agent available; specify 'agent' to target an agent by name or UUID"
|
||||
.to_string()
|
||||
});
|
||||
}
|
||||
if uuid::Uuid::parse_str(a).is_ok() {
|
||||
return Ok(a.to_string());
|
||||
}
|
||||
let matches = kh.find_agents(a);
|
||||
if let Some(m) = matches.iter().find(|m| m.name == a) {
|
||||
return Ok(m.id.clone());
|
||||
}
|
||||
match matches.len() {
|
||||
0 => Err(format!("Agent '{a}' not found")),
|
||||
1 => Ok(matches[0].id.clone()),
|
||||
n => Err(format!(
|
||||
"Agent name '{a}' is ambiguous ({n} matches). Pass the agent UUID."
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn tool_schedule_create(
|
||||
input: &serde_json::Value,
|
||||
kernel: Option<&Arc<dyn KernelHandle>>,
|
||||
caller_agent_id: Option<&str>,
|
||||
) -> Result<String, String> {
|
||||
let kh = require_kernel(kernel)?;
|
||||
let description = input["description"]
|
||||
@@ -2104,58 +2158,77 @@ async fn tool_schedule_create(
|
||||
let schedule_str = input["schedule"]
|
||||
.as_str()
|
||||
.ok_or("Missing 'schedule' parameter")?;
|
||||
let agent = input["agent"].as_str().unwrap_or("");
|
||||
let agent_input = input["agent"].as_str().unwrap_or("");
|
||||
|
||||
let cron_expr = parse_schedule_to_cron(schedule_str)?;
|
||||
let schedule_id = uuid::Uuid::new_v4().to_string();
|
||||
let target_agent_id = resolve_schedule_target(kh, agent_input, caller_agent_id)?;
|
||||
let name = sanitize_schedule_name(description);
|
||||
|
||||
let entry = serde_json::json!({
|
||||
"id": schedule_id,
|
||||
"description": description,
|
||||
"schedule_input": schedule_str,
|
||||
"cron": cron_expr,
|
||||
"agent": agent,
|
||||
"created_at": chrono::Utc::now().to_rfc3339(),
|
||||
"enabled": true,
|
||||
let job_json = serde_json::json!({
|
||||
"name": name,
|
||||
"schedule": { "kind": "cron", "expr": cron_expr, "tz": null },
|
||||
"action": {
|
||||
"kind": "agent_turn",
|
||||
"message": description,
|
||||
"model_override": null,
|
||||
"timeout_secs": null,
|
||||
},
|
||||
"delivery": { "kind": "none" },
|
||||
"one_shot": false,
|
||||
});
|
||||
|
||||
// Load existing schedules from shared memory
|
||||
let mut schedules: Vec<serde_json::Value> = match kh.memory_recall(SCHEDULES_KEY)? {
|
||||
Some(serde_json::Value::Array(arr)) => arr,
|
||||
_ => Vec::new(),
|
||||
let resp = kh.cron_create(&target_agent_id, job_json).await?;
|
||||
// Kernel returns JSON `{ "job_id": "...", "status": "created" }`.
|
||||
let job_id = serde_json::from_str::<serde_json::Value>(&resp)
|
||||
.ok()
|
||||
.and_then(|v| v["job_id"].as_str().map(str::to_string))
|
||||
.unwrap_or_else(|| resp.clone());
|
||||
|
||||
let agent_display = if agent_input.trim().is_empty() {
|
||||
"(self)".to_string()
|
||||
} else {
|
||||
agent_input.to_string()
|
||||
};
|
||||
|
||||
schedules.push(entry);
|
||||
kh.memory_store(SCHEDULES_KEY, serde_json::Value::Array(schedules))?;
|
||||
|
||||
Ok(format!(
|
||||
"Schedule created:\n ID: {schedule_id}\n Description: {description}\n Cron: {cron_expr}\n Original: {schedule_str}"
|
||||
"Schedule created:\n ID: {job_id}\n Description: {description}\n Cron: {cron_expr}\n Original: {schedule_str}\n Agent: {agent_display}"
|
||||
))
|
||||
}
|
||||
|
||||
async fn tool_schedule_list(kernel: Option<&Arc<dyn KernelHandle>>) -> Result<String, String> {
|
||||
async fn tool_schedule_list(
|
||||
kernel: Option<&Arc<dyn KernelHandle>>,
|
||||
caller_agent_id: Option<&str>,
|
||||
) -> Result<String, String> {
|
||||
let kh = require_kernel(kernel)?;
|
||||
let agent_id =
|
||||
caller_agent_id.ok_or("Agent ID required for schedule_list (no caller context)")?;
|
||||
|
||||
let schedules: Vec<serde_json::Value> = match kh.memory_recall(SCHEDULES_KEY)? {
|
||||
Some(serde_json::Value::Array(arr)) => arr,
|
||||
_ => Vec::new(),
|
||||
};
|
||||
|
||||
if schedules.is_empty() {
|
||||
let jobs = kh.cron_list(agent_id).await?;
|
||||
if jobs.is_empty() {
|
||||
return Ok("No scheduled tasks.".to_string());
|
||||
}
|
||||
|
||||
let mut output = format!("Scheduled tasks ({}):\n\n", schedules.len());
|
||||
for s in &schedules {
|
||||
let enabled = s["enabled"].as_bool().unwrap_or(true);
|
||||
let mut output = format!("Scheduled tasks ({}):\n\n", jobs.len());
|
||||
for job in &jobs {
|
||||
let enabled = job["enabled"].as_bool().unwrap_or(true);
|
||||
let status = if enabled { "active" } else { "paused" };
|
||||
let id = job["id"].as_str().unwrap_or("?");
|
||||
let schedule_display = match job["schedule"]["kind"].as_str() {
|
||||
Some("cron") => job["schedule"]["expr"].as_str().unwrap_or("?").to_string(),
|
||||
Some("every") => format!(
|
||||
"every {}s",
|
||||
job["schedule"]["every_secs"].as_u64().unwrap_or(0)
|
||||
),
|
||||
Some("at") => job["schedule"]["at"].as_str().unwrap_or("?").to_string(),
|
||||
_ => "?".to_string(),
|
||||
};
|
||||
let description = job["action"]["message"]
|
||||
.as_str()
|
||||
.or_else(|| job["action"]["text"].as_str())
|
||||
.unwrap_or_else(|| job["name"].as_str().unwrap_or("?"));
|
||||
let created = job["created_at"].as_str().unwrap_or("?");
|
||||
let agent = job["agent_id"].as_str().unwrap_or("(self)");
|
||||
output.push_str(&format!(
|
||||
" [{status}] {} — {}\n Cron: {} | Agent: {}\n Created: {}\n\n",
|
||||
s["id"].as_str().unwrap_or("?"),
|
||||
s["description"].as_str().unwrap_or("?"),
|
||||
s["cron"].as_str().unwrap_or("?"),
|
||||
s["agent"].as_str().unwrap_or("(self)"),
|
||||
s["created_at"].as_str().unwrap_or("?"),
|
||||
" [{status}] {id} — {description}\n Cron: {schedule_display} | Agent: {agent}\n Created: {created}\n\n"
|
||||
));
|
||||
}
|
||||
|
||||
@@ -2168,20 +2241,9 @@ async fn tool_schedule_delete(
|
||||
) -> Result<String, String> {
|
||||
let kh = require_kernel(kernel)?;
|
||||
let id = input["id"].as_str().ok_or("Missing 'id' parameter")?;
|
||||
|
||||
let mut schedules: Vec<serde_json::Value> = match kh.memory_recall(SCHEDULES_KEY)? {
|
||||
Some(serde_json::Value::Array(arr)) => arr,
|
||||
_ => Vec::new(),
|
||||
};
|
||||
|
||||
let before = schedules.len();
|
||||
schedules.retain(|s| s["id"].as_str() != Some(id));
|
||||
|
||||
if schedules.len() == before {
|
||||
return Err(format!("Schedule '{id}' not found."));
|
||||
}
|
||||
|
||||
kh.memory_store(SCHEDULES_KEY, serde_json::Value::Array(schedules))?;
|
||||
kh.cron_cancel(id)
|
||||
.await
|
||||
.map_err(|e| format!("Schedule '{id}' not found: {e}"))?;
|
||||
Ok(format!("Schedule '{id}' deleted."))
|
||||
}
|
||||
|
||||
@@ -4191,4 +4253,268 @@ mod tests {
|
||||
assert!(!is_shell_tool("file_read"));
|
||||
assert!(!is_shell_tool("web_fetch"));
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------
|
||||
// Issue #1069: schedule_* tools route through the kernel cron scheduler
|
||||
// ----------------------------------------------------------------------
|
||||
|
||||
#[test]
|
||||
fn test_sanitize_schedule_name_strips_punctuation() {
|
||||
// Colons, commas, dots, and other punctuation are replaced with '-'.
|
||||
let out = sanitize_schedule_name("Remind me: file report, please.");
|
||||
assert!(!out.contains(':'));
|
||||
assert!(!out.contains(','));
|
||||
assert!(!out.contains('.'));
|
||||
// Spaces, hyphens, and underscores survive.
|
||||
assert!(out
|
||||
.chars()
|
||||
.all(|c| c.is_alphanumeric() || c == ' ' || c == '-' || c == '_'));
|
||||
assert!(!out.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sanitize_schedule_name_empty_fallback() {
|
||||
assert_eq!(sanitize_schedule_name(""), "scheduled-task");
|
||||
assert_eq!(sanitize_schedule_name(" "), "scheduled-task");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sanitize_schedule_name_caps_length() {
|
||||
let long = "a".repeat(500);
|
||||
let out = sanitize_schedule_name(&long);
|
||||
assert!(out.chars().count() <= 128);
|
||||
}
|
||||
|
||||
// Minimal in-memory KernelHandle used to verify schedule_* tool wiring.
|
||||
// Records every cron_* call so tests can assert what the tool pushed into
|
||||
// the kernel, without booting a real OpenFangKernel.
|
||||
struct FakeKernelHandle {
|
||||
created: std::sync::Mutex<Vec<(String, serde_json::Value)>>,
|
||||
cancelled: std::sync::Mutex<Vec<String>>,
|
||||
jobs: std::sync::Mutex<Vec<serde_json::Value>>,
|
||||
}
|
||||
|
||||
impl FakeKernelHandle {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
created: std::sync::Mutex::new(Vec::new()),
|
||||
cancelled: std::sync::Mutex::new(Vec::new()),
|
||||
jobs: std::sync::Mutex::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
fn with_job(self, job: serde_json::Value) -> Self {
|
||||
self.jobs.lock().unwrap().push(job);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl crate::kernel_handle::KernelHandle for FakeKernelHandle {
|
||||
async fn spawn_agent(
|
||||
&self,
|
||||
_manifest_toml: &str,
|
||||
_parent_id: Option<&str>,
|
||||
) -> Result<(String, String), String> {
|
||||
Err("not used".into())
|
||||
}
|
||||
async fn send_to_agent(&self, _agent_id: &str, _message: &str) -> Result<String, String> {
|
||||
Err("not used".into())
|
||||
}
|
||||
fn list_agents(&self) -> Vec<crate::kernel_handle::AgentInfo> {
|
||||
vec![]
|
||||
}
|
||||
fn kill_agent(&self, _agent_id: &str) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
fn memory_store(&self, _key: &str, _value: serde_json::Value) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
fn memory_recall(&self, _key: &str) -> Result<Option<serde_json::Value>, String> {
|
||||
Ok(None)
|
||||
}
|
||||
fn find_agents(&self, _query: &str) -> Vec<crate::kernel_handle::AgentInfo> {
|
||||
vec![]
|
||||
}
|
||||
async fn task_post(
|
||||
&self,
|
||||
_title: &str,
|
||||
_description: &str,
|
||||
_assigned_to: Option<&str>,
|
||||
_created_by: Option<&str>,
|
||||
) -> Result<String, String> {
|
||||
Err("not used".into())
|
||||
}
|
||||
async fn task_claim(&self, _agent_id: &str) -> Result<Option<serde_json::Value>, String> {
|
||||
Ok(None)
|
||||
}
|
||||
async fn task_complete(&self, _task_id: &str, _result: &str) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
async fn task_list(&self, _status: Option<&str>) -> Result<Vec<serde_json::Value>, String> {
|
||||
Ok(vec![])
|
||||
}
|
||||
async fn publish_event(
|
||||
&self,
|
||||
_event_type: &str,
|
||||
_payload: serde_json::Value,
|
||||
) -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
async fn knowledge_add_entity(
|
||||
&self,
|
||||
_entity: openfang_types::memory::Entity,
|
||||
) -> Result<String, String> {
|
||||
Err("not used".into())
|
||||
}
|
||||
async fn knowledge_add_relation(
|
||||
&self,
|
||||
_relation: openfang_types::memory::Relation,
|
||||
) -> Result<String, String> {
|
||||
Err("not used".into())
|
||||
}
|
||||
async fn knowledge_query(
|
||||
&self,
|
||||
_pattern: openfang_types::memory::GraphPattern,
|
||||
) -> Result<Vec<openfang_types::memory::GraphMatch>, String> {
|
||||
Ok(vec![])
|
||||
}
|
||||
|
||||
async fn cron_create(
|
||||
&self,
|
||||
agent_id: &str,
|
||||
job_json: serde_json::Value,
|
||||
) -> Result<String, String> {
|
||||
let id = format!("job-{}", self.created.lock().unwrap().len());
|
||||
self.created
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push((agent_id.to_string(), job_json.clone()));
|
||||
// Mirror what the real kernel returns (see cron_create in
|
||||
// openfang-kernel): `{ "job_id": "...", "status": "created" }`.
|
||||
let resp = serde_json::json!({ "job_id": id, "status": "created" });
|
||||
Ok(resp.to_string())
|
||||
}
|
||||
|
||||
async fn cron_list(&self, _agent_id: &str) -> Result<Vec<serde_json::Value>, String> {
|
||||
Ok(self.jobs.lock().unwrap().clone())
|
||||
}
|
||||
|
||||
async fn cron_cancel(&self, job_id: &str) -> Result<(), String> {
|
||||
self.cancelled.lock().unwrap().push(job_id.to_string());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_schedule_create_routes_to_cron_scheduler() {
|
||||
let fake = Arc::new(FakeKernelHandle::new());
|
||||
let handle: Arc<dyn crate::kernel_handle::KernelHandle> = fake.clone();
|
||||
let caller = "11111111-1111-1111-1111-111111111111";
|
||||
|
||||
let input = serde_json::json!({
|
||||
"description": "Daily report",
|
||||
"schedule": "daily at 9am",
|
||||
"agent": "self",
|
||||
});
|
||||
|
||||
let out = tool_schedule_create(&input, Some(&handle), Some(caller))
|
||||
.await
|
||||
.expect("tool_schedule_create should succeed with a valid schedule");
|
||||
|
||||
// User-facing response shape is preserved.
|
||||
assert!(out.starts_with("Schedule created:"));
|
||||
assert!(out.contains("Daily report"));
|
||||
assert!(out.contains("Cron: "));
|
||||
|
||||
// The fake kernel received a cron_create for the caller agent with a
|
||||
// well-formed job_json. This is the whole point of #1069: the tool
|
||||
// must call into the cron scheduler, not just write to shared memory.
|
||||
let created = fake.created.lock().unwrap();
|
||||
assert_eq!(created.len(), 1, "cron_create must be called exactly once");
|
||||
assert_eq!(created[0].0, caller, "target agent must be the caller");
|
||||
let job = &created[0].1;
|
||||
assert_eq!(job["schedule"]["kind"], "cron");
|
||||
assert_eq!(job["action"]["kind"], "agent_turn");
|
||||
assert_eq!(job["action"]["message"], "Daily report");
|
||||
assert!(job["schedule"]["expr"].is_string());
|
||||
assert_eq!(job["one_shot"], false);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_schedule_create_rejects_missing_description() {
|
||||
let fake = Arc::new(FakeKernelHandle::new());
|
||||
let handle: Arc<dyn crate::kernel_handle::KernelHandle> = fake.clone();
|
||||
let input = serde_json::json!({ "schedule": "every hour" });
|
||||
let err = tool_schedule_create(&input, Some(&handle), Some("aaa"))
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(err.contains("description"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_schedule_list_reads_from_cron_scheduler() {
|
||||
let job = serde_json::json!({
|
||||
"id": "cron-1",
|
||||
"name": "demo",
|
||||
"enabled": true,
|
||||
"schedule": { "kind": "cron", "expr": "0 9 * * *" },
|
||||
"action": { "kind": "agent_turn", "message": "hello" },
|
||||
"created_at": "2026-01-01T00:00:00Z",
|
||||
"agent_id": "aaa",
|
||||
});
|
||||
let fake = Arc::new(FakeKernelHandle::new().with_job(job));
|
||||
let handle: Arc<dyn crate::kernel_handle::KernelHandle> = fake.clone();
|
||||
|
||||
let out = tool_schedule_list(Some(&handle), Some("aaa"))
|
||||
.await
|
||||
.expect("schedule_list should succeed");
|
||||
assert!(out.contains("Scheduled tasks (1)"));
|
||||
assert!(out.contains("0 9 * * *"));
|
||||
assert!(out.contains("hello"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_schedule_list_empty() {
|
||||
let fake = Arc::new(FakeKernelHandle::new());
|
||||
let handle: Arc<dyn crate::kernel_handle::KernelHandle> = fake.clone();
|
||||
let out = tool_schedule_list(Some(&handle), Some("aaa"))
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(out, "No scheduled tasks.");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_schedule_delete_routes_to_cron_cancel() {
|
||||
let fake = Arc::new(FakeKernelHandle::new());
|
||||
let handle: Arc<dyn crate::kernel_handle::KernelHandle> = fake.clone();
|
||||
let input = serde_json::json!({ "id": "abc-123" });
|
||||
let out = tool_schedule_delete(&input, Some(&handle)).await.unwrap();
|
||||
assert!(out.contains("abc-123"));
|
||||
let cancelled = fake.cancelled.lock().unwrap();
|
||||
assert_eq!(cancelled.len(), 1);
|
||||
assert_eq!(cancelled[0], "abc-123");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_schedule_tools_require_kernel() {
|
||||
// Without a kernel handle, the new tools must fail loudly rather than
|
||||
// silently writing to the old shared-memory key.
|
||||
let err = tool_schedule_create(
|
||||
&serde_json::json!({"description": "x", "schedule": "every hour"}),
|
||||
None,
|
||||
Some("aaa"),
|
||||
)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(err.to_lowercase().contains("kernel"));
|
||||
|
||||
let err = tool_schedule_list(None, Some("aaa")).await.unwrap_err();
|
||||
assert!(err.to_lowercase().contains("kernel"));
|
||||
|
||||
let err = tool_schedule_delete(&serde_json::json!({"id": "x"}), None)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(err.to_lowercase().contains("kernel"));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user