Skip to content

HeouDonkey/CPGHunter

Repository files navigation

CPGvulnHunter Framework

CPGvulnHunter 是一个基于代码属性图(Code Property Graph, CPG)的自动化漏洞检测框架。该框架结合了静态分析和大语言模型(LLM)的能力,提供高精度的漏洞发现和分析。

🔋 核心特性

  • 静态分析 + LLM增强: 结合Joern CPG分析和大语言模型语义理解
  • 高度并发处理: 支持多个Joern服务器并发分析
  • 智能缓存系统: 多层次LLM响应缓存,避免重复API调用
  • Pass流水线架构: 模块化的分析流程,易于扩展和维护
  • 资源管理优化: 自动服务器池管理和内存回收机制

📋 系统要求

  • Python: 3.8+
  • Joern: 需要预先安装并配置路径
  • Java项目: 目标分析项目
  • LLM API: 支持Alibaba DashScope (Qwen模型)

🚀 快速开始

安装依赖

# 安装Python依赖
pip install -r requirements.txt
pip install -e .

基本配置

创建或修改 config.yml 文件:

llm:
  api_key: "your_dashscope_api_key"
  model_name: "qwen-turbo"
  
joern:
  install_path: "/path/to/joern"
  
engine:
  max_workers: 11
  output_dir: "output_new"

运行分析

# 标准分析(包含语义增强)
python run.py

# 快速测试
python quick_semantic_test.py

# 清理残余Joern服务器
./utils/kill_server.sh

🏗️ 框架架构

核心组件

CPGvulnHunter/
├── core/                    # 核心引擎和基础组件
│   ├── engine.py           # VulnerabilityEngine - 主编排引擎
│   ├── cpg.py             # CPG管理和Joern集成
│   ├── task.py            # Task执行单元
│   └── passRegistry.py    # Pass注册和管理
├── passes/                 # 分析Pass实现
├── bridges/                # 外部工具集成
│   ├── joernServerPool.py  # Joern服务器池管理
│   ├── joernWrapper.py     # Joern API封装
│   └── asyncLLMWrapper.py  # 异步LLM调用
└── utils/                  # 工具和缓存系统
    ├── llmCacher.py       # LLM响应缓存
    └── stageLogger.py     # 分阶段日志记录

数据流架构

graph TD
    A[VulnerabilityEngine] --> B[Task]
    B --> C[CPG Management]
    C --> D[JoernServerPool]
    C --> E[Pass Pipeline]
    E --> F[FindTargetPass]
    F --> G[LabelFunctionPass]
    G --> H[PreTaintAnalysisPass]
    H --> I[SemanticModelPass]
    I --> J[TaintAnalysisPass]
    J --> K[VulnCheckPass]
    K --> L[Results Output]
Loading

🔍 CWE-22 Pass工作流程

1. FindTargetPass - 目标识别

功能: 识别可能包含路径穿越漏洞的目标包和函数

工作流程:

  • 获取项目所有包列表
  • 使用LLM分析包名特征,识别文件操作相关包
  • 过滤目标函数(位于目标包且非测试函数)
  • 智能缓存避免重复分析

LLM提示策略:

识别sink包特征:
- 文件操作核心:file, io, stream, reader, writer, nio, path
- 文件系统操作:fs, filesystem, directory, folder  
- 文件处理功能:upload, download, transfer, copy, move
- 压缩解压操作:zip, unzip, archive, extract, tar

输出: findTargetPass.json - 包含目标包和函数列表

2. LabelFunctionPass - 函数标记

功能: 将目标函数分类为Source、Sink或其他类型

两阶段分析:

  • 阶段1: 识别潜在Source(无调用者的根函数)和Sink(目标包中的函数)
  • 阶段2: 使用LLM确认函数的实际类型和参数索引

输出: LabelFunctionPass.json - 标记后的Source和Sink函数

3. PreTaintAnalysisPass - 预污点分析

功能: 预检测Source到Sink之间是否存在数据流路径

优化策略:

  • 批量检测避免无效污点任务
  • 过滤不可达的Source-Sink组合
  • 减少后续完整污点分析的计算开销

输出: pre_taint_analysis_results.json - 可达的污点任务列表

4. SemanticModelPass - 语义模型构建

功能: 构建语义规则以增强污点分析精度

语义规则类型:

  • 传播规则: 参数间的污点传播模式
  • 清洗规则: 识别消除污点的函数调用
  • 转换规则: 污点数据的格式转换

LLM增强: 自动分析函数语义,生成精确的污点传播规则

5. TaintAnalysisPass - 污点流分析

功能: 执行完整的污点流分析,追踪数据流路径

