Skip to content

🌊 Stream Refinery: Real-time AI data cleaning pipeline. Consumes "dirty" streams from Confluent Kafka, processes them with Google Gemini 2.5, and visualizes enriched JSONs instantly in Streamlit.

License

Notifications You must be signed in to change notification settings

vero-code/stream-refinery

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

18 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Stream Refinery 🌊🧠

Python Confluent Google AI Streamlit License

"Garbage In, Gold Out" β€” Real-time data cleaning and enrichment using Confluent and Google Vertex AI. This project was developed specifically for the AI Partner Catalyst: Accelerate Innovation hackathon to demonstrate real-time data cleansing using GenAI.

πŸ“Ί Demo Video

Stream Refinery Demo

Click the image above to watch the full demo (2 min).

See how it cleans dirty data streams instantly using Confluent Cloud & Gemini 2.5.

Project Goal

Standardize and fix "dirty" data streams (typos in locations, product names) on the fly using LLMs, transforming a chaotic raw stream into a high-quality analytical dataset without manual intervention.

πŸ—οΈ Architecture

Stream Refinery Architecture

Real-time pipeline: Confluent Cloud streams ➑️ Gemini AI cleaning ➑️ Streamlit Dashboard.

  1. Ingestion: Python Producer generates mock transactions with intentional errors.
  2. Transport: Confluent Cloud (Kafka) streams the raw data to the raw-data topic.
  3. Intelligence: Google Vertex AI (Gemini) processes the JSON, fixes typos, and enriches the data.
  4. Output: Clean, validated data is produced back to the clean-data topic in real-time.
  5. Visualization: A Streamlit dashboard consumes both topics to display a live side-by-side comparison with a history stack.

Setup

  1. Install dependencies:

    pip install -r requirements.txt
  2. Configure Environment: Create a client.properties file in the root directory. You need Confluent credentials and a Google AI Studio key:

    # Confluent Settings
    bootstrap.servers=YOUR_BOOTSTRAP_SERVER
    security.protocol=SASL_SSL
    sasl.mechanisms=PLAIN
    sasl.username=YOUR_KAFKA_API_KEY
    sasl.password=YOUR_KAFKA_API_SECRET
    
    # Google AI Settings
    google.api.key=YOUR_GOOGLE_AI_KEY
  3. Run the pipeline:

    Terminal 1 (Data Source):

    python producer.py

    Terminal 2 (AI Processor):

    python consumer.py

    Terminal 3 (Live Dashboard):

    streamlit run app.py
  4. Verification: Open your browser at http://localhost:8501. You will see the "Dirty" stream on the left and the AI-cleaned "Enriched" stream on the right appearing in real-time.

πŸ“„ License

This project is open-source and available under the MIT License.

About

🌊 Stream Refinery: Real-time AI data cleaning pipeline. Consumes "dirty" streams from Confluent Kafka, processes them with Google Gemini 2.5, and visualizes enriched JSONs instantly in Streamlit.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages