Skip to content

feat: add vector database migration solution between different vector DBs#6349

Merged
c121914yu merged 2 commits into
labring:migrate-vector-testfrom
zhangjiongai-debug:feature/vector-db-migration
Feb 2, 2026
Merged

feat: add vector database migration solution between different vector DBs#6349
c121914yu merged 2 commits into
labring:migrate-vector-testfrom
zhangjiongai-debug:feature/vector-db-migration

Conversation

@zhangjiongai-debug

Copy link
Copy Markdown

Summary

This PR implements a migration solution for transferring vector data between different vector databases (PostgreSQL, OceanBase, Milvus) as requested in #6196.

Features:

  • Support offline (stop-the-world) migration mode for reliable data transfer
  • Export vectors from source DB with cursor-based pagination
  • Import vectors to target DB with batch processing (configurable batch size)
  • Preserve original vector IDs when possible (PG ↔ OceanBase)
  • ID remapping for Milvus migrations
  • Validate migration results by comparing counts
  • CLI script for command-line migration
  • Admin API endpoint for programmatic migration

Files added:

  • packages/service/common/vectorDB/migration/ - Core migration logic
    • type.ts - Type definitions
    • exporters.ts - Database-specific export implementations
    • importers.ts - Database-specific import implementations
    • controller.ts - Migration orchestration
    • index.ts - Module exports
  • projects/app/src/pages/api/admin/migrateVector.ts - Admin API endpoint
  • scripts/migrateVector.ts - CLI migration script
  • .claude/design/vector-db-migration.md - Design document

Usage

CLI Migration

# Migrate from PostgreSQL to Milvus
npx ts-node scripts/migrateVector.ts \
  --source pg --target milvus \
  --target-address "http://localhost:19530"

# Migrate from Milvus to PostgreSQL
npx ts-node scripts/migrateVector.ts \
  --source milvus --source-address "http://localhost:19530" \
  --target pg --target-address "postgresql://user:pass@localhost:5432/db"

# Migrate specific team/dataset
npx ts-node scripts/migrateVector.ts \
  --source pg --target milvus \
  --target-address "http://localhost:19530" \
  --team-id "xxx" --dataset-id "yyy"

API Migration

# Start migration
curl -X POST /api/admin/migrateVector \
  -H "Content-Type: application/json" \
  -d '{
    "mode": "offline",
    "sourceType": "pg",
    "targetType": "milvus",
    "targetConfig": {
      "address": "http://localhost:19530"
    }
  }'

# Check migration status
curl /api/admin/migrateVector?action=list

Test plan

  • Test PG → Milvus migration
  • Test Milvus → PG migration
  • Test PG → OceanBase migration
  • Test with large dataset (100k+ vectors)
  • Verify vector search results are consistent after migration

Closes #6196

🤖 Generated with Claude Code

… DBs

This PR implements a migration solution for transferring vector data between
different vector databases (PostgreSQL, OceanBase, Milvus) as requested in labring#6196.

Features:
- Support offline (stop-the-world) migration mode
- Export vectors from source DB with pagination
- Import vectors to target DB with batch processing
- Preserve original vector IDs when possible
- Validate migration results by comparing counts
- CLI script for command-line migration
- Admin API endpoint for programmatic migration

Files added:
- packages/service/common/vectorDB/migration/ - Core migration logic
- projects/app/src/pages/api/admin/migrateVector.ts - Admin API
- scripts/migrateVector.ts - CLI migration script
- .claude/design/vector-db-migration.md - Design document

Closes labring#6196

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@gru-agent

gru-agent Bot commented Jan 30, 2026

Copy link
Copy Markdown
Contributor

TestGru Assignment

Summary

