mirror of
https://github.com/kharonsec/br-acc
synced 2026-04-25 17:15:02 +02:00
Add source registry governance and dynamic meta source stats (#6)
Co-authored-by: bruno cesar <brunoclz@brunos-MacBook-Pro.local>
This commit is contained in:
307
scripts/run_source_completeness_gates.py
Normal file
307
scripts/run_source_completeness_gates.py
Normal file
@@ -0,0 +1,307 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Validate Brazil source registry completeness and code alignment."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import json
|
||||
import re
|
||||
from collections import Counter
|
||||
from dataclasses import dataclass
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
|
||||
REQUIRED_COLUMNS = {
|
||||
"source_id",
|
||||
"name",
|
||||
"category",
|
||||
"tier",
|
||||
"status",
|
||||
"implementation_state",
|
||||
"load_state",
|
||||
"frequency",
|
||||
"in_universe_v1",
|
||||
"primary_url",
|
||||
"pipeline_id",
|
||||
"owner_agent",
|
||||
"access_mode",
|
||||
"notes",
|
||||
}
|
||||
|
||||
VALID_STATUS = {
|
||||
"loaded",
|
||||
"partial",
|
||||
"stale",
|
||||
"blocked_external",
|
||||
"quality_fail",
|
||||
"not_built",
|
||||
}
|
||||
VALID_IMPLEMENTATION = {"implemented", "not_implemented"}
|
||||
VALID_LOAD_STATE = {"loaded", "partial", "not_loaded"}
|
||||
PIPELINE_ENTRY_RE = re.compile(r'^\s*"([a-z0-9_]+)":\s*[A-Za-z_][A-Za-z0-9_]*,\s*$')
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GateResult:
|
||||
name: str
|
||||
passed: bool
|
||||
details: str
|
||||
|
||||
|
||||
def parse_bool(value: str) -> bool:
|
||||
return value.strip().lower() in {"1", "true", "yes", "y"}
|
||||
|
||||
|
||||
def read_registry(path: Path) -> tuple[list[dict[str, str]], list[str]]:
|
||||
if not path.exists():
|
||||
return [], [f"registry file not found: {path}"]
|
||||
|
||||
with path.open(encoding="utf-8", newline="") as csv_file:
|
||||
reader = csv.DictReader(csv_file)
|
||||
missing_cols = REQUIRED_COLUMNS.difference(set(reader.fieldnames or []))
|
||||
if missing_cols:
|
||||
return [], [f"missing required columns: {sorted(missing_cols)}"]
|
||||
rows = list(reader)
|
||||
return rows, []
|
||||
|
||||
|
||||
def parse_runner_pipelines(path: Path) -> tuple[set[str], list[str]]:
|
||||
if not path.exists():
|
||||
return set(), [f"runner file not found: {path}"]
|
||||
|
||||
pipelines: set[str] = set()
|
||||
inside_map = False
|
||||
with path.open(encoding="utf-8") as runner_file:
|
||||
for raw_line in runner_file:
|
||||
line = raw_line.rstrip("\n")
|
||||
if line.startswith("PIPELINES: dict[str, type] = {"):
|
||||
inside_map = True
|
||||
continue
|
||||
if inside_map and line.strip() == "}":
|
||||
break
|
||||
if inside_map:
|
||||
match = PIPELINE_ENTRY_RE.match(line)
|
||||
if match:
|
||||
pipelines.add(match.group(1))
|
||||
if not pipelines:
|
||||
return set(), ["could not parse pipeline ids from runner"]
|
||||
return pipelines, []
|
||||
|
||||
|
||||
def build_gate_results(
|
||||
rows: list[dict[str, str]],
|
||||
runner_pipelines: set[str],
|
||||
expected_universe: int,
|
||||
expected_implemented: int,
|
||||
) -> tuple[list[GateResult], dict[str, int], dict[str, int]]:
|
||||
source_ids = [row["source_id"].strip() for row in rows]
|
||||
duplicate_ids = [sid for sid, count in Counter(source_ids).items() if count > 1]
|
||||
invalid_status = sorted(
|
||||
{
|
||||
row["status"].strip()
|
||||
for row in rows
|
||||
if row["status"].strip() and row["status"].strip() not in VALID_STATUS
|
||||
}
|
||||
)
|
||||
invalid_implementation = sorted(
|
||||
{
|
||||
row["implementation_state"].strip()
|
||||
for row in rows
|
||||
if row["implementation_state"].strip()
|
||||
and row["implementation_state"].strip() not in VALID_IMPLEMENTATION
|
||||
}
|
||||
)
|
||||
invalid_load_state = sorted(
|
||||
{
|
||||
row["load_state"].strip()
|
||||
for row in rows
|
||||
if row["load_state"].strip() and row["load_state"].strip() not in VALID_LOAD_STATE
|
||||
}
|
||||
)
|
||||
|
||||
universe_rows = [row for row in rows if parse_bool(row["in_universe_v1"])]
|
||||
implemented_rows = [
|
||||
row for row in universe_rows if row["implementation_state"].strip() == "implemented"
|
||||
]
|
||||
implemented_ids = {row["source_id"].strip() for row in implemented_rows}
|
||||
|
||||
status_counter = Counter(row["status"].strip() for row in universe_rows)
|
||||
implementation_counter = Counter(
|
||||
row["implementation_state"].strip() for row in universe_rows
|
||||
)
|
||||
|
||||
missing_from_registry = sorted(runner_pipelines - implemented_ids)
|
||||
not_in_runner = sorted(implemented_ids - runner_pipelines)
|
||||
|
||||
gates = [
|
||||
GateResult(
|
||||
name="registry_has_no_duplicate_source_ids",
|
||||
passed=not duplicate_ids,
|
||||
details=f"duplicates={duplicate_ids}" if duplicate_ids else "ok",
|
||||
),
|
||||
GateResult(
|
||||
name="registry_values_are_valid",
|
||||
passed=not invalid_status and not invalid_implementation and not invalid_load_state,
|
||||
details=(
|
||||
f"invalid_status={invalid_status}; "
|
||||
f"invalid_implementation={invalid_implementation}; "
|
||||
f"invalid_load_state={invalid_load_state}"
|
||||
),
|
||||
),
|
||||
GateResult(
|
||||
name="universe_v1_count_matches_expected",
|
||||
passed=len(universe_rows) == expected_universe,
|
||||
details=f"actual={len(universe_rows)} expected={expected_universe}",
|
||||
),
|
||||
GateResult(
|
||||
name="implemented_count_matches_expected",
|
||||
passed=len(implemented_rows) == expected_implemented,
|
||||
details=f"actual={len(implemented_rows)} expected={expected_implemented}",
|
||||
),
|
||||
GateResult(
|
||||
name="runner_pipelines_are_all_marked_implemented",
|
||||
passed=not missing_from_registry,
|
||||
details=f"missing={missing_from_registry}" if missing_from_registry else "ok",
|
||||
),
|
||||
GateResult(
|
||||
name="implemented_registry_ids_exist_in_runner",
|
||||
passed=not not_in_runner,
|
||||
details=f"extra={not_in_runner}" if not_in_runner else "ok",
|
||||
),
|
||||
]
|
||||
return gates, dict(status_counter), dict(implementation_counter)
|
||||
|
||||
|
||||
def write_outputs(
|
||||
output_dir: Path,
|
||||
registry_path: Path,
|
||||
runner_path: Path,
|
||||
expected_universe: int,
|
||||
expected_implemented: int,
|
||||
gates: list[GateResult],
|
||||
status_counter: dict[str, int],
|
||||
implementation_counter: dict[str, int],
|
||||
) -> None:
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
payload = {
|
||||
"timestamp_utc": datetime.now(UTC).isoformat(),
|
||||
"registry_path": str(registry_path),
|
||||
"runner_path": str(runner_path),
|
||||
"expected_universe_v1": expected_universe,
|
||||
"expected_implemented": expected_implemented,
|
||||
"status_counter": status_counter,
|
||||
"implementation_counter": implementation_counter,
|
||||
"gates": [gate.__dict__ for gate in gates],
|
||||
"all_passed": all(gate.passed for gate in gates),
|
||||
}
|
||||
(output_dir / "source_completeness_report.json").write_text(
|
||||
json.dumps(payload, indent=2, ensure_ascii=True) + "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
lines = [
|
||||
"# Source Completeness Gate Report",
|
||||
"",
|
||||
f"- Timestamp (UTC): `{payload['timestamp_utc']}`",
|
||||
f"- Registry: `{registry_path}`",
|
||||
f"- Runner: `{runner_path}`",
|
||||
f"- Expected universe_v1: `{expected_universe}`",
|
||||
f"- Expected implemented: `{expected_implemented}`",
|
||||
"",
|
||||
"## Counters",
|
||||
"",
|
||||
f"- status_counter: `{status_counter}`",
|
||||
f"- implementation_counter: `{implementation_counter}`",
|
||||
"",
|
||||
"## Gate Results",
|
||||
"",
|
||||
]
|
||||
for gate in gates:
|
||||
mark = "PASS" if gate.passed else "FAIL"
|
||||
lines.append(f"- `{mark}` `{gate.name}`: {gate.details}")
|
||||
lines.append("")
|
||||
lines.append(f"## Final: `{'PASS' if payload['all_passed'] else 'FAIL'}`")
|
||||
lines.append("")
|
||||
(output_dir / "source_completeness_report.md").write_text(
|
||||
"\n".join(lines), encoding="utf-8"
|
||||
)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Validate source registry completeness and code alignment."
|
||||
)
|
||||
parser.add_argument(
|
||||
"--registry-path",
|
||||
default="docs/source_registry_br_v1.csv",
|
||||
help="Path to source registry CSV",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--runner-path",
|
||||
default="etl/src/icarus_etl/runner.py",
|
||||
help="Path to ETL runner with PIPELINES map",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--expected-universe-v1",
|
||||
type=int,
|
||||
default=108,
|
||||
help="Expected count for in_universe_v1=true rows",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--expected-implemented",
|
||||
type=int,
|
||||
default=45,
|
||||
help="Expected count for implementation_state=implemented rows",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output-dir",
|
||||
default=f"audit-results/brazil-coverage-{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}",
|
||||
help="Directory for gate reports",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
registry_path = Path(args.registry_path)
|
||||
runner_path = Path(args.runner_path)
|
||||
output_dir = Path(args.output_dir)
|
||||
|
||||
rows, registry_errors = read_registry(registry_path)
|
||||
runner_pipelines, runner_errors = parse_runner_pipelines(runner_path)
|
||||
|
||||
if registry_errors or runner_errors:
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
combined = registry_errors + runner_errors
|
||||
(output_dir / "source_completeness_report.md").write_text(
|
||||
"# Source Completeness Gate Report\n\n"
|
||||
+ "\n".join(f"- FAIL: {msg}" for msg in combined)
|
||||
+ "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
print("\n".join(combined))
|
||||
return 1
|
||||
|
||||
gates, status_counter, implementation_counter = build_gate_results(
|
||||
rows=rows,
|
||||
runner_pipelines=runner_pipelines,
|
||||
expected_universe=args.expected_universe_v1,
|
||||
expected_implemented=args.expected_implemented,
|
||||
)
|
||||
write_outputs(
|
||||
output_dir=output_dir,
|
||||
registry_path=registry_path,
|
||||
runner_path=runner_path,
|
||||
expected_universe=args.expected_universe_v1,
|
||||
expected_implemented=args.expected_implemented,
|
||||
gates=gates,
|
||||
status_counter=status_counter,
|
||||
implementation_counter=implementation_counter,
|
||||
)
|
||||
|
||||
all_passed = all(gate.passed for gate in gates)
|
||||
print("PASS" if all_passed else "FAIL")
|
||||
return 0 if all_passed else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user