A pythonic parallel computing library that just works
pip install ripplexAdd @flow to any function and Ripplex automatically runs independent operations in parallel.
from ripplex import flow
@flow
def get_user_profile(user_id):
# These three calls run in PARALLEL automatically
user = fetch_user_data(user_id) # 1.0s
posts = fetch_user_posts(user_id) # 1.5s
friends = fetch_user_friends(user_id) # 0.8s
# This waits for all three to finish, then runs
return {
"user": user,
"posts": posts,
"friends": friends
}
# Sequential: 3.3 seconds
# With @flow: 1.5 seconds
result = get_user_profile(123)Zero refactoring required. Just add the decorator.
Process lists in parallel with automatic variable capture.
from ripplex import loop
# Variables from outer scope are automatically available
API_KEY = "secret-key"
BASE_URL = "https://api.example.com"
user_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
@loop(user_ids)
def fetch_user(user_id):
# API_KEY and BASE_URL automatically available!
url = f"{BASE_URL}/users/{user_id}?key={API_KEY}"
return requests.get(url).json()
# The decorated function becomes a list of results!
print(f"Fetched {len(fetch_user)} users in parallel")
print(fetch_user[0]) # First user's dataNo setup needed. Outer variables just work.
The @loop decorator returns a special LoopResult object that acts like a list but includes extra metadata:
@loop([1, 2, 0, 4, 0, 6], on_error="collect")
def divide(n):
return 100 / n
# It's a list!
print(divide) # [100, 50, None, 25, None, 16.67]
print(divide[0]) # 100
print(len(divide)) # 6
# But with extras!
print(divide.success_count) # 4 successful
print(divide.total_count) # 6 total attempts
print(divide.all_successful) # False
print(divide.errors) # {2: ZeroDivisionError(...), 4: ZeroDivisionError(...)}The @loop decorator provides three strategies for handling errors:
data = [1, 2, 0, 4, 0, 6] # Zeros will cause division errors
# Option 1: "continue" (default) - Skip failures, only return successes
@loop(data, on_error="continue")
def divide_continue(n):
return 100 / n
# Returns: [100, 50, 25, 16.67] - failed items removed
# Option 2: "raise" - Stop on first error
@loop(data, on_error="raise")
def divide_raise(n):
return 100 / n
# Raises: ZeroDivisionError (and cancels remaining)
# Option 3: "collect" - Keep None for failures, preserve positions
@loop(data, on_error="collect")
def divide_collect(n):
return 100 / n
# Returns: [100, 50, None, 25, None, 16.67] - preserves list structurefrom ripplex import pmap, execute, quick_map
# One-liner parallel map
squared = pmap(lambda x: x**2, range(100))
# Execute with options
results = execute(process_item, items, workers=8, on_error="collect")
# Super quick for prototyping
doubled = quick_map(lambda x: x * 2, [1,2,3,4,5])import ripplex as rx
# Use short aliases for less typing
@rx.f # Instead of @flow
def complex_pipeline():
data = fetch_data()
processed = transform(data)
return processed
@rx.l(items) # Instead of @loop
def process(item):
return item * 2
# Ultra-short parallel map
results = rx.p(lambda x: x**2, range(10))Here's a real-world data pipeline using both features:
from ripplex import flow, loop
@flow
def process_sales_data():
# Step 1: Fetch data in parallel
sales = fetch_sales_data() # 2s
customers = fetch_customers() # 1.5s
products = fetch_products() # 1s
# Step 2: Process each sale in parallel
@loop(sales, workers=8)
def enrich_sale(sale):
# customers and products automatically available!
customer = customers[sale['customer_id']]
product = products[sale['product_id']]
return {
**sale,
'customer_name': customer['name'],
'product_name': product['name'],
'profit': sale['price'] - product['cost']
}
# Step 3: Generate reports in parallel
summary = generate_summary(enrich_sale)
insights = analyze_trends(enrich_sale)
return {'sales': enrich_sale, 'summary': summary, 'insights': insights}
# Sequential: ~15 seconds
# With Ripplex: ~6 seconds
result = process_sales_data()Add debug=True to see what's happening:
@flow(debug=True)
def my_function():
# Shows execution timeline
pass
@loop(items, debug=True)
def process_item(item):
# Shows progress bar
passAutomatically parallelizes independent operations in a function by analyzing dependencies.
Processes items in parallel with automatic variable capture.
Parameters:
iterable: Items to process (or an integer forrange(n))workers: Number of threads (default: smart auto-detection)on_error: Error handling strategy:"continue": Skip errors, return only successes"raise": Stop on first error"collect": IncludeNonefor failures, preserve list positions
debug: Show progress bar and timing
Returns: LoopResult - an enhanced list with error tracking
Functional parallel map for one-liners:
results = pmap(lambda x: x**2, [1,2,3,4]) # [1,4,9,16]Execute a function in parallel without decorators:
results = execute(process_item, items, workers=10, on_error="collect")Simplest parallel map with defaults:
results = quick_map(lambda x: x * 2, items)Get a human-readable summary of loop execution:
from ripplex import summary
@loop(items, on_error="collect")
def process(item):
return risky_operation(item)
print(summary(process))
# ๐ Execution Summary:
# Total items: 100
# Successful: 95
# Failed: 5
# Success rate: 95.0%