DCVPG is an open-source framework for defining, validating, and enforcing data contracts in modern data pipelines. It catches schema drift, quality violations, and freshness SLA breaches before they reach production.
Data pipelines break silently. A backend team renames a column, an upstream job starts sending nulls, or an overnight load produces 10× fewer rows than expected — and nobody knows until a BI dashboard breaks or a finance report is wrong.
DCVPG solves this by making data quality a first-class, code-reviewed, automatically enforced contract between teams.
- Define — Write a YAML contract that describes exactly what data you expect: field names, types, nullability, value ranges, allowed values, row-count SLAs, and freshness deadlines
- Validate — Run DCVPG before your pipeline writes to production; violations are caught and reported immediately
- Quarantine — Failed batches are isolated with full metadata; downstream jobs never see bad data
- Alert — Slack, PagerDuty, Teams, or webhook notifications with violation details
- Heal — AI Auto-Healer proposes an updated contract and opens a GitHub PR for human review
| Category | Capability |
|---|---|
| Contracts | Schema, nullability, type, unique, range, allowed values, format (regex), row-count SLA, freshness SLA |
| AI Generation | Claude profiles a live table and drafts the contract YAML automatically |
| AI Auto-Healer | On CRITICAL violations, an LLM proposes a fix and opens a GitHub PR |
| AI Anomaly Detection | Statistical baseline monitoring per-field (volume, null-rate, value distribution) |
| AI RCA Agent | Root-cause analysis agent that explains why a pipeline failed |
| Quarantine Engine | Isolates failed batches; replay after a contract fix is merged |
| Schema Drift Detection | Compares live source schema vs contract definition; alerts on changes |
| Connectors | PostgreSQL, MySQL, Snowflake, BigQuery, S3, GCS, REST API, CSV/Parquet file |
| Custom Rules | Write Python validation rules extending the built-in rule set |
| Orchestrator Operators | Native Airflow operator, Prefect task, and Dagster asset check |
| REST API | Full FastAPI backend with auth, pagination, and Prometheus metrics |
| Streamlit Dashboard | 6-page real-time monitoring UI |
| MCP Server | 10 tools for Claude Desktop / Cursor to manage pipelines by chat |
| Alerting | Slack, PagerDuty, Microsoft Teams, generic webhook |
DCVPG wraps around your existing pipelines without requiring rewrites. You add it as a quality gate that runs before data reaches production:
Your existing pipeline:
extract → transform → [DCVPG validates] → load to production
↑
stops bad data here
Step 1 — Generate a contract from your live table (no hand-writing needed):
pip install "dcvpg[ai]"
export ANTHROPIC_API_KEY=sk-ant-...
dcvpg init my_project && cd my_project
# Edit dcvpg.config.yaml to point at your database
dcvpg generate --source postgres_main --table orders --output-dir ./contracts
dcvpg register contracts/orders.yamlStep 2 — Add one operator to your existing Airflow DAG:
from dcvpg.orchestrators.airflow.operators.contract_validator import DataContractValidatorOperator
validate = DataContractValidatorOperator(
task_id="validate_orders",
contract_name="orders_raw",
config_path="/opt/airflow/dcvpg.config.yaml",
)
# Gate your existing load task — nothing else changes
extract >> transform >> validate >> load_to_prodThat's it. DCVPG does not replace or rebuild your pipeline — it guards the data flowing through it.
Write the contract first as the formal agreement between the team that produces the data and the team that consumes it:
# 1. Author the contract (or generate from a staging table)
dcvpg generate --source postgres_staging --table orders --output-dir ./contracts
# 2. Register and validate as part of every PR / CI run
dcvpg register contracts/orders.yaml
dcvpg validate --allA GitHub Actions workflow is scaffolded automatically by dcvpg init — every push runs dcvpg validate --all so contract regressions are caught in CI before they ever hit production.
pip install dcvpg
dcvpg init my_project
cd my_projectEdit dcvpg.config.yaml to point at your database and connections, then:
# Optional: generate a contract from a live table using AI
export ANTHROPIC_API_KEY=sk-ant-...
dcvpg generate --source postgres_main --table orders --output-dir ./contracts
# Register the contract
dcvpg register contracts/orders.yaml
# Validate
dcvpg validate --allSee the Quick Start Guide for the full walkthrough including Docker Compose setup.
# Core (PostgreSQL, file connectors, validation engine, CLI)
pip install dcvpg
# + AI features (contract generation, auto-healing, anomaly detection)
pip install "dcvpg[ai]"
# + MCP server (Claude Desktop / Cursor integration)
pip install "dcvpg[mcp]"
# + Airflow operator
pip install "dcvpg[airflow]"
# + Prefect task
pip install "dcvpg[prefect]"
# + Dagster asset check
pip install "dcvpg[dagster]"
# Everything
pip install "dcvpg[all]"Requirements: Python 3.11+, PostgreSQL 15+ (for quarantine & audit storage)
Contracts are plain YAML files. Here is a complete example:
contract:
name: orders_raw
version: "1.2"
description: "Raw orders table from the e-commerce backend."
owner_team: data-engineering
source_owner: backend-team
pipeline_tags: [crm, revenue]
source_connection: postgres_main
source_table: orders
# Row-count and freshness SLAs
row_count_min: 1000
row_count_max: 5000000
sla_freshness_hours: 6
schema:
- field: id
type: integer
nullable: false
unique: true
- field: status
type: string
nullable: false
allowed_values: ["active", "inactive", "pending"]
- field: amount
type: float
nullable: true
min: 0.0
max: 999999.99
- field: email
type: string
nullable: false
format: email # Regex-backed format validation
- field: created_at
type: timestamp
nullable: false
# Reference a custom Python validation rule
custom_rules:
- rule: no_weekend_orders.NoWeekendOrders
params:
date_field: created_atFull field reference: Contract Authoring Guide
Extend the built-in rule set with plain Python:
# custom_rules/no_weekend_orders.py
import pandas as pd
from dcvpg.engine.rules.base_rule import BaseRule
from dcvpg.engine.models import ValidationResult
class NoWeekendOrders(BaseRule):
def validate(self, data: pd.DataFrame, field: str, params: dict) -> ValidationResult:
dates = pd.to_datetime(data[params.get("date_field", field)], errors="coerce")
weekend_count = int((dates.dt.dayofweek >= 5).sum())
if weekend_count > 0:
return ValidationResult(
passed=False, field=field,
violation_type="WEEKEND_ORDER_FOUND",
rows_affected=weekend_count,
expected_value="Orders must be placed on weekdays (Mon–Fri)",
)
return ValidationResult(passed=True, field=field)Register in config:
extensions:
custom_rules_dir: ./custom_rulesFull guide: Custom Rules
from dcvpg.orchestrators.airflow.operators.contract_validator import DataContractValidatorOperator
validate = DataContractValidatorOperator(
task_id="validate_orders",
contract_name="orders_raw",
config_path="/opt/airflow/dcvpg.config.yaml",
)from dcvpg.orchestrators.prefect.tasks import validate_contract
@flow
def my_flow():
validate_contract(contract_name="orders_raw", config_path="./dcvpg.config.yaml")from dcvpg.orchestrators.dagster.assets import build_contract_asset_check
orders_check = build_contract_asset_check("orders_raw", config_path="./dcvpg.config.yaml")DCVPG ships a Model Context Protocol server with 10 tools, letting you manage pipelines from Claude Desktop or Cursor by chat.
pip install "dcvpg[mcp]"
dcvpg mcp-server startExample prompts:
- "What pipelines are currently failing?"
- "Show me the violation details for the orders pipeline."
- "Is there any schema drift in the payments contract?"
- "Generate a contract for the
userstable in postgres_main." - "Open a PR to fix the type mismatch in the orders contract."
- "Replay batch abc-123 now that the fix is merged."
| Tool | Description |
|---|---|
get_pipeline_status |
Live health of all pipelines |
get_violation_detail |
Full violation breakdown for a pipeline |
list_quarantine_batches |
All quarantined batches with metadata |
get_schema_diff |
Contract vs live source schema drift report |
create_fix_pr |
Open a GitHub PR to update a broken contract |
replay_quarantine |
Re-validate and release a quarantined batch |
generate_contract |
AI-generate a contract from a live data source |
get_incident_summary |
Incidents in the last N days |
get_contract_detail |
Full spec, rules, ownership, version history |
approve_contract_update |
Merge an approved PR and reload the contract |
Full setup guide: MCP Setup
uvicorn dcvpg.api.main:app --reload
# → http://localhost:8000/docsKey endpoints:
| Method | Endpoint | Description |
|---|---|---|
GET |
/health |
Health check |
GET |
/api/v1/contracts |
List all contracts |
POST |
/api/v1/contracts/generate |
AI-generate a contract |
GET |
/api/v1/pipelines |
List pipeline run history |
GET |
/api/v1/quarantine |
List quarantined batches |
GET |
/metrics |
Prometheus metrics |
All endpoints (except /health and /metrics) require an Authorization: <key> header.
┌─────────────────────────────────────────────────────────────────┐
│ DCVPG Framework │
│ │
│ CLI / REST API / MCP Server / Streamlit Dashboard │
│ │ │
│ ▼ │
│ Validator ◄── Contract Registry ◄── YAML Contracts │
│ │ │
│ ┌─────┴──────┐ │
│ │ Rules │ schema · type · null · range · │
│ │ Engine │ unique · format · allowed_values · │
│ │ │ row-count SLA · freshness SLA · │
│ └─────┬──────┘ custom Python rules │
│ │ │
│ ┌─────┴──────────────────┐ │
│ │ Connectors │ PostgreSQL · MySQL · Snowflake │
│ │ │ BigQuery · S3 · GCS · File │
│ └─────┬──────────────────┘ │
│ │ │
│ ┌─────┴──────────────────┐ │
│ │ Quarantine Engine │──► PostgreSQL audit store │
│ │ Alert Manager │──► Slack · PagerDuty · Teams │
│ └─────┬──────────────────┘ │
│ │ │
│ ┌─────┴──────────────────┐ │
│ │ AI Agents │ ContractGenerator │
│ │ (Anthropic Claude) │ AutoHealer → GitHub PR │
│ │ │ AnomalyDetector │
│ │ │ RCA Agent │
│ └────────────────────────┘ │
│ │
│ Orchestrators: Airflow · Prefect · Dagster │
└─────────────────────────────────────────────────────────────────┘
| Guide | Description |
|---|---|
| Quick Start | Get up and running in 5 minutes |
| Contract Authoring | Full YAML field reference |
| Connectors | Configure PostgreSQL, Snowflake, S3, and more |
| Custom Rules | Write Python validation extensions |
| MCP Setup | Claude Desktop / Cursor integration |
| API Reference | REST API endpoints |
See CONTRIBUTING.md. Pull requests are welcome for new connectors, rule types, and orchestrator integrations.
Apache 2.0 — see LICENSE.