Skip to content

mohosy/SurgGraph-Pipeline

Repository files navigation

SurgGraph Pipeline

Graph-enhanced data pipeline prototype for simulated robotic surgery analytics. This project is designed to mirror the technical surface area of Intuitive's Data Services team: streaming telemetry ingestion, graph-based optimization, ML feature/data quality workflows, orchestration, cloud storage, and cloud-native deployment.

Why This Is Relevant To Intuitive + Paul McCabe

  • Sensor processing at scale (Amazon/Alexa parallel): Generates high-rate robotic telemetry + vitals and processes events in streaming windows.
  • Graph algorithms (Apple Maps/Nokia parallel): Models procedures as a directed graph and uses Dijkstra shortest path to optimize surgical step routing.
  • ML dataset/evaluation rigor (Alexa parallel): Builds procedure-level feature tables, trains recovery-risk regression, and runs a fairness disparity audit across demographics.
  • Open stack + cloud-native: Python + Kafka + Flink-style processing + dbt + DuckDB + S3 hooks + Airflow + Docker + Kubernetes + GitHub Actions.
  • Quantifiable impact framing: Includes latency/reliability metrics and a benchmark simulation that demonstrates ~40% latency reduction for streaming vs batch-style baseline.

Architecture

flowchart LR
    A["Surgery Telemetry Simulator\n(Faker + NetworkX)"] --> B["Kafka Topic\n(surggraph.events)"]
    B --> C["Streaming Processor\n(pyFlink-style + IsolationForest)"]
    C --> D["Feature Store Artifacts\nParquet + DuckDB/dbt"]
    D --> E["ML + Fairness\nRecovery Model + Bias Audit"]
    D --> F["Flask API\n/insights /alerts"]
    D --> G["Streamlit Dashboard\nGraph + Latency + Alerts"]
    C --> H["S3 Upload Hook\n(boto3)"]
    I["Airflow DAG"] --> A
    I --> C
    I --> D
Loading

Text form: Ingest -> Kafka -> Flink-style stream processing -> dbt transforms (DuckDB) -> S3/API/Dashboard.

Repository Layout

.
├── surggraph_pipeline/
│   ├── data_generation.py
│   ├── kafka_producer.py
│   ├── kafka_consumer.py
│   ├── streaming/flink_job.py
│   ├── analytics/{graph_analytics.py,ml_models.py,fairness.py,insights.py}
│   ├── storage/sink.py
│   ├── api/app.py
│   ├── dashboard/app.py
│   ├── orchestration/airflow_dag.py
│   ├── scripts/
│   │   ├── run_pipeline.py
│   │   ├── generate_and_publish.py
│   │   ├── consume_and_process.py
│   │   ├── export_dbt_sources.py
│   │   └── simulate_metrics.py
│   └── dbt/
│       ├── dbt_project.yml
│       ├── profiles.yml.example
│       ├── data/events_processed.csv
│       └── models/{staging,marts}/...
├── airflow/dags/airflow_dag.py
├── docker-compose.yml
├── docker/*.Dockerfile
├── k8s/*.yaml
├── tests/
├── .github/workflows/ci.yaml
├── requirements.txt
└── requirements-full.txt

Core Capabilities

  1. Synthetic robotic surgery data
  • Procedure graph nodes: preop_setup -> incision -> trocar_insert -> dissection -> cauterization -> suturing -> closure
  • Event fields: arm position, force, velocity, vitals, demographics, anomaly injection labels.
  1. Streaming analytics
  • Kafka producer/consumer with local file fallback for offline demo reliability.
  • Flink-style streaming processor:
    • Window aggregations (avg_force, anomaly_rate, avg_latency)
    • IsolationForest anomaly detection on telemetry/vitals.
  1. Graph analytics
  • Centrality metrics for bottleneck detection.
  • Dijkstra shortest path and procedure path-efficiency scoring.
  1. ML + fairness
  • Procedure-level recovery-hours regression model (RandomForest).
  • Fairness check reports group error disparity ratio and pass/fail threshold.
  1. Storage + cloud hooks
  • Writes Parquet/JSON artifacts locally.
  • Optional S3 uploads via boto3 when S3_BUCKET is configured.
  1. Serving layer
  • Flask API endpoints: /health, /insights, /alerts, /metrics.
  • Streamlit dashboard with executive KPIs, graph intelligence panels, and telemetry explorer.
  • Interactive Surgical Scenario Simulator with controls for force, speed, patient risk, and path strategy, plus live outputs for anomaly rate, predicted recovery, path cost, and fairness impact.
  • Interactive Robot Control Console where you manually move the tool tip, execute incision/cauterization/suturing actions, and get live scoring + risk/fairness consequences.
  • The Robot Control Console includes a realistic OR-style visual scene, teleoperation HUD gauges (force/speed/patient risk), and action-quality trend charts for demo storytelling.
  • Robot control now includes diagonal joystick movement profiles and phase-aware incision depth control, with a zoomed incision micro-view for precision feedback.
  • Robot control supports direct drag manipulation of arm joints (base/elbow/tip) in a control canvas, with a larger teleoperation bay viewport for live precision control.
  1. Orchestration + GitOps
  • Airflow DAG for run sequence (run_pipeline -> export_dbt_sources -> dbt run).
  • GitHub Actions pipeline running tests with coverage threshold (>=80%, current tests hit >90%).

Quickstart (Local Python)

One command launcher (recommended):

./run_all.sh

This boots the venv (if needed), runs the pipeline, and starts API + dashboard together.

Optional custom ports:

API_PORT=5050 DASHBOARD_PORT=8601 ./run_all.sh

Manual setup/run:

python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Run end-to-end pipeline:

python -m surggraph_pipeline.scripts.run_pipeline

Run API:

python -m surggraph_pipeline.api.app
# http://localhost:5000/insights

Run dashboard:

streamlit run surggraph_pipeline/dashboard/app.py
# http://localhost:8501

Run tests:

pytest

Kafka + Containers (Docker)

Bring up local Kafka + pipeline + API + dashboard:

docker compose up --build

This launches:

  • zookeeper + kafka
  • pipeline runner (produces artifacts)
  • api on port 5000
  • dashboard on port 8501

dbt (DuckDB as Snowflake Stand-In)

Generate seed CSV from pipeline artifacts:

python -m surggraph_pipeline.scripts.export_dbt_sources

Then run dbt:

cd surggraph_pipeline/dbt
cp profiles.yml.example profiles.yml
dbt seed --profiles-dir .
dbt run --profiles-dir .
dbt test --profiles-dir .

Airflow

DAG file: airflow/dags/airflow_dag.py

Main DAG chain:

  1. run_pipeline
  2. export_dbt_sources
  3. run_dbt_models

(Use requirements-full.txt if installing Airflow/pyFlink extras.)

Kubernetes (Minikube)

Build local images:

docker build -f docker/Dockerfile.pipeline -t surggraph-pipeline:latest .
docker build -f docker/Dockerfile.api -t surggraph-api:latest .
docker build -f docker/Dockerfile.dashboard -t surggraph-dashboard:latest .

Deploy:

kubectl apply -f k8s/namespace.yaml
kubectl apply -f k8s/configmap.yaml
kubectl apply -f k8s/kafka.yaml
kubectl apply -f k8s/pipeline-cronjob.yaml
kubectl apply -f k8s/api-deployment.yaml
kubectl apply -f k8s/dashboard-deployment.yaml

AWS Extension (Free Tier-Friendly)

Set environment variables:

export AWS_REGION=us-east-1
export S3_BUCKET=<your-bucket>

When set, pipeline artifacts upload to:

  • s3://<bucket>/surggraph/events_processed.parquet
  • s3://<bucket>/surggraph/stream_windows.parquet
  • s3://<bucket>/surggraph/procedure_training.parquet
  • s3://<bucket>/surggraph/model_eval.parquet
  • s3://<bucket>/surggraph/insights.json

Performance + Reliability Signals

Pipeline emits:

  • reliability_success_rate
  • latency_avg_ms
  • latency_p95_ms
  • anomaly_events
  • latency_reduction_pct

Use benchmark helper for quick quantified talking point:

python -m surggraph_pipeline.scripts.simulate_metrics

Typical output shows ~40% streaming latency reduction versus synthetic batch baseline.

Security + Engineering Practices

  • No hard-coded cloud credentials (environment variables only).
  • Deterministic seeded simulation for reproducibility.
  • pytest + coverage gate configured in pyproject.toml.
  • Modular code layout for extensibility and maintainability.

Interview Talking Points

  • "I modeled da Vinci-like procedural flow as a directed graph and used Dijkstra routing to quantify path efficiency and potential tool-path optimization."
  • "I treated streaming latency as a first-class metric and instrumented reliability + p95 latency with reproducible simulation baselines."
  • "I separated telemetry ingest, stream enrich/anomaly detection, dbt feature transformations, and serving APIs to mirror production data platform boundaries."
  • "I added fairness checks to ML evaluation, reflecting the importance of robust dataset quality and accountable model performance."

About

Graph-enhanced robotic surgery analytics pipeline prototype

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors