Graph-enhanced data pipeline prototype for simulated robotic surgery analytics. This project is designed to mirror the technical surface area of Intuitive's Data Services team: streaming telemetry ingestion, graph-based optimization, ML feature/data quality workflows, orchestration, cloud storage, and cloud-native deployment.
- Sensor processing at scale (Amazon/Alexa parallel): Generates high-rate robotic telemetry + vitals and processes events in streaming windows.
- Graph algorithms (Apple Maps/Nokia parallel): Models procedures as a directed graph and uses Dijkstra shortest path to optimize surgical step routing.
- ML dataset/evaluation rigor (Alexa parallel): Builds procedure-level feature tables, trains recovery-risk regression, and runs a fairness disparity audit across demographics.
- Open stack + cloud-native: Python + Kafka + Flink-style processing + dbt + DuckDB + S3 hooks + Airflow + Docker + Kubernetes + GitHub Actions.
- Quantifiable impact framing: Includes latency/reliability metrics and a benchmark simulation that demonstrates ~40% latency reduction for streaming vs batch-style baseline.
flowchart LR
A["Surgery Telemetry Simulator\n(Faker + NetworkX)"] --> B["Kafka Topic\n(surggraph.events)"]
B --> C["Streaming Processor\n(pyFlink-style + IsolationForest)"]
C --> D["Feature Store Artifacts\nParquet + DuckDB/dbt"]
D --> E["ML + Fairness\nRecovery Model + Bias Audit"]
D --> F["Flask API\n/insights /alerts"]
D --> G["Streamlit Dashboard\nGraph + Latency + Alerts"]
C --> H["S3 Upload Hook\n(boto3)"]
I["Airflow DAG"] --> A
I --> C
I --> D
Text form: Ingest -> Kafka -> Flink-style stream processing -> dbt transforms (DuckDB) -> S3/API/Dashboard.
.
├── surggraph_pipeline/
│ ├── data_generation.py
│ ├── kafka_producer.py
│ ├── kafka_consumer.py
│ ├── streaming/flink_job.py
│ ├── analytics/{graph_analytics.py,ml_models.py,fairness.py,insights.py}
│ ├── storage/sink.py
│ ├── api/app.py
│ ├── dashboard/app.py
│ ├── orchestration/airflow_dag.py
│ ├── scripts/
│ │ ├── run_pipeline.py
│ │ ├── generate_and_publish.py
│ │ ├── consume_and_process.py
│ │ ├── export_dbt_sources.py
│ │ └── simulate_metrics.py
│ └── dbt/
│ ├── dbt_project.yml
│ ├── profiles.yml.example
│ ├── data/events_processed.csv
│ └── models/{staging,marts}/...
├── airflow/dags/airflow_dag.py
├── docker-compose.yml
├── docker/*.Dockerfile
├── k8s/*.yaml
├── tests/
├── .github/workflows/ci.yaml
├── requirements.txt
└── requirements-full.txt
- Synthetic robotic surgery data
- Procedure graph nodes:
preop_setup -> incision -> trocar_insert -> dissection -> cauterization -> suturing -> closure - Event fields: arm position, force, velocity, vitals, demographics, anomaly injection labels.
- Streaming analytics
- Kafka producer/consumer with local file fallback for offline demo reliability.
- Flink-style streaming processor:
- Window aggregations (
avg_force,anomaly_rate,avg_latency) - IsolationForest anomaly detection on telemetry/vitals.
- Window aggregations (
- Graph analytics
- Centrality metrics for bottleneck detection.
- Dijkstra shortest path and procedure path-efficiency scoring.
- ML + fairness
- Procedure-level recovery-hours regression model (RandomForest).
- Fairness check reports group error disparity ratio and pass/fail threshold.
- Storage + cloud hooks
- Writes Parquet/JSON artifacts locally.
- Optional S3 uploads via
boto3whenS3_BUCKETis configured.
- Serving layer
- Flask API endpoints:
/health,/insights,/alerts,/metrics. - Streamlit dashboard with executive KPIs, graph intelligence panels, and telemetry explorer.
- Interactive Surgical Scenario Simulator with controls for force, speed, patient risk, and path strategy, plus live outputs for anomaly rate, predicted recovery, path cost, and fairness impact.
- Interactive Robot Control Console where you manually move the tool tip, execute incision/cauterization/suturing actions, and get live scoring + risk/fairness consequences.
- The Robot Control Console includes a realistic OR-style visual scene, teleoperation HUD gauges (force/speed/patient risk), and action-quality trend charts for demo storytelling.
- Robot control now includes diagonal joystick movement profiles and phase-aware incision depth control, with a zoomed incision micro-view for precision feedback.
- Robot control supports direct drag manipulation of arm joints (base/elbow/tip) in a control canvas, with a larger teleoperation bay viewport for live precision control.
- Orchestration + GitOps
- Airflow DAG for run sequence (
run_pipeline -> export_dbt_sources -> dbt run). - GitHub Actions pipeline running tests with coverage threshold (>=80%, current tests hit >90%).
One command launcher (recommended):
./run_all.shThis boots the venv (if needed), runs the pipeline, and starts API + dashboard together.
Optional custom ports:
API_PORT=5050 DASHBOARD_PORT=8601 ./run_all.shManual setup/run:
python3 -m venv .venv
source .venv/bin/activate
pip install -r requirements.txtRun end-to-end pipeline:
python -m surggraph_pipeline.scripts.run_pipelineRun API:
python -m surggraph_pipeline.api.app
# http://localhost:5000/insightsRun dashboard:
streamlit run surggraph_pipeline/dashboard/app.py
# http://localhost:8501Run tests:
pytestBring up local Kafka + pipeline + API + dashboard:
docker compose up --buildThis launches:
zookeeper+kafkapipelinerunner (produces artifacts)apion port5000dashboardon port8501
Generate seed CSV from pipeline artifacts:
python -m surggraph_pipeline.scripts.export_dbt_sourcesThen run dbt:
cd surggraph_pipeline/dbt
cp profiles.yml.example profiles.yml
dbt seed --profiles-dir .
dbt run --profiles-dir .
dbt test --profiles-dir .DAG file: airflow/dags/airflow_dag.py
Main DAG chain:
run_pipelineexport_dbt_sourcesrun_dbt_models
(Use requirements-full.txt if installing Airflow/pyFlink extras.)
Build local images:
docker build -f docker/Dockerfile.pipeline -t surggraph-pipeline:latest .
docker build -f docker/Dockerfile.api -t surggraph-api:latest .
docker build -f docker/Dockerfile.dashboard -t surggraph-dashboard:latest .Deploy:
kubectl apply -f k8s/namespace.yaml
kubectl apply -f k8s/configmap.yaml
kubectl apply -f k8s/kafka.yaml
kubectl apply -f k8s/pipeline-cronjob.yaml
kubectl apply -f k8s/api-deployment.yaml
kubectl apply -f k8s/dashboard-deployment.yamlSet environment variables:
export AWS_REGION=us-east-1
export S3_BUCKET=<your-bucket>When set, pipeline artifacts upload to:
s3://<bucket>/surggraph/events_processed.parquets3://<bucket>/surggraph/stream_windows.parquets3://<bucket>/surggraph/procedure_training.parquets3://<bucket>/surggraph/model_eval.parquets3://<bucket>/surggraph/insights.json
Pipeline emits:
reliability_success_ratelatency_avg_mslatency_p95_msanomaly_eventslatency_reduction_pct
Use benchmark helper for quick quantified talking point:
python -m surggraph_pipeline.scripts.simulate_metricsTypical output shows ~40% streaming latency reduction versus synthetic batch baseline.
- No hard-coded cloud credentials (environment variables only).
- Deterministic seeded simulation for reproducibility.
pytest+ coverage gate configured inpyproject.toml.- Modular code layout for extensibility and maintainability.
- "I modeled da Vinci-like procedural flow as a directed graph and used Dijkstra routing to quantify path efficiency and potential tool-path optimization."
- "I treated streaming latency as a first-class metric and instrumented reliability + p95 latency with reproducible simulation baselines."
- "I separated telemetry ingest, stream enrich/anomaly detection, dbt feature transformations, and serving APIs to mirror production data platform boundaries."
- "I added fairness checks to ML evaluation, reflecting the importance of robust dataset quality and accountable model performance."