Link CommitId Status Reason
Detail b55b71a 🚫 Skipped No files need to be tested {".claude/design/vector-db-migration.md":"File path does not match include patterns.","packages/service/common/vectorDB/migration/controller.ts":"File path does not match include patterns.","packages/service/common/vectorDB/migration/exporters.ts":"File path does not match include patterns.","packages/service/common/vectorDB/migration/importers.ts":"File path does not match include patterns.","packages/service/common/vectorDB/migration/index.ts":"File path does not match include patterns.","packages/service/common/vectorDB/migration/type.ts":"File path does not match include patterns.","projects/app/src/pages/api/admin/migrateVector.ts":"Can not find valuable test target.\nhandler: Out of scope - The handler function is an API controller that orchestrates requests and responses, interacts with services, performs authentication, and handles business process flow. It involves multiple modules and external dependencies, as well as validation and logging. Unit …

History Assignment

Tip

You can @gru-agent and leave your feedback. TestGru will make adjustments based on your input

@cla-assistant

cla-assistant Bot commented Jan 30, 2026

Copy link
Copy Markdown

CLA assistant check
All committers have signed the CLA.

@cla-assistant

cla-assistant Bot commented Jan 30, 2026

Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@github-actions

github-actions Bot commented Jan 30, 2026

Copy link
Copy Markdown

Preview mcp_server Image:

registry.cn-hangzhou.aliyuncs.com/fastgpt/fastgpt-pr:fatsgpt_mcp_server_28625271623130ca715e8be683b83d19cd1d69f9

@github-actions

github-actions Bot commented Jan 30, 2026

Copy link
Copy Markdown

Preview sandbox Image:

registry.cn-hangzhou.aliyuncs.com/fastgpt/fastgpt-pr:fatsgpt_sandbox_28625271623130ca715e8be683b83d19cd1d69f9

@github-actions

github-actions Bot commented Jan 30, 2026

Copy link
Copy Markdown

Preview fastgpt Image:

registry.cn-hangzhou.aliyuncs.com/fastgpt/fastgpt-pr:fatsgpt_28625271623130ca715e8be683b83d19cd1d69f9

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements a vector database migration solution to transfer data between PostgreSQL, OceanBase, and Milvus vector databases. The implementation provides an offline (stop-the-world) migration mode with CLI and API interfaces for initiating and monitoring migrations.

Changes:

  • Added migration module with exporters/importers for PG, OceanBase, and Milvus
  • Created admin API endpoint for programmatic migration control
  • Implemented CLI script for command-line migrations

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 25 comments.

Show a summary per file
File Description
packages/service/common/vectorDB/migration/type.ts Type definitions and Zod schemas for migration
packages/service/common/vectorDB/migration/exporters.ts Database-specific export implementations
packages/service/common/vectorDB/migration/importers.ts Database-specific import implementations
packages/service/common/vectorDB/migration/controller.ts Migration orchestration and state management
packages/service/common/vectorDB/migration/index.ts Module exports
projects/app/src/pages/api/admin/migrateVector.ts Admin API endpoint
scripts/migrateVector.ts CLI migration script
.claude/design/vector-db-migration.md Design documentation

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +60 to +69
} catch {
// Index might already exist
}

try {
await pool.query(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS team_dataset_collection_index ON ${DatasetVectorTableName} USING btree(team_id, dataset_id, collection_id);`
);
} catch {
// Index might already exist

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Poor error handling: The catch blocks silently ignore index creation errors with only a comment. This could mask real issues like insufficient permissions or syntax errors. At minimum, log the error to help with debugging if index creation fails for reasons other than already existing.

Suggested change
} catch {
// Index might already exist
}
try {
await pool.query(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS team_dataset_collection_index ON ${DatasetVectorTableName} USING btree(team_id, dataset_id, collection_id);`
);
} catch {
// Index might already exist
} catch (error) {
// Index might already exist, but log any error for debugging
addLog.error('[Migration] Failed to create PostgreSQL index "vector_index"', error);
}
try {
await pool.query(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS team_dataset_collection_index ON ${DatasetVectorTableName} USING btree(team_id, dataset_id, collection_id);`
);
} catch (error) {
// Index might already exist, but log any error for debugging
addLog.error(
'[Migration] Failed to create PostgreSQL index "team_dataset_collection_index"',
error
);

Copilot uses AI. Check for mistakes.
Comment on lines +152 to +160
if (afterId) {
whereConditions.push(`id > ${afterId}`);
}
if (teamId) {
whereConditions.push(`team_id = '${teamId}'`);
}
if (datasetId) {
whereConditions.push(`dataset_id = '${datasetId}'`);
}

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL injection vulnerability: User-controlled input (afterId, teamId, datasetId) is directly interpolated into SQL queries without parameterization. This occurs in the same pattern as the PostgreSQL exporter. Use parameterized queries to prevent SQL injection attacks.

Copilot uses AI. Check for mistakes.
Comment on lines +222 to +236
// Build bulk insert with explicit IDs
const values = records
.map((record) => {
const vectorStr = `[${record.vector.join(',')}]`;
const timestamp = record.createTime.toISOString().replace('T', ' ').replace('Z', '');
return `(${record.id}, '${vectorStr}', '${record.teamId}', '${record.datasetId}', '${record.collectionId}', '${timestamp}')`;
})
.join(',');

const sql = `
INSERT IGNORE INTO ${DatasetVectorTableName} (id, vector, team_id, dataset_id, collection_id, createtime)
VALUES ${values}
`;

await pool.query(sql);

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQL injection vulnerability: Vector record data is directly concatenated into SQL without parameterization, the same as in the PgImporter. Use parameterized queries for all database operations.

Suggested change
// Build bulk insert with explicit IDs
const values = records
.map((record) => {
const vectorStr = `[${record.vector.join(',')}]`;
const timestamp = record.createTime.toISOString().replace('T', ' ').replace('Z', '');
return `(${record.id}, '${vectorStr}', '${record.teamId}', '${record.datasetId}', '${record.collectionId}', '${timestamp}')`;
})
.join(',');
const sql = `
INSERT IGNORE INTO ${DatasetVectorTableName} (id, vector, team_id, dataset_id, collection_id, createtime)
VALUES ${values}
`;
await pool.query(sql);
// Build bulk insert with explicit IDs using parameterized query
const placeholders: string[] = [];
const params: any[] = [];
for (const record of records) {
const vectorStr = `[${record.vector.join(',')}]`;
const timestamp = record.createTime.toISOString().replace('T', ' ').replace('Z', '');
// One placeholder tuple per record
placeholders.push('(?, ?, ?, ?, ?, ?)');
// Bind parameters in the same order as columns
params.push(
record.id,
vectorStr,
record.teamId,
record.datasetId,
record.collectionId,
timestamp
);
}
const sql = `
INSERT IGNORE INTO ${DatasetVectorTableName} (id, vector, team_id, dataset_id, collection_id, createtime)
VALUES ${placeholders.join(',')}
`;
await pool.query(sql, params);

Copilot uses AI. Check for mistakes.
Comment on lines +256 to +272
// Sample check: export a few records and verify they exist in target
// For simplicity, we just compare counts for now
// A more thorough validation would involve vector comparison

return {
passed: countMatch,
countMatch: {
source: sourceCount,
target: targetCount,
match: countMatch
},
sampleMatch: {
checked: 0,
matched: 0,
mismatchedIds: []
}
};

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inadequate validation: The validation logic only compares counts between source and target databases, which is insufficient for ensuring data integrity. The comment acknowledges this but leaves it unimplemented. Consider implementing at least basic sample validation by comparing a subset of actual vector records to ensure the migration was successful and data wasn't corrupted.

Copilot uses AI. Check for mistakes.
Comment on lines +150 to +165
const importResult = await importer.importBatch(records);

// Collect ID mappings
for (const [oldId, newId] of importResult.idMappings) {
idMappings.set(oldId, newId);
}

// Collect errors
allErrors.push(...importResult.errors);
this.state.errors.push(...importResult.errors);

// Update progress
processedCount += records.length;
this.state.syncedRecords = processedCount;
this.state.failedRecords = allErrors.length;
migrationStates.set(this.migrationId, this.state);

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing error handling: If importBatch encounters errors for individual records, the migration continues without any retry mechanism or mechanism to halt on critical failures. The code collects errors but doesn't check if they exceed a threshold or if they indicate a systemic issue. Consider adding logic to halt migration if error rate exceeds a threshold or if critical errors occur.

Copilot uses AI. Check for mistakes.
Comment on lines +448 to +451
const client = await this.getClient();
await client.delete({
collection_name: DatasetVectorTableName,
filter: `id in [${ids.join(',')}]`

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filter expression injection vulnerability: IDs from user input are directly concatenated into the Milvus delete filter expression. Properly escape or validate ID values before inclusion in filter expressions to prevent injection attacks.

Suggested change
const client = await this.getClient();
await client.delete({
collection_name: DatasetVectorTableName,
filter: `id in [${ids.join(',')}]`
// Validate and sanitize IDs to prevent filter expression injection
const idPattern = /^[A-Za-z0-9_-]+$/;
const sanitizedIds = ids.map((id) => {
if (!idPattern.test(id)) {
throw new Error(`Invalid ID format for deletion: ${id}`);
}
return id;
});
if (sanitizedIds.length === 0) return;
// Quote each ID to ensure they are treated as literals in the filter expression
const quotedIds = sanitizedIds.map((id) => `"${id}"`);
const client = await this.getClient();
await client.delete({
collection_name: DatasetVectorTableName,
filter: `id in [${quotedIds.join(',')}]`

Copilot uses AI. Check for mistakes.
Comment on lines +7 to +12
// Database connection configuration
export const DatabaseConfigSchema = z.object({
type: VectorDbTypeSchema,
address: z.string(),
token: z.string().optional() // For Milvus authentication
});

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing input validation: The address field in DatabaseConfigSchema is only validated as a string, with no format or pattern validation. This allows potentially invalid connection strings that could cause runtime errors or security issues. Add validation to ensure addresses conform to expected URL/connection string formats for each database type.

Suggested change
// Database connection configuration
export const DatabaseConfigSchema = z.object({
type: VectorDbTypeSchema,
address: z.string(),
token: z.string().optional() // For Milvus authentication
});
// Basic connection string validators for different database types.
// These are intentionally permissive to avoid rejecting valid but uncommon formats,
// while still catching clearly malformed addresses.
const PG_CONNECTION_STRING_REGEX = /^postgres(?:ql)?:\/\/\S+/i;
const OCEANBASE_CONNECTION_STRING_REGEX = /^oceanbase:\/\/\S+/i;
// Database connection configuration
export const DatabaseConfigSchema = z
.object({
type: VectorDbTypeSchema,
address: z.string(),
token: z.string().optional() // For Milvus authentication
})
.superRefine((value, ctx) => {
const { type, address } = value;
if (!address || typeof address !== 'string') {
ctx.addIssue({
code: z.ZodIssueCode.custom,
path: ['address'],
message: 'Address must be a non-empty string.'
});
return;
}
switch (type) {
case 'pg':
if (!PG_CONNECTION_STRING_REGEX.test(address)) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
path: ['address'],
message:
"PostgreSQL address must be a valid connection string starting with 'postgres://' or 'postgresql://'."
});
}
break;
case 'oceanbase':
if (!OCEANBASE_CONNECTION_STRING_REGEX.test(address)) {
ctx.addIssue({
code: z.ZodIssueCode.custom,
path: ['address'],
message:
"OceanBase address must be a valid connection string starting with 'oceanbase://'."
});
}
break;
case 'milvus':
try {
const url = new URL(address);
if (url.protocol !== 'http:' && url.protocol !== 'https:') {
ctx.addIssue({
code: z.ZodIssueCode.custom,
path: ['address'],
message: 'Milvus address must use http or https scheme.'
});
}
} catch {
ctx.addIssue({
code: z.ZodIssueCode.custom,
path: ['address'],
message: 'Milvus address must be a valid URL (e.g., http(s)://host:port).'
});
}
break;
default:
// If a new database type is added but not handled here, do not block validation,
// but this branch makes the intent explicit.
break;
}
});

Copilot uses AI. Check for mistakes.

import type { VectorImporter, VectorRecord, MigrationError } from './type';
import { DatasetVectorTableName, DatasetVectorDbName, VectorVQ } from '../constants';
import { PgClient, connectPg } from '../pg/controller';

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused imports PgClient, connectPg.

Suggested change
import { PgClient, connectPg } from '../pg/controller';

Copilot uses AI. Check for mistakes.
import type { VectorImporter, VectorRecord, MigrationError } from './type';
import { DatasetVectorTableName, DatasetVectorDbName, VectorVQ } from '../constants';
import { PgClient, connectPg } from '../pg/controller';
import { ObClient, getClient as getObClient } from '../oceanbase/controller';

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused imports ObClient, getObClient.

Suggested change
import { ObClient, getClient as getObClient } from '../oceanbase/controller';

Copilot uses AI. Check for mistakes.
import { DatasetVectorTableName } from '../constants';
import { PgClient, connectPg } from '../pg/controller';
import { ObClient } from '../oceanbase/controller';
import { MilvusClient, DataType } from '@zilliz/milvus2-sdk-node';

Copilot AI Jan 30, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused import DataType.

Suggested change
import { MilvusClient, DataType } from '@zilliz/milvus2-sdk-node';
import { MilvusClient } from '@zilliz/milvus2-sdk-node';

Copilot uses AI. Check for mistakes.

@c121914yu c121914yu left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Review: feat: add vector database migration solution between different vector DBs

📊 变更概览

✅ 优点

  1. 架构设计清晰: 采用 ETL 模式,导出器/导入器分离,易于扩展
  2. 文档完善: 提供了详细的设计文档
  3. 多种访问方式: 同时提供 CLI 和 API 端点
  4. 进度跟踪: 提供实时的迁移进度和状态管理
  5. 数据验证: 包含迁移后的数据验证机制

⚠️ 问题汇总

🔴 严重问题 (6 个,必须修复)

1. SQL 注入风险 (exporters.ts)

PgExporterOceanBaseExporter 直接拼接用户输入到 SQL 语句,存在严重的 SQL 注入漏洞:

// ❌ 当前代码
whereConditions.push(`team_id = '${teamId}'`);

// ✅ 应该使用参数化查询或 PgClient 的安全格式
const whereConditions: (string | [string, string | number])[] = [];
if (teamId) {
  whereConditions.push(['team_id', teamId]);
}

2. 缺少权限验证 (migrateVector.ts)

API 端点缺少管理员权限检查,任何用户都可以触发迁移操作。应该参考项目中其他 admin API 的权限验证模式:

import { authTemplate } from '@fastgpt/service/support/permission/auth/template';
import { OwnerRoleVal } from '@fastgpt/global/support/permission/constant';

export default async function handler(req, res) {
  await authTemplate({
    req,
    authToken: true,
    authApiKey: true,
    minimumRole: OwnerRoleVal.admin
  });
  // ...
}

3. 内存泄漏风险 (controller.ts:24)

迁移状态存储在内存 Map 中永不清理:

// ❌ 当前代码
const migrationStates = new Map<string, MigrationState>();

// ✅ 应该添加清理机制或使用 MongoDB 持久化

4. 事务一致性问题 (importers.ts)

PostgreSQL 和 OceanBase 导入缺少事务保护,迁移失败时可能导致数据不一致。

5. 错误处理不完整 (controller.ts:149-160)

清理阶段的错误可能被忽略。

6. 并发安全问题 (controller.ts:93)

没有限制同时运行的迁移任务数量。

🟡 建议改进

  • 添加 Zod schema 验证配置
  • 添加单元测试和集成测试
  • 优化性能(缓存 count 查询)
  • 完善错误日志

🟢 可选优化

  • 断点续传功能
  • 增量迁移支持
  • 性能监控指标

🧪 测试建议

  1. 安全性测试: 验证 SQL 注入防护和权限检查
  2. 数据一致性测试: 迁移前后数据对比
  3. 错误恢复测试: 测试失败时的回滚机制
  4. 性能测试: 测试大规模数据迁移

💬 总体评价

  • 代码质量: ⭐⭐⭐☆☆ (3/5) - 架构合理但有安全隐患
  • 安全性: ⭐⭐☆☆☆ (2/5) - 存在严重安全问题
  • 性能: ⭐⭐⭐⭐☆ (4/5) - 分页和批量设计合理
  • 可维护性: ⭐⭐⭐⭐☆ (4/5) - 结构清晰但缺测试

🚀 审查结论

结论: 需要修改 (REQUEST_CHANGES)

必须修复的问题:

  1. ✅ 修复所有 SQL 注入漏洞
  2. ✅ 添加管理员权限验证
  3. ✅ 实现迁移状态清理机制
  4. ✅ 添加数据库事务保护
  5. ✅ 完善错误处理逻辑
  6. ✅ 添加并发控制

请在修复这些问题后重新提交 PR。详细的修复建议和代码示例请参考完整审查报告。

- Fix SQL injection vulnerabilities in PgExporter and OceanBaseExporter
  by using parameterized queries instead of string concatenation
- Add memory leak prevention with automatic cleanup of old migration
  states (24h retention, hourly cleanup)
- Add concurrent migration limit (max 3 simultaneous migrations)
- Add transaction protection for PostgreSQL and OceanBase batch imports
- Improve error handling for connection cleanup phase
- Fix SQL injection in deleteBatch methods

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@zhangjiongai-debug

Copy link
Copy Markdown
Author

@c121914yu 感谢您的详细审查!已根据您的反馈修复了所有 6 个严重问题:

✅ 已修复的问题

# 问题 修复方案
1 SQL 注入风险 exporters.tsimporters.ts 中使用参数化查询替代字符串拼接(PG 使用 $1, $2...,OceanBase 使用 ?
2 权限验证 已有 authCert({ req, authRoot: true }) 验证,只有 root 用户可以访问
3 内存泄漏 添加 cleanupOldStates() 自动清理机制(24小时保留,每小时清理)
4 事务保护 PG 和 OceanBase 批量导入使用事务(BEGIN/COMMIT/ROLLBACK)
5 错误处理 连接关闭时的错误被正确捕获和记录
6 并发控制 添加 MAX_CONCURRENT_MIGRATIONS = 3 限制

修复提交: 28625271 - fix: address PR review security and stability issues

请您重新审查,谢谢!

@c121914yu c121914yu changed the base branch from main to migrate-vector-test January 31, 2026 08:11
@c121914yu

Copy link
Copy Markdown
Collaborator

PR Review: feat: add vector database migration solution between different vector DBs

📊 变更概览

✅ 优点

1. 架构设计合理 ⭐⭐⭐⭐⭐

  • 清晰的模块化结构:exporters、importers、controller 分离
  • 良好的抽象层设计:VectorExporter/VectorImporter 接口
  • 支持三种向量数据库:PostgreSQL、OceanBase、Milvus 的完整组合

2. 安全性改进 ⭐⭐⭐⭐⭐

  • ✅ 已修复 SQL 注入漏洞,使用参数化查询
  • ✅ Admin API 权限验证:confirmAdminRole()
  • ✅ 事务保护:PostgreSQL 和 OceanBase 批量导入使用事务

3. 稳定性增强 ⭐⭐⭐⭐☆

  • ✅ 并发控制:最多 3 个同时迁移任务
  • ✅ 内存泄漏预防:自动清理 24 小时前的状态
  • ✅ 错误处理完善:连接清理阶段有错误捕获

4. 代码质量 ⭐⭐⭐⭐☆

  • ✅ 完整的 TypeScript 类型定义
  • ✅ 详细的 JSDoc 注释
  • ✅ 清晰的日志输出
  • ✅ 设计文档完善 (.claude/design/vector-db-migration.md)

5. 用户体验 ⭐⭐⭐⭐☆

  • CLI 脚本和 Admin API 双接口支持
  • 进度跟踪和状态查询
  • 灵活的过滤选项:teamId、datasetId

⚠️ 问题汇总

🔴 严重问题 (2 个,必须修复)

1. 缺少 MongoDB ID 更新实现

位置: 整个迁移流程
问题描述: 设计文档中提到需要更新 MongoDB 中的向量 ID 映射,但代码中未实现。

当前代码在 importers.ts 中收集了 idMappings:

for (const [oldId, newId] of importResult.idMappings) {
  idMappings.set(oldId, newId);
}

但在迁移完成后,没有代码更新 MongoDB 的 dataset_datas.indexes[].dataId

影响:

  • 如果迁移到 Milvus(ID 会变化),MongoDB 中的 dataId 引用将失效
  • 用户无法搜索已迁移的向量数据

建议:

// 在 controller.ts 的 runOfflineMigration 方法中,迁移完成后添加:
if (idMappings.size > 0) {
  addLog.info(`[Migration] Updating MongoDB ID mappings for ${idMappings.size} vectors...`);

  const MongoClient = require('mongodb').MongoClient;
  // 批量更新 dataset_datas 集合
  // 更新 indexes[].dataId 为新 ID
}

2. Milvus 迁移的 ID 重用问题

位置: packages/service/common/vectorDB/migration/importers.ts L290-L305

问题描述:

const data: InsertReq[] = records.map((record) => {
  const newId = this.idCounter++;
  return {
    // ... 使用自增 ID
  };
});

Milvus 使用自增 ID,但 Milvus 本身有自动 ID 生成机制。如果手动指定 ID 与 Milvus 内部 ID 生成器冲突,会导致插入失败

建议:

// 方案1:使用 Milvus 自动生成的 ID
const data: InsertReq[] = records.map((record, idx) => {
  return {
    data: [record.vector],
    meta: {
      teamId: record.teamId,
      datasetId: record.datasetId,
      collectionId: record.collectionId,
      oldId: record.id, // 保存旧 ID
      createTime: dayjs(record.createTime).unix()
    }
  };
});

// 获取 Milvus 返回的自动生成 ID,建立映射
const insertResp = await this.client.insert({ ... });
const insertIds = insertResp.IDs?.int_id?.data || [];

🟡 建议改进 (5 个)

1. 缺少单元测试

位置: 整个项目
问题: 2399 行新代码,没有任何测试文件

建议:

// 添加测试文件:
// packages/service/common/vectorDB/migration/__tests__/exporters.test.ts
// packages/service/common/vectorDB/migration/__tests__/importers.test.ts
// packages/service/common/vectorDB/migration/__tests__/controller.test.ts

describe('PgExporter', () => {
  it('should export vectors with pagination', async () => {
    // 测试分页导出
  });

  it('should handle empty result', async () => {
    // 测试空结果
  });
});

2. 错误处理不够细粒度

位置: exporters.ts 多处

当前代码:

const whereConditions: string[] = [];
const params: (string | number)[] = [];
let paramIndex = 1;

if (afterId) {
  whereConditions.push(`id > $${paramIndex++}`);
  params.push(afterId);
}

问题: 如果 afterId 不是有效的数字,SQL 查询会失败但没有验证

建议:

if (afterId) {
  const numericId = parseInt(afterId, 10);
  if (isNaN(numericId)) {
    throw new Error(`Invalid afterId: ${afterId}. Must be a number.`);
  }
  whereConditions.push(`id > $${paramIndex++}`);
  params.push(numericId);
}

3. Milvus 连接管理缺少超时设置

位置: exporters.ts L263-L278

当前代码:

this.client = new MilvusClient({
  address: this.address,
  token: this.token
});
await this.client.connectPromise;

问题: 没有超时设置,如果 Milvus 服务无响应,会无限期挂起

建议:

this.client = new MilvusClient({
  address: this.address,
  token: this.token,
  timeout: 30000, // 30秒超时
  connectTimeout: 10000 // 10秒连接超时
});

4. 缺少数据采样验证

位置: controller.ts L347-L359

当前代码:

return {
  passed: countMatch,
  countMatch: {
    source: sourceCount,
    target: targetCount,
    match: countMatch
  },
  sampleMatch: {
    checked: 0,
    matched: 0,
    mismatchedIds: []
  }
};

问题: 设计文档中提到采样验证,但实际实现只比较了数量

建议:

// 随机抽取 100 条记录进行向量内容对比
const sampleSize = 100;
const sampleRecords = await exporter.exportBatch({ limit: sampleSize });
let matchedCount = 0;
const mismatchedIds: string[] = [];

for (const record of sampleRecords.records) {
  const targetVector = await importer.getVectorById(record.id);
  if (targetVector && vectorsEqual(record.vector, targetVector)) {
    matchedCount++;
  } else {
    mismatchedIds.push(record.id);
  }
}

return {
  passed: countMatch && mismatchedIds.length === 0,
  countMatch: { ... },
  sampleMatch: {
    checked: sampleSize,
    matched: matchedCount,
    mismatchedIds
  }
};

5. 日志级别使用不当

位置: controller.ts 多处

问题: 使用 addLog.info 记录所有日志,应该区分不同级别

建议:

// 关键错误
addLog.error(`[Migration] Failed to insert batch: ${error.message}`);

// 警告
addLog.warn(`[Migration] Retry attempt ${retryCount}/${maxRetries}`);

// 调试信息
addLog.debug(`[Migration] Processing batch ${currentBatch}/${totalBatches}`);

// 一般信息
addLog.info(`[Migration] Progress: ${processedCount}/${totalCount}`);

🟢 可选优化 (3 个)

1. 性能优化:批量读取可使用流式处理

位置: exporters.ts 所有导出器

当前: 每次读取 limit+1 条,判断 hasMore

优化建议:

// 对于 PostgreSQL,使用游标
const cursor = pg.query(new Cursor(`
  SELECT id, vector::text, team_id, ...
  FROM ${DatasetVectorTableName}
  ${whereClause}
  ORDER BY id ASC
`));

const batches = [];
while (true) {
  const batch = await cursor.read(batchSize);
  if (batch.length === 0) break;
  batches.push(batch);
}

2. 代码复用:PostgreSQL 和 OceanBase 逻辑相似

位置: exporters.tsimporters.ts

观察: PostgreSQL 和 OceanBase 的查询逻辑几乎相同,只是参数占位符不同($1 vs ?)

优化建议:

// 抽取通用逻辑
abstract class SqlExporter implements VectorExporter {
  abstract getPlaceholder(index: number): string;

  buildQuery(filters: FilterConfig): { query: string; params: any[] } {
    const whereConditions: string[] = [];
    const params: any[] = [];
    let paramIndex = 1;

    if (filters.afterId) {
      whereConditions.push(`id > ${this.getPlaceholder(paramIndex++)}`);
      params.push(filters.afterId);
    }
    // ...
  }
}

class PgExporter extends SqlExporter {
  getPlaceholder(index: number) {
    return `$${index}`;
  }
}

class OceanBaseExporter extends SqlExporter {
  getPlaceholder(index: number) {
    return '?';
  }
}

3. 类型安全:使用 enum 代替字符串字面量

位置: type.ts

当前:

export type VectorDbType = 'pg' | 'oceanbase' | 'milvus';
export type MigrationStatus = 'idle' | 'preparing' | 'full_sync' | 'incremental_sync' | 'completed' | 'failed' | 'cancelled';

优化建议:

export enum VectorDbType {
  Postgres = 'pg',
  OceanBase = 'oceanbase',
  Milvus = 'milvus'
}

export enum MigrationStatus {
  Idle = 'idle',
  Preparing = 'preparing',
  FullSync = 'full_sync',
  IncrementalSync = 'incremental_sync',
  Completed = 'completed',
  Failed = 'failed',
  Cancelled = 'cancelled'
}

🧪 测试建议

单元测试

  1. exporters 测试:

    • 测试分页逻辑 (afterId, hasMore)
    • 测试过滤条件 (teamId, datasetId)
    • 测试错误处理 (连接失败,无效查询)
  2. importers 测试:

    • 测试批量插入
    • 测试 ID 映射
    • 测试事务回滚
  3. controller 测试:

    • 测试状态管理
    • 测试并发限制
    • 测试进度计算

集成测试

  1. PG → Milvus 迁移
  2. Milvus → PG 迁移
  3. PG → OceanBase 迁移
  4. 大数据集测试 (100k+ 向量)

手动测试步骤

# 1. 准备测试数据
# 2. 运行 CLI 迁移
npx ts-node scripts/migrateVector.ts --source pg --target milvus

# 3. 验证数量
# 4. 抽样验证向量内容
# 5. 测试搜索功能

💬 总体评价

代码质量: ⭐⭐⭐⭐☆ (4/5)

  • ✅ 架构设计合理,模块化良好
  • ✅ TypeScript 类型完整
  • ⚠️ 缺少单元测试
  • ✅ 代码风格一致

安全性: ⭐⭐⭐⭐⭐ (5/5)

  • ✅ SQL 注入防护已修复
  • ✅ Admin API 权限验证
  • ✅ 事务保护
  • ✅ 参数化查询

性能: ⭐⭐⭐⭐☆ (4/5)

  • ✅ 批量处理优化
  • ✅ 分页读取避免内存溢出
  • 🟢 可使用流式处理进一步优化
  • ✅ 并发控制

可维护性: ⭐⭐⭐⭐☆ (4/5)

  • ✅ 清晰的模块结构
  • ✅ 完整的文档
  • ✅ 良好的错误处理
  • ⚠️ 需要补充单元测试

功能完整性: ⭐⭐⭐☆☆ (3/5)

  • ✅ 基础迁移功能完整
  • 缺少 MongoDB ID 更新实现 (严重)
  • ⚠️ Milvus ID 生成策略需要改进
  • 🟢 在线迁移模式未实现(设计文档中有,但抛出了"未实现"错误)

🚀 审查结论

状态: 需要修改后通过

必须修复项 (阻塞合并):

  1. SQL 注入漏洞 (已在第二个 commit 中修复)
  2. 实现 MongoDB ID 映射更新
  3. 修复 Milvus ID 生成策略

建议改进项 (合并前处理):

  1. 添加单元测试
  2. 改进错误处理的细粒度
  3. 添加 Milvus 连接超时配置
  4. 实现数据采样验证
  5. 改进日志级别使用

可选优化项 (后续 PR):

  1. 流式处理优化
  2. 代码复用优化
  3. 使用 enum 替代字符串字量

📍 详细代码评论

已在以下位置发现需要改进的问题:

exporters.ts

  • L85-110: PgExporter.exportBatch - 缺少 afterId 数字验证
  • L263-278: MilvusExporter.getClient - 缺少超时配置

importers.ts

  • L290-305: MilvusImporter.importBatch - ID 生成策略有问题

controller.ts

  • L180-250: runOfflineMigration - 缺少 MongoDB ID 更新逻辑
  • L347-359: validate - 采样验证未实现
  • 多处: 日志级别需要区分

测试覆盖

  • 整个项目: 缺少测试文件

📝 其他说明

关于设计文档

  • ✅ 设计文档非常完善,架构清晰
  • ✅ 考虑了停机和不停机两种模式
  • ✅ 有详细的错误处理和回滚策略

关于代码提交历史

  1. 第一个 commit (b55b71a): 初始实现,存在 SQL 注入风险
  2. 第二个 commit (2862527): 修复安全和稳定性问题
    • ✅ 修复 SQL 注入
    • ✅ 添加并发限制
    • ✅ 添加内存泄漏防护
    • ✅ 改进错误处理

说明: 作者对 PR 反馈响应良好,快速修复了安全问题。

建议的后续 PR

  1. 补充单元测试和集成测试
  2. 实现在线迁移模式 (增量同步 + 双写)
  3. 实现 MongoDB ID 映射更新
  4. 添加迁移回滚功能
  5. 添加更详细的文档和使用示例

审查时间: 2026-01-31
审查人: Claude Code PR Review
审查标准: FastGPT 代码规范 + 通用代码质量标准

@c121914yu c121914yu merged commit 747af5b into labring:migrate-vector-test Feb 2, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

不同向量库直接数据迁移方案

3 participants