mirror of
https://github.com/koala73/worldmonitor.git
synced 2026-04-25 17:14:57 +02:00
fix(resilience-static): preserve existing Redis fao data when FSIN fetch fails (#2748)
* fix(resilience-static): preserve existing Redis fao data when FSIN fetch fails * fix(resilience-static): block publish when Redis recovery reads fail; add recoverFailedDatasets tests
This commit is contained in:
@@ -777,6 +777,51 @@ async function fetchAllDatasetMaps() {
|
||||
return { datasetMaps, failedDatasets };
|
||||
}
|
||||
|
||||
// Exported for testing. When a dataset fetch fails, reads the prior Redis snapshot and
|
||||
// injects the existing per-country values for that field back into datasetMaps so the
|
||||
// new publish does not overwrite good data with null.
|
||||
// Throws if the Redis recovery reads themselves fail — the caller must then call
|
||||
// preservePreviousSnapshotOnFailure and abort, rather than publishing corrupt data.
|
||||
export async function recoverFailedDatasets(datasetMaps, failedDatasets, { readIndex, readPipeline }) {
|
||||
if (failedDatasets.length === 0) return;
|
||||
|
||||
let existingIndex;
|
||||
try {
|
||||
existingIndex = await readIndex();
|
||||
} catch (err) {
|
||||
throw new Error(`Dataset(s) (${failedDatasets.join(', ')}) failed and Redis index read also failed: ${err.message}`);
|
||||
}
|
||||
|
||||
const existingCountries = existingIndex?.countries ?? [];
|
||||
if (existingCountries.length === 0) {
|
||||
console.warn(` [fallback] dataset(s) failed (${failedDatasets.join(', ')}) — no prior snapshot to recover from`);
|
||||
return;
|
||||
}
|
||||
|
||||
let pipelineResults;
|
||||
try {
|
||||
pipelineResults = await readPipeline(existingCountries.map((iso2) => ['GET', countryRedisKey(iso2)]));
|
||||
} catch (err) {
|
||||
throw new Error(`Dataset(s) (${failedDatasets.join(', ')}) failed and Redis pipeline read also failed: ${err.message}`);
|
||||
}
|
||||
|
||||
for (let i = 0; i < existingCountries.length; i++) {
|
||||
const iso2 = existingCountries[i];
|
||||
let existing = null;
|
||||
try { existing = JSON.parse(pipelineResults[i]?.result ?? 'null'); } catch { /* skip */ }
|
||||
if (!existing) continue;
|
||||
for (const key of failedDatasets) {
|
||||
if (existing[key] != null && datasetMaps[key] instanceof Map && !datasetMaps[key].has(iso2)) {
|
||||
datasetMaps[key].set(iso2, existing[key]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (const key of failedDatasets) {
|
||||
console.warn(` [fallback] dataset '${key}' failed — preserved ${datasetMaps[key].size} existing Redis records from prior snapshot`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function seedResilienceStatic() {
|
||||
const seedYear = nowSeedYear();
|
||||
const existingMeta = await readJsonKey(RESILIENCE_STATIC_META_KEY).catch(() => null);
|
||||
@@ -790,6 +835,23 @@ export async function seedResilienceStatic() {
|
||||
}
|
||||
|
||||
const { datasetMaps, failedDatasets } = await fetchAllDatasetMaps();
|
||||
|
||||
try {
|
||||
await recoverFailedDatasets(datasetMaps, failedDatasets, {
|
||||
readIndex: () => readJsonKey(RESILIENCE_STATIC_INDEX_KEY),
|
||||
readPipeline: redisPipeline,
|
||||
});
|
||||
} catch (recoveryErr) {
|
||||
const failure = await preservePreviousSnapshotOnFailure(
|
||||
failedDatasets,
|
||||
seedYear,
|
||||
recoveryErr.message,
|
||||
);
|
||||
const error = new Error(recoveryErr.message);
|
||||
error.failure = failure;
|
||||
throw error;
|
||||
}
|
||||
|
||||
const seededAt = new Date().toISOString();
|
||||
const countryPayloads = finalizeCountryPayloads(datasetMaps, seedYear, seededAt);
|
||||
const manifest = buildManifest(countryPayloads, failedDatasets, seedYear, seededAt);
|
||||
|
||||
@@ -14,6 +14,7 @@ import {
|
||||
finalizeCountryPayloads,
|
||||
parseEurostatEnergyDataset,
|
||||
parseRsfRanking,
|
||||
recoverFailedDatasets,
|
||||
resolveIso2,
|
||||
shouldSkipSeedYear,
|
||||
} from '../scripts/seed-resilience-static.mjs';
|
||||
@@ -178,6 +179,73 @@ describe('resilience static seed payload assembly', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('recoverFailedDatasets', () => {
|
||||
const existingFao = { source: 'hdx-ipc', year: 2025, phase3plus: 4_500_000, phase4: 1_200_000, phase5: 300_000 };
|
||||
const existingSo = { source: 'hdx-ipc', year: 2025, phase3plus: 3_000_000, phase4: null, phase5: null };
|
||||
|
||||
function makeDatasetMaps(faoOverride = new Map()) {
|
||||
return {
|
||||
wgi: new Map([['YE', { source: 'worldbank-wgi' }]]),
|
||||
infrastructure: new Map(), gpi: new Map(), rsf: new Map(),
|
||||
who: new Map(), fao: faoOverride, aquastat: new Map(), iea: new Map(),
|
||||
};
|
||||
}
|
||||
|
||||
it('injects prior fao values when FSIN fails and a prior snapshot exists', async () => {
|
||||
const maps = makeDatasetMaps();
|
||||
await recoverFailedDatasets(maps, ['fao'], {
|
||||
readIndex: async () => ({ countries: ['YE', 'SO'] }),
|
||||
readPipeline: async () => [
|
||||
{ result: JSON.stringify({ fao: existingFao, wgi: { source: 'worldbank-wgi' } }) },
|
||||
{ result: JSON.stringify({ fao: existingSo, wgi: null }) },
|
||||
],
|
||||
});
|
||||
assert.deepEqual(maps.fao.get('YE'), existingFao, 'YE fao should be recovered');
|
||||
assert.deepEqual(maps.fao.get('SO'), existingSo, 'SO fao should be recovered');
|
||||
});
|
||||
|
||||
it('does not overwrite a partial fao success with prior data', async () => {
|
||||
const freshFao = { source: 'hdx-ipc', year: 2026, phase3plus: 5_000_000, phase4: null, phase5: null };
|
||||
const maps = makeDatasetMaps(new Map([['YE', freshFao]]));
|
||||
await recoverFailedDatasets(maps, ['fao'], {
|
||||
readIndex: async () => ({ countries: ['YE'] }),
|
||||
readPipeline: async () => [{ result: JSON.stringify({ fao: existingFao }) }],
|
||||
});
|
||||
assert.deepEqual(maps.fao.get('YE'), freshFao, 'fresh partial data should not be replaced');
|
||||
});
|
||||
|
||||
it('warns but does not throw when no prior snapshot exists (first-run tolerance)', async () => {
|
||||
const maps = makeDatasetMaps();
|
||||
await assert.doesNotReject(() => recoverFailedDatasets(maps, ['fao'], {
|
||||
readIndex: async () => null,
|
||||
readPipeline: async () => [],
|
||||
}));
|
||||
assert.equal(maps.fao.size, 0, 'fao stays empty — no prior data to recover');
|
||||
});
|
||||
|
||||
it('throws when Redis index read fails, so caller blocks publish', async () => {
|
||||
const maps = makeDatasetMaps();
|
||||
await assert.rejects(
|
||||
() => recoverFailedDatasets(maps, ['fao'], {
|
||||
readIndex: async () => { throw new Error('ECONNRESET'); },
|
||||
readPipeline: async () => [],
|
||||
}),
|
||||
/Redis index read also failed.*ECONNRESET/,
|
||||
);
|
||||
});
|
||||
|
||||
it('throws when Redis pipeline read fails, so caller blocks publish', async () => {
|
||||
const maps = makeDatasetMaps();
|
||||
await assert.rejects(
|
||||
() => recoverFailedDatasets(maps, ['fao'], {
|
||||
readIndex: async () => ({ countries: ['YE'] }),
|
||||
readPipeline: async () => { throw new Error('timeout'); },
|
||||
}),
|
||||
/Redis pipeline read also failed.*timeout/,
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('resilience static health registrations', () => {
|
||||
const healthSrc = readFileSync(join(root, 'api', 'health.js'), 'utf8');
|
||||
const seedHealthSrc = readFileSync(join(root, 'api', 'seed-health.js'), 'utf8');
|
||||
|
||||
Reference in New Issue
Block a user