并发架构:

  • 使用BatchedFinalTaintEngine支持大规模并发
  • 最多21个Joern服务器并行处理
  • 智能任务分批和资源管理

分析维度:

  • 数据流路径追踪
  • 污点传播验证
  • 语义规则应用

输出: taint_analysis_results.json - 完整的污点流路径

6. VulnCheckPass - 漏洞确认

功能: 验证检测到的数据流是否构成真实漏洞

验证策略:

  • 路径有效性检查
  • 漏洞模式匹配
  • 误报过滤机制

输出: vulnCheck_results.json - 最终漏洞报告

⚡ 并发处理架构

JoernServerPool设计

# 服务器池配置
max_servers: 21          # 最大并发服务器数
port_range: 9000-9100    # 端口分配范围
auto_restart: true       # 服务器崩溃自动重启
resource_monitoring: true # 资源使用监控

并发执行策略

  • 引擎级并发: VulnerabilityEngine支持多项目并行分析
  • Pass级并发: TaintAnalysisPass内部批量并行处理
  • 任务级并发: 单个任务内的数据流并发追踪

资源管理

  • 自动服务器池生命周期管理
  • 内存使用监控和垃圾回收
  • 服务器崩溃检测和恢复机制

🧠 LLM集成策略

多层缓存系统

LLM缓存架构:
├── source_cache.json      # Source函数分析缓存
├── sink_cache.json        # Sink函数分析缓存  
├── package_cache.json     # 包分类缓存
├── semantic_cache.json    # 语义规则缓存
└── request_cache.json     # 通用请求缓存

智能提示工程

  • 上下文感知: 基于函数签名和调用上下文的精确提示
  • 领域专业化: CWE-22专项的漏洞模式提示
  • 批量优化: 减少API调用次数的批量分析提示

API配置

llm:
  api_key: "your_api_key"
  model_name: "qwen-turbo"  # 或其他支持的模型
  base_url: "https://dashscope.aliyuncs.com/api/v1"
  timeout: 30
  max_retries: 3
  request_interval: 0.1     # API调用间隔

📊 输出结构

分析结果存储在 output_new/{project_name}/ 目录下:

project_output/
├── project_with_semantics_all/     # 语义增强分析结果
│   ├── findTargetPass.json         # 目标识别结果
│   ├── LabelFunctionPass.json      # 函数标记结果
│   ├── pre_taint_analysis_results.json  # 预污点分析
│   ├── semantics_result.json       # 语义模型
│   ├── taint_analysis_results.json # 污点分析结果
│   ├── vulnCheck_results.json      # 最终漏洞报告
│   └── joern_logs/                 # Joern服务器日志
├── project_without_semantics_all/   # 基准分析结果
└── project_llm_cache/              # LLM缓存数据
    └── llm_cache/
        ├── source_cache.json
        ├── sink_cache.json
        └── ...

漏洞报告格式

{
  "vulnerability_id": "vuln_001",
  "cwe_type": "CWE-22",
  "severity": "high",
  "source": {
    "function": "getUserInput",
    "location": "com.example.Controller:45"
  },
  "sink": {
    "function": "readFile", 
    "location": "com.example.FileHandler:123"
  },
  "data_flow_path": [...],
  "confidence": 0.95
}

🛠️ 扩展开发

添加新的漏洞类型

  1. passes/ 下创建新的漏洞类型目录
  2. 实现对应的Pass类(继承BasePass)
  3. passRegistry.py 中注册新Pass
  4. 更新LLM提示模板

自定义Pass开发

class CustomPass:
    def __init__(self, cpg: CPG):
        self.cpg = cpg
        self.logger = get_thread_logger()
    
    def run(self):
        # 实现自定义分析逻辑
        pass

配置扩展

通过 config.yml 支持灵活的参数配置:

custom_pass:
  enable: true
  parameters:
    analysis_depth: 5
    confidence_threshold: 0.8

🐛 故障排除

常见问题

  1. Joern服务器启动失败

    # 检查Joern安装路径
    which joern
    # 手动清理端口
    ./utils/kill_server.sh
  2. 内存不足错误

    # 减少并发度
    engine:
      max_workers: 5
    parallel:
      taint_analysis_workers: 10
  3. LLM API调用失败

    # 增加重试和间隔
    llm:
      max_retries: 5
      request_interval: 0.5

日志分析

  • 主日志:查看引擎级别的执行状态
  • 线程日志:查看具体任务的执行细节
  • Joern日志:查看CPG构建和查询状态

性能调优

  • 根据硬件资源调整 max_workers 参数
  • 使用缓存减少重复的LLM调用
  • 定期清理输出目录避免磁盘空间不足

About

LLM-based vuln-detect framework

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors