mirror of
https://github.com/Mintplex-Labs/anything-llm
synced 2026-04-25 17:15:37 +02:00
* implement native embedder job queue * persist embedding progress across renders * add development worker timeouts * change to static method * native reranker * remove useless return * lint * simplify * make embedding worker timeout value configurable by admin * add event emission for missing data * lint * remove onProgress callback argument * make rerank to rerankDirect * persists progress state across app reloads * remove chunk level progress reporting * remove unuse dvariable * make NATIVE_RERANKING_WORKER_TIMEOUT user configurable * remove dead code * scope embedding progress per-user and clear stale state on SSE reconnect * lint * revert vector databases and embedding engines to call their original methods * simplify rerank * simplify progress fetching by removing updateProgressFromApi * remove duplicate jsdoc * replace sessionStorage persistence with server-side history replay for embedding progress * fix old comment * fix: ignore premature SSE all_complete when embedding hasn't started yet The SSE connection opens before the embedding API call fires, so the server sees no buffered history and immediately sends all_complete. Firefox dispatches this eagerly enough that it closes the EventSource before real progress events arrive, causing the progress UI to clear and fall back to the loading spinner. Chrome's EventSource timing masks the race. Track slugs where startEmbedding was called but no real progress event has arrived yet via awaitingProgressRef. Ignore the first all_complete for those slugs and keep the connection open for the real events. * reduce duplication with progress emissions * remove dead code * refactor: streamline embedding progress handling Removed unnecessary tracking of slugs for premature all_complete events in the EmbeddingProgressProvider. Updated the server-side logic to avoid sending all_complete when no embedding is in progress, allowing the connection to remain open for real events. Adjusted the embedding initiation flow to ensure the server processes the job before the SSE connection opens, improving the reliability of progress updates. * fix stale comment * remove unused function * fix event emissions for document creation failure * refactor: move Reranking Worker Idle Timeout input to LanceDBOptions component Extracted the Reranking Worker Idle Timeout input from GeneralEmbeddingPreference and integrated it into the LanceDBOptions component. This change enhances modularity and maintains a cleaner structure for the settings interface. * lint * remove unused hadHistory vars * refactor workspace directory by hoisting component and converting into functions * moved EmbeddingProgressProvider to wrap Document Manager Modal * refactor embed progress SSE connection to use fetchEventSource instead of native EventSource API. * refactor message handlng into a function and reduce duplication * refactor: utilize writeResponseChunk for event emissions in document embedding progress SSE * refactor: explicit in-proc embedding and rerank methods that are called by workers instead of process.send checks * Abstract EmbeddingProgressBus and Worker Queue into modules * remove error and toast messages on embed process result * use safeJsonParse * add chunk-level progress events with per-document progress bar in UI * remove unused parameter * rename all worker timeout references to use ttl | remove ttl updating from UI * refactor: pass embedding context through job payload instead of global state * lint * add graceful shutdown for workers * apply figma styles * refactor embedding worker to use bree * use existing WorkerQueue class as the management layer for jobs * lint * revert all reranking worker changes back to master state Removes the reranking worker queue, rerankViaWorker/rerankInProcess renames, and NATIVE_RERANKING_WORKER_TTL config so this branch only contains the embedding worker job queue feature. * remove breeManaged flag — WorkerQueue always spawns via Bree * fix prompt embedding bug * have embedTextInput call embedChunksInProcess * add message field to `process.send()` * remove nullish check and error throw * remove bespoke graceful shutdown logix * add spawnWorker method and asbtract redudant flows into helper methods * remove unneeded comment * remove recomputation of TTL value * frontend cleanup and refactor * wip on backend refactor * backend overhaul * small lint * second pass * add logging, update endpoint * simple refactor * add reporting to all embedder providers * fix styles --------- Co-authored-by: Timothy Carambat <rambat1010@gmail.com>
365 lines
11 KiB
JavaScript
365 lines
11 KiB
JavaScript
const { v4: uuidv4 } = require("uuid");
|
|
const { getVectorDbClass } = require("../utils/helpers");
|
|
const prisma = require("../utils/prisma");
|
|
const { Telemetry } = require("./telemetry");
|
|
const { EventLogs } = require("./eventLogs");
|
|
const { safeJsonParse } = require("../utils/http");
|
|
const { getModelTag } = require("../endpoints/utils");
|
|
|
|
const Document = {
|
|
writable: ["pinned", "watched", "lastUpdatedAt"],
|
|
/**
|
|
* @param {import("@prisma/client").workspace_documents} document - Document PrismaRecord
|
|
* @returns {{
|
|
* metadata: (null|object),
|
|
* type: import("./documentSyncQueue.js").validFileType,
|
|
* source: string
|
|
* }}
|
|
*/
|
|
parseDocumentTypeAndSource: function (document) {
|
|
const metadata = safeJsonParse(document.metadata, null);
|
|
if (!metadata) return { metadata: null, type: null, source: null };
|
|
|
|
// Parse the correct type of source and its original source path.
|
|
const idx = metadata.chunkSource.indexOf("://");
|
|
const [type, source] = [
|
|
metadata.chunkSource.slice(0, idx),
|
|
metadata.chunkSource.slice(idx + 3),
|
|
];
|
|
return { metadata, type, source: this._stripSource(source, type) };
|
|
},
|
|
|
|
forWorkspace: async function (workspaceId = null) {
|
|
if (!workspaceId) return [];
|
|
return await prisma.workspace_documents.findMany({
|
|
where: { workspaceId },
|
|
});
|
|
},
|
|
|
|
delete: async function (clause = {}) {
|
|
try {
|
|
await prisma.workspace_documents.deleteMany({ where: clause });
|
|
return true;
|
|
} catch (error) {
|
|
console.error(error.message);
|
|
return false;
|
|
}
|
|
},
|
|
|
|
get: async function (clause = {}) {
|
|
try {
|
|
const document = await prisma.workspace_documents.findFirst({
|
|
where: clause,
|
|
});
|
|
return document || null;
|
|
} catch (error) {
|
|
console.error(error.message);
|
|
return null;
|
|
}
|
|
},
|
|
|
|
where: async function (
|
|
clause = {},
|
|
limit = null,
|
|
orderBy = null,
|
|
include = null,
|
|
select = null
|
|
) {
|
|
try {
|
|
const results = await prisma.workspace_documents.findMany({
|
|
where: clause,
|
|
...(limit !== null ? { take: limit } : {}),
|
|
...(orderBy !== null ? { orderBy } : {}),
|
|
...(include !== null ? { include } : {}),
|
|
...(select !== null ? { select: { ...select } } : {}),
|
|
});
|
|
return results;
|
|
} catch (error) {
|
|
console.error(error.message);
|
|
return [];
|
|
}
|
|
},
|
|
|
|
addDocuments: async function (workspace, additions = [], userId = null) {
|
|
const VectorDb = getVectorDbClass();
|
|
if (additions.length === 0) return { failed: [], embedded: [] };
|
|
const { fileData } = require("../utils/files");
|
|
const { emitProgress } = require("../utils/EmbeddingWorkerManager");
|
|
const embedded = [];
|
|
const failedToEmbed = [];
|
|
const errors = new Set();
|
|
|
|
emitProgress(workspace.slug, {
|
|
type: "batch_starting",
|
|
workspaceSlug: workspace.slug,
|
|
userId,
|
|
filenames: additions,
|
|
totalDocs: additions.length,
|
|
});
|
|
|
|
for (const [index, path] of additions.entries()) {
|
|
const docProgress = {
|
|
workspaceSlug: workspace.slug,
|
|
userId,
|
|
filename: path,
|
|
docIndex: index,
|
|
totalDocs: additions.length,
|
|
};
|
|
|
|
const data = await fileData(path);
|
|
if (!data) {
|
|
emitProgress(workspace.slug, {
|
|
type: "doc_failed",
|
|
...docProgress,
|
|
error: "Failed to load file data",
|
|
});
|
|
continue;
|
|
}
|
|
|
|
const docId = uuidv4();
|
|
const { pageContent: _pageContent, ...metadata } = data;
|
|
const newDoc = {
|
|
docId,
|
|
filename: path.split("/")[1],
|
|
docpath: path,
|
|
workspaceId: workspace.id,
|
|
metadata: JSON.stringify(metadata),
|
|
};
|
|
|
|
emitProgress(workspace.slug, { type: "doc_starting", ...docProgress });
|
|
|
|
global.__embeddingProgress = {
|
|
workspaceSlug: workspace.slug,
|
|
filename: path,
|
|
userId,
|
|
};
|
|
|
|
const { vectorized, error } = await VectorDb.addDocumentToNamespace(
|
|
workspace.slug,
|
|
{ ...data, docId },
|
|
path
|
|
);
|
|
|
|
if (!vectorized) {
|
|
console.error(
|
|
"Failed to vectorize",
|
|
metadata?.title || newDoc.filename
|
|
);
|
|
failedToEmbed.push(metadata?.title || newDoc.filename);
|
|
errors.add(error);
|
|
emitProgress(workspace.slug, {
|
|
type: "doc_failed",
|
|
...docProgress,
|
|
error: error || "Unknown error",
|
|
});
|
|
continue;
|
|
}
|
|
|
|
try {
|
|
await prisma.workspace_documents.create({ data: newDoc });
|
|
embedded.push(path);
|
|
emitProgress(workspace.slug, {
|
|
type: "doc_complete",
|
|
...docProgress,
|
|
});
|
|
} catch (error) {
|
|
console.error(error.message);
|
|
emitProgress(workspace.slug, {
|
|
type: "doc_failed",
|
|
...docProgress,
|
|
error: "Failed to save document record",
|
|
});
|
|
}
|
|
}
|
|
|
|
global.__embeddingProgress = null;
|
|
|
|
emitProgress(workspace.slug, {
|
|
type: "all_complete",
|
|
workspaceSlug: workspace.slug,
|
|
userId,
|
|
totalDocs: additions.length,
|
|
embedded: embedded.length,
|
|
failed: failedToEmbed.length,
|
|
});
|
|
|
|
await Telemetry.sendTelemetry("documents_embedded_in_workspace", {
|
|
LLMSelection: process.env.LLM_PROVIDER || "openai",
|
|
Embedder: process.env.EMBEDDING_ENGINE || "inherit",
|
|
VectorDbSelection: process.env.VECTOR_DB || "lancedb",
|
|
TTSSelection: process.env.TTS_PROVIDER || "native",
|
|
LLMModel: getModelTag(),
|
|
});
|
|
await EventLogs.logEvent(
|
|
"workspace_documents_added",
|
|
{
|
|
workspaceName: workspace?.name || "Unknown Workspace",
|
|
numberOfDocumentsAdded: additions.length,
|
|
},
|
|
userId
|
|
);
|
|
return { failedToEmbed, errors: Array.from(errors), embedded };
|
|
},
|
|
|
|
removeDocuments: async function (workspace, removals = [], userId = null) {
|
|
const VectorDb = getVectorDbClass();
|
|
if (removals.length === 0) return;
|
|
|
|
for (const path of removals) {
|
|
const document = await this.get({
|
|
docpath: path,
|
|
workspaceId: workspace.id,
|
|
});
|
|
if (!document) continue;
|
|
await VectorDb.deleteDocumentFromNamespace(
|
|
workspace.slug,
|
|
document.docId
|
|
);
|
|
|
|
try {
|
|
await prisma.workspace_documents.delete({
|
|
where: { id: document.id, workspaceId: workspace.id },
|
|
});
|
|
await prisma.document_vectors.deleteMany({
|
|
where: { docId: document.docId },
|
|
});
|
|
} catch (error) {
|
|
console.error(error.message);
|
|
}
|
|
}
|
|
|
|
await EventLogs.logEvent(
|
|
"workspace_documents_removed",
|
|
{
|
|
workspaceName: workspace?.name || "Unknown Workspace",
|
|
numberOfDocuments: removals.length,
|
|
},
|
|
userId
|
|
);
|
|
return true;
|
|
},
|
|
|
|
count: async function (clause = {}, limit = null) {
|
|
try {
|
|
const count = await prisma.workspace_documents.count({
|
|
where: clause,
|
|
...(limit !== null ? { take: limit } : {}),
|
|
});
|
|
return count;
|
|
} catch (error) {
|
|
console.error("FAILED TO COUNT DOCUMENTS.", error.message);
|
|
return 0;
|
|
}
|
|
},
|
|
update: async function (id = null, data = {}) {
|
|
if (!id) throw new Error("No workspace document id provided for update");
|
|
|
|
const validKeys = Object.keys(data).filter((key) =>
|
|
this.writable.includes(key)
|
|
);
|
|
if (validKeys.length === 0)
|
|
return { document: { id }, message: "No valid fields to update!" };
|
|
|
|
try {
|
|
const document = await prisma.workspace_documents.update({
|
|
where: { id },
|
|
data,
|
|
});
|
|
return { document, message: null };
|
|
} catch (error) {
|
|
console.error(error.message);
|
|
return { document: null, message: error.message };
|
|
}
|
|
},
|
|
_updateAll: async function (clause = {}, data = {}) {
|
|
try {
|
|
await prisma.workspace_documents.updateMany({
|
|
where: clause,
|
|
data,
|
|
});
|
|
return true;
|
|
} catch (error) {
|
|
console.error(error.message);
|
|
return false;
|
|
}
|
|
},
|
|
content: async function (docId) {
|
|
if (!docId) throw new Error("No workspace docId provided!");
|
|
const document = await this.get({ docId: String(docId) });
|
|
if (!document) throw new Error(`Could not find a document by id ${docId}`);
|
|
|
|
const { fileData } = require("../utils/files");
|
|
const data = await fileData(document.docpath);
|
|
return { title: data.title, content: data.pageContent };
|
|
},
|
|
contentByDocPath: async function (docPath) {
|
|
const { fileData } = require("../utils/files");
|
|
const data = await fileData(docPath);
|
|
return { title: data.title, content: data.pageContent };
|
|
},
|
|
|
|
// Some data sources have encoded params in them we don't want to log - so strip those details.
|
|
_stripSource: function (sourceString, type) {
|
|
if (["confluence", "github"].includes(type)) {
|
|
const _src = new URL(sourceString);
|
|
_src.search = ""; // remove all search params that are encoded for resync.
|
|
return _src.toString();
|
|
}
|
|
|
|
return sourceString;
|
|
},
|
|
|
|
/**
|
|
* Functions for the backend API endpoints - not to be used by the frontend or elsewhere.
|
|
* @namespace api
|
|
*/
|
|
api: {
|
|
/**
|
|
* Process a document upload from the API and upsert it into the database. This
|
|
* functionality should only be used by the backend /v1/documents/upload endpoints for post-upload embedding.
|
|
* @param {string} wsSlugs - The slugs of the workspaces to embed the document into, will be comma-separated list of workspace slugs
|
|
* @param {string} docLocation - The location/path of the document that was uploaded
|
|
* @returns {Promise<boolean>} - True if the document was uploaded successfully, false otherwise
|
|
*/
|
|
uploadToWorkspace: async function (wsSlugs = "", docLocation = null) {
|
|
if (!docLocation)
|
|
return console.log(
|
|
"No document location provided for embedding",
|
|
docLocation
|
|
);
|
|
|
|
const slugs = wsSlugs
|
|
.split(",")
|
|
.map((slug) => String(slug)?.trim()?.toLowerCase());
|
|
if (slugs.length === 0)
|
|
return console.log(`No workspaces provided got: ${wsSlugs}`);
|
|
|
|
const { Workspace } = require("./workspace");
|
|
const workspaces = await Workspace.where({ slug: { in: slugs } });
|
|
if (workspaces.length === 0)
|
|
return console.log("No valid workspaces found for slugs: ", slugs);
|
|
|
|
// Upsert the document into each workspace - do this sequentially
|
|
// because the document may be large and we don't want to overwhelm the embedder, plus on the first
|
|
// upsert we will then have the cache of the document - making n+1 embeds faster. If we parallelize this
|
|
// we will have to do a lot of extra work to ensure that the document is not embedded more than once.
|
|
for (const workspace of workspaces) {
|
|
const { failedToEmbed = [], errors = [] } = await Document.addDocuments(
|
|
workspace,
|
|
[docLocation]
|
|
);
|
|
if (failedToEmbed.length > 0)
|
|
return console.log(
|
|
`Failed to embed document into workspace ${workspace.slug}`,
|
|
errors
|
|
);
|
|
console.log(`Document embedded into workspace ${workspace.slug}...`);
|
|
}
|
|
|
|
return true;
|
|
},
|
|
},
|
|
};
|
|
|
|
module.exports = { Document };
|