- Project Summary
- High-level architecture (visual)
- Core components & responsibilities
- Data model & semantic model (Tableau Next)
- Realtime ingestion & pipeline
- Predictive models & ML lifecycle
- APIs & Extension points (Hyper, REST, Webhooks)
- Agentforce automation workflows
- Solana immutable ledger integration
- Tableau dashboards, embeds, and Extensions
- Deployment & infrastructure (Docker, k8s, CI/CD)
- Observability, testing, and SLOs
- Security, authN/authZ, and secrets handling
- Developer guide & local dev quickstart
- Troubleshooting & runbook
- Roadmap & known limitations
- Contributing, license, credits
- Appendices (configs, examples)
Predict.AI (this repo) is an end-to-end predictive maintenance platform built on top of Tableau Cloud / Tableau Next semantics. It unifies time-series and master data into a semantic layer, produces explainable failure probability and Remaining Useful Life (RUL) predictions, automates actions via Agentforce, and appends audit hashes to Solana for immutable compliance records.
Key capabilities:
- Canonical semantic model for Equipment, SensorReadings, Predictions.
- Real-time ingestion (MQTT → Kafka) and streaming feature computation.
- Predictive Engine (probability + RUL) with explainability (SHAP-like).
- Agentforce playbooks to create Field Service Work Orders, reserve parts, notify teams.
- Tableau Hyper pre-aggregates + Extensions for one-click actions.
- Optional Solana on-chain audit of decision hashes.
Below are editable Mermaid diagrams you can paste into GitHub to render.
flowchart LR
subgraph DataSources [Data Sources]
IOT[IoT Sensors]
PLC[PLC / SCADA]
ERP[ERP / Inventory]
CSV[CSV / Vendor APIs]
MES[MES]
end
subgraph IngestLayer [Edge & Ingest]
Edge[Edge Gateway / IoT Bridge]
Ingest[Ingress Service / Kafka Producer]
end
subgraph Processing [Processing & Storage]
Broker[Message Broker (Kafka / Pulsar)]
Stream[Stream Processor (Flink/ksqlDB)]
TSDB[(Time-series DB)]
FeatureStore[(Feature Store / HyperTS)]
HyperAPI[Hyper API / Pre-aggregates]
end
subgraph SemanticAndModel [Semantic & ML]
Semantic[Semantic Layer (Tableau Next)]
Model[Predictive Engine (REST)]
Explain[Explainability Service]
end
subgraph Action [Action & Orchestration]
Agent[Agentforce Orchestrator]
Tableau[Tableau Cloud / Extensions]
Salesforce[Salesforce Field Service]
Slack[Slack / Push / Mobile]
Solana[Solana Ledger (hashes)]
end
IOT -->|MQTT/HTTP| Edge
PLC -->|OPC-UA / HTTP| Edge
CSV -->|API / SFTP| Ingest
MES --> Ingest
ERP --> Ingest
Edge -->|Kafka / PubSub| Broker
Ingest --> Broker
Broker --> Stream
Stream --> TSDB
Stream --> FeatureStore
TSDB --> FeatureStore
FeatureStore --> Model
Stream --> Model
Model -->|predictions| Semantic
Model -->|audit_hash| Solana
Semantic --> HyperAPI
HyperAPI --> Tableau
Tableau -->|user actions| Agent
Agent --> Salesforce
Agent --> Slack
sequenceDiagram
participant Sensor
participant Edge
participant Broker
participant Stream
participant FeatureStore
participant Model as PredictiveEngine
participant Semantic
participant Tableau
participant Agent as Agentforce
participant SF as Salesforce
participant Sol as Solana
Sensor->>Edge: publish reading (mqtt)
Edge->>Broker: forward event
Broker->>Stream: stream processing (windowing)
Stream->>FeatureStore: update features
FeatureStore->>Model: request features
Model->>Semantic: write prediction (prob, RUL)
Model->>Sol: append audit hash
Semantic->>Tableau: refresh pre-aggregate
Tableau->>User: show alert
User->>Tableau: click "Create Work Order"
Tableau->>Agent: webhook(action:create_case, payload)
Agent->>SF: create WorkOrder
Agent->>ERP: reserve parts
flowchart TB
subgraph Kubernetes Cluster
APIG[API Gateway / Ingress]
subgraph Apps
EdgeSvc[edge-proxy]
Broker[Kafka StatefulSet]
StreamProc[flink / stream-processor]
TSDB[TimescaleDB StatefulSet]
FSvc[feature-store]
ModelS[model-server (FastAPI)]
API[api-gateway]
AgentSvc[agentforce-worker]
HyperJob[hyper-preaggregate job]
Redis[Redis]
Prom[Prometheus]
Graf[Grafana]
end
APIG --> API
API --> ModelS
API --> FSvc
API --> AgentSvc
end
APIG -->|Ingress| LoadBalancer
Prom --> Graf
This section enumerates each major component, responsibilities, and implementation notes.
-
Edge / IoT Gateway
- TLS client certs for device auth, buffering, lightweight filtering and protocol bridging (MQTT → Kafka).
- Implementation: Node or Python container with
paho-mqtt/asynciooremqxconfig.
-
Message Broker (Kafka / Pulsar)
- Durable streaming backbone; partitioned topics (sensor_readings, predictions, audits).
-
Streaming Processor
- Windowed aggregates, enrichment, early anomaly detection.
- Tech: Flink, ksqlDB, or Faust for smaller stacks.
-
Time-series DB & Feature Store
- TSDB for raw telemetry (Timescale/Influx), Feature Store for online/offline features (Feast/Hopsworks).
- Materialized views written to Hyper for Tableau.
-
Predictive Engine
- Model server (FastAPI). Exposes
/predictand/predict/batch, returns explainability. - Models: XGBoost/LightGBM, optional LSTM ensemble.
- Writes audit hash (sha256 of critical metadata) to Solana; persists full prediction to DB.
- Model server (FastAPI). Exposes
-
Semantic Layer (Tableau Next)
- Canonical entities, DMO/semantic model definitions published to Tableau Cloud.
-
Hyper API & Pre-Aggregates
- Precompute aggregates for interactive queries; stream smaller extracts to Tableau Cloud.
-
Agentforce Orchestrator
- Playbook runner: create cases, reserve parts, notify Slack/mobile.
- Idempotent execution and step-level auditing.
-
Solana Ledger
- Store compact hashes and minimal metadata on-chain; off-chain storage holds full payload.
-
Observability
- Prometheus metrics; OpenTelemetry tracing; ELK/Loki log aggregation.
- Equipment: id, name, type, installation_date, criticality_score, location_id
- SensorReading: id, equipment_id, ts, sensor_type, value, raw_quality_flag
- Prediction: id, equipment_id, model_version, ts, failure_probability, rul_days, confidence, explanation_json
- MaintenanceEvent: id, equipment_id, workorder_id, start_time, end_time, parts_used
- InventoryItem: part_id, qty_on_hand, reorder_point
- Technician: id, name, skills, availability
CREATE TABLE equipment (
id UUID PRIMARY KEY,
name TEXT,
type TEXT,
installation_date DATE,
criticality SMALLINT,
location TEXT
);
CREATE TABLE sensor_readings (
id BIGSERIAL PRIMARY KEY,
equipment_id UUID REFERENCES equipment(id),
ts TIMESTAMPTZ NOT NULL,
sensor_type TEXT,
value DOUBLE PRECISION,
raw_quality_flag BOOLEAN DEFAULT FALSE
);
CREATE TABLE predictions (
id UUID PRIMARY KEY,
equipment_id UUID REFERENCES equipment(id),
model_version TEXT,
ts TIMESTAMPTZ NOT NULL,
failure_probability DOUBLE PRECISION,
rul_days DOUBLE PRECISION,
confidence DOUBLE PRECISION,
explanation JSONB
);semantic_model:
name: Predictive_Maintenance_Semantic_Model
tables:
- name: Equipment
source: equipment
fields: [id, name, type, installation_date, criticality, location]
- name: SensorReadings
source: sensor_readings
fields: [id, equipment_id, ts, sensor_type, value, raw_quality_flag]
- name: Predictions
source: predictions
fields: [id, equipment_id, ts, model_version, failure_probability, rul_days, confidence, explanation]
relationships:
- from: Equipment.id
to: SensorReadings.equipment_id
type: one_to_many
- from: Equipment.id
to: Predictions.equipment_id
type: one_to_manyPublish this model to Tableau Next so dashboards and Hyper queries reference canonical fields (e.g., Equipment.criticality, Predictions.failure_probability).
- Devices → Edge Gateway (MQTT / HTTP)
- Gateway → Kafka
sensor_readingstopic - Stream Processor: de-noise, window aggregates → TSDB + Feature Store
- Model scoring (online or triggered) → Predictions stored in DB + semantic layer
- Pre-aggregates produced for Tableau Hyper ingestion
# mqtt_to_kafka.py (simplified)
import json
from kafka import KafkaProducer
import paho.mqtt.client as mqtt
KAFKA_BOOTSTRAP = "kafka:9092"
TOPIC = "sensor_readings"
producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP,
value_serializer=lambda v: json.dumps(v).encode())
def on_message(client, userdata, msg):
payload = json.loads(msg.payload.decode())
producer.send(TOPIC, payload)
client = mqtt.Client()
client.on_message = on_message
client.connect("edge.local", 1883)
client.subscribe("factory/+/reading")
client.loop_forever()- 1h average, 24h variance
- slope / trend features
- categorical enrichments:
equipment_type,criticality - time-based features:
hour_of_day,shift
Use tableauhyperapi to write Hyper extracts (hourly/daily) for Tableau.
- Primary: predict failure probability in 7–30 day horizon
- Secondary: estimate RUL (days)
- Explainability: per-prediction contributions (top-5 features)
- Tabular model: XGBoost/LightGBM for probability
- RUL model: regression or survival modeling
- Temporal: LSTM/1D-CNN for complex series (ensemble)
- Explainability: SHAP TreeExplainer / DeepExplainer
# app.py (concept)
from fastapi import FastAPI
import joblib, uuid, datetime
app = FastAPI()
model = joblib.load("/models/failure_predictor_v1.2.pkl")
@app.post("/predict")
def predict(req: dict):
features = req["features"]
prob = model.predict_proba([features])[0][1]
rul = estimate_rul(prob, features)
pred_id = str(uuid.uuid4())
# persist to predictions table, enqueue audit write
return {"prediction_id": pred_id, "failure_probability": float(prob), "rul_days": float(rul)}- Train → validate (time-split CV) → register (MLflow/S3) → staging → promote (manual approval) → production
- Nightly drift checks; automated retrain candidate generation.
POST /predict— single predictionPOST /predict/batch— batch scoringGET /equipment/{id}/health— latest health + RULGET /predictions/{id}— full prediction + explanationPOST /actions/create_case— agentforce wrapperPOST /webhooks/tableau— receive extension actions (create case, reserve parts)
- Periodically push
predictions_pre_agg.hyperto Tableau Cloud or call Hyper API to stream pre-aggregates.
- Use signed JWTs from Tableau; verify signature prior to processing.
Playbooks are JSON/YAML-driven. Example playbook:
name: high_risk_failure_playbook
trigger:
when: prediction.failure_probability >= 0.85
actions:
- type: create_case
target: Salesforce.FieldService
payload_template: |
{
"subject":"Predicted failure: {{equipment_id}}",
"priority":"High",
"description":"Predicted failure probability {{failure_probability}} (RUL: {{rul_days}})"
}
- type: reserve_parts
target: ERP
payload_template: |
{ "part_id": "{{critical_part}}", "qty": 1 }
- type: notify
target: Slack
payload_template: { "channel":"#maintenance-alerts", "text":"Auto-case created: {{case_id}}" }Agentforce ensures idempotency, retries, and logs step outcomes to workflow_runs.
- Store only compact SHA-256 hashes on-chain to minimize cost. Off-chain S3 (encrypted) stores the full payload.
- On prediction, compute:
hash = sha256(prediction_json)and submit(hash, short_meta)to Solana program. - Off-chain indexer maps
hash -> full payload.
from solana.rpc.api import Client
from solana.transaction import Transaction
client = Client("https://api.mainnet-beta.solana.com")
sha256_bytes = compute_sha256(audit_record_json)
tx = Transaction()
# create instruction to custom Solana program that appends hash to log account
res = client.send_transaction(tx, signer)Note: handle rate limits and retries; record pending_onchain status if RPC fails.
- Equipment Health: floor-map with colored glyphs (Critical/Warning/OK), tooltip shows top factors, RUL.
- Predictive Timeline: sparks + prediction band slider (7–30 days).
- Maintenance Scheduler: Gantt scheduler with Agentforce suggestions.
- Parts Forecast: projected demand and auto-PO button.
- Built using Tableau Extensions SDK (React or vanilla). Extensions call
POST /webhooks/tableauwith signed JWT.
- Embed tokens rotated; use CSP and iframe sandboxing. Extensions must authenticate via signed JWT.
- Namespace:
predict-ai-prod - Deployments:
edge-proxy,kafka,streaming-processor,timescaledb,feature-store,model-server,api,agentforce-worker,hyper-worker - StatefulSets:
kafka,timescaledb - Services:
redis,prometheus,grafana - Jobs/CronJobs: Hyper pre-aggregate export, nightly training checks
version: '3.7'
services:
kafka:
image: wurstmeister/kafka
zookeeper:
image: wurstmeister/zookeeper
model-server:
build: ./predictive_engine
ports: ["8000:8000"]
api:
build: ./api
ports: ["8080:8080"]
timescaledb:
image: timescale/timescaledb:latest-pg12
environment:
POSTGRES_PASSWORD: example- CI: GitHub Actions runs unit tests, lints, builds Docker images, runs model validation on PRs.
- CD: Helm charts + GitHub Actions
kubectl/helmdeploys to cluster. Model promotions gated by manual approval.
ingest_messages_total,ingest_latency_secondspredictions_total,prediction_latency_seconds,model_version_gaugeapi_errors_total,request_duration_seconds
- Use OpenTelemetry to propagate trace IDs across services (Edge → Ingest → Model → Agentforce).
- Structured JSON logs and central ELK (or Loki) ingestion.
/predict99% < 300ms- Ingestion 99.9% availability
- Mean time to detect anomaly < 2 minutes
- Unit tests (pytest / jest)
- Integration tests: synthetic sensor feed → Kafka → end-to-end pre-aggregate and Tableau update.
- Model tests: nightly drift, calibration, performance checks.
- Inter-service: mTLS (recommended)
- External APIs: OAuth2 / JWT with short lives
- Tableau webhooks: signed JWTs validated per invocation
- Store in KMS (AWS KMS/GCP KMS) or Vault. Kubernetes secrets encrypted at rest and injected at runtime. Never store raw secrets in logs.
- TLS for all transport (1.2+)
- Encrypt at-rest (S3, Postgres)
- PII redaction in logs; store minimal PII in semantic layer
- Docker & docker-compose
- Python 3.10+, Node 16+
- Kafka dev or Confluent cloud
- Tableau Dev site and tokens for embedding
git clone git@github.com:your-org/predict-ai.git
cd predict-ai
docker-compose build
docker-compose up -d
# run migrations
docker-compose exec api alembic upgrade head
# seed sample data
python scripts/seed_sample_data.py
# run model server locally
cd predictive_engine && uvicorn app:app --reload --port 8000pytest tests/unit
pytest tests/integration # requires local kafka & dbPrediction latency spikes
- Check model-server CPU/memory and thread pool.
kubectl top pods - Check feature store cache misses (cache metrics)
- Consider reverting model version via model registry
Too many false positives
- Verify sensor quality flags (missing ts, unrealistic values)
- Inspect feature importance on affected predictions
- Retrain with improved windowing or add conservative ensemble gating
Agentforce action failures
- Check external endpoints creds and network ACLs
- Inspect
workflow_runsfor step errors (timeout, 5xx) - Validate idempotency keys to avoid duplicates
Solana append failures
- RPC rate limits or account signer issues
- Mark audit entries
pending_onchainand retry with backoff
Tableau pre-aggregate failures
- Confirm Hyper file creation, upload logs, and Tableau Cloud token validity
- Continuous learning loop (retrain on confirmed failures)
- Technician mobile app native integration (offline mode)
- Improve LSTM ensemble for long-term pattern capture
- Cross-factory semantic federation and horizontal scaling
- Differential privacy mode for regulated environments
- Solana on-chain costs prohibit full payload on-chain; only hashes stored.
- Some SCADA vendors use proprietary locked protocols; prebuilt adapters cover many but not all.
- Fork → new branch
feature/xxx→ tests pass → PR with changelog - Python style: Black / isort / mypy. JS: Prettier / ESLint. Infra: Helm lint and tfsec.
- License: MIT (see
LICENSE)
POSTGRES_HOST=timescaledb
POSTGRES_USER=
POSTGRES_PASSWORD=secret
KAFKA_BOOTSTRAP=kafka:9092
MODEL_REGISTRY_BUCKET=s3://models
TABLEAU_CLOUD_SITE=your-site
TABLEAU_CLOUD_TOKEN=XXXX
SOLANA_RPC=https://api.mainnet-beta.solana.com
SOLANA_SIGNER_KEY=base58key
{
"id":"pb_high_risk_001",
"name":"High Risk Failure Playbook",
"trigger":{"type":"threshold","field":"failure_probability","op":">=","value":0.85},
"steps":[
{"type":"create_case","args":{"priority":"High"}},
{"type":"reserve_parts","args":{"strategy":"min_stock"}},
{"type":"notify","args":{"channels":["slack","mobile"]}}
]
}- ROC-AUC: 0.92
- Precision@10%: 0.81
- RUL RMSE: 4.2 days
- Data: 4 years labeled failures, 2 factories, 120 equipment types