revert: Make Wait node fully durable by removing in-memory execution path (#28538)

This commit is contained in:
Declan Carroll
2026-04-16 12:42:22 +01:00
committed by GitHub
parent 56f36a6d19
commit bb9bec3ba4
14 changed files with 350 additions and 807 deletions

View File

@@ -1,54 +0,0 @@
import type { DatabaseConfig } from '@n8n/config';
import type { DataSource } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import { ClockRepository } from '../clock.repository';
describe('ClockRepository', () => {
const databaseConfig = mock<DatabaseConfig>({ type: 'sqlite' });
const dataSource = mock<DataSource>();
let clockRepository: ClockRepository;
beforeEach(() => {
clockRepository = new ClockRepository(dataSource, databaseConfig);
});
afterEach(() => {
jest.restoreAllMocks();
});
describe('getDbTime()', () => {
it('parses SQLite ISO timestamp string into a UTC Date', async () => {
databaseConfig.type = 'sqlite';
dataSource.query.mockResolvedValueOnce([{ now: '2024-01-15T12:30:45.123Z' }]);
const result = await clockRepository.getDbTime();
expect(result).toBeInstanceOf(Date);
expect(result.getUTCFullYear()).toBe(2024);
expect(result.getUTCMonth()).toBe(0); // January
expect(result.getUTCDate()).toBe(15);
expect(result.getUTCHours()).toBe(12);
expect(result.getUTCMinutes()).toBe(30);
});
it('returns the PostgreSQL Date directly', async () => {
databaseConfig.type = 'postgresdb';
const pgNow = new Date('2024-06-01T10:20:30.456Z');
dataSource.query.mockResolvedValueOnce([{ now: pgNow }]);
const result = await clockRepository.getDbTime();
expect(result).toBeInstanceOf(Date);
expect(result).toBe(pgNow);
});
it('throws UnexpectedError when SQLite returns an unparseable string', async () => {
databaseConfig.type = 'sqlite';
dataSource.query.mockResolvedValueOnce([{ now: 'not-a-date' }]);
await expect(clockRepository.getDbTime()).rejects.toThrow('Invalid DB server time');
});
});
});

View File

@@ -1,8 +1,6 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { GlobalConfig } from '@n8n/config';
import { Container } from '@n8n/di';
import { In, LessThan, And, Not, type SelectQueryBuilder } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import { In, LessThan, And, Not } from '@n8n/typeorm';
import type { IExecutionResponse } from 'entities/types-db';
@@ -409,61 +407,6 @@ describe('ExecutionRepository', () => {
});
});
describe('getWaitingExecutions()', () => {
const globalConfig = Container.get(GlobalConfig);
const originalDbType = globalConfig.database.type;
let queryBuilder: jest.Mocked<SelectQueryBuilder<ExecutionEntity>>;
beforeEach(() => {
queryBuilder = mock<SelectQueryBuilder<ExecutionEntity>>();
queryBuilder.select.mockReturnThis();
queryBuilder.where.mockReturnThis();
queryBuilder.andWhere.mockReturnThis();
queryBuilder.orderBy.mockReturnThis();
queryBuilder.getMany.mockResolvedValue([]);
jest.spyOn(executionRepository, 'createQueryBuilder').mockReturnValue(queryBuilder);
});
afterEach(() => {
globalConfig.database.type = originalDbType;
});
it('should filter by status = waiting', async () => {
await executionRepository.getWaitingExecutions();
expect(queryBuilder.andWhere).toHaveBeenCalledWith('e.status = :status', {
status: 'waiting',
});
});
it('should use a DB-clock lookahead condition for PostgreSQL', async () => {
globalConfig.database.type = 'postgresdb';
await executionRepository.getWaitingExecutions();
expect(queryBuilder.where).toHaveBeenCalledWith(
expect.stringContaining("NOW() + INTERVAL '15 seconds'"),
);
});
it('should use a DB-clock lookahead condition for SQLite', async () => {
globalConfig.database.type = 'sqlite';
await executionRepository.getWaitingExecutions();
expect(queryBuilder.where).toHaveBeenCalledWith(
expect.stringContaining("datetime('now', '+15 seconds')"),
);
});
it('should order results by waitTill ASC', async () => {
await executionRepository.getWaitingExecutions();
expect(queryBuilder.orderBy).toHaveBeenCalledWith('e.waitTill', 'ASC');
});
});
describe('setRunning', () => {
beforeEach(() => {
entityManager.transaction.mockImplementation(async (fn: unknown) => {

View File

@@ -1,32 +0,0 @@
import { DatabaseConfig } from '@n8n/config';
import { Service } from '@n8n/di';
import { DataSource } from '@n8n/typeorm';
import { UnexpectedError } from 'n8n-workflow';
/** Provides access to the database server's clock for time-sensitive scheduling. */
@Service()
export class ClockRepository {
constructor(
private readonly dataSource: DataSource,
private readonly databaseConfig: DatabaseConfig,
) {}
async getDbTime(): Promise<Date> {
if (this.databaseConfig.type === 'postgresdb') {
const [{ now }] = await this.dataSource.query<[{ now: Date }]>(
'SELECT CURRENT_TIMESTAMP(3) AS now',
);
return now;
}
// SQLite: use ISO-friendly format directly to avoid JS-side string manipulation
const [{ now }] = await this.dataSource.query<[{ now: string }]>(
"SELECT STRFTIME('%Y-%m-%dT%H:%M:%fZ', 'NOW') AS now",
);
const date = new Date(now);
if (Number.isNaN(date.getTime())) {
throw new UnexpectedError(`Invalid DB server time: ${now}`);
}
return date;
}
}

View File

@@ -602,21 +602,28 @@ export class ExecutionRepository extends Repository<ExecutionEntity> {
return await this.delete({ id: In(executionIds) });
}
async getWaitingExecutions(): Promise<Array<Pick<ExecutionEntity, 'id' | 'waitTill'>>> {
// DB-clock lookahead: 5s poll + 10s buffer = 15s window.
async getWaitingExecutions() {
// Find all the executions which should be triggered in the next 70 seconds
const waitTill = new Date(Date.now() + 70000);
const where: FindOptionsWhere<ExecutionEntity> = {
waitTill: LessThanOrEqual(waitTill),
status: Not('crashed'),
};
const dbType = this.globalConfig.database.type;
if (dbType === 'sqlite') {
// This is needed because of issue in TypeORM <> SQLite:
// https://github.com/typeorm/typeorm/issues/2286
where.waitTill = LessThanOrEqual(DateUtils.mixedDateToUtcDatetimeString(waitTill));
}
const lookaheadCondition =
dbType === 'postgresdb'
? "e.waitTill <= NOW() + INTERVAL '15 seconds'"
: "e.waitTill <= datetime('now', '+15 seconds')";
return await this.createQueryBuilder('e')
.select(['e.id', 'e.waitTill'])
.where(lookaheadCondition)
.andWhere('e.status = :status', { status: 'waiting' })
.orderBy('e.waitTill', 'ASC')
.getMany();
return await this.findMultipleExecutions({
select: ['id', 'waitTill'],
where,
order: {
waitTill: 'ASC',
},
});
}
async getExecutionsCountForPublicApi(params: {

View File

@@ -4,7 +4,6 @@ export { ApiKeyRepository } from './api-key.repository';
export { AuthIdentityRepository } from './auth-identity.repository';
export { AuthProviderSyncHistoryRepository } from './auth-provider-sync-history.repository';
export { BinaryDataRepository } from './binary-data.repository';
export { ClockRepository } from './clock.repository';
export { CredentialsRepository } from './credentials.repository';
export { CredentialDependencyRepository } from './credential-dependency.repository';
export { DeploymentKeyRepository } from './deployment-key.repository';

View File

@@ -1,77 +0,0 @@
import type { ClockRepository } from '@n8n/db';
import { mock } from 'jest-mock-extended';
import { DbClock } from '@/services/db-clock.service';
jest.useFakeTimers({ advanceTimers: true });
describe('DbClock', () => {
const clockRepository = mock<ClockRepository>();
let dbClock: DbClock;
beforeEach(() => {
dbClock = new DbClock(clockRepository);
});
afterEach(() => {
jest.clearAllMocks();
jest.clearAllTimers();
});
it('should fetch DB time from the repository on first call', async () => {
const dbTime = new Date();
clockRepository.getDbTime.mockResolvedValue(dbTime);
const result = await dbClock.getApproximateDbTime();
expect(clockRepository.getDbTime).toHaveBeenCalledTimes(1);
expect(result.getTime()).toBeCloseTo(dbTime.getTime(), -1);
});
it('should reuse cached DB time within 60s TTL', async () => {
clockRepository.getDbTime.mockResolvedValue(new Date());
await dbClock.getApproximateDbTime();
await dbClock.getApproximateDbTime();
expect(clockRepository.getDbTime).toHaveBeenCalledTimes(1);
});
it('should refresh DB time after 60s TTL expires', async () => {
clockRepository.getDbTime.mockResolvedValue(new Date());
await dbClock.getApproximateDbTime();
jest.advanceTimersByTime(60_001);
await dbClock.getApproximateDbTime();
expect(clockRepository.getDbTime).toHaveBeenCalledTimes(2);
});
it('should interpolate DB time between cache refreshes', async () => {
const dbTimeAtFetch = new Date(Date.now() - 5_000);
clockRepository.getDbTime.mockResolvedValue(dbTimeAtFetch);
await dbClock.getApproximateDbTime();
jest.advanceTimersByTime(10_000);
const result = await dbClock.getApproximateDbTime();
// Approximate = dbTimeAtFetch + 10s elapsed (RTT is ~0 with mocks)
const expected = dbTimeAtFetch.getTime() + 10_000;
expect(result.getTime()).toBe(expected);
expect(clockRepository.getDbTime).toHaveBeenCalledTimes(1);
});
it('should clear cache on resetCache()', async () => {
clockRepository.getDbTime.mockResolvedValue(new Date());
await dbClock.getApproximateDbTime();
dbClock.resetCache();
await dbClock.getApproximateDbTime();
expect(clockRepository.getDbTime).toHaveBeenCalledTimes(2);
});
});

View File

@@ -1,14 +1,13 @@
/* eslint-disable @typescript-eslint/unbound-method */
import { mockLogger } from '@n8n/backend-test-utils';
import type { Project, IExecutionResponse, ExecutionRepository, ExecutionEntity } from '@n8n/db';
import type { Project, IExecutionResponse, ExecutionRepository } from '@n8n/db';
import { mock, captor } from 'jest-mock-extended';
import type { ErrorReporter, InstanceSettings } from 'n8n-core';
import type { InstanceSettings } from 'n8n-core';
import type { IWorkflowBase, IRun, INode, IExecuteData, ITaskData } from 'n8n-workflow';
import { createDeferredPromise, createRunExecutionData, WAIT_INDEFINITELY } from 'n8n-workflow';
import type { ActiveExecutions } from '@/active-executions';
import type { MultiMainSetup } from '@/scaling/multi-main-setup.ee';
import type { DbClock } from '@/services/db-clock.service';
import type { OwnershipService } from '@/services/ownership.service';
import { WaitTracker } from '@/wait-tracker';
import type { WorkflowRunner } from '@/workflow-runner';
@@ -20,8 +19,6 @@ describe('WaitTracker', () => {
const ownershipService = mock<OwnershipService>();
const workflowRunner = mock<WorkflowRunner>();
const executionRepository = mock<ExecutionRepository>();
const dbClock = mock<DbClock>();
const errorReporter = mock<ErrorReporter>();
const multiMainSetup = mock<MultiMainSetup>();
const instanceSettings = mock<InstanceSettings>({ isLeader: true, isMultiMain: false });
@@ -38,12 +35,9 @@ describe('WaitTracker', () => {
startedAt: undefined,
});
execution.workflowData = mock<IWorkflowBase>({ id: 'abcd' });
// Minimal ExecutionEntity for getWaitingExecutions — only id and waitTill are used by WaitTracker
const waitingEntity = mock<ExecutionEntity>({ id: execution.id, waitTill: execution.waitTill });
let waitTracker: WaitTracker;
beforeEach(() => {
dbClock.getApproximateDbTime.mockResolvedValue(new Date());
waitTracker = new WaitTracker(
mockLogger(),
executionRepository,
@@ -51,20 +45,17 @@ describe('WaitTracker', () => {
activeExecutions,
workflowRunner,
instanceSettings,
dbClock,
errorReporter,
);
multiMainSetup.on.mockReturnThis();
});
afterEach(() => {
jest.clearAllMocks();
jest.clearAllTimers();
});
describe('init()', () => {
it('should query DB for waiting executions if leader', () => {
executionRepository.getWaitingExecutions.mockResolvedValue([waitingEntity]);
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
waitTracker.init();
@@ -94,7 +85,7 @@ describe('WaitTracker', () => {
executionRepository.findSingleExecution
.calledWith(execution.id)
.mockResolvedValue(execution);
executionRepository.getWaitingExecutions.mockResolvedValue([waitingEntity]);
executionRepository.getWaitingExecutions.mockResolvedValue([execution]);
ownershipService.getWorkflowProjectCached.mockResolvedValue(project);
startExecutionSpy = jest
@@ -569,224 +560,6 @@ describe('WaitTracker', () => {
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
});
it('should poll every 5 seconds', () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
const setIntervalSpy = jest.spyOn(global, 'setInterval');
setIntervalSpy.mockClear(); // ensure no prior calls are counted
waitTracker.init();
expect(setIntervalSpy).toHaveBeenCalledTimes(1);
expect(setIntervalSpy).toHaveBeenCalledWith(expect.any(Function), 5000);
});
});
describe('getWaitingExecutions()', () => {
it('should use server time for triggerTime calculation', async () => {
// Server clock is 10s behind local clock
const serverTime = new Date(Date.now() - 10_000);
dbClock.getApproximateDbTime.mockResolvedValue(serverTime);
const waitTill = new Date(Date.now() + 5_000);
const delayedExecution = mock<ExecutionEntity>({ id: 'delayed-exec', waitTill });
executionRepository.getWaitingExecutions.mockResolvedValue([delayedExecution]);
const setTimeoutSpy = jest.spyOn(global, 'setTimeout');
await waitTracker.getWaitingExecutions();
// triggerTime = waitTill - serverTime = ~15s (not ~5s from Date.now())
const expectedDelay = waitTill.getTime() - serverTime.getTime();
expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), expectedDelay);
});
it('should fire immediately for past-due executions', async () => {
// Server clock 5s ahead — waitTill is already past from the DB's perspective
const serverTime = new Date(Date.now() + 5_000);
dbClock.getApproximateDbTime.mockResolvedValue(serverTime);
const waitTill = new Date(Date.now() + 2_000);
const pastDueExecution = mock<ExecutionEntity>({ id: 'past-due-exec', waitTill });
executionRepository.getWaitingExecutions.mockResolvedValue([pastDueExecution]);
const setTimeoutSpy = jest.spyOn(global, 'setTimeout');
setTimeoutSpy.mockClear();
const startExecutionSpy = jest
.spyOn(waitTracker, 'startExecution')
.mockImplementation(async () => {});
await waitTracker.getWaitingExecutions();
// Math.max(triggerTime, 0) clamps a negative delay to 0 — fires immediately
expect(setTimeoutSpy).toHaveBeenCalledWith(expect.any(Function), 0);
jest.advanceTimersByTime(0);
expect(startExecutionSpy).toHaveBeenCalledWith('past-due-exec');
});
it('should warn on clock skew exceeding 2 seconds', async () => {
// mockLogger().scoped() returns a new inner mock — make scoped() return itself
// so that warn calls on the scoped logger are captured on our mock
const logger = mockLogger();
(logger.scoped as jest.Mock).mockReturnValue(logger);
const skewedWaitTracker = new WaitTracker(
logger,
executionRepository,
ownershipService,
activeExecutions,
workflowRunner,
instanceSettings,
dbClock,
errorReporter,
);
// Server clock is 3s ahead — exceeds 2s threshold
dbClock.getApproximateDbTime.mockResolvedValue(new Date(Date.now() + 3_000));
executionRepository.getWaitingExecutions.mockResolvedValue([]);
await skewedWaitTracker.getWaitingExecutions();
expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('Clock skew detected'));
});
it('should delegate DB time to DbClock', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
await waitTracker.getWaitingExecutions();
expect(dbClock.getApproximateDbTime).toHaveBeenCalledTimes(1);
});
});
describe('race condition guards', () => {
describe('overlapping poll guard', () => {
it('should skip poll when previous poll is still in progress', async () => {
// Make getWaitingExecutions hang on the first call by returning a never-resolving promise
let resolveFirstPoll!: (value: unknown[]) => void;
const slowPoll = new Promise<unknown[]>((resolve) => {
resolveFirstPoll = resolve;
});
executionRepository.getWaitingExecutions.mockReturnValueOnce(slowPoll as Promise<never>);
// Start first poll — it will block on the slow DB query
const firstPoll = waitTracker.getWaitingExecutions();
// Second poll should bail out immediately
executionRepository.getWaitingExecutions.mockResolvedValue([]);
await waitTracker.getWaitingExecutions();
// Only the first call to getWaitingExecutions should have reached the DB
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(1);
// Unblock and clean up
resolveFirstPoll([]);
await firstPoll;
});
it('should allow polling again after previous poll completes', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
await waitTracker.getWaitingExecutions();
await waitTracker.getWaitingExecutions();
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(2);
});
it('should allow polling again after previous poll errors', async () => {
executionRepository.getWaitingExecutions.mockRejectedValueOnce(
new Error('DB connection lost'),
);
await expect(waitTracker.getWaitingExecutions()).rejects.toThrow('DB connection lost');
executionRepository.getWaitingExecutions.mockResolvedValue([]);
await waitTracker.getWaitingExecutions();
expect(executionRepository.getWaitingExecutions).toHaveBeenCalledTimes(2);
});
});
describe('execution stays guarded until workflowRunner.run() settles', () => {
const raceExecId = 'race-exec';
const raceExecution = mock<IExecutionResponse>({
id: raceExecId,
finished: false,
waitTill: new Date(Date.now() + 1_000),
mode: 'manual',
data: mock({ pushRef: 'push_ref', parentExecution: undefined }),
startedAt: undefined,
});
raceExecution.workflowData = mock<IWorkflowBase>({ id: 'race-wf' });
it('should keep execution in waitingExecutions during async startExecution work', async () => {
const waitTill = new Date(Date.now() + 1_000);
const entity = mock<ExecutionEntity>({ id: raceExecId, waitTill });
executionRepository.getWaitingExecutions.mockResolvedValue([entity]);
dbClock.getApproximateDbTime.mockResolvedValue(new Date());
// Set up the timer via poll
await waitTracker.getWaitingExecutions();
expect(waitTracker.has(raceExecId)).toBe(true);
// Make workflowRunner.run() hang so we can observe the guard
let resolveRun!: (value: string) => void;
const runPromise = new Promise<string>((resolve) => {
resolveRun = resolve;
});
workflowRunner.run.mockReturnValueOnce(runPromise);
executionRepository.findSingleExecution
.calledWith(raceExecId)
.mockResolvedValue(raceExecution);
ownershipService.getWorkflowProjectCached.mockResolvedValue(project);
// Fire the timer — startExecution begins but blocks on workflowRunner.run()
const startPromise = waitTracker.startExecution(raceExecId);
// Execution should STILL be in waitingExecutions (guard active)
expect(waitTracker.has(raceExecId)).toBe(true);
// A second poll should skip this execution because the guard is still active
executionRepository.getWaitingExecutions.mockResolvedValue([entity]);
const setTimeoutSpy = jest.spyOn(global, 'setTimeout');
const callCountBefore = setTimeoutSpy.mock.calls.length;
await waitTracker.getWaitingExecutions();
// No new timer should have been set for 'race-exec'
const newTimerCalls = setTimeoutSpy.mock.calls.slice(callCountBefore);
expect(newTimerCalls).toHaveLength(0);
// Clean up
resolveRun('exec-id');
await startPromise;
// Now the guard should be released
expect(waitTracker.has(raceExecId)).toBe(false);
});
it('should release guard even when workflowRunner.run() throws', async () => {
executionRepository.getWaitingExecutions.mockResolvedValue([]);
dbClock.getApproximateDbTime.mockResolvedValue(new Date());
// Set up a waiting execution via poll
const waitTill = new Date(Date.now() + 1_000);
const entity = mock<ExecutionEntity>({ id: raceExecId, waitTill });
executionRepository.getWaitingExecutions.mockResolvedValue([entity]);
await waitTracker.getWaitingExecutions();
executionRepository.findSingleExecution
.calledWith(raceExecId)
.mockResolvedValue(raceExecution);
ownershipService.getWorkflowProjectCached.mockResolvedValue(project);
workflowRunner.run.mockRejectedValueOnce(new Error('Runner crashed'));
await expect(waitTracker.startExecution(raceExecId)).rejects.toThrow('Runner crashed');
// Guard should be released so future polls can re-schedule it
expect(waitTracker.has(raceExecId)).toBe(false);
});
});
});
describe('multi-main setup', () => {
@@ -806,8 +579,6 @@ describe('WaitTracker', () => {
activeExecutions,
workflowRunner,
mock<InstanceSettings>({ isLeader: false, isMultiMain: false }),
dbClock,
errorReporter,
);
executionRepository.getWaitingExecutions.mockResolvedValue([]);

View File

@@ -5,6 +5,7 @@ import type { IExecutionResponse } from '@n8n/db';
import { ExecutionEntity, ExecutionRepository } from '@n8n/db';
import { Container } from '@n8n/di';
import type { SelectQueryBuilder } from '@n8n/typeorm';
import { Not, LessThanOrEqual } from '@n8n/typeorm';
import { mock } from 'jest-mock-extended';
import { BinaryDataService } from 'n8n-core';
import type { IRunExecutionData, IWorkflowBase } from 'n8n-workflow';
@@ -33,33 +34,22 @@ describe('ExecutionRepository', () => {
'on %s, should be called with expected args',
async (dbType) => {
globalConfig.database.type = dbType;
const qb = {
select: jest.fn().mockReturnThis(),
where: jest.fn().mockReturnThis(),
andWhere: jest.fn().mockReturnThis(),
orderBy: jest.fn().mockReturnThis(),
getMany: jest.fn().mockResolvedValue([]),
};
jest
.spyOn(executionRepository, 'createQueryBuilder')
.mockReturnValue(qb as unknown as SelectQueryBuilder<ExecutionEntity>);
entityManager.find.mockResolvedValueOnce([]);
await executionRepository.getWaitingExecutions();
expect(executionRepository.createQueryBuilder).toHaveBeenCalledWith('e');
expect(qb.select).toHaveBeenCalledWith(['e.id', 'e.waitTill']);
const expectedCondition =
dbType === 'postgresdb'
? "e.waitTill <= NOW() + INTERVAL '15 seconds'"
: "e.waitTill <= datetime('now', '+15 seconds')";
expect(qb.where).toHaveBeenCalledWith(expectedCondition);
expect(qb.andWhere).toHaveBeenCalledWith('e.status = :status', {
status: 'waiting',
expect(entityManager.find).toHaveBeenCalledWith(ExecutionEntity, {
order: { waitTill: 'ASC' },
select: ['id', 'waitTill'],
where: {
status: Not('crashed'),
waitTill: LessThanOrEqual(
dbType === 'sqlite'
? '2023-12-28 12:36:06.789'
: new Date('2023-12-28T12:36:06.789Z'),
),
},
});
expect(qb.orderBy).toHaveBeenCalledWith('e.waitTill', 'ASC');
expect(qb.getMany).toHaveBeenCalled();
},
);
});

View File

@@ -1,42 +0,0 @@
import { ClockRepository } from '@n8n/db';
import { Service } from '@n8n/di';
/**
* Provides an approximation of the DB server's current time.
*
* Fetches from the DB at most once every 60s and approximates intermediate
* values by adding elapsed local wall-clock time since the last query.
* Compensates for query round-trip time using NTP-style half-RTT offset.
*/
@Service()
export class DbClock {
private cache: { dbTime: Date; localTimeAtQuery: number } | null = null;
constructor(private readonly clockRepository: ClockRepository) {}
async getApproximateDbTime(): Promise<Date> {
const nowMs = Date.now();
if (!this.isCacheStale(nowMs)) {
const elapsed = nowMs - this.cache!.localTimeAtQuery;
return new Date(this.cache!.dbTime.getTime() + elapsed);
}
const beforeMs = Date.now();
const dbTime = await this.clockRepository.getDbTime();
const afterMs = Date.now();
const halfRtt = (afterMs - beforeMs) / 2;
this.setCache(new Date(dbTime.getTime() + halfRtt), afterMs);
return this.cache!.dbTime;
}
resetCache() {
this.cache = null;
}
private isCacheStale(nowMs: number): boolean {
return this.cache === null || nowMs - this.cache.localTimeAtQuery >= 60_000;
}
private setCache(dbTime: Date, localTimeAtQuery: number) {
this.cache = { dbTime, localTimeAtQuery };
}
}

View File

@@ -2,12 +2,11 @@ import { Logger } from '@n8n/backend-common';
import { ExecutionRepository } from '@n8n/db';
import { OnLeaderStepdown, OnLeaderTakeover } from '@n8n/decorators';
import { Service } from '@n8n/di';
import { ErrorReporter, InstanceSettings } from 'n8n-core';
import { InstanceSettings } from 'n8n-core';
import { UnexpectedError, type IWorkflowExecutionDataProcess } from 'n8n-workflow';
import { ActiveExecutions } from '@/active-executions';
import { ExecutionAlreadyResumingError } from '@/errors/execution-already-resuming.error';
import { DbClock } from '@/services/db-clock.service';
import { OwnershipService } from '@/services/ownership.service';
import { WorkflowRunner } from '@/workflow-runner';
import {
@@ -26,9 +25,6 @@ export class WaitTracker {
mainTimer: NodeJS.Timeout;
/** Guards against overlapping poll invocations when DB queries take longer than the poll interval. */
private isPolling = false;
constructor(
private readonly logger: Logger,
private readonly executionRepository: ExecutionRepository,
@@ -36,8 +32,6 @@ export class WaitTracker {
private readonly activeExecutions: ActiveExecutions,
private readonly workflowRunner: WorkflowRunner,
private readonly instanceSettings: InstanceSettings,
private readonly dbClock: DbClock,
private readonly errorReporter: ErrorReporter,
) {
this.logger = this.logger.scoped('waiting-executions');
}
@@ -52,9 +46,10 @@ export class WaitTracker {
@OnLeaderTakeover()
private startTracking() {
// Poll every 60 seconds a list of upcoming executions
this.mainTimer = setInterval(() => {
void this.getWaitingExecutions();
}, 5000);
}, 60000);
void this.getWaitingExecutions();
@@ -62,64 +57,32 @@ export class WaitTracker {
}
async getWaitingExecutions() {
if (this.isPolling) {
this.logger.debug('Skipping poll — previous poll still in progress');
this.logger.debug('Querying database for waiting executions');
const executions = await this.executionRepository.getWaitingExecutions();
if (executions.length === 0) {
return;
}
this.isPolling = true;
try {
const [executions, dbTime] = await Promise.all([
this.executionRepository.getWaitingExecutions(),
this.dbClock.getApproximateDbTime(),
]);
const executionIds = executions.map((execution) => execution.id).join(', ');
this.logger.debug(
`Found ${executions.length} executions. Setting timer for IDs: ${executionIds}`,
);
const skewMs = dbTime.getTime() - Date.now();
if (Math.abs(skewMs) > 2000) {
this.logger.warn(
`Clock skew detected: this instance is ${Math.abs(skewMs)}ms ${skewMs > 0 ? 'behind' : 'ahead of'} the database server`,
);
}
// Add timers for each waiting execution that they get started at the correct time
if (executions.length === 0) {
return;
}
const newExecutions = executions.filter((e) => this.waitingExecutions[e.id] === undefined);
if (newExecutions.length > 0) {
const executionIds = newExecutions.map((e) => e.id).join(', ');
this.logger.debug(
`Found ${newExecutions.length} new waiting execution(s). Setting timer for IDs: ${executionIds}`,
);
}
for (const execution of newExecutions) {
const executionId = execution.id;
if (execution.waitTill === null || execution.waitTill === undefined) {
this.errorReporter.error(
new UnexpectedError(
'Polling returned an execution without waitTill — this should never happen',
{ extra: { executionId } },
),
{ level: 'fatal' },
);
continue;
}
const triggerTime = execution.waitTill.getTime() - dbTime.getTime();
for (const execution of executions) {
const executionId = execution.id;
if (this.waitingExecutions[executionId] === undefined) {
const triggerTime = execution.waitTill!.getTime() - new Date().getTime();
this.waitingExecutions[executionId] = {
executionId,
timer: setTimeout(
() => {
void this.startExecution(executionId);
},
Math.max(triggerTime, 0),
),
timer: setTimeout(() => {
void this.startExecution(executionId);
}, triggerTime),
};
}
} finally {
this.isPolling = false;
}
}
@@ -133,71 +96,74 @@ export class WaitTracker {
async startExecution(executionId: string) {
this.logger.debug(`Resuming execution ${executionId}`, { executionId });
delete this.waitingExecutions[executionId];
// Get the data to execute
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
if (!fullExecutionData) {
throw new UnexpectedError('Execution does not exist.', { extra: { executionId } });
}
if (fullExecutionData.finished) {
throw new UnexpectedError('The execution did succeed and can so not be started again.');
}
if (!fullExecutionData.workflowData.id) {
throw new UnexpectedError('Only saved workflows can be resumed.');
}
const workflowId = fullExecutionData.workflowData.id;
const project = await this.ownershipService.getWorkflowProjectCached(workflowId);
const data: IWorkflowExecutionDataProcess = {
executionMode: fullExecutionData.mode,
executionData: fullExecutionData.data,
workflowData: fullExecutionData.workflowData,
projectId: project.id,
pushRef: fullExecutionData.data.pushRef,
startedAt: fullExecutionData.startedAt,
};
// Start the execution again
try {
const fullExecutionData = await this.executionRepository.findSingleExecution(executionId, {
includeData: true,
unflattenData: true,
});
if (!fullExecutionData) {
throw new UnexpectedError('Execution does not exist.', { extra: { executionId } });
}
if (fullExecutionData.finished) {
throw new UnexpectedError('The execution did succeed and can so not be started again.');
await this.workflowRunner.run(data, false, false, executionId);
} catch (error) {
if (error instanceof ExecutionAlreadyResumingError) {
// This execution is already being resumed by another child execution
// This is expected in "run once for each item" mode when multiple children complete
this.logger.debug(
`Execution ${executionId} is already being resumed, skipping duplicate resume`,
{ executionId },
);
return;
}
// Rethrow any other errors
throw error;
}
if (!fullExecutionData.workflowData.id) {
throw new UnexpectedError('Only saved workflows can be resumed.');
}
const workflowId = fullExecutionData.workflowData.id;
const project = await this.ownershipService.getWorkflowProjectCached(workflowId);
const data: IWorkflowExecutionDataProcess = {
executionMode: fullExecutionData.mode,
executionData: fullExecutionData.data,
workflowData: fullExecutionData.workflowData,
projectId: project.id,
pushRef: fullExecutionData.data.pushRef,
startedAt: fullExecutionData.startedAt,
};
try {
await this.workflowRunner.run(data, false, false, executionId);
} catch (error) {
if (error instanceof ExecutionAlreadyResumingError) {
this.logger.debug(
`Execution ${executionId} is already being resumed, skipping duplicate resume`,
{ executionId },
const { parentExecution } = fullExecutionData.data;
if (shouldRestartParentExecution(parentExecution)) {
// on child execution completion, resume parent execution
void this.activeExecutions
.getPostExecutePromise(executionId)
.then(async (subworkflowResults) => {
if (!subworkflowResults) return;
if (subworkflowResults.status === 'waiting') return; // The child execution is waiting, not completing.
await updateParentExecutionWithChildResults(
this.executionRepository,
parentExecution.executionId,
subworkflowResults,
);
return;
}
throw error;
}
const { parentExecution } = fullExecutionData.data;
if (shouldRestartParentExecution(parentExecution)) {
void this.activeExecutions
.getPostExecutePromise(executionId)
.then(async (subworkflowResults) => {
if (!subworkflowResults) return;
if (subworkflowResults.status === 'waiting') return; // The child execution is waiting, not completing.
await updateParentExecutionWithChildResults(
this.executionRepository,
parentExecution.executionId,
subworkflowResults,
);
return subworkflowResults;
})
.then((subworkflowResults) => {
if (!subworkflowResults) return;
if (subworkflowResults.status === 'waiting') return; // The child execution is waiting, not completing.
void this.startExecution(parentExecution.executionId);
});
}
} finally {
delete this.waitingExecutions[executionId];
return subworkflowResults;
})
.then((subworkflowResults) => {
if (!subworkflowResults) return;
if (subworkflowResults.status === 'waiting') return; // The child execution is waiting, not completing.
void this.startExecution(parentExecution.executionId);
});
}
}

View File

@@ -61,132 +61,6 @@ describe('ExecutionRepository', () => {
});
});
});
describe('getWaitingExecutions', () => {
it('should return waiting executions within the 15s lookahead window', async () => {
const executionRepo = Container.get(ExecutionRepository);
const workflow = await createWorkflow();
// waitTill in the past — should be returned
await executionRepo.insert({
workflowId: workflow.id,
mode: 'manual',
startedAt: new Date(),
status: 'waiting',
finished: false,
waitTill: new Date(Date.now() - 5_000),
createdAt: new Date(),
});
// waitTill 10s from now — within 15s lookahead, should be returned
await executionRepo.insert({
workflowId: workflow.id,
mode: 'manual',
startedAt: new Date(),
status: 'waiting',
finished: false,
waitTill: new Date(Date.now() + 10_000),
createdAt: new Date(),
});
const results = await executionRepo.getWaitingExecutions();
expect(results).toHaveLength(2);
});
it('should exclude waiting executions beyond the 15s lookahead window', async () => {
const executionRepo = Container.get(ExecutionRepository);
const workflow = await createWorkflow();
// waitTill 1 hour from now — well outside the 15s lookahead
await executionRepo.insert({
workflowId: workflow.id,
mode: 'manual',
startedAt: new Date(),
status: 'waiting',
finished: false,
waitTill: new Date(Date.now() + 3_600_000),
createdAt: new Date(),
});
const results = await executionRepo.getWaitingExecutions();
expect(results).toHaveLength(0);
});
it('should exclude non-waiting executions even if waitTill is in range', async () => {
const executionRepo = Container.get(ExecutionRepository);
const workflow = await createWorkflow();
await executionRepo.insert({
workflowId: workflow.id,
mode: 'manual',
startedAt: new Date(),
status: 'success',
finished: true,
waitTill: new Date(),
createdAt: new Date(),
});
const results = await executionRepo.getWaitingExecutions();
expect(results).toHaveLength(0);
});
it('should order results by waitTill ascending', async () => {
const executionRepo = Container.get(ExecutionRepository);
const workflow = await createWorkflow();
const laterWaitTill = new Date(Date.now() + 5_000);
const earlierWaitTill = new Date(Date.now() - 5_000);
const { identifiers: laterIds } = await executionRepo.insert({
workflowId: workflow.id,
mode: 'manual',
startedAt: new Date(),
status: 'waiting',
finished: false,
waitTill: laterWaitTill,
createdAt: new Date(),
});
const { identifiers: earlierIds } = await executionRepo.insert({
workflowId: workflow.id,
mode: 'manual',
startedAt: new Date(),
status: 'waiting',
finished: false,
waitTill: earlierWaitTill,
createdAt: new Date(),
});
const results = await executionRepo.getWaitingExecutions();
expect(results).toHaveLength(2);
expect(String(results[0].id)).toBe(String(earlierIds[0].id));
expect(String(results[1].id)).toBe(String(laterIds[0].id));
});
it('should only return id and waitTill fields', async () => {
const executionRepo = Container.get(ExecutionRepository);
const workflow = await createWorkflow();
await executionRepo.insert({
workflowId: workflow.id,
mode: 'manual',
startedAt: new Date(),
status: 'waiting',
finished: false,
waitTill: new Date(),
createdAt: new Date(),
});
const results = await executionRepo.getWaitingExecutions();
expect(results).toHaveLength(1);
expect(Object.keys(results[0]).sort()).toEqual(['id', 'waitTill']);
});
});
describe('findByStopExecutionsFilter', () => {
it('should find executions by status', async () => {
const executionRepo = Container.get(ExecutionRepository);

View File

@@ -591,6 +591,21 @@ export class Wait extends Webhook {
}
}
const waitValue = Math.max(waitTill.getTime() - new Date().getTime(), 0);
if (waitValue < 65000) {
// If wait time is shorter than 65 seconds leave execution active because
// we just check the database every 60 seconds.
return await new Promise((resolve, _reject) => {
const timer = setTimeout(() => resolve([context.getInputData()]), waitValue);
context.onExecutionCancellation(() => {
clearTimeout(timer);
resolve([context.getInputData()]);
});
});
}
// If longer than 65 seconds put execution to wait
return await this.putToWait(context, waitTill);
}

View File

@@ -6,13 +6,17 @@ import { NodeOperationError, type IExecuteFunctions } from 'n8n-workflow';
import { Wait } from '../Wait.node';
describe('Execute Wait Node', () => {
let timer: NodeJS.Timeout;
const { clearInterval, setInterval } = global;
const nextDay = DateTime.now().startOf('day').plus({ days: 1 });
beforeAll(() => {
timer = setInterval(() => jest.advanceTimersByTime(1000), 10);
jest.useFakeTimers().setSystemTime(new Date('2025-01-01'));
});
afterAll(() => {
clearInterval(timer);
jest.useRealTimers();
});
@@ -63,36 +67,45 @@ describe('Execute Wait Node', () => {
},
);
test('should resolve with input data if canceled', async () => {
const putExecutionToWaitSpy = jest.fn();
const waitNode = new Wait();
let cancelSignal: (() => void) | null = null;
const inputData = [{ json: { test: 'data' } }];
const executeFunctionsMock = mock<IExecuteFunctions>({
getNodeParameter: jest.fn().mockImplementation((paramName: string) => {
if (paramName === 'resume') return 'timeInterval';
if (paramName === 'unit') return 'seconds';
if (paramName === 'amount') return 60;
}),
getTimezone: jest.fn().mockReturnValue('UTC'),
putExecutionToWait: putExecutionToWaitSpy,
getInputData: jest.fn(() => inputData),
getNode: jest.fn(),
onExecutionCancellation: (handler) => {
cancelSignal = handler;
},
});
const waitPromise = waitNode.execute(executeFunctionsMock);
for (let index = 0; index < 20; index++) {
await new Promise((r) => setTimeout(r, 10));
if (cancelSignal !== null) break;
}
expect(cancelSignal).not.toBeNull();
cancelSignal!();
await expect(waitPromise).resolves.toEqual([inputData]);
});
describe('Validation', () => {
describe('Time interval', () => {
it.each([
// Previously in-memory path (< 65s) — now all go through putToWait
{
unit: 'seconds',
amount: 5,
expectedWaitTill: () => DateTime.now().plus({ seconds: 5 }).toJSDate(),
},
{
unit: 'seconds',
amount: 30,
expectedWaitTill: () => DateTime.now().plus({ seconds: 30 }).toJSDate(),
},
{
unit: 'seconds',
amount: 60,
expectedWaitTill: () => DateTime.now().plus({ seconds: 60 }).toJSDate(),
},
{
unit: 'seconds',
amount: 64,
expectedWaitTill: () => DateTime.now().plus({ seconds: 64 }).toJSDate(),
},
// DB-persisted path (>= 65s)
{
unit: 'seconds',
amount: 66,
expectedWaitTill: () => DateTime.now().plus({ seconds: 66 }).toJSDate(),
},
{
unit: 'seconds',
amount: 300,
@@ -116,6 +129,7 @@ describe('Execute Wait Node', () => {
{
unit: 'seconds',
amount: 0,
mode: 'timeout',
expectedWaitTill: () => DateTime.now().toJSDate(),
},
{
@@ -135,7 +149,7 @@ describe('Execute Wait Node', () => {
},
])(
'Validate wait unit: $unit, amount: $amount',
async ({ unit, amount, expectedWaitTill, error }) => {
async ({ unit, amount, expectedWaitTill, error, mode }) => {
const putExecutionToWaitSpy = jest.fn();
const waitNode = new Wait();
const inputData = [{ json: { inputData: true } }];
@@ -152,9 +166,16 @@ describe('Execute Wait Node', () => {
});
if (!error) {
// All time-based waits are now persisted to DB via putToWait
await expect(waitNode.execute(executeFunctionsMock)).resolves.not.toThrow();
expect(putExecutionToWaitSpy).toHaveBeenCalledWith(expectedWaitTill?.());
if (mode === 'timeout') {
// for short wait times (<65s) a simple timeout is used
const resultPromise = waitNode.execute(executeFunctionsMock);
jest.runAllTimers();
await expect(resultPromise).resolves.toEqual([inputData]);
} else {
// for longer wait times (>=65s) the execution is put to wait
await expect(waitNode.execute(executeFunctionsMock)).resolves.not.toThrow();
expect(putExecutionToWaitSpy).toHaveBeenCalledWith(expectedWaitTill?.());
}
} else {
await expect(waitNode.execute(executeFunctionsMock)).rejects.toThrowError(error);
}

View File

@@ -0,0 +1,162 @@
{
"name": "[Unit Test] Wait Node",
"nodes": [
{
"parameters": {},
"id": "76e5dcfd-fdc7-472f-8832-bccc0eb122c0",
"name": "When clicking \"Execute Workflow\"",
"type": "n8n-nodes-base.manualTrigger",
"typeVersion": 1,
"position": [120, 420]
},
{
"parameters": {
"amount": 42,
"unit": "seconds"
},
"id": "37f2c758-6fb2-43ce-86ae-ca11ec957cbd",
"name": "Wait",
"type": "n8n-nodes-base.wait",
"typeVersion": 1,
"position": [560, 420],
"webhookId": "35edc971-c3e4-48cf-835d-4d73a4fd1fd8"
},
{
"parameters": {
"conditions": {
"number": [
{
"value1": "={{ parseInt($json.afterTimestamp) }}",
"operation": "largerEqual",
"value2": "={{ parseInt($json.startTimestamp) + 42 }}"
}
]
}
},
"id": "c5c53934-2677-4adf-a4df-b32f3b0642a2",
"name": "IF",
"type": "n8n-nodes-base.if",
"typeVersion": 1,
"position": [960, 420]
},
{
"parameters": {
"keepOnlySet": true,
"values": {
"boolean": [
{
"name": "success",
"value": true
}
]
},
"options": {}
},
"id": "a78417b6-d3f5-4bbc-916a-d4b9d46961cc",
"name": "Set1",
"type": "n8n-nodes-base.set",
"typeVersion": 1,
"position": [1180, 400]
},
{
"parameters": {
"value": "={{ $now }}",
"dataPropertyName": "afterTimestamp",
"toFormat": "X",
"options": {}
},
"id": "94f042ea-49d5-44ea-9ccf-93dac8d27d4a",
"name": "After",
"type": "n8n-nodes-base.dateTime",
"typeVersion": 1,
"position": [760, 420]
},
{
"parameters": {
"value": "={{ $now }}",
"dataPropertyName": "startTimestamp",
"toFormat": "X",
"options": {}
},
"id": "43f8a396-1bf7-484e-962c-120f677dfa51",
"name": "Before",
"type": "n8n-nodes-base.dateTime",
"typeVersion": 1,
"position": [360, 420]
}
],
"pinData": {
"Set1": [
{
"json": {
"success": true
}
}
]
},
"connections": {
"When clicking \"Execute Workflow\"": {
"main": [
[
{
"node": "Before",
"type": "main",
"index": 0
}
]
]
},
"Wait": {
"main": [
[
{
"node": "After",
"type": "main",
"index": 0
}
]
]
},
"IF": {
"main": [
[
{
"node": "Set1",
"type": "main",
"index": 0
}
]
]
},
"After": {
"main": [
[
{
"node": "IF",
"type": "main",
"index": 0
}
]
]
},
"Before": {
"main": [
[
{
"node": "Wait",
"type": "main",
"index": 0
}
]
]
}
},
"active": false,
"settings": {},
"versionId": "8ed794a0-5c04-4b8a-9a49-02c2c7f8003f",
"id": "500",
"meta": {
"instanceId": "8c8c5237b8e37b006a7adce87f4369350c58e41f3ca9de16196d3197f69eabcd"
},
"tags": []
}