mirror of
https://github.com/different-ai/openwork
synced 2026-04-25 17:15:34 +02:00
fix(den): reuse a shared Daytona volume for workers (#1392)
Co-authored-by: src-opn <src-opn@users.noreply.github.com>
This commit is contained in:
@@ -68,6 +68,7 @@ const EnvSchema = z.object({
|
||||
DAYTONA_SIGNED_PREVIEW_EXPIRES_SECONDS: z.string().optional(),
|
||||
DAYTONA_WORKER_PROXY_BASE_URL: z.string().optional(),
|
||||
DAYTONA_SANDBOX_NAME_PREFIX: z.string().optional(),
|
||||
DAYTONA_SHARED_VOLUME_NAME: z.string().optional(),
|
||||
DAYTONA_VOLUME_NAME_PREFIX: z.string().optional(),
|
||||
DAYTONA_WORKSPACE_MOUNT_PATH: z.string().optional(),
|
||||
DAYTONA_DATA_MOUNT_PATH: z.string().optional(),
|
||||
@@ -240,8 +241,10 @@ export const env = {
|
||||
optionalString(parsed.DAYTONA_WORKER_PROXY_BASE_URL) ?? "https://workers.den.openworklabs",
|
||||
sandboxNamePrefix:
|
||||
optionalString(parsed.DAYTONA_SANDBOX_NAME_PREFIX) ?? "den-daytona-worker",
|
||||
volumeNamePrefix:
|
||||
optionalString(parsed.DAYTONA_VOLUME_NAME_PREFIX) ?? "den-daytona-worker",
|
||||
sharedVolumeName:
|
||||
optionalString(parsed.DAYTONA_SHARED_VOLUME_NAME) ??
|
||||
optionalString(parsed.DAYTONA_VOLUME_NAME_PREFIX) ??
|
||||
"den-daytona-workers",
|
||||
workspaceMountPath:
|
||||
optionalString(parsed.DAYTONA_WORKSPACE_MOUNT_PATH) ?? "/workspace",
|
||||
dataMountPath:
|
||||
|
||||
@@ -90,12 +90,35 @@ function sandboxName(input: ProvisionInput) {
|
||||
).slice(0, 63)
|
||||
}
|
||||
|
||||
function workspaceVolumeName(workerId: WorkerId) {
|
||||
return slug(`${env.daytona.volumeNamePrefix}-${workerHint(workerId)}-workspace`).slice(0, 63)
|
||||
function sharedVolumeName() {
|
||||
return slug(env.daytona.sharedVolumeName).slice(0, 63)
|
||||
}
|
||||
|
||||
function dataVolumeName(workerId: WorkerId) {
|
||||
return slug(`${env.daytona.volumeNamePrefix}-${workerHint(workerId)}-data`).slice(0, 63)
|
||||
function workerVolumeRootSubpath(workerId: WorkerId) {
|
||||
return `workers/${workerId}`
|
||||
}
|
||||
|
||||
function workspaceVolumeSubpath(workerId: WorkerId) {
|
||||
return `${workerVolumeRootSubpath(workerId)}/workspace`
|
||||
}
|
||||
|
||||
function dataVolumeSubpath(workerId: WorkerId) {
|
||||
return `${workerVolumeRootSubpath(workerId)}/data`
|
||||
}
|
||||
|
||||
function sharedVolumeMounts(workerId: WorkerId, volumeId: string) {
|
||||
return [
|
||||
{
|
||||
volumeId,
|
||||
mountPath: env.daytona.workspaceMountPath,
|
||||
subpath: workspaceVolumeSubpath(workerId),
|
||||
},
|
||||
{
|
||||
volumeId,
|
||||
mountPath: env.daytona.dataMountPath,
|
||||
subpath: dataVolumeSubpath(workerId),
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
function buildOpenWorkStartCommand(input: ProvisionInput) {
|
||||
@@ -174,6 +197,90 @@ async function waitForVolumeReady(daytona: Daytona, name: string, timeoutMs: num
|
||||
throw new Error(`Timed out waiting for Daytona volume ${name} to become ready`)
|
||||
}
|
||||
|
||||
function buildVolumeCleanupCommand(workerId: WorkerId) {
|
||||
return [
|
||||
"node -e",
|
||||
shellQuote(
|
||||
[
|
||||
'const fs = require("node:fs")',
|
||||
'const path = require("node:path")',
|
||||
'for (const dir of process.argv.slice(1)) {',
|
||||
' fs.mkdirSync(dir, { recursive: true })',
|
||||
' for (const entry of fs.readdirSync(dir)) {',
|
||||
' fs.rmSync(path.join(dir, entry), { recursive: true, force: true })',
|
||||
' }',
|
||||
'}',
|
||||
].join("; "),
|
||||
),
|
||||
shellQuote(env.daytona.workspaceMountPath),
|
||||
shellQuote(env.daytona.dataMountPath),
|
||||
].join(" ")
|
||||
}
|
||||
|
||||
async function cleanupWorkerDataOnDaytona(daytona: Daytona, workerId: WorkerId) {
|
||||
let sharedVolume
|
||||
|
||||
try {
|
||||
sharedVolume = await waitForVolumeReady(
|
||||
daytona,
|
||||
sharedVolumeName(),
|
||||
env.daytona.createTimeoutSeconds * 1000,
|
||||
)
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : "unknown_error"
|
||||
console.warn(`[provisioner] failed to resolve shared Daytona volume for ${workerId}: ${message}`)
|
||||
return
|
||||
}
|
||||
|
||||
let cleanupSandbox: Awaited<ReturnType<typeof daytona.create>> | null = null
|
||||
|
||||
try {
|
||||
cleanupSandbox = await daytona.create(
|
||||
{
|
||||
name: slug(`den-daytona-cleanup-${workerHint(workerId)}`).slice(0, 63),
|
||||
image: env.daytona.image,
|
||||
public: false,
|
||||
autoStopInterval: 0,
|
||||
autoArchiveInterval: 0,
|
||||
autoDeleteInterval: 0,
|
||||
ephemeral: true,
|
||||
envVars: {
|
||||
DEN_RUNTIME_PROVIDER: "daytona-cleanup",
|
||||
DEN_WORKER_ID: workerId,
|
||||
},
|
||||
resources: {
|
||||
cpu: 1,
|
||||
memory: 1,
|
||||
disk: 4,
|
||||
},
|
||||
volumes: sharedVolumeMounts(workerId, sharedVolume.id),
|
||||
},
|
||||
{ timeout: env.daytona.createTimeoutSeconds },
|
||||
)
|
||||
|
||||
const result = await cleanupSandbox.process.executeCommand(
|
||||
buildVolumeCleanupCommand(workerId),
|
||||
undefined,
|
||||
undefined,
|
||||
env.daytona.deleteTimeoutSeconds,
|
||||
)
|
||||
|
||||
if (result.exitCode !== 0) {
|
||||
throw new Error(result.result?.trim() || `cleanup command exited with ${result.exitCode}`)
|
||||
}
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : "unknown_error"
|
||||
console.warn(`[provisioner] failed to cleanup Daytona worker data for ${workerId}: ${message}`)
|
||||
} finally {
|
||||
if (cleanupSandbox) {
|
||||
await cleanupSandbox.delete(env.daytona.deleteTimeoutSeconds).catch((error) => {
|
||||
const message = error instanceof Error ? error.message : "unknown_error"
|
||||
console.warn(`[provisioner] failed to delete Daytona cleanup sandbox for ${workerId}: ${message}`)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function waitForHealth(url: string, timeoutMs: number, sandbox: Sandbox, sessionId: string, commandId: string) {
|
||||
const startedAt = Date.now()
|
||||
|
||||
@@ -330,18 +437,11 @@ export async function provisionWorkerOnDaytona(
|
||||
|
||||
const daytona = createDaytonaClient()
|
||||
const labels = sandboxLabels(input.workerId)
|
||||
const workspaceVolumeNameValue = workspaceVolumeName(input.workerId)
|
||||
const dataVolumeNameValue = dataVolumeName(input.workerId)
|
||||
await daytona.volume.get(workspaceVolumeNameValue, true)
|
||||
await daytona.volume.get(dataVolumeNameValue, true)
|
||||
const workspaceVolume = await waitForVolumeReady(
|
||||
const sharedVolumeNameValue = sharedVolumeName()
|
||||
await daytona.volume.get(sharedVolumeNameValue, true)
|
||||
const sharedVolume = await waitForVolumeReady(
|
||||
daytona,
|
||||
workspaceVolumeNameValue,
|
||||
env.daytona.createTimeoutSeconds * 1000,
|
||||
)
|
||||
const dataVolume = await waitForVolumeReady(
|
||||
daytona,
|
||||
dataVolumeNameValue,
|
||||
sharedVolumeNameValue,
|
||||
env.daytona.createTimeoutSeconds * 1000,
|
||||
)
|
||||
let sandbox: Awaited<ReturnType<typeof daytona.create>> | null = null
|
||||
@@ -361,16 +461,7 @@ export async function provisionWorkerOnDaytona(
|
||||
DEN_WORKER_ID: input.workerId,
|
||||
DEN_RUNTIME_PROVIDER: "daytona",
|
||||
},
|
||||
volumes: [
|
||||
{
|
||||
volumeId: workspaceVolume.id,
|
||||
mountPath: env.daytona.workspaceMountPath,
|
||||
},
|
||||
{
|
||||
volumeId: dataVolume.id,
|
||||
mountPath: env.daytona.dataMountPath,
|
||||
},
|
||||
],
|
||||
volumes: sharedVolumeMounts(input.workerId, sharedVolume.id),
|
||||
},
|
||||
{ timeout: env.daytona.createTimeoutSeconds },
|
||||
)
|
||||
@@ -392,16 +483,7 @@ export async function provisionWorkerOnDaytona(
|
||||
memory: env.daytona.resources.memory,
|
||||
disk: env.daytona.resources.disk,
|
||||
},
|
||||
volumes: [
|
||||
{
|
||||
volumeId: workspaceVolume.id,
|
||||
mountPath: env.daytona.workspaceMountPath,
|
||||
},
|
||||
{
|
||||
volumeId: dataVolume.id,
|
||||
mountPath: env.daytona.dataMountPath,
|
||||
},
|
||||
],
|
||||
volumes: sharedVolumeMounts(input.workerId, sharedVolume.id),
|
||||
},
|
||||
{ timeout: env.daytona.createTimeoutSeconds },
|
||||
)
|
||||
@@ -423,8 +505,8 @@ export async function provisionWorkerOnDaytona(
|
||||
await upsertDaytonaSandbox({
|
||||
workerId: input.workerId,
|
||||
sandboxId: sandbox.id,
|
||||
workspaceVolumeId: workspaceVolume.id,
|
||||
dataVolumeId: dataVolume.id,
|
||||
workspaceVolumeId: sharedVolume.id,
|
||||
dataVolumeId: sharedVolume.id,
|
||||
signedPreviewUrl: preview.url,
|
||||
signedPreviewUrlExpiresAt: signedPreviewRefreshAt(expiresInSeconds),
|
||||
region: sandbox.target ?? null,
|
||||
@@ -440,8 +522,6 @@ export async function provisionWorkerOnDaytona(
|
||||
if (sandbox) {
|
||||
await sandbox.delete(env.daytona.deleteTimeoutSeconds).catch(() => {})
|
||||
}
|
||||
await daytona.volume.delete(workspaceVolume).catch(() => {})
|
||||
await daytona.volume.delete(dataVolume).catch(() => {})
|
||||
throw error
|
||||
}
|
||||
}
|
||||
@@ -461,17 +541,7 @@ export async function deprovisionWorkerOnDaytona(workerId: WorkerId) {
|
||||
console.warn(`[provisioner] failed to delete Daytona sandbox ${record.sandbox_id}: ${message}`)
|
||||
}
|
||||
|
||||
const volumes = await daytona.volume.list().catch(() => [])
|
||||
for (const volumeId of [record.workspace_volume_id, record.data_volume_id]) {
|
||||
const volume = volumes.find((entry) => entry.id === volumeId)
|
||||
if (!volume) {
|
||||
continue
|
||||
}
|
||||
await daytona.volume.delete(volume).catch((error) => {
|
||||
const message = error instanceof Error ? error.message : "unknown_error"
|
||||
console.warn(`[provisioner] failed to delete Daytona volume ${volumeId}: ${message}`)
|
||||
})
|
||||
}
|
||||
await cleanupWorkerDataOnDaytona(daytona, workerId)
|
||||
|
||||
return
|
||||
}
|
||||
@@ -485,15 +555,5 @@ export async function deprovisionWorkerOnDaytona(workerId: WorkerId) {
|
||||
})
|
||||
}
|
||||
|
||||
const volumes = await daytona.volume.list()
|
||||
for (const name of [workspaceVolumeName(workerId), dataVolumeName(workerId)]) {
|
||||
const volume = volumes.find((entry) => entry.name === name)
|
||||
if (!volume) {
|
||||
continue
|
||||
}
|
||||
await daytona.volume.delete(volume).catch((error) => {
|
||||
const message = error instanceof Error ? error.message : "unknown_error"
|
||||
console.warn(`[provisioner] failed to delete Daytona volume ${name}: ${message}`)
|
||||
})
|
||||
}
|
||||
await cleanupWorkerDataOnDaytona(daytona, workerId)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user