Skip to content

lucylow/tableau-aipredict

Repository files navigation

Tableau : AI-Powered Predictive Maintenance


Table of contents

  1. Project Summary
  2. High-level architecture (visual)
  3. Core components & responsibilities
  4. Data model & semantic model (Tableau Next)
  5. Realtime ingestion & pipeline
  6. Predictive models & ML lifecycle
  7. APIs & Extension points (Hyper, REST, Webhooks)
  8. Agentforce automation workflows
  9. Solana immutable ledger integration
  10. Tableau dashboards, embeds, and Extensions
  11. Deployment & infrastructure (Docker, k8s, CI/CD)
  12. Observability, testing, and SLOs
  13. Security, authN/authZ, and secrets handling
  14. Developer guide & local dev quickstart
  15. Troubleshooting & runbook
  16. Roadmap & known limitations
  17. Contributing, license, credits
  18. Appendices (configs, examples)

Project summary

Predict.AI (this repo) is an end-to-end predictive maintenance platform built on top of Tableau Cloud / Tableau Next semantics. It unifies time-series and master data into a semantic layer, produces explainable failure probability and Remaining Useful Life (RUL) predictions, automates actions via Agentforce, and appends audit hashes to Solana for immutable compliance records.

Key capabilities:

  • Canonical semantic model for Equipment, SensorReadings, Predictions.
  • Real-time ingestion (MQTT → Kafka) and streaming feature computation.
  • Predictive Engine (probability + RUL) with explainability (SHAP-like).
  • Agentforce playbooks to create Field Service Work Orders, reserve parts, notify teams.
  • Tableau Hyper pre-aggregates + Extensions for one-click actions.
  • Optional Solana on-chain audit of decision hashes.

High-level architecture (visual)

Below are editable Mermaid diagrams you can paste into GitHub to render.

System architecture — component flow

flowchart LR
  subgraph DataSources [Data Sources]
    IOT[IoT Sensors]
    PLC[PLC / SCADA]
    ERP[ERP / Inventory]
    CSV[CSV / Vendor APIs]
    MES[MES]
  end

  subgraph IngestLayer [Edge & Ingest]
    Edge[Edge Gateway / IoT Bridge]
    Ingest[Ingress Service / Kafka Producer]
  end

  subgraph Processing [Processing & Storage]
    Broker[Message Broker (Kafka / Pulsar)]
    Stream[Stream Processor (Flink/ksqlDB)]
    TSDB[(Time-series DB)]
    FeatureStore[(Feature Store / HyperTS)]
    HyperAPI[Hyper API / Pre-aggregates]
  end

  subgraph SemanticAndModel [Semantic & ML]
    Semantic[Semantic Layer (Tableau Next)]
    Model[Predictive Engine (REST)]
    Explain[Explainability Service]
  end

  subgraph Action [Action & Orchestration]
    Agent[Agentforce Orchestrator]
    Tableau[Tableau Cloud / Extensions]
    Salesforce[Salesforce Field Service]
    Slack[Slack / Push / Mobile]
    Solana[Solana Ledger (hashes)]
  end

  IOT -->|MQTT/HTTP| Edge
  PLC -->|OPC-UA / HTTP| Edge
  CSV -->|API / SFTP| Ingest
  MES --> Ingest
  ERP --> Ingest

  Edge -->|Kafka / PubSub| Broker
  Ingest --> Broker
  Broker --> Stream
  Stream --> TSDB
  Stream --> FeatureStore
  TSDB --> FeatureStore
  FeatureStore --> Model
  Stream --> Model
  Model -->|predictions| Semantic
  Model -->|audit_hash| Solana
  Semantic --> HyperAPI
  HyperAPI --> Tableau
  Tableau -->|user actions| Agent
  Agent --> Salesforce
  Agent --> Slack
Loading

Sequence diagram — prediction → action

