fix: add circuit breaker to OpenClaw worker client (#1636) (#1697)

* fix: add circuit breaker to OpenClaw worker client (#1636)

When the claude-mem worker is unreachable, every plugin event (before_agent_start,
before_prompt_build, tool_result_persist, agent_end) triggered a new fetch that
failed and logged a warning, causing CPU-spinning and continuous log spam.

Add a CLOSED/OPEN/HALF_OPEN circuit breaker: after 3 consecutive network errors
the circuit opens, silently drops all worker calls for 30 s, then sends one probe.
Individual failures are only logged while the circuit is still CLOSED; once open
it logs once ("disabling requests for 30s") and goes quiet until recovery.

Generated by Claude Code
Vibe coded by Ousama Ben Younes

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: limit HALF_OPEN to single probe and move circuitOnSuccess after response.ok check

- Add _halfOpenProbeInFlight flag so only one probe is allowed in HALF_OPEN state;
  concurrent callers are silently dropped until the probe completes (success or failure)
- Move circuitOnSuccess() to after the response.ok check in workerPost, workerPostFireAndForget,
  and workerGetText so non-2xx HTTP responses no longer close the circuit
- Clear _halfOpenProbeInFlight in both circuitOnSuccess and circuitOnFailure, and in circuitReset
- Add regression test covering HALF_OPEN one-probe behavior: non-2xx keeps circuit open,
  2xx closes it

* chore: trigger CodeRabbit re-review

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Ben Younes
2026-04-15 09:58:32 +02:00
committed by GitHub
parent 4538e686ad
commit b411d91885
2 changed files with 298 additions and 3 deletions

View File

@@ -979,3 +979,207 @@ describe("SSE stream integration", () => {
await getService().stop({});
});
});
describe("circuit breaker", () => {
// Reset circuit breaker state before each test by firing gateway_start.
// The circuit is module-level state, so tests would otherwise bleed into each other.
beforeEach(async () => {
const { api, fireEvent } = createMockApi({ workerPort: 59999 });
claudeMemPlugin(api);
await fireEvent("gateway_start", {}, {});
});
it("opens after threshold failures and stops further requests", async () => {
const { api, logs, fireEvent } = createMockApi({ workerPort: 59999 });
claudeMemPlugin(api);
// Reset circuit inside the test body to guard against timers from preceding
// tests (e.g. completionDelayMs timers) that may fire between beforeEach and here.
await fireEvent("gateway_start", {}, {});
// Fire threshold+1 calls so the circuit is open by the end of the loop
// regardless of whether a concurrent timer fires at the exact boundary.
for (let i = 0; i < 4; i++) {
await fireEvent("before_agent_start", { prompt: "hello" }, { sessionKey: `cb-open-${i}` });
}
// Circuit is now OPEN. Subsequent calls must be silently dropped.
const logCountBeforeDrop = logs.length;
await fireEvent("before_agent_start", { prompt: "hello" }, { sessionKey: "cb-drop" });
const noisyDropLogs = logs.slice(logCountBeforeDrop).filter(
(l) => l.includes("failed") || l.includes("disabling")
);
assert.equal(noisyDropLogs.length, 0, "calls when circuit is open should be silently dropped");
});
it("logs individual failures while circuit is closed, then disabling when it opens", async () => {
const { api, logs, fireEvent } = createMockApi({ workerPort: 59999 });
claudeMemPlugin(api);
await fireEvent("gateway_start", {}, {});
const logsAfterReset = logs.length;
// Fire exactly threshold (3) calls
for (let i = 0; i < 3; i++) {
await fireEvent("before_agent_start", { prompt: "hello" }, { sessionKey: `cb-log-${i}` });
}
const newLogs = logs.slice(logsAfterReset);
// At least some failures should have been logged (circuit was active)
assert.ok(newLogs.length > 0, "threshold calls should produce log output");
// Exactly one disabling warning should appear
const disablingLogs = newLogs.filter((l) => l.includes("disabling requests"));
assert.equal(disablingLogs.length, 1, "should emit exactly one disabling warning when circuit opens");
// The last call (the threshold-crossing one) should NOT log an individual failure
const failureLogs = newLogs.filter((l) => l.includes("failed:"));
assert.ok(failureLogs.length < 3, "threshold-crossing call should not log an individual failure");
});
it("resets on gateway_start, allowing connections again", async () => {
const { api, logs, fireEvent } = createMockApi({ workerPort: 59999 });
claudeMemPlugin(api);
await fireEvent("gateway_start", {}, {});
// Open the circuit by firing threshold+1 calls
for (let i = 0; i < 4; i++) {
await fireEvent("before_agent_start", { prompt: "hello" }, { sessionKey: `cb-reset-${i}` });
}
// Confirm circuit is open (call is silently dropped)
const logCountWhileOpen = logs.length;
await fireEvent("before_agent_start", { prompt: "hello" }, { sessionKey: "cb-while-open" });
assert.equal(
logs.slice(logCountWhileOpen).filter((l) => l.includes("failed") || l.includes("disabling")).length,
0,
"call while circuit is open should be silently dropped"
);
// gateway_start resets the circuit
await fireEvent("gateway_start", {}, {});
// Next call should attempt to connect again (not silently drop)
const logCountAfterReset = logs.length;
await fireEvent("before_agent_start", { prompt: "hello" }, { sessionKey: "cb-after-reset" });
const newLogs = logs.slice(logCountAfterReset);
assert.ok(
newLogs.some((l) => l.includes("failed:") || l.includes("disabling")),
"should attempt worker connection after gateway_start reset"
);
});
it("HALF_OPEN allows only a single probe — non-2xx keeps circuit open, 2xx closes it", async () => {
// ---- Phase 1: open the circuit via network failures (unreachable port) ----
// Reset circuit state first
const resetMock = createMockApi({ workerPort: 59999 });
claudeMemPlugin(resetMock.api);
await resetMock.fireEvent("gateway_start", {}, {});
// Drive 4 failures to ensure circuit is OPEN
for (let i = 0; i < 4; i++) {
await resetMock.fireEvent("before_agent_start", { prompt: "probe-test" }, { sessionKey: `probe-phase1-${i}` });
}
// ---- Phase 2: advance clock so cooldown has elapsed ----
// _circuitOpenedAt was set during Phase 1 using the real Date.now().
// Advancing Date.now by 31s means the next circuitAllow call sees the cooldown elapsed.
const realDateNow = Date.now.bind(Date);
Date.now = () => realDateNow() + 31_000;
try {
// ---- Phase 3: non-2xx probe — circuit should stay OPEN ----
// Start a server that returns 500 for all requests
let serverA: Server | null = null;
const portA: number = await new Promise((resolve) => {
serverA = createServer((_req: IncomingMessage, res: ServerResponse) => {
res.writeHead(500);
res.end();
});
serverA!.listen(0, () => {
const addr = serverA!.address();
resolve((addr as any).port);
});
});
// Reuse the same module-level circuit state — just change the worker port.
// Create a new mock api instance pointed at server A (500 responder).
const mockA = createMockApi({ workerPort: portA });
claudeMemPlugin(mockA.api);
// Do NOT fire gateway_start here — we want the OPEN circuit state from Phase 1.
// The circuit is OPEN but the mocked clock says cooldown elapsed.
// The next call should: transition to HALF_OPEN, set _halfOpenProbeInFlight=true,
// send the probe to server A (which returns 500), then call circuitOnFailure
// and re-open the circuit.
const logCountAtProbe = mockA.logs.length;
await mockA.fireEvent("before_agent_start", { prompt: "probe" }, { sessionKey: "probe-call-non2xx" });
await new Promise((resolve) => setTimeout(resolve, 100));
const probeALogs = mockA.logs.slice(logCountAtProbe);
// After a 500 response, circuitOnFailure is called which logs "disabling requests"
// (because state was HALF_OPEN) and logger.warn logs the 500 status.
assert.ok(
probeALogs.some((l) => l.includes("disabling") || l.includes("returned 500") || l.includes("Worker POST")),
"non-2xx probe should keep circuit open (expected disabling or 500 status log)"
);
// Verify probe flag resets: a second call with cooldown elapsed should be allowed as a new probe
// (i.e., _halfOpenProbeInFlight was cleared by circuitOnFailure).
// But without advancing time further the circuit is OPEN again — so calls are dropped.
const logCountAfterFailedProbe = mockA.logs.length;
await mockA.fireEvent("before_agent_start", { prompt: "probe" }, { sessionKey: "probe-concurrent" });
await new Promise((resolve) => setTimeout(resolve, 100));
const droppedLogs = mockA.logs.slice(logCountAfterFailedProbe).filter(
(l) => l.includes("failed") || l.includes("disabling")
);
assert.equal(droppedLogs.length, 0, "call should be silently dropped while circuit is OPEN again after failed probe");
serverA!.close();
// ---- Phase 4: 2xx probe — circuit should close ----
// Re-open the circuit with fresh failures, then probe with a 200-returning server.
// Reset circuit state first.
const resetMock2 = createMockApi({ workerPort: 59999 });
claudeMemPlugin(resetMock2.api);
await resetMock2.fireEvent("gateway_start", {}, {});
// Drive failures (still using mocked Date.now, but _circuitOpenedAt will be set to
// the mocked time, so cooldown is NOT elapsed yet from the mocked perspective).
// We need to temporarily restore real Date.now while opening the circuit, then
// re-mock it for the probe.
Date.now = realDateNow;
for (let i = 0; i < 4; i++) {
await resetMock2.fireEvent("before_agent_start", { prompt: "probe-test" }, { sessionKey: `probe-phase4-${i}` });
}
// Re-advance the clock past cooldown
Date.now = () => realDateNow() + 31_000;
let serverB: Server | null = null;
const portB: number = await new Promise((resolve) => {
serverB = createServer((_req: IncomingMessage, res: ServerResponse) => {
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({ sessionDbId: 1, promptNumber: 1, skipped: false }));
});
serverB!.listen(0, () => {
const addr = serverB!.address();
resolve((addr as any).port);
});
});
const mockB = createMockApi({ workerPort: portB });
claudeMemPlugin(mockB.api);
// Do NOT fire gateway_start — reuse OPEN circuit state from resetMock2.
const logCountBeforeSuccessProbe = mockB.logs.length;
await mockB.fireEvent("before_agent_start", { prompt: "probe" }, { sessionKey: "probe-call-2xx" });
await new Promise((resolve) => setTimeout(resolve, 150));
const successProbeLogs = mockB.logs.slice(logCountBeforeSuccessProbe);
assert.ok(
successProbeLogs.some((l) => l.includes("restored") || l.includes("circuit closed")),
"2xx probe should close the circuit — expected 'restored' or 'circuit closed' log"
);
serverB!.close();
} finally {
Date.now = realDateNow;
}
});
});

View File

@@ -264,12 +264,80 @@ function workerBaseUrl(port: number): string {
return `http://${_workerHost}:${port}`;
}
// ============================================================================
// Worker Circuit Breaker
// ============================================================================
// Prevents CPU-spinning retry loops when the worker is unreachable.
// After CIRCUIT_BREAKER_THRESHOLD consecutive network errors, the circuit
// opens and all worker calls are silently dropped for CIRCUIT_BREAKER_COOLDOWN_MS.
// After the cooldown, one probe attempt is allowed to check if the worker recovered.
const CIRCUIT_BREAKER_THRESHOLD = 3;
const CIRCUIT_BREAKER_COOLDOWN_MS = 30_000;
type CircuitState = "CLOSED" | "OPEN" | "HALF_OPEN";
let _circuitState: CircuitState = "CLOSED";
let _circuitFailures = 0;
let _circuitOpenedAt = 0;
let _halfOpenProbeInFlight = false;
function circuitAllow(logger: PluginLogger): boolean {
if (_circuitState === "CLOSED") return true;
if (_circuitState === "OPEN") {
if (Date.now() - _circuitOpenedAt >= CIRCUIT_BREAKER_COOLDOWN_MS) {
_circuitState = "HALF_OPEN";
logger.info("[claude-mem] Circuit breaker: probing worker connection");
if (_halfOpenProbeInFlight) return false;
_halfOpenProbeInFlight = true;
return true;
}
return false;
}
// HALF_OPEN: allow one probe through
if (_halfOpenProbeInFlight) return false;
_halfOpenProbeInFlight = true;
return true;
}
function circuitOnSuccess(logger: PluginLogger): void {
if (_circuitState !== "CLOSED") {
logger.info("[claude-mem] Worker connection restored — circuit closed");
}
_circuitState = "CLOSED";
_circuitFailures = 0;
_halfOpenProbeInFlight = false;
}
function circuitOnFailure(logger: PluginLogger): void {
_halfOpenProbeInFlight = false;
_circuitFailures++;
if (
_circuitState === "HALF_OPEN" ||
(_circuitState === "CLOSED" && _circuitFailures >= CIRCUIT_BREAKER_THRESHOLD)
) {
_circuitState = "OPEN";
_circuitOpenedAt = Date.now();
logger.warn(
`[claude-mem] Worker unreachable — disabling requests for ${CIRCUIT_BREAKER_COOLDOWN_MS / 1000}s`
);
}
}
function circuitReset(): void {
_circuitState = "CLOSED";
_circuitFailures = 0;
_circuitOpenedAt = 0;
_halfOpenProbeInFlight = false;
}
async function workerPost(
port: number,
path: string,
body: Record<string, unknown>,
logger: PluginLogger
): Promise<Record<string, unknown> | null> {
if (!circuitAllow(logger)) return null;
try {
const response = await fetch(`${workerBaseUrl(port)}${path}`, {
method: "POST",
@@ -277,13 +345,18 @@ async function workerPost(
body: JSON.stringify(body),
});
if (!response.ok) {
circuitOnFailure(logger);
logger.warn(`[claude-mem] Worker POST ${path} returned ${response.status}`);
return null;
}
circuitOnSuccess(logger);
return (await response.json()) as Record<string, unknown>;
} catch (error: unknown) {
const message = error instanceof Error ? error.message : String(error);
logger.warn(`[claude-mem] Worker POST ${path} failed: ${message}`);
circuitOnFailure(logger);
if (_circuitState !== "OPEN") {
logger.warn(`[claude-mem] Worker POST ${path} failed: ${message}`);
}
return null;
}
}
@@ -294,13 +367,24 @@ function workerPostFireAndForget(
body: Record<string, unknown>,
logger: PluginLogger
): void {
if (!circuitAllow(logger)) return;
fetch(`${workerBaseUrl(port)}${path}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
}).then((response) => {
if (!response.ok) {
circuitOnFailure(logger);
logger.warn(`[claude-mem] Worker POST ${path} returned ${response.status}`);
return;
}
circuitOnSuccess(logger);
}).catch((error: unknown) => {
const message = error instanceof Error ? error.message : String(error);
logger.warn(`[claude-mem] Worker POST ${path} failed: ${message}`);
circuitOnFailure(logger);
if (_circuitState !== "OPEN") {
logger.warn(`[claude-mem] Worker POST ${path} failed: ${message}`);
}
});
}
@@ -309,16 +393,22 @@ async function workerGetText(
path: string,
logger: PluginLogger
): Promise<string | null> {
if (!circuitAllow(logger)) return null;
try {
const response = await fetch(`${workerBaseUrl(port)}${path}`);
if (!response.ok) {
circuitOnFailure(logger);
logger.warn(`[claude-mem] Worker GET ${path} returned ${response.status}`);
return null;
}
circuitOnSuccess(logger);
return await response.text();
} catch (error: unknown) {
const message = error instanceof Error ? error.message : String(error);
logger.warn(`[claude-mem] Worker GET ${path} failed: ${message}`);
circuitOnFailure(logger);
if (_circuitState !== "OPEN") {
logger.warn(`[claude-mem] Worker GET ${path} failed: ${message}`);
}
return null;
}
}
@@ -856,6 +946,7 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void {
// Event: gateway_start — clear session tracking for fresh start
// ------------------------------------------------------------------
api.on("gateway_start", async () => {
circuitReset();
sessionIds.clear();
contextCache.clear();
recentPromptInits.clear();