fix(energy): atomic rollback on partial pipeline failure; seedError priority in health cascade

This commit is contained in:
Elie Habib
2026-04-08 11:02:00 +04:00
parent 11c0c6e992
commit 1aaa027492
3 changed files with 129 additions and 39 deletions

View File

@@ -421,7 +421,10 @@ export default async function handler(req) {
const size = metaCount ?? (hasData ? 1 : 0);
let status;
if (!hasData) {
if (seedError === true) {
status = 'SEED_ERROR';
warnCount++;
} else if (!hasData) {
if (EMPTY_DATA_OK_KEYS.has(name)) {
if (seedStale === true) {
status = 'STALE_SEED';
@@ -447,9 +450,6 @@ export default async function handler(req) {
status = 'EMPTY_DATA';
critCount++;
}
} else if (seedError === true) {
status = 'SEED_ERROR';
warnCount++;
} else if (seedStale === true) {
status = 'STALE_SEED';
warnCount++;
@@ -511,7 +511,10 @@ export default async function handler(req) {
}
let status;
if (!hasData) {
if (seedError === true) {
status = 'SEED_ERROR';
warnCount++;
} else if (!hasData) {
if (cascadeCovered) {
status = 'OK_CASCADE';
okCount++;
@@ -549,9 +552,6 @@ export default async function handler(req) {
status = 'EMPTY_DATA';
critCount++;
}
} else if (seedError === true) {
status = 'SEED_ERROR';
warnCount++;
} else if (seedStale === true) {
status = 'STALE_SEED';
warnCount++;

View File

@@ -224,25 +224,34 @@ async function redisGet(key) {
return data.result ? JSON.parse(data.result) : null;
}
async function preservePreviousSnapshot(errorMsg) {
async function preservePreviousSnapshot(errorMsg, stashedAllMap = null) {
console.error('[EmberElectricity] Preserving previous snapshot:', errorMsg);
const [existingAll, existingMeta] = await Promise.all([
redisGet(EMBER_ALL_KEY).catch(() => null),
redisGet(EMBER_META_KEY).catch(() => null),
]);
const iso2Keys = existingAll && typeof existingAll === 'object'
? Object.keys(existingAll).map((iso2) => `${EMBER_KEY_PREFIX}${iso2}`)
: [];
const existingMeta = await redisGet(EMBER_META_KEY).catch(() => null);
await extendExistingTtl(
[...iso2Keys, EMBER_ALL_KEY, EMBER_META_KEY],
EMBER_TTL_SECONDS,
);
if (stashedAllMap && typeof stashedAllMap === 'object') {
const restoreCmds = Object.entries(stashedAllMap).map(([iso2, val]) => [
'SET', `${EMBER_KEY_PREFIX}${iso2}`, JSON.stringify(val), 'EX', EMBER_TTL_SECONDS,
]);
restoreCmds.push(['SET', EMBER_ALL_KEY, JSON.stringify(stashedAllMap), 'EX', EMBER_TTL_SECONDS]);
await redisPipeline(restoreCmds).catch((e) =>
console.error('[EmberElectricity] Snapshot restore failed:', e),
);
} else {
const existingAll = await redisGet(EMBER_ALL_KEY).catch(() => null);
const iso2Keys = existingAll && typeof existingAll === 'object'
? Object.keys(existingAll).map((iso2) => `${EMBER_KEY_PREFIX}${iso2}`)
: [];
await extendExistingTtl(
[...iso2Keys, EMBER_ALL_KEY, EMBER_META_KEY],
EMBER_TTL_SECONDS,
);
}
const metaPayload = {
fetchedAt: Date.now(),
recordCount: existingMeta?.recordCount ?? 0, // preserve previous count so drop guard stays active
recordCount: existingMeta?.recordCount ?? 0,
status: 'error',
error: errorMsg,
};
@@ -264,6 +273,8 @@ export async function main() {
return;
}
let oldAllMap = null;
try {
const csvText = await withRetry(
() =>
@@ -305,10 +316,13 @@ export async function main() {
sourceVersion: 'ember-monthly-v1',
};
// Phase A: write all per-country keys
const countryCommands = [];
// Stash old _all for rollback on partial failure
oldAllMap = await redisGet(EMBER_ALL_KEY).catch(() => null);
// Phase A: write all per-country keys + _all in a single pipeline
const dataCommands = [];
for (const [iso2, payload] of countries) {
countryCommands.push([
dataCommands.push([
'SET',
`${EMBER_KEY_PREFIX}${iso2}`,
JSON.stringify(payload),
@@ -316,30 +330,38 @@ export async function main() {
EMBER_TTL_SECONDS,
]);
}
const countryResults = await redisPipeline(countryCommands);
const countryFailures = countryResults.filter((r) => r?.error || r?.result === 'ERR');
if (countryFailures.length > 0) {
dataCommands.push([
'SET',
EMBER_ALL_KEY,
JSON.stringify(allCountriesMap),
'EX',
EMBER_TTL_SECONDS,
]);
const dataResults = await redisPipeline(dataCommands);
const dataFailures = dataResults.filter((r) => r?.error || r?.result === 'ERR');
if (dataFailures.length > 0) {
if (oldAllMap && typeof oldAllMap === 'object') {
const rollbackCmds = Object.entries(oldAllMap).map(([iso2, val]) => [
'SET', `${EMBER_KEY_PREFIX}${iso2}`, JSON.stringify(val), 'EX', EMBER_TTL_SECONDS,
]);
rollbackCmds.push(['SET', EMBER_ALL_KEY, JSON.stringify(oldAllMap), 'EX', EMBER_TTL_SECONDS]);
await redisPipeline(rollbackCmds).catch((e) =>
console.error('[EmberElectricity] Rollback pipeline failed:', e),
);
}
throw new Error(
`Redis pipeline: ${countryFailures.length}/${countryCommands.length} country commands failed`,
`Redis pipeline: ${dataFailures.length}/${dataCommands.length} data commands failed`,
);
}
// Phase A2: write _all only after all per-country writes succeed
// _all is always the last-written key so preservePreviousSnapshot always reads the previous good snapshot
const allResults = await redisPipeline([
['SET', EMBER_ALL_KEY, JSON.stringify(allCountriesMap), 'EX', EMBER_TTL_SECONDS],
]);
if (allResults[0]?.error || allResults[0]?.result === 'ERR') {
throw new Error('Redis pipeline: _all write failed');
}
// Phase B: seed-meta (only after _all is fully written)
// Phase B: seed-meta (only after all data is fully written)
await redisPipeline([['SET', EMBER_META_KEY, JSON.stringify(metaPayload), 'EX', EMBER_TTL_SECONDS]]);
logSeedResult('energy:ember', countries.size, Date.now() - startedAt);
console.log(`[EmberElectricity] Seeded ${countries.size} countries`);
} catch (err) {
await preservePreviousSnapshot(String(err)).catch((e) =>
await preservePreviousSnapshot(String(err), oldAllMap).catch((e) =>
console.error('[EmberElectricity] Failed to preserve snapshot:', e),
);
throw err;

View File

@@ -252,6 +252,74 @@ describe('pipeline failure detection logic', () => {
});
});
describe('rollback command generation on partial pipeline failure', () => {
it('generates correct rollback commands from a stashed _all map', () => {
const oldAllMap = {
US: { dataMonth: '2024-01', fossilShare: 50, renewShare: 37.5 },
DE: { dataMonth: '2024-01', fossilShare: 28.6, renewShare: 71.4 },
};
const rollbackCmds = Object.entries(oldAllMap).map(([iso2, val]) => [
'SET', `${EMBER_KEY_PREFIX}${iso2}`, JSON.stringify(val), 'EX', EMBER_TTL_SECONDS,
]);
rollbackCmds.push(['SET', EMBER_ALL_KEY, JSON.stringify(oldAllMap), 'EX', EMBER_TTL_SECONDS]);
assert.equal(rollbackCmds.length, 3, 'should have 2 per-country + 1 _all command');
assert.equal(rollbackCmds[0][0], 'SET');
assert.equal(rollbackCmds[0][1], `${EMBER_KEY_PREFIX}US`);
assert.equal(rollbackCmds[0][3], 'EX');
assert.equal(rollbackCmds[0][4], EMBER_TTL_SECONDS);
assert.equal(rollbackCmds[2][1], EMBER_ALL_KEY, 'last command should restore _all');
const restoredAll = JSON.parse(rollbackCmds[2][2]);
assert.deepEqual(Object.keys(restoredAll).sort(), ['DE', 'US']);
});
it('does not generate rollback when oldAllMap is null', () => {
const oldAllMap = null;
const shouldRollback = oldAllMap && typeof oldAllMap === 'object';
assert.equal(shouldRollback, null, 'null stash should skip rollback');
});
});
describe('health cascade: seedError priority over hasData', () => {
function resolveStatus({ seedError, hasData, seedStale, size }) {
let status;
if (seedError === true) {
status = 'SEED_ERROR';
} else if (!hasData) {
status = 'EMPTY';
} else if (size === 0) {
status = 'EMPTY_DATA';
} else if (seedStale === true) {
status = 'STALE_SEED';
} else {
status = 'OK';
}
return status;
}
it('returns SEED_ERROR when seedError=true and hasData=false', () => {
const status = resolveStatus({ seedError: true, hasData: false, seedStale: true, size: 0 });
assert.equal(status, 'SEED_ERROR', 'seedError should take priority over !hasData');
});
it('returns SEED_ERROR when seedError=true and hasData=true', () => {
const status = resolveStatus({ seedError: true, hasData: true, seedStale: false, size: 100 });
assert.equal(status, 'SEED_ERROR', 'seedError should take priority even when data exists');
});
it('returns EMPTY when seedError=false and hasData=false', () => {
const status = resolveStatus({ seedError: false, hasData: false, seedStale: false, size: 0 });
assert.equal(status, 'EMPTY', 'no error + no data should be EMPTY');
});
it('returns OK when seedError=false and hasData=true and not stale', () => {
const status = resolveStatus({ seedError: false, hasData: true, seedStale: false, size: 100 });
assert.equal(status, 'OK');
});
});
describe('health endpoint status agreement for error meta', () => {
it('seed-health.js logic emits "error" for meta.status="error"', () => {
// Simulates seed-health.js lines 131-148 logic