sequenceDiagram
  participant Sensor
  participant Edge
  participant Broker
  participant Stream
  participant FeatureStore
  participant Model as PredictiveEngine
  participant Semantic
  participant Tableau
  participant Agent as Agentforce
  participant SF as Salesforce
  participant Sol as Solana

  Sensor->>Edge: publish reading (mqtt)
  Edge->>Broker: forward event
  Broker->>Stream: stream processing (windowing)
  Stream->>FeatureStore: update features
  FeatureStore->>Model: request features
  Model->>Semantic: write prediction (prob, RUL)
  Model->>Sol: append audit hash
  Semantic->>Tableau: refresh pre-aggregate
  Tableau->>User: show alert
  User->>Tableau: click "Create Work Order"
  Tableau->>Agent: webhook(action:create_case, payload)
  Agent->>SF: create WorkOrder
  Agent->>ERP: reserve parts
Loading

Deployment topology (Kubernetes-oriented)

flowchart TB
  subgraph Kubernetes Cluster
    APIG[API Gateway / Ingress]
    subgraph Apps
      EdgeSvc[edge-proxy]
      Broker[Kafka StatefulSet]
      StreamProc[flink / stream-processor]
      TSDB[TimescaleDB StatefulSet]
      FSvc[feature-store]
      ModelS[model-server (FastAPI)]
      API[api-gateway]
      AgentSvc[agentforce-worker]
      HyperJob[hyper-preaggregate job]
      Redis[Redis]
      Prom[Prometheus]
      Graf[Grafana]
    end
    APIG --> API
    API --> ModelS
    API --> FSvc
    API --> AgentSvc
  end

  APIG -->|Ingress| LoadBalancer
  Prom --> Graf
Loading

Core components & responsibilities

This section enumerates each major component, responsibilities, and implementation notes.

  1. Edge / IoT Gateway

    • TLS client certs for device auth, buffering, lightweight filtering and protocol bridging (MQTT → Kafka).
    • Implementation: Node or Python container with paho-mqtt/asyncio or emqx config.
  2. Message Broker (Kafka / Pulsar)

    • Durable streaming backbone; partitioned topics (sensor_readings, predictions, audits).
  3. Streaming Processor

    • Windowed aggregates, enrichment, early anomaly detection.
    • Tech: Flink, ksqlDB, or Faust for smaller stacks.
  4. Time-series DB & Feature Store

    • TSDB for raw telemetry (Timescale/Influx), Feature Store for online/offline features (Feast/Hopsworks).
    • Materialized views written to Hyper for Tableau.
  5. Predictive Engine

    • Model server (FastAPI). Exposes /predict and /predict/batch, returns explainability.
    • Models: XGBoost/LightGBM, optional LSTM ensemble.
    • Writes audit hash (sha256 of critical metadata) to Solana; persists full prediction to DB.
  6. Semantic Layer (Tableau Next)

    • Canonical entities, DMO/semantic model definitions published to Tableau Cloud.
  7. Hyper API & Pre-Aggregates

    • Precompute aggregates for interactive queries; stream smaller extracts to Tableau Cloud.
  8. Agentforce Orchestrator

    • Playbook runner: create cases, reserve parts, notify Slack/mobile.
    • Idempotent execution and step-level auditing.
  9. Solana Ledger

    • Store compact hashes and minimal metadata on-chain; off-chain storage holds full payload.
  10. Observability

    • Prometheus metrics; OpenTelemetry tracing; ELK/Loki log aggregation.

Data model & semantic model (Tableau Next)

Core entities

  • Equipment: id, name, type, installation_date, criticality_score, location_id
  • SensorReading: id, equipment_id, ts, sensor_type, value, raw_quality_flag
  • Prediction: id, equipment_id, model_version, ts, failure_probability, rul_days, confidence, explanation_json
  • MaintenanceEvent: id, equipment_id, workorder_id, start_time, end_time, parts_used
  • InventoryItem: part_id, qty_on_hand, reorder_point
  • Technician: id, name, skills, availability

Example Timescale schema (abridged)

CREATE TABLE equipment (
  id UUID PRIMARY KEY,
  name TEXT,
  type TEXT,
  installation_date DATE,
  criticality SMALLINT,
  location TEXT
);

CREATE TABLE sensor_readings (
  id BIGSERIAL PRIMARY KEY,
  equipment_id UUID REFERENCES equipment(id),
  ts TIMESTAMPTZ NOT NULL,
  sensor_type TEXT,
  value DOUBLE PRECISION,
  raw_quality_flag BOOLEAN DEFAULT FALSE
);

CREATE TABLE predictions (
  id UUID PRIMARY KEY,
  equipment_id UUID REFERENCES equipment(id),
  model_version TEXT,
  ts TIMESTAMPTZ NOT NULL,
  failure_probability DOUBLE PRECISION,
  rul_days DOUBLE PRECISION,
  confidence DOUBLE PRECISION,
  explanation JSONB
);

Semantic model (conceptual YAML)

semantic_model:
  name: Predictive_Maintenance_Semantic_Model
  tables:
    - name: Equipment
      source: equipment
      fields: [id, name, type, installation_date, criticality, location]
    - name: SensorReadings
      source: sensor_readings
      fields: [id, equipment_id, ts, sensor_type, value, raw_quality_flag]
    - name: Predictions
      source: predictions
      fields: [id, equipment_id, ts, model_version, failure_probability, rul_days, confidence, explanation]
  relationships:
    - from: Equipment.id
      to: SensorReadings.equipment_id
      type: one_to_many
    - from: Equipment.id
      to: Predictions.equipment_id
      type: one_to_many

Publish this model to Tableau Next so dashboards and Hyper queries reference canonical fields (e.g., Equipment.criticality, Predictions.failure_probability).


Realtime ingestion & pipeline

Flow summary

  1. Devices → Edge Gateway (MQTT / HTTP)
  2. Gateway → Kafka sensor_readings topic
  3. Stream Processor: de-noise, window aggregates → TSDB + Feature Store
  4. Model scoring (online or triggered) → Predictions stored in DB + semantic layer
  5. Pre-aggregates produced for Tableau Hyper ingestion

Sample MQTT → Kafka bridge (Python, simplified)

# mqtt_to_kafka.py (simplified)
import json
from kafka import KafkaProducer
import paho.mqtt.client as mqtt

KAFKA_BOOTSTRAP = "kafka:9092"
TOPIC = "sensor_readings"
producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP,
                         value_serializer=lambda v: json.dumps(v).encode())

def on_message(client, userdata, msg):
    payload = json.loads(msg.payload.decode())
    producer.send(TOPIC, payload)

client = mqtt.Client()
client.on_message = on_message
client.connect("edge.local", 1883)
client.subscribe("factory/+/reading")
client.loop_forever()

Stream feature engineering

  • 1h average, 24h variance
  • slope / trend features
  • categorical enrichments: equipment_type, criticality
  • time-based features: hour_of_day, shift

Materialize to Hyper

Use tableauhyperapi to write Hyper extracts (hourly/daily) for Tableau.


Predictive models & ML lifecycle

Objectives

  • Primary: predict failure probability in 7–30 day horizon
  • Secondary: estimate RUL (days)
  • Explainability: per-prediction contributions (top-5 features)

Architecture

  • Tabular model: XGBoost/LightGBM for probability
  • RUL model: regression or survival modeling
  • Temporal: LSTM/1D-CNN for complex series (ensemble)
  • Explainability: SHAP TreeExplainer / DeepExplainer

Model server (FastAPI example)

# app.py (concept)
from fastapi import FastAPI
import joblib, uuid, datetime

app = FastAPI()
model = joblib.load("/models/failure_predictor_v1.2.pkl")

@app.post("/predict")
def predict(req: dict):
    features = req["features"]
    prob = model.predict_proba([features])[0][1]
    rul = estimate_rul(prob, features)
    pred_id = str(uuid.uuid4())
    # persist to predictions table, enqueue audit write
    return {"prediction_id": pred_id, "failure_probability": float(prob), "rul_days": float(rul)}

Model lifecycle

  • Train → validate (time-split CV) → register (MLflow/S3) → staging → promote (manual approval) → production
  • Nightly drift checks; automated retrain candidate generation.

APIs & Extension points (Hyper, REST, Webhooks)

Exposed endpoints (recommended)

  • POST /predict — single prediction
  • POST /predict/batch — batch scoring
  • GET /equipment/{id}/health — latest health + RUL
  • GET /predictions/{id} — full prediction + explanation
  • POST /actions/create_case — agentforce wrapper
  • POST /webhooks/tableau — receive extension actions (create case, reserve parts)

