This library provides a high-performance Python client for Baseten.co endpoints including embeddings, reranking, and classification. It was built for massive concurrent post requests to any URL, also outside of baseten.co. PerformanceClient releases the GIL while performing requests in the Rust, and supports simultaneous sync and async usage. It was benchmarked with >1200 rps per client in our blog. PerformanceClient is built on top of pyo3, reqwest and tokio and is MIT licensed.
pip install baseten_performance_client
import os
import asyncio
from baseten_performance_client import PerformanceClient, OpenAIEmbeddingsResponse, RerankResponse, ClassificationResponse
api_key = os.environ.get("BASETEN_API_KEY")
base_url_embed = "https://model-yqv4yjjq.api.baseten.co/environments/production/sync"
# Also works with OpenAI or Mixedbread.
# base_url_embed = "https://api.openai.com" or "https://api.mixedbread.com"
# Basic client setup
client = PerformanceClient(base_url=base_url_embed, api_key=api_key)
# Advanced setup with HTTP version selection and connection pooling
from baseten_performance_client import HttpClientWrapper
http_wrapper = HttpClientWrapper(http_version=1) # HTTP/1.1 (default)
advanced_client = PerformanceClient(
base_url=base_url_embed,
api_key=api_key,
http_version=1, # HTTP/1.1
client_wrapper=http_wrapper # Share connection pool
)from baseten_performance_client import RequestProcessingPreference
texts = ["Hello world", "Example text", "Another sample"]
preference = RequestProcessingPreference(
batch_size=16,
max_concurrent_requests=32,
timeout_s=360,
max_chars_per_request=256000, # Character limit per request
hedge_delay=0.5, # Enable hedging with 0.5s delay
total_timeout_s=360 # Total operation timeout
)
response = client.embed(
input=texts,
model="my_model",
preference=preference
)
# Accessing embedding data
print(f"Model used: {response.model}")
print(f"Total tokens used: {response.usage.total_tokens}")
print(f"Total time: {response.total_time:.4f}s")
if response.individual_batch_request_times:
for i, batch_time in enumerate(response.individual_batch_request_times):
print(f" Time for batch {i}: {batch_time:.4f}s")
for i, embedding_data in enumerate(response.data):
print(f"Embedding for text {i} (original input index {embedding_data.index}):")
# embedding_data.embedding can be List[float] or str (base64)
if isinstance(embedding_data.embedding, list):
print(f" First 3 dimensions: {embedding_data.embedding[:3]}")
print(f" Length: {len(embedding_data.embedding)}")
# Using the numpy() method (requires numpy to be installed)
import numpy as np
numpy_array = response.numpy()
print("\nEmbeddings as NumPy array:")
print(f" Shape: {numpy_array.shape}")
print(f" Data type: {numpy_array.dtype}")
if numpy_array.shape[0] > 0:
print(f" First 3 dimensions of the first embedding: {numpy_array[0][:3]}")Note: The embed method is versatile and can be used with any embeddings service, e.g. OpenAI API embeddings, not just for Baseten deployments.
async def async_embed():
from baseten_performance_client import RequestProcessingPreference
texts = ["Async hello", "Async example"]
preference = RequestProcessingPreference(
batch_size=16,
max_concurrent_requests=32,
timeout_s=360,
max_chars_per_request=256000, # Character limit per request
hedge_delay=0.5, # Enable hedging with 0.5s delay
total_timeout_s=360 # Total operation timeout
)
response = await client.async_embed(
input=texts,
model="my_model",
preference=preference
)
print("Async embedding response:", response.data)
# To run:
# asyncio.run(async_embed())Comparison against pip install openai for /v1/embeddings. Tested with the ./scripts/compare_latency_openai.py with mini_batch_size of 128, and 4 server-side replicas. Results with OpenAI similar, OpenAI allows a max mini_batch_size of 2048.
| Number of inputs / embeddings | Number of Tasks | PerformanceClient (s) | AsyncOpenAI (s) | Speedup |
|---|---|---|---|---|
| 128 | 1 | 0.12 | 0.13 | 1.08× |
| 512 | 4 | 0.14 | 0.21 | 1.50× |
| 8 192 | 64 | 0.83 | 1.95 | 2.35× |
| 131 072 | 1 024 | 4.63 | 39.07 | 8.44× |
| 2 097 152 | 16 384 | 70.92 | 903.68 | 12.74× |
The batch_post method is generic. It can be used to send POST requests to any URL, not limited to Baseten endpoints. The input and output can be any JSON item.
from baseten_performance_client import RequestProcessingPreference
payload1 = {"model": "my_model", "input": ["Batch request sample 1"]}
payload2 = {"model": "my_model", "input": ["Batch request sample 2"]}
preference = RequestProcessingPreference(
max_concurrent_requests=32,
timeout_s=360,
hedge_delay=0.5, # Enable hedging with 0.5s delay
total_timeout_s=360, # Total operation timeout
extra_headers={"x-custom-header": "value"} # Custom headers
)
response_obj = client.batch_post(
url_path="/v1/embeddings", # Example path, adjust to your needs
payloads=[payload1, payload2],
preference=preference
)
print(f"Total time for batch POST: {response_obj.total_time:.4f}s")
for i, (resp_data, headers, time_taken) in enumerate(zip(response_obj.data, response_obj.response_headers, response_obj.individual_request_times)):
print(f"Response {i+1}:")
print(f" Data: {resp_data}")
print(f" Headers: {headers}")
print(f" Time taken: {time_taken:.4f}s")async def async_batch_post_example():
from baseten_performance_client import RequestProcessingPreference
payload1 = {"model": "my_model", "input": ["Async batch sample 1"]}
payload2 = {"model": "my_model", "input": ["Async batch sample 2"]}
preference = RequestProcessingPreference(
max_concurrent_requests=32,
timeout_s=360,
hedge_delay=0.5, # Enable hedging with 0.5s delay
total_timeout_s=360, # Total operation timeout
extra_headers={"x-custom-header": "value"} # Custom headers
)
response_obj = await client.async_batch_post(
url_path="/v1/embeddings",
payloads=[payload1, payload2],
preference=preference
)
print(f"Async total time for batch POST: {response_obj.total_time:.4f}s")
for i, (resp_data, headers, time_taken) in enumerate(zip(response_obj.data, response_obj.response_headers, response_obj.individual_request_times)):
print(f"Async Response {i+1}:")
print(f" Data: {resp_data}")
print(f" Headers: {headers}")
print(f" Time taken: {time_taken:.4f}s")
# To run:
# asyncio.run(async_batch_post_example())Reranking compatible with BEI or text-embeddings-inference.
from baseten_performance_client import RequestProcessingPreference
query = "What is the best framework?"
documents = ["Doc 1 text", "Doc 2 text", "Doc 3 text"]
preference = RequestProcessingPreference(
batch_size=16,
max_concurrent_requests=32,
timeout_s=360,
max_chars_per_request=256000, # Character limit per request
hedge_delay=0.5, # Enable hedging with 0.5s delay
total_timeout_s=360 # Total operation timeout
)
rerank_response = client.rerank(
query=query,
texts=documents,
model="rerank-model", # Optional model specification
return_text=True,
preference=preference
)
for res in rerank_response.data:
print(f"Index: {res.index} Score: {res.score}")async def async_rerank():
from baseten_performance_client import RequestProcessingPreference
query = "Async query sample"
docs = ["Async doc1", "Async doc2"]
preference = RequestProcessingPreference(
batch_size=16,
max_concurrent_requests=32,
timeout_s=360,
max_chars_per_request=256000, # Character limit per request
hedge_delay=0.5, # Enable hedging with 0.5s delay
total_timeout_s=360 # Total operation timeout
)
response = await client.async_rerank(
query=query,
texts=docs,
model="rerank-model", # Optional model specification
return_text=True,
preference=preference
)
for res in response.data:
print(f"Async Index: {res.index} Score: {res.score}")
# To run:
# asyncio.run(async_rerank())Predict (classification endpoint) compatible with BEI or text-embeddings-inference.
from baseten_performance_client import RequestProcessingPreference
texts_to_classify = [
"This is great!",
"I did not like it.",
"Neutral experience."
]
preference = RequestProcessingPreference(
batch_size=16,
max_concurrent_requests=32,
timeout_s=360,
max_chars_per_request=256000, # Character limit per request
hedge_delay=0.5, # Enable hedging with 0.5s delay
total_timeout_s=360 # Total operation timeout
)
classify_response = client.classify(
inputs=texts_to_classify,
model="classification-model", # Optional model specification
preference=preference
)
for group in classify_response.data:
for result in group:
print(f"Label: {result.label}, Score: {result.score}")async def async_classify():
from baseten_performance_client import RequestProcessingPreference
texts = ["Async positive", "Async negative"]
preference = RequestProcessingPreference(
batch_size=16,
max_concurrent_requests=32,
timeout_s=360,
max_chars_per_request=256000, # Character limit per request
hedge_delay=0.5, # Enable hedging with 0.5s delay
total_timeout_s=360 # Total operation timeout
)
response = await client.async_classify(
inputs=texts,
model="classification-model", # Optional model specification
preference=preference
)
for group in response.data:
for res in group:
print(f"Async Label: {res.label}, Score: {res.score}")
# To run:
# asyncio.run(async_classify())The RequestProcessingPreference class provides a unified way to configure all request processing parameters. This is the recommended approach for advanced configuration as it provides better type safety and clearer intent.
from baseten_performance_client import RequestProcessingPreference
# Create a preference with custom settings
preference = RequestProcessingPreference(
max_concurrent_requests=64, # Parallel requests (default: 128)
batch_size=32, # Items per batch (default: 128)
timeout_s=30.0, # Per-request timeout (default: 3600.0)
hedge_delay=0.5, # Hedging delay (default: None)
hedge_budget_pct=0.15, # Hedge budget percentage (default: 0.10)
retry_budget_pct=0.08, # Retry budget percentage (default: 0.05)
max_retries=3, # Maximum HTTP retries (default: 4)
initial_backoff_ms=250, # Initial backoff in milliseconds (default: 125)
total_timeout_s=300.0 # Total operation timeout (default: None)
)
# Use with any method
response = client.embed(
input=["text1", "text2"],
model="my_model",
preference=preference
)
# Also works with async methods
response = await client.async_embed(
input=["text1", "text2"],
model="my_model",
preference=preference
)Property-based Configuration: You can also modify preferences after creation using property setters:
# Create preference and modify properties
preference = RequestProcessingPreference()
preference.max_concurrent_requests = 64 # Set parallel requests
preference.batch_size = 32 # Set batch size
preference.timeout_s = 30.0 # Set timeout
preference.hedge_delay = 0.5 # Enable hedging
preference.hedge_budget_pct = 0.15 # Set hedge budget
preference.retry_budget_pct = 0.08 # Set retry budget
preference.max_retries = 3 # Set max retries
preference.initial_backoff_ms = 250 # Set backoff
# Use with any method
response = client.embed(
input=["text1", "text2"],
model="my_model",
preference=preference
)Budget Percentages:
hedge_budget_pct: Percentage of total requests allocated for hedging (default: 10%)retry_budget_pct: Percentage of total requests allocated for retries (default: 5%)- Maximum allowed: 300% for both budgets
Retry Configuration:
max_retries: Maximum number of HTTP retries (default: 4, max: 4)initial_backoff_ms: Initial backoff duration in milliseconds (default: 125, range: 50-30000)- Backoff uses exponential backoff with jitter
The client supports request hedging for improved latency by sending duplicate requests after a specified delay:
# Enable hedging with 0.5 second delay
preference = RequestProcessingPreference(
hedge_delay=0.5, # Send hedge request after 0.5s
max_chars_per_request=256000,
total_timeout_s=360
)
response = client.embed(
input=texts,
model="my_model",
preference=preference
)Use custom headers with batch_post:
preference = RequestProcessingPreference(
extra_headers={
"x-custom-header": "value",
"authorization": "Bearer token"
}
)
response = client.batch_post(
url_path="/v1/embeddings",
payloads=payloads,
preference=preference
)Choose between HTTP/1.1 and HTTP/2:
# HTTP/1.1 (default, better for high concurrency)
client_http1 = PerformanceClient(base_url, api_key, http_version=1)
# HTTP/2 (better for single requests)
client_http2 = PerformanceClient(base_url, api_key, http_version=2)Share connection pools across multiple clients:
from baseten_performance_client import HttpClientWrapper
# Create shared wrapper
wrapper = HttpClientWrapper(http_version=1)
# Reuse across multiple clients
client1 = PerformanceClient(base_url="https://api1.example.com", client_wrapper=wrapper)
client2 = PerformanceClient(base_url="https://api2.example.com", client_wrapper=wrapper)Route all HTTP requests through a proxy (e.g., for connection pooling with Envoy):
from baseten_performance_client import HttpClientWrapper
# Create wrapper with HTTP proxy
wrapper = HttpClientWrapper(
http_version=1,
proxy="http://envoy-proxy.local:8080"
)
# Share the wrapper across multiple clients
client1 = PerformanceClient(
base_url="https://api1.example.com",
api_key="your_key",
client_wrapper=wrapper
)
client2 = PerformanceClient(
base_url="https://api2.example.com",
api_key="your_key",
client_wrapper=wrapper
)
# Both clients will use the same connection pool and proxyYou can also specify the proxy directly when creating a client:
client = PerformanceClient(
base_url="https://api.example.com",
api_key="your_key",
proxy="http://envoy-proxy.local:8080"
)The client can raise several types of errors. Here's how to handle common ones:
requests.exceptions.HTTPError: This error is raised for HTTP issues, such as authentication failures (e.g., 403 Forbidden if the API key is wrong), server errors (e.g., 5xx), or if the endpoint is not found (404). You can inspecte.response.status_codeande.response.text(ore.response.json()if the body is JSON) for more details.ValueError: This error can occur due to invalid input parameters (e.g., an emptyinputlist forembed, invalidbatch_sizeormax_concurrent_requestsvalues). It can also be raised byresponse.numpy()if embeddings are not float vectors or have inconsistent dimensions.
Here's an example demonstrating how to catch these errors for the embed method:
import requests
from baseten_performance_client import RequestProcessingPreference
# client = PerformanceClient(base_url="your_baseten_url", api_key="your_baseten_api_key")
texts_to_embed = ["Hello world", "Another text example"]
try:
preference = RequestProcessingPreference(
batch_size=2,
max_concurrent_requests=4,
timeout_s=60 # Timeout in seconds
)
response = client.embed(
input=texts_to_embed,
model="your_embedding_model", # Replace with your actual model name
preference=preference
)
# Process successful response
print(f"Model used: {response.model}")
print(f"Total tokens: {response.usage.total_tokens}")
for item in response.data:
embedding_preview = item.embedding[:3] if isinstance(item.embedding, list) else "Base64 Data"
print(f"Index {item.index}, Embedding (first 3 dims or type): {embedding_preview}")
except requests.exceptions.HTTPError as e:
print(f"An HTTP error occurred: {e}, code {e.args[0]}")For asynchronous methods (async_embed, async_rerank, async_classify, async_batch_post), the same exceptions will be raised by the await call and can be caught using a try...except block within an async def function.
# Install prerequisites
sudo apt-get install patchelf
# Install cargo if not already installed.
# Set up a Python virtual environment
python -m venv .venv
source .venv/bin/activate
# Install development dependencies
pip install maturin[patchelf] pytest requests numpy
# Build and install the Rust extension in development mode
maturin develop
cargo fmt
# Run tests
pytest testsFeel free to contribute to this repo, tag @michaelfeil for review.
MIT License
