"Garbage In, Gold Out" β Real-time data cleaning and enrichment using Confluent and Google Vertex AI. This project was developed specifically for the AI Partner Catalyst: Accelerate Innovation hackathon to demonstrate real-time data cleansing using GenAI.
Click the image above to watch the full demo (2 min).
See how it cleans dirty data streams instantly using Confluent Cloud & Gemini 2.5.
Standardize and fix "dirty" data streams (typos in locations, product names) on the fly using LLMs, transforming a chaotic raw stream into a high-quality analytical dataset without manual intervention.
Real-time pipeline: Confluent Cloud streams β‘οΈ Gemini AI cleaning β‘οΈ Streamlit Dashboard.
- Ingestion: Python Producer generates mock transactions with intentional errors.
- Transport: Confluent Cloud (Kafka) streams the raw data to the
raw-datatopic. - Intelligence: Google Vertex AI (Gemini) processes the JSON, fixes typos, and enriches the data.
- Output: Clean, validated data is produced back to the
clean-datatopic in real-time. - Visualization: A Streamlit dashboard consumes both topics to display a live side-by-side comparison with a history stack.
-
Install dependencies:
pip install -r requirements.txt
-
Configure Environment: Create a
client.propertiesfile in the root directory. You need Confluent credentials and a Google AI Studio key:# Confluent Settings bootstrap.servers=YOUR_BOOTSTRAP_SERVER security.protocol=SASL_SSL sasl.mechanisms=PLAIN sasl.username=YOUR_KAFKA_API_KEY sasl.password=YOUR_KAFKA_API_SECRET # Google AI Settings google.api.key=YOUR_GOOGLE_AI_KEY
-
Run the pipeline:
Terminal 1 (Data Source):
python producer.py
Terminal 2 (AI Processor):
python consumer.py
Terminal 3 (Live Dashboard):
streamlit run app.py
-
Verification: Open your browser at
http://localhost:8501. You will see the "Dirty" stream on the left and the AI-cleaned "Enriched" stream on the right appearing in real-time.
This project is open-source and available under the MIT License.