Hyper API flow

  • Periodically push predictions_pre_agg.hyper to Tableau Cloud or call Hyper API to stream pre-aggregates.

Webhook authenticity

  • Use signed JWTs from Tableau; verify signature prior to processing.

Agentforce automation workflows

Playbooks are JSON/YAML-driven. Example playbook:

name: high_risk_failure_playbook
trigger:
  when: prediction.failure_probability >= 0.85
actions:
  - type: create_case
    target: Salesforce.FieldService
    payload_template: |
      {
        "subject":"Predicted failure: {{equipment_id}}",
        "priority":"High",
        "description":"Predicted failure probability {{failure_probability}} (RUL: {{rul_days}})"
      }
  - type: reserve_parts
    target: ERP
    payload_template: | 
      { "part_id": "{{critical_part}}", "qty": 1 }
  - type: notify
    target: Slack
    payload_template: { "channel":"#maintenance-alerts", "text":"Auto-case created: {{case_id}}" }

Agentforce ensures idempotency, retries, and logs step outcomes to workflow_runs.


Solana immutable ledger integration

Design principles

  • Store only compact SHA-256 hashes on-chain to minimize cost. Off-chain S3 (encrypted) stores the full payload.
  • On prediction, compute: hash = sha256(prediction_json) and submit (hash, short_meta) to Solana program.
  • Off-chain indexer maps hash -> full payload.

Pseudocode: write audit hash

from solana.rpc.api import Client
from solana.transaction import Transaction

client = Client("https://api.mainnet-beta.solana.com")
sha256_bytes = compute_sha256(audit_record_json)
tx = Transaction()
# create instruction to custom Solana program that appends hash to log account
res = client.send_transaction(tx, signer)

Note: handle rate limits and retries; record pending_onchain status if RPC fails.


Tableau dashboards, embeds, and Extensions

Dashboard examples

  • Equipment Health: floor-map with colored glyphs (Critical/Warning/OK), tooltip shows top factors, RUL.
  • Predictive Timeline: sparks + prediction band slider (7–30 days).
  • Maintenance Scheduler: Gantt scheduler with Agentforce suggestions.
  • Parts Forecast: projected demand and auto-PO button.

Viz Extensions

  • Built using Tableau Extensions SDK (React or vanilla). Extensions call POST /webhooks/tableau with signed JWT.

Embed & security

  • Embed tokens rotated; use CSP and iframe sandboxing. Extensions must authenticate via signed JWT.

Deployment & infrastructure (Docker, k8s, CI/CD)

Recommended k8s layout

  • Namespace: predict-ai-prod
  • Deployments: edge-proxy, kafka, streaming-processor, timescaledb, feature-store, model-server, api, agentforce-worker, hyper-worker
  • StatefulSets: kafka, timescaledb
  • Services: redis, prometheus, grafana
  • Jobs/CronJobs: Hyper pre-aggregate export, nightly training checks

Local docker-compose.yml (abridged)

version: '3.7'
services:
  kafka:
    image: wurstmeister/kafka
  zookeeper:
    image: wurstmeister/zookeeper
  model-server:
    build: ./predictive_engine
    ports: ["8000:8000"]
  api:
    build: ./api
    ports: ["8080:8080"]
  timescaledb:
    image: timescale/timescaledb:latest-pg12
    environment:
      POSTGRES_PASSWORD: example

CI/CD

  • CI: GitHub Actions runs unit tests, lints, builds Docker images, runs model validation on PRs.
  • CD: Helm charts + GitHub Actions kubectl/helm deploys to cluster. Model promotions gated by manual approval.

Observability, testing, and SLOs

Metrics to export (Prometheus)

  • ingest_messages_total, ingest_latency_seconds
  • predictions_total, prediction_latency_seconds, model_version_gauge
  • api_errors_total, request_duration_seconds

Tracing & logs

  • Use OpenTelemetry to propagate trace IDs across services (Edge → Ingest → Model → Agentforce).
  • Structured JSON logs and central ELK (or Loki) ingestion.

SLOs

  • /predict 99% < 300ms
  • Ingestion 99.9% availability
  • Mean time to detect anomaly < 2 minutes

