mirror of
https://github.com/kharonsec/br-acc
synced 2026-05-13 18:36:19 +02:00
- Neo4j query service: CypherLoader + parameterized executor
- Entity endpoints: /entity/{cpf_or_cnpj} lookup + /entity/{id}/connections
- Search endpoint: /search with fulltext index, pagination, type filtering
- Graph endpoint: /graph/{entity_id} with depth/type filtering, nodes + edges
- CPF masking middleware: scans responses, masks non-PEP CPFs, preserves CNPJ
- Pydantic models: EntityResponse, SearchResponse, GraphResponse with source attribution
- 5 .cypher query files (never inline Cypher)
- 58 unit tests passing (ruff + mypy + pytest clean)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
21 lines
662 B
Python
21 lines
662 B
Python
import pytest
|
|
from httpx import AsyncClient
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_graph_rejects_invalid_depth(client: AsyncClient) -> None:
|
|
response = await client.get("/api/v1/graph/test-id?depth=5")
|
|
assert response.status_code == 422
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_graph_accepts_valid_depth(client: AsyncClient) -> None:
|
|
response = await client.get("/api/v1/graph/test-id?depth=2")
|
|
assert response.status_code != 422
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_graph_accepts_entity_types_filter(client: AsyncClient) -> None:
|
|
response = await client.get("/api/v1/graph/test-id?entity_types=person,company")
|
|
assert response.status_code != 422
|