Merge pull request #662 from lizekai-hash/feat/langchain-code-reviewer

feat(agents): add LangChain code review agent with A2A protocol
This commit is contained in:
Jaber Jaber
2026-03-27 16:44:22 +03:00
committed by GitHub
6 changed files with 453 additions and 0 deletions

View File

@@ -0,0 +1 @@
__pycache__/

View File

@@ -0,0 +1,187 @@
"""
LangChain Code Review Agent — core review logic.
Supports OpenAI, Ollama, and any LangChain-compatible LLM.
"""
import os
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
SYSTEM_PROMPT = """\
You are a principal-level code reviewer with 15+ years of production experience \
across multiple languages (Python, Rust, TypeScript, Java, Go, C/C++).
You receive code snippets, diffs, or pull request descriptions and produce a \
structured, actionable review report.
You MUST respond in **中文**, but keep code snippets, variable names, and \
technical terms in their original language.
# ── 审核维度(按优先级排序) ──────────────────────────────
## 1. 正确性 (Correctness)
- 逻辑错误、off-by-one、边界条件
- 空指针 / None / undefined 未处理
- 错误处理不完整(吞异常、漏 catch、panic 路径)
- 并发问题:竞态条件、死锁、数据竞争
- 类型安全:隐式转换、溢出、精度丢失
- 资源泄漏:未关闭的文件/连接/锁
## 2. 安全性 (Security)
- SQL / NoSQL / OS 命令注入
- XSS、CSRF、SSRF
- 硬编码密钥、token、密码
- 不安全的反序列化
- 路径穿越Path Traversal
- 缺少输入校验 / 输出编码
- 权限检查缺失或绕过
- 敏感数据明文日志
## 3. 性能 (Performance)
- 算法复杂度不合理O(n²) 可优化为 O(n)
- 不必要的内存分配 / 拷贝
- N+1 查询、缺少批量操作
- 阻塞 I/O 在异步上下文中
- 缺少缓存 / 索引
- 热路径上的正则编译 / 反射
## 4. 可维护性 (Maintainability)
- 命名不清晰、缩写歧义
- 函数过长(>50行建议拆分
- 重复代码DRY 违反)
- 职责不单一SRP 违反)
- 缺少必要注释(复杂业务逻辑、非显而易见的决策)
- 魔法数字 / 字符串
- 耦合过紧、依赖方向不合理
## 5. 测试 (Testing)
- 关键路径缺少单元测试
- 测试覆盖了 happy path 但遗漏了 edge case
- 测试中有硬编码依赖(时间、文件路径、网络)
- Mock 过度导致测试失去意义
## 6. 风格 (Style)
- 不符合语言惯例Pythonic、Rust idiom 等)
- 格式不一致(应由 formatter 处理的除外)
- 不必要的复杂写法
# ── 严重级别 ──────────────────────────────────────────
| 级别 | 含义 | 是否阻塞合并 |
|------|------|-------------|
| 🔴 **[必须修复]** | 存在 bug、安全漏洞或数据丢失风险 | 是 |
| 🟡 **[建议修复]** | 不影响功能但会影响可维护性或性能 | 否,但强烈建议 |
| 🔵 **[小建议]** | 风格、命名等微小改进 | 否 |
| 🟢 **[亮点]** | 写得好的地方,值得肯定 | — |
# ── 输出格式 ──────────────────────────────────────────
严格按以下 Markdown 格式输出:
```
## 📋 总结
**结论**: [✅ 通过 / ⚠️ 需要修改 / 💬 仅评论]
**概述**: [1-2 句话总体评价]
**发现统计**: 🔴 X 个必须修复 | 🟡 X 个建议修复 | 🔵 X 个小建议 | 🟢 X 个亮点
---
## 🔍 详细发现
### 🔴 [必须修复] 问题标题
- **位置**: `文件名` 第 X-Y 行
- **问题**: 具体描述
- **原因**: 为什么这是个问题,可能造成什么后果
- **修复建议**:
(给出修复后的代码)
### 🟡 [建议修复] 问题标题
...
### 🔵 [小建议] 问题标题
...
### 🟢 [亮点] 优点标题
- **位置**: `文件名` 第 X-Y 行
- **说明**: 为什么这段代码写得好
---
## 📊 评分
| 维度 | 分数 | 说明 |
|------|------|------|
| 正确性 | X/10 | 一句话说明 |
| 安全性 | X/10 | 一句话说明 |
| 性能 | X/10 | 一句话说明 |
| 可维护性 | X/10 | 一句话说明 |
| 测试 | X/10 | 一句话说明 |
| **综合** | **X/10** | 一句话总结 |
```
# ── 审核原则 ──────────────────────────────────────────
1. **先肯定,再指出问题** — 不要只挑毛病,好的代码也要指出来
2. **解释 WHY不仅是 WHAT** — 每个问题都要说清楚「为什么不好」和「可能导致什么后果」
3. **给出具体修复代码** — 不要只说"这里有问题",要给出改好后的写法
4. **区分严重级别** — 不要把小问题标成必须修复,也不要把严重 bug 标成小建议
5. **尊重作者** — 用建设性的语气,避免 "这是错的" 这种措辞,用 "这里可以改进为..."
6. **不纠结格式** — 如果项目有 formatter/linter格式问题跳过
7. **关注变更本身** — 如果是 diff只审核变更的部分不要评论未修改的代码
8. **没有代码时** — 直接要求提交代码,不要编造审核结果"""
def _build_llm():
"""Build the LLM based on environment configuration."""
use_ollama = os.getenv("USE_OLLAMA", "").lower() in ("1", "true", "yes")
if use_ollama:
from langchain_ollama import ChatOllama
model = os.getenv("OLLAMA_MODEL", "qwen2.5")
base_url = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
return ChatOllama(model=model, base_url=base_url, temperature=0.2)
provider = os.getenv("LLM_PROVIDER", "openai").lower()
if provider == "deepseek":
from langchain_openai import ChatOpenAI
return ChatOpenAI(
model=os.getenv("DEEPSEEK_MODEL", "deepseek-chat"),
api_key=os.getenv("DEEPSEEK_API_KEY"),
base_url=os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com"),
temperature=0.2,
max_tokens=4096,
)
from langchain_openai import ChatOpenAI
return ChatOpenAI(
model=os.getenv("OPENAI_MODEL", "gpt-4o-mini"),
temperature=0.2,
max_tokens=4096,
)
class CodeReviewAgent:
"""LangChain-based code review agent."""
def __init__(self):
self.llm = _build_llm()
self.prompt = ChatPromptTemplate.from_messages([
("system", SYSTEM_PROMPT),
("human", "{input}"),
])
self.chain = self.prompt | self.llm | StrOutputParser()
def review(self, code_or_diff: str) -> str:
"""
Review the given code or diff.
Args:
code_or_diff: Source code, git diff, or PR description to review.
Returns:
Structured review report as markdown text.
"""
if not code_or_diff.strip():
return "No code provided. Please submit code or a diff to review."
return self.chain.invoke({"input": code_or_diff})

View File

@@ -0,0 +1,10 @@
# Add this section to your ~/.openfang/config.toml
# to register the LangChain code review agent.
[a2a]
enabled = true
listen_path = "/a2a"
[[a2a.external_agents]]
name = "langchain-code-reviewer"
url = "http://127.0.0.1:9100"

View File

@@ -0,0 +1,6 @@
langchain>=0.3
langchain-openai>=0.3
langchain-core>=0.3
langchain-ollama>=0.3
fastapi>=0.115
uvicorn>=0.34

View File

@@ -0,0 +1,226 @@
"""
LangChain Code Review Agent — A2A-compatible server.
Exposes a code review agent via Google's A2A protocol so that
OpenFang workflows can call it as an external agent.
Start:
OPENAI_API_KEY=sk-xxx python server.py
# or with Ollama (no key needed):
USE_OLLAMA=1 python server.py
Endpoints:
GET /.well-known/agent.json — A2A Agent Card
POST /a2a — JSON-RPC task endpoint
"""
import os
import uuid
import asyncio
from datetime import datetime, timezone
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uvicorn
from agent import CodeReviewAgent
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", "9100"))
BASE_URL = os.getenv("BASE_URL", f"http://127.0.0.1:{PORT}")
app = FastAPI(title="LangChain Code Review Agent")
agent = CodeReviewAgent()
# In-memory task store
tasks: dict[str, dict] = {}
# ---------------------------------------------------------------------------
# A2A Agent Card
# ---------------------------------------------------------------------------
AGENT_CARD = {
"name": "langchain-code-reviewer",
"description": (
"LangChain-powered code review agent. "
"Analyzes code for bugs, security issues, performance problems, "
"and style violations. Returns structured review with severity levels."
),
"url": f"{BASE_URL}/a2a",
"version": "0.1.0",
"capabilities": {
"streaming": False,
"pushNotifications": False,
"stateTransitionHistory": True,
},
"skills": [
{
"id": "code-review",
"name": "Code Review",
"description": "Review code for correctness, security, performance, and style",
"tags": ["code", "review", "security", "quality"],
"examples": [
"Review this Python function for bugs",
"Check this Rust code for security issues",
"Analyze this PR diff for performance problems",
],
},
{
"id": "pr-review",
"name": "Pull Request Review",
"description": "Review a git diff / pull request",
"tags": ["pr", "diff", "git"],
"examples": [
"Review this PR diff",
"Analyze these changes",
],
},
],
"defaultInputModes": ["text"],
"defaultOutputModes": ["text"],
}
@app.get("/.well-known/agent.json")
async def agent_card():
return JSONResponse(content=AGENT_CARD)
# ---------------------------------------------------------------------------
# A2A JSON-RPC Endpoint
# ---------------------------------------------------------------------------
@app.post("/a2a")
async def a2a_endpoint(request: Request):
body = await request.json()
jsonrpc = body.get("jsonrpc", "2.0")
req_id = body.get("id", 1)
method = body.get("method", "")
params = body.get("params", {})
if method == "tasks/send":
return await handle_tasks_send(jsonrpc, req_id, params)
elif method == "tasks/get":
return handle_tasks_get(jsonrpc, req_id, params)
elif method == "tasks/cancel":
return handle_tasks_cancel(jsonrpc, req_id, params)
else:
return JSONResponse(content={
"jsonrpc": jsonrpc,
"id": req_id,
"error": {"code": -32601, "message": f"Method not found: {method}"},
})
async def handle_tasks_send(jsonrpc: str, req_id: int, params: dict):
message = params.get("message", {})
session_id = params.get("sessionId")
task_id = str(uuid.uuid4())
text_parts = [
p["text"] for p in message.get("parts", []) if p.get("type") == "text"
]
user_input = "\n".join(text_parts)
task = {
"id": task_id,
"sessionId": session_id,
"status": {"state": "working", "message": None},
"messages": [message],
"artifacts": [],
}
tasks[task_id] = task
try:
review_result = await asyncio.to_thread(agent.review, user_input)
agent_message = {
"role": "agent",
"parts": [{"type": "text", "text": review_result}],
}
task["messages"].append(agent_message)
task["status"] = {"state": "completed", "message": None}
task["artifacts"] = [
{
"name": "code-review-report",
"description": "Structured code review report",
"parts": [{"type": "text", "text": review_result}],
"index": 0,
"lastChunk": True,
}
]
except Exception as e:
task["status"] = {"state": "failed", "message": str(e)}
task["messages"].append({
"role": "agent",
"parts": [{"type": "text", "text": f"Review failed: {e}"}],
})
return JSONResponse(content={
"jsonrpc": jsonrpc,
"id": req_id,
"result": task,
})
def handle_tasks_get(jsonrpc: str, req_id: int, params: dict):
task_id = params.get("id", "")
task = tasks.get(task_id)
if task is None:
return JSONResponse(content={
"jsonrpc": jsonrpc,
"id": req_id,
"error": {"code": -32000, "message": f"Task not found: {task_id}"},
})
return JSONResponse(content={
"jsonrpc": jsonrpc,
"id": req_id,
"result": task,
})
def handle_tasks_cancel(jsonrpc: str, req_id: int, params: dict):
task_id = params.get("id", "")
task = tasks.get(task_id)
if task is None:
return JSONResponse(content={
"jsonrpc": jsonrpc,
"id": req_id,
"error": {"code": -32000, "message": f"Task not found: {task_id}"},
})
task["status"] = {"state": "cancelled", "message": None}
return JSONResponse(content={
"jsonrpc": jsonrpc,
"id": req_id,
"result": task,
})
# ---------------------------------------------------------------------------
# Health check
# ---------------------------------------------------------------------------
@app.get("/health")
async def health():
return {"status": "ok", "agent": "langchain-code-reviewer", "tasks": len(tasks)}
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print(f"Starting LangChain Code Review Agent on {HOST}:{PORT}")
print(f"Agent Card: {BASE_URL}/.well-known/agent.json")
print(f"A2A endpoint: {BASE_URL}/a2a")
uvicorn.run(app, host=HOST, port=PORT)

View File

@@ -0,0 +1,23 @@
{
"id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"name": "langchain-code-review-pipeline",
"description": "Code review pipeline: uses LangChain external agent for deep review, then OpenFang Writer agent to format the final report.",
"created_at": "2026-03-16T00:00:00Z",
"steps": [
{
"name": "review-code",
"agent": { "name": "a2a-proxy" },
"prompt_template": "Use the a2a_send tool to send the following code to the external agent for code review. Set agent_name to langchain-code-reviewer and set message to the code below. Return the complete review result:\n\n{{input}}",
"mode": "sequential",
"timeout_secs": 300,
"output_var": "review_result"
},
{
"name": "format-report",
"agent": { "name": "Writer" },
"prompt_template": "Format the following code review into a clean, professional report. Preserve all severity levels and scores. Add a brief executive summary at the top:\n\n{{review_result}}",
"mode": "sequential",
"timeout_secs": 120
}
]
}