Tests

  • Unit tests (pytest / jest)
  • Integration tests: synthetic sensor feed → Kafka → end-to-end pre-aggregate and Tableau update.
  • Model tests: nightly drift, calibration, performance checks.

Security, authN/authZ, and secrets handling

Auth patterns

  • Inter-service: mTLS (recommended)
  • External APIs: OAuth2 / JWT with short lives
  • Tableau webhooks: signed JWTs validated per invocation

Secrets & key management

  • Store in KMS (AWS KMS/GCP KMS) or Vault. Kubernetes secrets encrypted at rest and injected at runtime. Never store raw secrets in logs.

Data protection

  • TLS for all transport (1.2+)
  • Encrypt at-rest (S3, Postgres)
  • PII redaction in logs; store minimal PII in semantic layer

Developer guide & local dev quickstart

Prereqs

  • Docker & docker-compose
  • Python 3.10+, Node 16+
  • Kafka dev or Confluent cloud
  • Tableau Dev site and tokens for embedding

Quickstart (local)

git clone git@github.com:your-org/predict-ai.git
cd predict-ai
docker-compose build
docker-compose up -d
# run migrations
docker-compose exec api alembic upgrade head
# seed sample data
python scripts/seed_sample_data.py
# run model server locally
cd predictive_engine && uvicorn app:app --reload --port 8000

Tests

pytest tests/unit
pytest tests/integration  # requires local kafka & db

Troubleshooting & runbook

Prediction latency spikes

  • Check model-server CPU/memory and thread pool. kubectl top pods
  • Check feature store cache misses (cache metrics)
  • Consider reverting model version via model registry

Too many false positives

  • Verify sensor quality flags (missing ts, unrealistic values)
  • Inspect feature importance on affected predictions
  • Retrain with improved windowing or add conservative ensemble gating

Agentforce action failures

  • Check external endpoints creds and network ACLs
  • Inspect workflow_runs for step errors (timeout, 5xx)
  • Validate idempotency keys to avoid duplicates

Solana append failures

  • RPC rate limits or account signer issues
  • Mark audit entries pending_onchain and retry with backoff

Tableau pre-aggregate failures

  • Confirm Hyper file creation, upload logs, and Tableau Cloud token validity

Roadmap & known limitations

Short-term

  • Continuous learning loop (retrain on confirmed failures)
  • Technician mobile app native integration (offline mode)
  • Improve LSTM ensemble for long-term pattern capture

Medium-term

  • Cross-factory semantic federation and horizontal scaling
  • Differential privacy mode for regulated environments

Known limitations

  • Solana on-chain costs prohibit full payload on-chain; only hashes stored.
  • Some SCADA vendors use proprietary locked protocols; prebuilt adapters cover many but not all.

Contributing, license, credits

  • Fork → new branch feature/xxx → tests pass → PR with changelog
  • Python style: Black / isort / mypy. JS: Prettier / ESLint. Infra: Helm lint and tfsec.
  • License: MIT (see LICENSE)

Appendices (configs, examples)

.env.example

POSTGRES_HOST=timescaledb
POSTGRES_USER=
POSTGRES_PASSWORD=secret
KAFKA_BOOTSTRAP=kafka:9092
MODEL_REGISTRY_BUCKET=s3://models
TABLEAU_CLOUD_SITE=your-site
TABLEAU_CLOUD_TOKEN=XXXX
SOLANA_RPC=https://api.mainnet-beta.solana.com
SOLANA_SIGNER_KEY=base58key

Example playbook (JSON)

{
  "id":"pb_high_risk_001",
  "name":"High Risk Failure Playbook",
  "trigger":{"type":"threshold","field":"failure_probability","op":">=","value":0.85},
  "steps":[
    {"type":"create_case","args":{"priority":"High"}},
    {"type":"reserve_parts","args":{"strategy":"min_stock"}},
    {"type":"notify","args":{"channels":["slack","mobile"]}}
  ]
}

Example model evaluation summary (v1.2)

  • ROC-AUC: 0.92
  • Precision@10%: 0.81
  • RUL RMSE: 4.2 days
  • Data: 4 years labeled failures, 2 factories, 120 equipment types

About

01-07-2026. predictai TABLEAU

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published