CPGvulnHunter 是一个基于代码属性图(Code Property Graph, CPG)的自动化漏洞检测框架。该框架结合了静态分析和大语言模型(LLM)的能力,提供高精度的漏洞发现和分析。
- 静态分析 + LLM增强: 结合Joern CPG分析和大语言模型语义理解
- 高度并发处理: 支持多个Joern服务器并发分析
- 智能缓存系统: 多层次LLM响应缓存,避免重复API调用
- Pass流水线架构: 模块化的分析流程,易于扩展和维护
- 资源管理优化: 自动服务器池管理和内存回收机制
- Python: 3.8+
- Joern: 需要预先安装并配置路径
- Java项目: 目标分析项目
- LLM API: 支持Alibaba DashScope (Qwen模型)
# 安装Python依赖
pip install -r requirements.txt
pip install -e .创建或修改 config.yml 文件:
llm:
api_key: "your_dashscope_api_key"
model_name: "qwen-turbo"
joern:
install_path: "/path/to/joern"
engine:
max_workers: 11
output_dir: "output_new"# 标准分析(包含语义增强)
python run.py
# 快速测试
python quick_semantic_test.py
# 清理残余Joern服务器
./utils/kill_server.shCPGvulnHunter/
├── core/ # 核心引擎和基础组件
│ ├── engine.py # VulnerabilityEngine - 主编排引擎
│ ├── cpg.py # CPG管理和Joern集成
│ ├── task.py # Task执行单元
│ └── passRegistry.py # Pass注册和管理
├── passes/ # 分析Pass实现
├── bridges/ # 外部工具集成
│ ├── joernServerPool.py # Joern服务器池管理
│ ├── joernWrapper.py # Joern API封装
│ └── asyncLLMWrapper.py # 异步LLM调用
└── utils/ # 工具和缓存系统
├── llmCacher.py # LLM响应缓存
└── stageLogger.py # 分阶段日志记录
graph TD
A[VulnerabilityEngine] --> B[Task]
B --> C[CPG Management]
C --> D[JoernServerPool]
C --> E[Pass Pipeline]
E --> F[FindTargetPass]
F --> G[LabelFunctionPass]
G --> H[PreTaintAnalysisPass]
H --> I[SemanticModelPass]
I --> J[TaintAnalysisPass]
J --> K[VulnCheckPass]
K --> L[Results Output]
功能: 识别可能包含路径穿越漏洞的目标包和函数
工作流程:
- 获取项目所有包列表
- 使用LLM分析包名特征,识别文件操作相关包
- 过滤目标函数(位于目标包且非测试函数)
- 智能缓存避免重复分析
LLM提示策略:
识别sink包特征:
- 文件操作核心:file, io, stream, reader, writer, nio, path
- 文件系统操作:fs, filesystem, directory, folder
- 文件处理功能:upload, download, transfer, copy, move
- 压缩解压操作:zip, unzip, archive, extract, tar
输出: findTargetPass.json - 包含目标包和函数列表
功能: 将目标函数分类为Source、Sink或其他类型
两阶段分析:
- 阶段1: 识别潜在Source(无调用者的根函数)和Sink(目标包中的函数)
- 阶段2: 使用LLM确认函数的实际类型和参数索引
输出: LabelFunctionPass.json - 标记后的Source和Sink函数
功能: 预检测Source到Sink之间是否存在数据流路径
优化策略:
- 批量检测避免无效污点任务
- 过滤不可达的Source-Sink组合
- 减少后续完整污点分析的计算开销
输出: pre_taint_analysis_results.json - 可达的污点任务列表
功能: 构建语义规则以增强污点分析精度
语义规则类型:
- 传播规则: 参数间的污点传播模式
- 清洗规则: 识别消除污点的函数调用
- 转换规则: 污点数据的格式转换
LLM增强: 自动分析函数语义,生成精确的污点传播规则
功能: 执行完整的污点流分析,追踪数据流路径
并发架构:
- 使用
BatchedFinalTaintEngine支持大规模并发 - 最多21个Joern服务器并行处理
- 智能任务分批和资源管理
分析维度:
- 数据流路径追踪
- 污点传播验证
- 语义规则应用
输出: taint_analysis_results.json - 完整的污点流路径
功能: 验证检测到的数据流是否构成真实漏洞
验证策略:
- 路径有效性检查
- 漏洞模式匹配
- 误报过滤机制
输出: vulnCheck_results.json - 最终漏洞报告
# 服务器池配置
max_servers: 21 # 最大并发服务器数
port_range: 9000-9100 # 端口分配范围
auto_restart: true # 服务器崩溃自动重启
resource_monitoring: true # 资源使用监控- 引擎级并发: VulnerabilityEngine支持多项目并行分析
- Pass级并发: TaintAnalysisPass内部批量并行处理
- 任务级并发: 单个任务内的数据流并发追踪
- 自动服务器池生命周期管理
- 内存使用监控和垃圾回收
- 服务器崩溃检测和恢复机制
LLM缓存架构:
├── source_cache.json # Source函数分析缓存
├── sink_cache.json # Sink函数分析缓存
├── package_cache.json # 包分类缓存
├── semantic_cache.json # 语义规则缓存
└── request_cache.json # 通用请求缓存
- 上下文感知: 基于函数签名和调用上下文的精确提示
- 领域专业化: CWE-22专项的漏洞模式提示
- 批量优化: 减少API调用次数的批量分析提示
llm:
api_key: "your_api_key"
model_name: "qwen-turbo" # 或其他支持的模型
base_url: "https://dashscope.aliyuncs.com/api/v1"
timeout: 30
max_retries: 3
request_interval: 0.1 # API调用间隔分析结果存储在 output_new/{project_name}/ 目录下:
project_output/
├── project_with_semantics_all/ # 语义增强分析结果
│ ├── findTargetPass.json # 目标识别结果
│ ├── LabelFunctionPass.json # 函数标记结果
│ ├── pre_taint_analysis_results.json # 预污点分析
│ ├── semantics_result.json # 语义模型
│ ├── taint_analysis_results.json # 污点分析结果
│ ├── vulnCheck_results.json # 最终漏洞报告
│ └── joern_logs/ # Joern服务器日志
├── project_without_semantics_all/ # 基准分析结果
└── project_llm_cache/ # LLM缓存数据
└── llm_cache/
├── source_cache.json
├── sink_cache.json
└── ...
{
"vulnerability_id": "vuln_001",
"cwe_type": "CWE-22",
"severity": "high",
"source": {
"function": "getUserInput",
"location": "com.example.Controller:45"
},
"sink": {
"function": "readFile",
"location": "com.example.FileHandler:123"
},
"data_flow_path": [...],
"confidence": 0.95
}- 在
passes/下创建新的漏洞类型目录 - 实现对应的Pass类(继承BasePass)
- 在
passRegistry.py中注册新Pass - 更新LLM提示模板
class CustomPass:
def __init__(self, cpg: CPG):
self.cpg = cpg
self.logger = get_thread_logger()
def run(self):
# 实现自定义分析逻辑
pass通过 config.yml 支持灵活的参数配置:
custom_pass:
enable: true
parameters:
analysis_depth: 5
confidence_threshold: 0.8-
Joern服务器启动失败
# 检查Joern安装路径 which joern # 手动清理端口 ./utils/kill_server.sh
-
内存不足错误
# 减少并发度 engine: max_workers: 5 parallel: taint_analysis_workers: 10
-
LLM API调用失败
# 增加重试和间隔 llm: max_retries: 5 request_interval: 0.5
- 主日志:查看引擎级别的执行状态
- 线程日志:查看具体任务的执行细节
- Joern日志:查看CPG构建和查询状态
- 根据硬件资源调整
max_workers参数 - 使用缓存减少重复的LLM调用
- 定期清理输出目录避免磁盘空间不足