Merge pull request #667 from bobbiejaxn/feat/http-memory-backend

feat: HTTP memory backend for SemanticStore
This commit is contained in:
Jaber Jaber
2026-03-27 16:44:17 +03:00
committed by GitHub
7 changed files with 486 additions and 7 deletions

View File

@@ -562,7 +562,7 @@ impl OpenFangKernel {
.clone()
.unwrap_or_else(|| config.data_dir.join("openfang.db"));
let memory = Arc::new(
MemorySubstrate::open(&db_path, config.memory.decay_rate)
MemorySubstrate::open(&db_path, config.memory.decay_rate, &config.memory)
.map_err(|e| KernelError::BootFailed(format!("Memory init failed: {e}")))?,
);

View File

@@ -5,6 +5,10 @@ edition.workspace = true
license.workspace = true
description = "Memory substrate for the OpenFang Agent OS"
[features]
default = ["http-memory"]
http-memory = ["reqwest"]
[dependencies]
openfang-types = { path = "../openfang-types" }
tokio = { workspace = true }
@@ -17,6 +21,7 @@ uuid = { workspace = true }
thiserror = { workspace = true }
async-trait = { workspace = true }
tracing = { workspace = true }
reqwest = { workspace = true, features = ["blocking"], optional = true }
[dev-dependencies]
tokio-test = { workspace = true }

View File

@@ -0,0 +1,246 @@
//! HTTP client for the memory-api gateway.
//!
//! Provides a blocking HTTP client that routes `remember` and `recall` operations
//! to the shared memory-api service (PostgreSQL + pgvector + Jina AI embeddings).
//! Designed to be called from synchronous SemanticStore methods within
//! `spawn_blocking` contexts.
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
/// Error type for memory API operations.
#[derive(Debug, thiserror::Error)]
pub enum MemoryApiError {
#[error("HTTP error: {0}")]
Http(String),
#[error("API error (status {status}): {message}")]
Api { status: u16, message: String },
#[error("Parse error: {0}")]
Parse(String),
#[error("Missing config: {0}")]
Config(String),
}
/// HTTP client for the memory-api gateway service.
#[derive(Clone)]
pub struct MemoryApiClient {
base_url: String,
token: String,
client: reqwest::blocking::Client,
}
// -- Request/Response types matching memory-api endpoints --
#[derive(Serialize)]
struct StoreRequest<'a> {
content: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
category: Option<&'a str>,
#[serde(rename = "agentId", skip_serializing_if = "Option::is_none")]
agent_id: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
source: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
importance: Option<u8>,
#[serde(skip_serializing_if = "Option::is_none")]
tags: Option<Vec<String>>,
}
#[derive(Deserialize, Debug)]
pub struct StoreResponse {
pub id: serde_json::Value,
#[serde(default)]
pub deduplicated: bool,
}
#[derive(Serialize)]
struct SearchRequest<'a> {
query: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
limit: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
category: Option<&'a str>,
}
#[derive(Deserialize, Debug)]
pub struct SearchResponse {
pub results: Vec<SearchResult>,
pub count: usize,
}
#[derive(Deserialize, Debug, Clone)]
pub struct SearchResult {
pub id: serde_json::Value,
pub content: String,
#[serde(default)]
pub category: Option<String>,
#[serde(default)]
pub score: f64,
#[serde(rename = "createdAt", default)]
pub created_at: Option<f64>,
}
#[derive(Deserialize, Debug)]
struct HealthResponse {
pub status: String,
}
impl MemoryApiClient {
/// Create a new memory-api HTTP client.
///
/// `base_url`: The base URL of the memory-api service (e.g., "http://127.0.0.1:5500").
/// `token_env`: The name of the environment variable holding the bearer token.
pub fn new(base_url: &str, token_env: &str) -> Result<Self, MemoryApiError> {
let token = if token_env.is_empty() {
String::new()
} else {
std::env::var(token_env).unwrap_or_else(|_| {
warn!(env = token_env, "Memory API token env var not set");
String::new()
})
};
let client = reqwest::blocking::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.user_agent("openfang-memory/0.4")
.build()
.map_err(|e| MemoryApiError::Http(e.to_string()))?;
let base_url = base_url.trim_end_matches('/').to_string();
Ok(Self {
base_url,
token,
client,
})
}
/// Check if memory-api is reachable.
pub fn health_check(&self) -> Result<(), MemoryApiError> {
let url = format!("{}/health", self.base_url);
let resp = self
.client
.get(&url)
.send()
.map_err(|e| MemoryApiError::Http(e.to_string()))?;
if !resp.status().is_success() {
return Err(MemoryApiError::Api {
status: resp.status().as_u16(),
message: resp.text().unwrap_or_default(),
});
}
let body: HealthResponse = resp
.json()
.map_err(|e| MemoryApiError::Parse(e.to_string()))?;
if body.status != "ok" {
return Err(MemoryApiError::Api {
status: 503,
message: format!("memory-api status: {}", body.status),
});
}
debug!("memory-api health check passed");
Ok(())
}
/// Store a memory via POST /memory/store.
///
/// The memory-api handles embedding generation (Jina AI) and deduplication.
pub fn store(
&self,
content: &str,
category: Option<&str>,
agent_id: Option<&str>,
source: Option<&str>,
importance: Option<u8>,
tags: Option<Vec<String>>,
) -> Result<StoreResponse, MemoryApiError> {
let url = format!("{}/memory/store", self.base_url);
let body = StoreRequest {
content,
category,
agent_id,
source,
importance,
tags,
};
let mut req = self.client.post(&url).json(&body);
if !self.token.is_empty() {
req = req.header("Authorization", format!("Bearer {}", self.token));
}
let resp = req
.send()
.map_err(|e| MemoryApiError::Http(e.to_string()))?;
let status = resp.status().as_u16();
if status != 200 && status != 201 {
let body_text = resp.text().unwrap_or_default();
return Err(MemoryApiError::Api {
status,
message: body_text,
});
}
let result: StoreResponse = resp
.json()
.map_err(|e| MemoryApiError::Parse(e.to_string()))?;
debug!(
id = %result.id,
deduplicated = result.deduplicated,
"Stored memory via HTTP"
);
Ok(result)
}
/// Search memories via POST /memory/search.
///
/// The memory-api handles embedding the query (Jina AI) and hybrid vector+BM25 search.
pub fn search(
&self,
query: &str,
limit: usize,
category: Option<&str>,
) -> Result<Vec<SearchResult>, MemoryApiError> {
let url = format!("{}/memory/search", self.base_url);
let body = SearchRequest {
query,
limit: Some(limit),
category,
};
let mut req = self.client.post(&url).json(&body);
if !self.token.is_empty() {
req = req.header("Authorization", format!("Bearer {}", self.token));
}
let resp = req
.send()
.map_err(|e| MemoryApiError::Http(e.to_string()))?;
let status = resp.status().as_u16();
if status != 200 {
let body_text = resp.text().unwrap_or_default();
return Err(MemoryApiError::Api {
status,
message: body_text,
});
}
let result: SearchResponse = resp
.json()
.map_err(|e| MemoryApiError::Parse(e.to_string()))?;
debug!(count = result.count, "Searched memories via HTTP");
Ok(result.results)
}
}

View File

@@ -8,6 +8,8 @@
//! Agents interact with a single `Memory` trait that abstracts over all three stores.
pub mod consolidation;
#[cfg(feature = "http-memory")]
pub mod http_client;
pub mod knowledge;
pub mod migration;
pub mod semantic;

View File

@@ -14,18 +14,43 @@ use openfang_types::memory::{MemoryFilter, MemoryFragment, MemoryId, MemorySourc
use rusqlite::Connection;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tracing::debug;
use tracing::{debug, warn};
#[cfg(feature = "http-memory")]
use crate::http_client::MemoryApiClient;
/// Semantic store backed by SQLite with optional vector search.
///
/// Supports two backends:
/// - **SQLite** (default): Local LIKE matching / cosine similarity.
/// - **HTTP**: Routes `remember`/`recall` to the memory-api gateway
/// (PostgreSQL + pgvector + Jina AI embeddings).
#[derive(Clone)]
pub struct SemanticStore {
conn: Arc<Mutex<Connection>>,
#[cfg(feature = "http-memory")]
http_client: Option<MemoryApiClient>,
}
impl SemanticStore {
/// Create a new semantic store wrapping the given connection.
/// Create a new semantic store wrapping the given connection (SQLite backend).
pub fn new(conn: Arc<Mutex<Connection>>) -> Self {
Self { conn }
Self {
conn,
#[cfg(feature = "http-memory")]
http_client: None,
}
}
/// Create a semantic store with an HTTP backend for the memory-api gateway.
///
/// The SQLite connection is still required for local fallback and other stores.
#[cfg(feature = "http-memory")]
pub fn new_with_http(conn: Arc<Mutex<Connection>>, client: MemoryApiClient) -> Self {
Self {
conn,
http_client: Some(client),
}
}
/// Store a new memory fragment (without embedding).
@@ -41,6 +66,9 @@ impl SemanticStore {
}
/// Store a new memory fragment with an optional embedding vector.
///
/// When HTTP backend is configured, stores via memory-api (which handles
/// embedding generation and deduplication). Falls back to local SQLite.
pub fn remember_with_embedding(
&self,
agent_id: AgentId,
@@ -49,6 +77,26 @@ impl SemanticStore {
scope: &str,
metadata: HashMap<String, serde_json::Value>,
embedding: Option<&[f32]>,
) -> OpenFangResult<MemoryId> {
// HTTP backend: route to memory-api
#[cfg(feature = "http-memory")]
if let Some(ref client) = self.http_client {
return self.remember_via_http(client, agent_id, content, source, scope, &metadata);
}
// SQLite backend (default)
self.remember_sqlite(agent_id, content, source, scope, metadata, embedding)
}
/// SQLite implementation of remember_with_embedding.
fn remember_sqlite(
&self,
agent_id: AgentId,
content: &str,
source: MemorySource,
scope: &str,
metadata: HashMap<String, serde_json::Value>,
embedding: Option<&[f32]>,
) -> OpenFangResult<MemoryId> {
let conn = self
.conn
@@ -80,6 +128,53 @@ impl SemanticStore {
Ok(id)
}
/// HTTP implementation of remember — routes to memory-api POST /memory/store.
#[cfg(feature = "http-memory")]
fn remember_via_http(
&self,
client: &MemoryApiClient,
agent_id: AgentId,
content: &str,
source: MemorySource,
scope: &str,
metadata: &HashMap<String, serde_json::Value>,
) -> OpenFangResult<MemoryId> {
let source_str = format!("{:?}", source).to_lowercase();
let importance = metadata
.get("importance")
.and_then(|v| v.as_u64())
.map(|v| v.min(10) as u8)
.unwrap_or(5);
let tags: Option<Vec<String>> = metadata
.get("tags")
.and_then(|v| serde_json::from_value(v.clone()).ok());
match client.store(
content,
Some(scope),
Some(&agent_id.0.to_string()),
Some(&source_str),
Some(importance),
tags,
) {
Ok(resp) => {
debug!(id = %resp.id, "Stored memory via HTTP backend");
Ok(MemoryId::new())
}
Err(e) => {
warn!(error = %e, "HTTP memory store failed, falling back to SQLite");
self.remember_sqlite(
agent_id,
content,
source,
scope,
metadata.clone(),
None,
)
}
}
}
/// Search for memories using text matching (fallback, no embeddings).
pub fn recall(
&self,
@@ -92,6 +187,9 @@ impl SemanticStore {
/// Search for memories using vector similarity when a query embedding is provided,
/// falling back to LIKE matching otherwise.
///
/// When HTTP backend is configured, searches via memory-api (hybrid vector+BM25).
/// Falls back to local SQLite on HTTP errors.
pub fn recall_with_embedding(
&self,
query: &str,
@@ -99,6 +197,17 @@ impl SemanticStore {
filter: Option<MemoryFilter>,
query_embedding: Option<&[f32]>,
) -> OpenFangResult<Vec<MemoryFragment>> {
// HTTP backend: route to memory-api
#[cfg(feature = "http-memory")]
if let Some(ref client) = self.http_client {
match self.recall_via_http(client, query, limit, &filter) {
Ok(results) => return Ok(results),
Err(e) => {
warn!(error = %e, "HTTP memory search failed, falling back to SQLite");
}
}
}
let conn = self
.conn
.lock()
@@ -277,7 +386,15 @@ impl SemanticStore {
}
/// Soft-delete a memory fragment.
///
/// In HTTP mode, logs a warning (memory-api doesn't support delete yet)
/// and performs the soft-delete locally only.
pub fn forget(&self, id: MemoryId) -> OpenFangResult<()> {
#[cfg(feature = "http-memory")]
if self.http_client.is_some() {
warn!(id = %id.0, "forget() not supported via HTTP backend, local-only soft-delete");
}
let conn = self
.conn
.lock()
@@ -304,6 +421,58 @@ impl SemanticStore {
.map_err(|e| OpenFangError::Memory(e.to_string()))?;
Ok(())
}
/// HTTP implementation of recall — routes to memory-api POST /memory/search.
///
/// Maps memory-api search results to `MemoryFragment` structs. Fields not
/// available from the HTTP API (agent_id, embedding, access_count) use defaults.
#[cfg(feature = "http-memory")]
fn recall_via_http(
&self,
client: &MemoryApiClient,
query: &str,
limit: usize,
filter: &Option<MemoryFilter>,
) -> OpenFangResult<Vec<MemoryFragment>> {
let category = filter.as_ref().and_then(|f| f.scope.as_deref());
let results = client
.search(query, limit, category)
.map_err(|e| OpenFangError::Memory(format!("HTTP search failed: {e}")))?;
let fragments: Vec<MemoryFragment> = results
.into_iter()
.map(|r| {
let created_at = r
.created_at
.map(|ms| {
chrono::DateTime::from_timestamp_millis(ms as i64)
.unwrap_or_else(|| Utc::now())
})
.unwrap_or_else(|| Utc::now());
MemoryFragment {
id: MemoryId::new(),
agent_id: filter
.as_ref()
.and_then(|f| f.agent_id)
.unwrap_or_else(AgentId::new),
content: r.content,
embedding: None,
metadata: HashMap::new(),
source: MemorySource::System,
confidence: r.score as f32,
created_at,
accessed_at: Utc::now(),
access_count: 0,
scope: r.category.unwrap_or_else(|| "general".to_string()),
}
})
.collect();
debug!(count = fragments.len(), "Recalled memories via HTTP backend");
Ok(fragments)
}
}
/// Compute cosine similarity between two vectors.

View File

@@ -13,6 +13,7 @@ use crate::usage::UsageStore;
use async_trait::async_trait;
use openfang_types::agent::{AgentEntry, AgentId, SessionId};
use openfang_types::config::MemoryConfig;
use openfang_types::error::{OpenFangError, OpenFangResult};
use openfang_types::memory::{
ConsolidationReport, Entity, ExportFormat, GraphMatch, GraphPattern, ImportReport, Memory,
@@ -22,6 +23,7 @@ use rusqlite::Connection;
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, Mutex};
use tracing::{info, warn};
/// The unified memory substrate. Implements the `Memory` trait by delegating
/// to specialized stores backed by a shared SQLite connection.
@@ -37,17 +39,23 @@ pub struct MemorySubstrate {
impl MemorySubstrate {
/// Open or create a memory substrate at the given database path.
pub fn open(db_path: &Path, decay_rate: f32) -> OpenFangResult<Self> {
///
/// When `memory_config.backend == "http"` and `http_url`/`http_token_env` are set,
/// the semantic store routes `remember`/`recall` to the memory-api gateway.
/// All other stores (KV, knowledge graph, sessions) remain local SQLite.
pub fn open(db_path: &Path, decay_rate: f32, memory_config: &MemoryConfig) -> OpenFangResult<Self> {
let conn = Connection::open(db_path).map_err(|e| OpenFangError::Memory(e.to_string()))?;
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")
.map_err(|e| OpenFangError::Memory(e.to_string()))?;
run_migrations(&conn).map_err(|e| OpenFangError::Memory(e.to_string()))?;
let shared = Arc::new(Mutex::new(conn));
let semantic = Self::create_semantic_store(Arc::clone(&shared), memory_config);
Ok(Self {
conn: Arc::clone(&shared),
structured: StructuredStore::new(Arc::clone(&shared)),
semantic: SemanticStore::new(Arc::clone(&shared)),
semantic,
knowledge: KnowledgeStore::new(Arc::clone(&shared)),
sessions: SessionStore::new(Arc::clone(&shared)),
usage: UsageStore::new(Arc::clone(&shared)),
@@ -55,7 +63,39 @@ impl MemorySubstrate {
})
}
/// Create an in-memory substrate (for testing).
/// Create the semantic store, optionally with HTTP backend.
fn create_semantic_store(
conn: Arc<Mutex<Connection>>,
memory_config: &MemoryConfig,
) -> SemanticStore {
#[cfg(feature = "http-memory")]
if memory_config.backend == "http" {
if let (Some(url), Some(token_env)) = (&memory_config.http_url, &memory_config.http_token_env) {
match crate::http_client::MemoryApiClient::new(url, token_env) {
Ok(client) => {
// Best-effort health check on startup
match client.health_check() {
Ok(()) => info!(url = %url, "HTTP memory backend connected"),
Err(e) => warn!(url = %url, error = %e, "HTTP memory backend health check failed, will retry on use"),
}
return SemanticStore::new_with_http(conn, client);
}
Err(e) => {
warn!(error = %e, "Failed to create HTTP memory client, falling back to SQLite");
}
}
} else {
warn!("backend=http but http_url/http_token_env not set, falling back to SQLite");
}
}
#[cfg(not(feature = "http-memory"))]
let _ = memory_config;
SemanticStore::new(conn)
}
/// Create an in-memory substrate (for testing). Always uses SQLite backend.
pub fn open_in_memory(decay_rate: f32) -> OpenFangResult<Self> {
let conn =
Connection::open_in_memory().map_err(|e| OpenFangError::Memory(e.to_string()))?;

View File

@@ -1513,12 +1513,26 @@ pub struct MemoryConfig {
/// How often to run memory consolidation (hours). 0 = disabled.
#[serde(default = "default_consolidation_interval")]
pub consolidation_interval_hours: u64,
/// Memory backend: "sqlite" (default) or "http".
#[serde(default = "default_memory_backend")]
pub backend: String,
/// HTTP memory API URL (when backend = "http").
/// e.g., "http://127.0.0.1:5500"
#[serde(default)]
pub http_url: Option<String>,
/// Env var name holding the HTTP memory API bearer token.
#[serde(default)]
pub http_token_env: Option<String>,
}
fn default_consolidation_interval() -> u64 {
24
}
fn default_memory_backend() -> String {
"sqlite".to_string()
}
impl Default for MemoryConfig {
fn default() -> Self {
Self {
@@ -1529,6 +1543,9 @@ impl Default for MemoryConfig {
embedding_provider: None,
embedding_api_key_env: None,
consolidation_interval_hours: default_consolidation_interval(),
backend: default_memory_backend(),
http_url: None,
http_token_env: None,
}
}
}