-
Notifications
You must be signed in to change notification settings - Fork 868
feat(experiment): Add run experiment capabilities #3331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Nina Kollman seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds an experiments subsystem (Experiment class, Experiment models, Evaluator, SSE streaming client), exposes Client.experiment and an async httpx client, adds datasets JSONL retrieval, treats NDJSON as text in HTTPClient, includes Experiment JSONL parsing tests, a sample medical experiment demo with prompts, and adds runtime dependency Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Dev as Developer Script
participant Client as SDK Client
participant Exp as Experiment
participant DS as Datasets
participant API as Traceloop API
participant Eval as Evaluator
Dev->>Client: Traceloop.init() / access client.experiment
Dev->>Exp: run(task, dataset_slug, dataset_version, evaluators)
Exp->>API: PUT /experiments/initialize
API-->>Exp: ExperimentInitResponse (experiment, run.id)
Exp->>DS: get_version_jsonl(slug, version)
DS->>API: GET /datasets/{slug}/versions/{version}/jsonl
API-->>DS: JSONL (NDJSON text)
DS-->>Exp: JSONL text
Exp->>Exp: parse JSONL -> rows (concurrent)
loop per row (concurrent)
Exp->>API: POST /experiments/{slug}/runs/{run_id}/task
API-->>Exp: CreateTaskResponse (task_id)
alt evaluators configured & wait_for_results
Exp->>Eval: run_experiment_evaluator(evaluator_slug, task_id, experiment_id, run_id, input)
Eval->>API: POST /evaluators/{slug}/execute
API-->>Eval: {execution_id, stream_url}
Eval->>Eval: SSEClient.wait_for_result(execution_id, stream_url)
Eval->>API: GET {base}/v2{stream_url} (Accept: text/event-stream)
API-->>Eval: SSE events -> final result
Eval-->>Exp: ExecutionResponse(result)
else evaluators triggered only
Exp->>Eval: trigger_experiment_evaluator(...)
Eval-->>Exp: execution_id
end
end
Exp-->>Dev: (results list, errors list)
sequenceDiagram
autonumber
participant Eval as Evaluator
participant HTTP as httpx.AsyncClient
participant SSE as SSEClient
participant API as Traceloop API
Eval->>HTTP: POST /evaluators/{slug}/execute (JSON)
API-->>Eval: 200 {execution_id, stream_url}
Eval->>SSE: wait_for_result(execution_id, stream_url)
SSE->>HTTP: GET {base}/v2{stream_url} (Accept: text/event-stream)
alt 200 OK and valid SSE
API-->>SSE: SSE body (JSON events -> final JSON)
SSE->>SSE: parse to ExecutionResponse
SSE-->>Eval: ExecutionResponse(result)
else non-200 / parse error / timeout
SSE-->>Eval: raise descriptive exception
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (2)
packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py (1)
34-35: Build stream URL robustly; handle absolute URLs and pre-prefixed “/v2”Current concatenation can yield invalid URLs like “…/v2/v2/...”. Normalize paths and respect absolute URLs.
- full_stream_url = f"{self._api_endpoint}/v2{stream_url}" + # Build robust stream URL + if stream_url.startswith(("http://", "https://")): + full_stream_url = stream_url + elif stream_url.startswith("/v2/"): + full_stream_url = f"{self._api_endpoint}{stream_url}" + else: + prefix = "" if stream_url.startswith("/") else "/" + full_stream_url = f"{self._api_endpoint}/v2{prefix}{stream_url}"packages/sample-app/sample_app/experiment/experiment_example.py (1)
15-16: Fail fast if Traceloop client isn’t initializedTraceloop.init() can return None when API key/config is missing. Guard to avoid
AttributeError.client = Traceloop.init() +if client is None: + raise RuntimeError( + "Traceloop client not initialized. Ensure TRACELOOP_API_KEY is set." + )
🧹 Nitpick comments (9)
packages/traceloop-sdk/traceloop/sdk/experiment/model.py (1)
10-16: Optional: Make models resilient to server-side schema additions (ignore extras)If the API adds fields, strict parsing can break clients. Consider ignoring unknown fields for response models.
Example (Pydantic v2):
from pydantic import ConfigDict class ExperimentResponse(BaseModel): model_config = ConfigDict(extra="ignore") ...Apply similarly to other response-bearing models (ExperimentRunResponse, ExperimentInitResponse, CreateTaskResponse, TaskResponse).
packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (2)
52-59: Docstring drift: remove undocumented parameter and align descriptionsThe docstring mentions
context_data, which isn’t in the signature. Trim it to avoid confusion and keep the Args accurate.Args: evaluator_slug: Slug of the evaluator to execute - input: Dict mapping evaluator input field names to their values. {field_name: value, ...} - client: Shared HTTP client for connection reuse (optional) - context_data: Context data to be passed to the evaluator (optional) + input: Dict mapping evaluator input field names to their values. {field_name: value, ...} + client: Shared HTTP client for connection reuse (optional) evaluator_version: Version of the evaluator to execute (optional) timeout_in_sec: Timeout in seconds for execution
44-45: Avoid shadowing built-ininput; rename parameter for clarityShadowing Python’s built-in
inputcan confuse readers and tools. Rename toevaluator_input.- input: Dict[str, str], + evaluator_input: Dict[str, str], @@ - input: Dict mapping evaluator input field names to their values. {field_name: value, ...} + evaluator_input: Dict mapping evaluator input field names to their values. {field_name: value, ...} @@ - schema_mapping = InputSchemaMapping( - root={k: InputExtractor(source=v) for k, v in input.items()} - ) + schema_mapping = InputSchemaMapping( + root={k: InputExtractor(source=v) for k, v in evaluator_input.items()} + ) @@ - body = request.model_dump() + body = request.model_dump()Also applies to: 54-55, 63-65, 67-72
packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py (1)
75-78: Decode error body before formatting
error_textis bytes; printing raw bytes yieldsb'...'. Decode for readability.- error_text = await response.aread() + raw = await response.aread() + error_text = raw.decode("utf-8", errors="replace") raise Exception( f"Failed to stream results: {response.status_code}, body: {error_text}" )packages/sample-app/sample_app/experiment/experiment_example.py (2)
22-23: Validate OPENAI_API_KEY before creating clientProvide a clear error if the environment variable is missing.
- openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + api_key = os.getenv("OPENAI_API_KEY") + if not api_key: + raise RuntimeError("OPENAI_API_KEY is not set in the environment") + openai_client = OpenAI(api_key=api_key)
9-9: Avoid sys.path manipulation in samples; prefer package-relative importsModifying
sys.pathcan mask import issues. Consider using proper package structure or relative imports.packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (3)
44-52: Docstring parameters are outdated; align with current signature and behaviorMentions
experiment_name,concurrency, andclientwhich aren’t parameters. Update to reflectexperiment_slug,related_ref,aux, and fixed concurrency behavior.- Args: - dataset_slug: Slug of the dataset to use - task: Function to run on each dataset row - evaluators: List of evaluator slugs to run - experiment_name: Name for this experiment run - concurrency: Number of concurrent tasks - exit_on_error: Whether to exit on first error - client: Traceloop client instance (if not provided, will initialize) + Args: + task: Function to run on each dataset row (sync; see note below) + dataset_slug: Slug of the dataset to use (requires dataset_version) + dataset_version: Version of the dataset to use + evaluators: List of evaluator slugs or (slug, version) tuples to run + experiment_slug: Slug for this experiment (auto-generated if not provided) + related_ref: Optional reference metadata dict for the run + aux: Optional auxiliary metadata dict for the run + exit_on_error: Whether to propagate the first error
97-105: Optional: Support async task callables without blocking the event loopIf
taskis async, await it; if it’s a heavy sync function, considerrun_in_executor. This keeps the loop responsive.+ # Allow either sync or async task functions + import inspect - task_result = task(row) + if inspect.iscoroutinefunction(task): + task_result = await task(row) + else: + task_result = task(row)
31-41: Optional: Make concurrency configurable instead of hard-coded 50Expose a
concurrency: int = 50parameter to tune throughput.- async def run( + async def run( self, task: Callable[[Optional[Dict[str, Any]]], Dict[str, Any]], dataset_slug: Optional[str] = None, dataset_version: Optional[str] = None, evaluators: Optional[List[EvaluatorDetails]] = None, experiment_slug: Optional[str] = None, related_ref: Optional[Dict[str, str]] = None, aux: Optional[Dict[str, str]] = None, - exit_on_error: bool = False, + exit_on_error: bool = False, + concurrency: int = 50, ) -> Tuple[List[TaskResponse], List[str]]: @@ - semaphore = asyncio.Semaphore(50) + semaphore = asyncio.Semaphore(concurrency)Also applies to: 136-136
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
packages/sample-app/sample_app/experiment/experiment_example.py(1 hunks)packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py(1 hunks)packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py(1 hunks)packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py(1 hunks)packages/traceloop-sdk/traceloop/sdk/experiment/model.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.pypackages/sample-app/sample_app/experiment/experiment_example.pypackages/traceloop-sdk/traceloop/sdk/experiment/experiment.pypackages/traceloop-sdk/traceloop/sdk/experiment/model.pypackages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py
🧠 Learnings (1)
📚 Learning: 2025-08-04T15:35:30.188Z
Learnt from: nina-kollman
PR: traceloop/openllmetry#3219
File: packages/traceloop-sdk/traceloop/sdk/datasets/dataset.py:357-360
Timestamp: 2025-08-04T15:35:30.188Z
Learning: In the traceloop SDK Dataset class, the established error handling pattern is that HTTP client methods return None on failure (after catching and logging RequestException), and the Dataset API methods check for None return values and raise Exception with descriptive messages. The update_row_api method is inconsistent with this pattern.
Applied to files:
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py
🧬 Code graph analysis (5)
packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (2)
packages/traceloop-sdk/traceloop/sdk/evaluator/model.py (5)
InputExtractor(6-7)InputSchemaMapping(10-13)ExecuteEvaluatorRequest(16-21)ExecuteEvaluatorResponse(24-28)ExecutionResponse(39-43)packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py (2)
SSEClient(8-91)wait_for_result(18-70)
packages/sample-app/sample_app/experiment/experiment_example.py (3)
packages/sample-app/sample_app/experiment/medical_prompts.py (2)
refuse_medical_advice_prompt(25-46)provide_medical_info_prompt(6-22)packages/traceloop-sdk/traceloop/sdk/__init__.py (2)
Traceloop(37-274)init(49-206)packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (1)
run(31-157)
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (5)
packages/traceloop-sdk/traceloop/sdk/client/http.py (3)
HTTPClient(7-91)put(74-91)post(23-37)packages/traceloop-sdk/traceloop/sdk/datasets/datasets.py (2)
Datasets(25-207)get_version_jsonl(165-170)packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (2)
Evaluator(16-102)run_experiment_evaluator(38-102)packages/traceloop-sdk/tests/experiment/test_experiment.py (1)
experiment(8-11)packages/traceloop-sdk/traceloop/sdk/experiment/model.py (5)
InitExperimentRequest(18-26)ExperimentInitResponse(51-55)CreateTaskRequest(58-62)CreateTaskResponse(65-68)TaskResponse(10-15)
packages/traceloop-sdk/traceloop/sdk/experiment/model.py (1)
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (1)
run(31-157)
packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py (1)
packages/traceloop-sdk/traceloop/sdk/evaluator/model.py (1)
ExecutionResponse(39-43)
🪛 Ruff (0.12.2)
packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py
47-53: Use a single with statement with multiple contexts instead of nested with statements
Combine with statements
(SIM117)
64-66: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
68-68: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
70-70: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
89-89: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
91-91: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Build Packages (3.11)
- GitHub Check: Test Packages (3.10)
- GitHub Check: Test Packages (3.11)
- GitHub Check: Lint
- GitHub Check: Test Packages (3.12)
🔇 Additional comments (3)
packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (1)
37-103: Good: client lifecycle managed and richer error details includedThe async client is properly closed when owned; HTTP errors include response text; SSE result fetch is cleanly integrated. Looks solid.
packages/sample-app/sample_app/experiment/experiment_example.py (1)
58-67: LGTM: Correct unpacking of run() return value and error printingThe example now respects
(results, errors)return order and prints errors when present.Also applies to: 76-85
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (1)
186-204: LGTM: HTTP error handling follows established SDK patternConsistent None-checks with clear exceptions, typed request/response models, and correct path normalization. Looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Changes requested ❌
Reviewed dceaac4 in 1 minute and 29 seconds. Click for details.
- Reviewed
49lines of code in2files - Skipped
0files when reviewing. - Skipped posting
1draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/sample-app/sample_app/experiment/experiment_example.py:63
- Draft comment:
Added new evaluator 'word-count-medical'; ensure this evaluator is implemented and configured in the backend to match expected behavior. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
Workflow ID: wflow_nm4gWUUNRbnwJDdx
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (3)
151-156: Good: error handling now uses TaskResponse instead of dict probingPrevious feedback flagged
"error" in resulton Pydantic models. This code correctly checks result.error on a TaskResponse, avoiding TypeError and improving clarity.
79-107: Critical: blocking “requests” calls inside async flow will stall the event looprun() is async but invokes synchronous HTTP via requests in _init_experiment, get_version_jsonl, and _create_task. This negates concurrency under load and can freeze other coroutines. Wrap these in asyncio.to_thread (minimal change) or move to an async HTTP client.
Also folds in the previously raised guard for dataset_version when dataset_slug is provided.
Apply this diff:
- experiment = self._init_experiment( - experiment_slug, - dataset_slug=dataset_slug, - dataset_version=dataset_version, - evaluator_slugs=[slug for slug, _ in evaluator_details] - if evaluator_details - else None, - experiment_run_metadata=experiment_run_metadata, - ) + experiment = await asyncio.to_thread( + self._init_experiment, + experiment_slug, + dataset_slug=dataset_slug, + dataset_version=dataset_version, + evaluator_slugs=[slug for slug, _ in evaluator_details] + if evaluator_details + else None, + experiment_run_metadata=experiment_run_metadata, + ) @@ - rows = [] - if dataset_slug: - jsonl_data = self._datasets.get_version_jsonl(dataset_slug, dataset_version) - rows = self._parse_jsonl_to_rows(jsonl_data) + rows = [] + if dataset_slug: + if not dataset_version: + raise ValueError( + "dataset_version is required when dataset_slug is provided" + ) + jsonl_data = await asyncio.to_thread( + self._datasets.get_version_jsonl, dataset_slug, dataset_version + ) + rows = self._parse_jsonl_to_rows(jsonl_data) @@ - task_id = self._create_task( - experiment_slug=experiment_slug, - experiment_run_id=run_id, - task_input=row, - task_output=task_result, - ).id + task_id = ( + await asyncio.to_thread( + self._create_task, + experiment_slug=experiment_slug, + experiment_run_id=run_id, + task_input=row, + task_output=task_result, + ) + ).id
210-225: JSONL/NDJSON parsing drops the first recordThe parser skips the first line assuming a header; NDJSON rarely includes headers. Parse all non-empty lines and ignore invalid JSON.
- lines = jsonl_data.strip().split("\n") - - # Skip the first line (columns definition) - for line in lines[1:]: + lines = jsonl_data.strip().split("\n") + for line in lines: if line.strip(): try: row_data = json.loads(line) rows.append(row_data) except json.JSONDecodeError: # Skip invalid JSON lines continue
🧹 Nitpick comments (7)
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (7)
110-127: Parallelize evaluator calls per row to reduce latencyEvaluator executions are awaited sequentially. Use asyncio.gather to run them concurrently and fold exceptions into the result map.
- if evaluator_details: - for evaluator_slug, evaluator_version in evaluator_details: - try: - eval_result = ( - await self._evaluator.run_experiment_evaluator( - evaluator_slug=evaluator_slug, - evaluator_version=evaluator_version, - task_id=task_id, - experiment_id=experiment.experiment.id, - experiment_run_id=run_id, - input=task_result, - timeout_in_sec=120, - ) - ) - eval_results[evaluator_slug] = eval_result.result - except Exception as e: - eval_results[evaluator_slug] = f"Error: {str(e)}" + if evaluator_details: + eval_tasks = [ + self._evaluator.run_experiment_evaluator( + evaluator_slug=evaluator_slug, + evaluator_version=evaluator_version, + task_id=task_id, + experiment_id=experiment.experiment.id, + experiment_run_id=run_id, + input=task_result, + timeout_in_sec=120, + ) + for evaluator_slug, evaluator_version in evaluator_details + ] + eval_results_list = await asyncio.gather( + *eval_tasks, return_exceptions=True + ) + for (evaluator_slug, _), r in zip( + evaluator_details, eval_results_list + ): + if isinstance(r, Exception): + eval_results[evaluator_slug] = f"Error: {str(r)}" + else: + eval_results[evaluator_slug] = r.resultOptional follow-up: reuse a single httpx.AsyncClient across all evaluator calls (Evaluator.run_experiment_evaluator supports a client parameter) to avoid connection churn. I can draft that if you want.
144-148: Don’t spawn tasks when wait_for_results is FalseCurrently tasks are created before the early return, causing fire-and-forget execution and hidden work. Return before creating tasks.
- tasks = [asyncio.create_task(run_with_semaphore(row)) for row in rows] - - if not wait_for_results: - return [], [] + if not wait_for_results: + return [], [] + + tasks = [asyncio.create_task(run_with_semaphore(row)) for row in rows]
149-161: If stop_on_error is set, cancel remaining tasks to free resourcesOn first error the loop breaks but pending tasks keep running. Cancel them and await with return_exceptions=True to avoid warnings.
- for completed_task in asyncio.as_completed(tasks): + for completed_task in asyncio.as_completed(tasks): try: result = await completed_task if result.error: errors.append(result.error) else: results.append(result) except Exception as e: error_msg = f"Task execution error: {str(e)}" errors.append(error_msg) if stop_on_error: - break + for t in tasks: + if not t.done(): + t.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + break
59-63: Avoid truncating cuid; keep full ID for uniquenessSlicing to 11 chars increases collision risk at scale. Prefer the full cuid unless there’s an external length constraint.
- os.getenv("TRACELOOP_EXP_SLUG") or "exp-" + str(cuid.cuid())[:11] + os.getenv("TRACELOOP_EXP_SLUG") or f"exp-{cuid.cuid()}"If you have a max-length requirement for slugs, note it here and consider hashing.
70-77: Clarify EvaluatorDetails type and input normalizationThe code normalizes evaluators into (slug, version) tuples. Ensure EvaluatorDetails is explicitly typed (e.g., Tuple[str, Optional[str]]) in model.py to match this usage, or widen the annotation here to accept Union[str, Tuple[str, Optional[str]]].
138-138: Make concurrency limit configurableSemaphore(50) is a reasonable default, but exposing it as a parameter (e.g., max_concurrency: int = 50) helps tune for different environments and dataset sizes.
128-136: Minor: include row context in error to aid debugging (optionally)When a row fails, consider including a stable identifier from the row (e.g., row.get("id")) rather than the full row to avoid PII leakage yet keep traceability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
packages/sample-app/sample_app/experiment/experiment_example.py(1 hunks)packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/sample-app/sample_app/experiment/experiment_example.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py
🧠 Learnings (1)
📚 Learning: 2025-08-04T15:35:30.188Z
Learnt from: nina-kollman
PR: traceloop/openllmetry#3219
File: packages/traceloop-sdk/traceloop/sdk/datasets/dataset.py:357-360
Timestamp: 2025-08-04T15:35:30.188Z
Learning: In the traceloop SDK Dataset class, the established error handling pattern is that HTTP client methods return None on failure (after catching and logging RequestException), and the Dataset API methods check for None return values and raise Exception with descriptive messages. The update_row_api method is inconsistent with this pattern.
Applied to files:
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py
🧬 Code graph analysis (1)
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (4)
packages/traceloop-sdk/traceloop/sdk/client/http.py (3)
HTTPClient(7-91)put(74-91)post(23-37)packages/traceloop-sdk/traceloop/sdk/datasets/datasets.py (2)
Datasets(25-207)get_version_jsonl(165-170)packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (2)
Evaluator(16-102)run_experiment_evaluator(38-102)packages/traceloop-sdk/traceloop/sdk/experiment/model.py (5)
InitExperimentRequest(18-26)ExperimentInitResponse(51-55)CreateTaskRequest(58-62)CreateTaskResponse(65-68)TaskResponse(10-15)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Lint
- GitHub Check: Test Packages (3.11)
- GitHub Check: Test Packages (3.10)
- GitHub Check: Test Packages (3.12)
- GitHub Check: Build Packages (3.11)
| task=medical_task_refuse_advice, | ||
| evaluators=["medical_advice", "word-count-medical"], | ||
| experiment_slug="medical-advice-exp", | ||
| stop_on_error=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add an option to not await on eval results
| print( | ||
| "\033[95m🔬 Running experiment with clinical guidance prompt (refuses medical advice)...\033[0m" | ||
| ) | ||
| results_1, errors_1 = asyncio.run( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: make the run experiment method async and put the asyncio.run when calling run_experiment_example
| experiment_id=experiment_id, | ||
| experiment_run_id=experiment_run_id, | ||
| ) | ||
| api_endpoint = os.environ.get("TRACELOOP_BASE_URL", "https://api.traceloop.com") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't be here - you should get the API endpoint from the http client when initializing the evaluator class (similar to other APIs - https://github.com/traceloop/openllmetry/blob/main/packages/traceloop-sdk/traceloop/sdk/annotation/base_annotation.py)
| from .model import ExecutionResponse | ||
|
|
||
|
|
||
| class SSEClient: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this shouldn't be here - should be in the general http client
| """Handles Server-Sent Events streaming""" | ||
|
|
||
| def __init__(self, shared_client: httpx.AsyncClient): | ||
| self._api_endpoint = os.environ.get( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do not get it here, you should get it from the outside - users may set the URL and api key when initializing traceloop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important
Looks good to me! 👍
Reviewed 19e9246 in 1 minute and 41 seconds. Click for details.
- Reviewed
146lines of code in4files - Skipped
0files when reviewing. - Skipped posting
5draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/sample-app/sample_app/experiment/experiment_example.py:49
- Draft comment:
Potential async task mismatch: The experiment tasks (e.g. medical_task_refuse_advice and medical_task_provide_info) are now defined as async functions and are passed to client.experiment.run. However, the SDK’s run method appears to invoke the task synchronously (e.g. via task(row)) rather than awaiting them. Ensure that Experiment.run is updated to properly await asynchronous tasks to avoid returning coroutine objects. - Reason this comment was not posted:
Comment looked like it was already resolved.
2. packages/sample-app/sample_app/experiment/experiment_example.py:90
- Draft comment:
Correct use of asyncio.run in the main block ensures asynchronous functions are properly executed. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
3. packages/traceloop-sdk/traceloop/sdk/client/client.py:11
- Draft comment:
Minor formatting change: Extra blank line removed between imports improves code consistency. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
4. packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py:19
- Draft comment:
Formatting improvement: The removal of an extra newline in the Evaluator class leads to cleaner code. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
5. packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py:3
- Draft comment:
Cleanup: The unused import 'os' has been removed, reducing unnecessary dependencies. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
Workflow ID: wflow_f0amdRo946luxuIo
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (1)
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (1)
227-242: Parse all JSONL lines; don’t drop the first record unconditionallyUnconditionally skipping the first line loses data for standard NDJSON. Attempt to parse every non-empty line; skip only invalid JSON lines.
Apply this diff:
def _parse_jsonl_to_rows(self, jsonl_data: str) -> List[Dict[str, Any]]: """Parse JSONL string into list of {col_name: col_value} dictionaries""" rows = [] lines = jsonl_data.strip().split("\n") - - # Skip the first line (columns definition) - for line in lines[1:]: + for line in lines: if line.strip(): try: row_data = json.loads(line) rows.append(row_data) except json.JSONDecodeError: # Skip invalid JSON lines continue return rows
🧹 Nitpick comments (7)
packages/traceloop-sdk/traceloop/sdk/client/client.py (1)
55-60: Add X-Traceloop-SDK-Version to async client headers for parity with sync HTTPClientThis helps server-side analytics and consistent version tracking.
Apply this diff:
self._async_http = httpx.AsyncClient( headers={ "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", "User-Agent": f"traceloop-sdk/{__version__}", + "X-Traceloop-SDK-Version": __version__, }, timeout=httpx.Timeout(120.0), )packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (2)
86-99: Docstring and parameters are out of sync (mentions client/context_data that don’t exist after refactor)After switching to instance methods, remove stale parameters from docstrings and keep Args in sync.
Apply this diff:
- Args: - evaluator_slug: Slug of the evaluator to execute - input: Dict mapping evaluator input field names to their values. {field_name: value, ...} - client: Shared HTTP client for connection reuse (optional) - context_data: Context data to be passed to the evaluator (optional) - evaluator_version: Version of the evaluator to execute (optional) - timeout_in_sec: Timeout in seconds for execution + Args: + evaluator_slug: Slug of the evaluator to execute + input: Dict mapping evaluator input field names to their values. {field_name: value, ...} + evaluator_version: Version of the evaluator to execute (optional) + timeout_in_sec: Timeout in seconds for execution
1-13: SSEClient relies on env for base URL and API key; prefer dependency injectionSSEClient builds Authorization from TRACELOOP_API_KEY and base URL from TRACELOOP_BASE_URL, ignoring headers/base URL on the shared client. If users configure a custom endpoint or non-env API key, streaming will 401 or hit the wrong host.
- Pass base URL and API key explicitly to SSEClient, or reuse the shared client’s base URL and headers.
- Minimal change: let SSEClient accept api_endpoint and reuse the shared client’s default headers unless overridden.
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (4)
79-90: Avoid shadowing “experiment”; rename local to clarify and fix referencesThe variable named experiment refers to the init response, not the class. Rename for clarity and to avoid confusion.
Apply this diff:
- experiment = self._init_experiment( + exp_init = self._init_experiment( experiment_slug, dataset_slug=dataset_slug, dataset_version=dataset_version, evaluator_slugs=[slug for slug, _ in evaluator_details] if evaluator_details else None, experiment_run_metadata=experiment_run_metadata, ) - run_id = experiment.run.id + run_id = exp_init.run.id @@ - experiment_id=experiment.experiment.id, + experiment_id=exp_init.experiment.id, @@ - experiment_id=experiment.experiment.id, + experiment_id=exp_init.experiment.id,Also applies to: 115-121, 127-134
91-95: Validate dataset_version when dataset_slug is providedCalling get_version_jsonl with a None version would hit a bad endpoint. Make this explicit.
Apply this diff:
- rows = [] - if dataset_slug and dataset_version: - jsonl_data = self._datasets.get_version_jsonl(dataset_slug, dataset_version) - rows = self._parse_jsonl_to_rows(jsonl_data) + rows = [] + if dataset_slug: + if not dataset_version: + raise ValueError("dataset_version is required when dataset_slug is provided") + jsonl_data = self._datasets.get_version_jsonl(dataset_slug, dataset_version) + rows = self._parse_jsonl_to_rows(jsonl_data)
99-107: Support both sync and async task callablesEnable users to provide async tasks without wrapping.
Apply this diff:
- try: - task_result = task(row) + try: + result = task(row) + # Support async task functions + if hasattr(result, "__await__"): + task_result = await result + else: + task_result = resultAdditionally, add this import at the top of the file:
# near other imports from collections.abc import Awaitable # optional if you prefer isinstance checks
61-69: Slug generation: ensure deterministic prefix and length if slug not providedNot blocking, but consider centralizing slug generation (prefix, length) to avoid accidental changes and aid observability.
If helpful, extract to a helper:
def _default_slug(self) -> str: return f"exp_{cuid.cuid()[:12]}"Then use self._experiment_slug or self._default_slug().
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
packages/sample-app/sample_app/experiment/experiment_example.py(1 hunks)packages/traceloop-sdk/traceloop/sdk/client/client.py(3 hunks)packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py(1 hunks)packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py(1 hunks)packages/traceloop-sdk/traceloop/sdk/experiment/model.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- packages/sample-app/sample_app/experiment/experiment_example.py
- packages/traceloop-sdk/traceloop/sdk/experiment/model.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.pypackages/traceloop-sdk/traceloop/sdk/client/client.pypackages/traceloop-sdk/traceloop/sdk/experiment/experiment.py
🧠 Learnings (1)
📚 Learning: 2025-08-04T15:35:30.188Z
Learnt from: nina-kollman
PR: traceloop/openllmetry#3219
File: packages/traceloop-sdk/traceloop/sdk/datasets/dataset.py:357-360
Timestamp: 2025-08-04T15:35:30.188Z
Learning: In the traceloop SDK Dataset class, the established error handling pattern is that HTTP client methods return None on failure (after catching and logging RequestException), and the Dataset API methods check for None return values and raise Exception with descriptive messages. The update_row_api method is inconsistent with this pattern.
Applied to files:
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py
🧬 Code graph analysis (3)
packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (2)
packages/traceloop-sdk/traceloop/sdk/evaluator/model.py (5)
InputExtractor(6-7)InputSchemaMapping(10-13)ExecuteEvaluatorRequest(16-21)ExecuteEvaluatorResponse(24-28)ExecutionResponse(39-43)packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py (2)
SSEClient(8-91)wait_for_result(18-70)
packages/traceloop-sdk/traceloop/sdk/client/client.py (3)
packages/traceloop-sdk/tests/experiment/test_experiment.py (1)
experiment(8-11)packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (1)
Experiment(19-242)packages/traceloop-sdk/traceloop/sdk/client/http.py (1)
HTTPClient(7-91)
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (4)
packages/traceloop-sdk/traceloop/sdk/client/http.py (3)
HTTPClient(7-91)put(74-91)post(23-37)packages/traceloop-sdk/traceloop/sdk/datasets/datasets.py (2)
Datasets(25-207)get_version_jsonl(165-170)packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (3)
Evaluator(15-154)run_experiment_evaluator(75-118)trigger_experiment_evaluator(121-154)packages/traceloop-sdk/traceloop/sdk/experiment/model.py (5)
InitExperimentRequest(18-26)ExperimentInitResponse(51-55)CreateTaskRequest(58-62)CreateTaskResponse(65-68)TaskResponse(10-15)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important
Looks good to me! 👍
Reviewed 5f6941a in 39 seconds. Click for details.
- Reviewed
79lines of code in2files - Skipped
0files when reviewing. - Skipped posting
2draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py:24
- Draft comment:
Ensure the 'Authorization' header is valid; using self.client.headers.get('Authorization') may yield None, resulting in 'Bearer None'. - Reason this comment was not posted:
Confidence changes required:50%<= threshold50%None
2. packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py:101
- Draft comment:
Awaiting the task call; ensure the provided task is an async function as required. - Reason this comment was not posted:
Comment did not seem useful. Confidence is useful =0%<= threshold50%The comment is asking the PR author to ensure that the provided task is an async function. This falls under the rule of not asking the author to ensure behavior is intended or to double-check things. It doesn't provide a specific suggestion or point out a clear issue.
Workflow ID: wflow_5PMC6lIBAGnuSj2U
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (4)
packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py (2)
7-12: Keep transport concerns centralized (auth/base URL) in the general HTTP clientConstructing auth headers and base URLs here duplicates responsibilities that belong to your shared HTTP client. Prefer: accept a fully configured
AsyncClientand avoid re-deriving auth or base URL logic locally. This aligns with prior feedback.Action: Remove local auth/header construction (except stream-specific headers) and rely on the shared client’s configuration.
29-35: Robustly build full_stream_url (handle absolute URLs and pre-prefixed “/v2”)Current concatenation can yield “/v2/v2/…” or malformed URLs. Normalize cases: absolute URLs, already “/v2/”-prefixed paths, and bare paths.
- full_stream_url = f"{self.client.base_url}/v2{stream_url}" + # Build robust stream URL + if stream_url.startswith("http://") or stream_url.startswith("https://"): + full_stream_url = stream_url + elif stream_url.startswith("/v2/"): + full_stream_url = f"{self.client.base_url}{stream_url}" + else: + prefix = "" if stream_url.startswith("/") else "/" + full_stream_url = f"{self.client.base_url}/v2{prefix}{stream_url}"packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (2)
27-31: Pass base_url into Evaluator to decouple from env and align with HTTP client configInstantiate Evaluator with the same base_url as HTTPClient to avoid hidden environment coupling and ensure requests hit the intended API host.
Apply this diff if Evaluator supports base_url:
- self._evaluator = Evaluator(async_http_client) + self._evaluator = Evaluator(async_http_client, http_client.base_url)
227-242: Parse all JSONL lines; don’t drop the first record unconditionallyNDJSON/JSONL does not include a header by default. Skipping the first line can drop a valid record. Attempt to parse every non-empty line and ignore only invalid JSON.
def _parse_jsonl_to_rows(self, jsonl_data: str) -> List[Dict[str, Any]]: """Parse JSONL string into list of {col_name: col_value} dictionaries""" rows = [] lines = jsonl_data.strip().split("\n") - # Skip the first line (columns definition) - for line in lines[1:]: + for line in lines: if line.strip(): try: row_data = json.loads(line) rows.append(row_data) except json.JSONDecodeError: # Skip invalid JSON lines continue return rowsEnsure tests in tests/experiment/test_experiment.py cover:
- JSONL with all-valid lines (first line is data).
- JSONL with a header or malformed first line (ignored gracefully).
🧹 Nitpick comments (12)
packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py (5)
31-36: Streaming timeout semantics — consider disabling read timeout for SSEFor long-running streams, a read timeout equal to
timeout_in_seccan prematurely abort an otherwise healthy stream. Prefer a connect/write timeout and no read timeout for true SSE behavior, or expose a separateread_timeoutparameter.- timeout=httpx.Timeout(timeout_in_sec), + # For streaming, it's safer to allow indefinite reads + timeout=httpx.Timeout(connect=timeout_in_sec, read=None, write=timeout_in_sec),
46-53: Preserve stack traces via exception chaining (raise ... from e)Re-raising without chaining loses the original context and violates Ruff B904.
- except httpx.ConnectError as e: - raise Exception( - f"Failed to connect to stream URL: {full_stream_url}. Error: {e}" - ) - except httpx.TimeoutException as e: - raise Exception(f"Stream request timed out: {e}") - except Exception as e: - raise Exception(f"Unexpected error in SSE stream: {e}") + except httpx.ConnectError as e: + raise Exception( + f"Failed to connect to stream URL: {full_stream_url}. Error: {e}" + ) from e + except httpx.TimeoutException as e: + raise Exception(f"Stream request timed out: {e}") from e + except Exception as e: + raise Exception(f"Unexpected error in SSE stream: {e}") from e
55-61: Decode error body and type-annotate responseReturn a readable error body and add a precise type hint.
- async def _handle_sse_response(self, response) -> ExecutionResponse: + async def _handle_sse_response(self, response: httpx.Response) -> ExecutionResponse: """Handle SSE response: check status and parse result""" if response.status_code != 200: - error_text = await response.aread() + error_bytes = await response.aread() + error_text = error_bytes.decode("utf-8", errors="replace") raise Exception( f"Failed to stream results: {response.status_code}, body: {error_text}" )
66-74: Support SSE “data:” frames and chain parsing exceptionsServers often send SSE as lines prefixed with
data:; parse the last data payload if present. Also apply exception chaining (B904).def _parse_sse_result(self, response_text: str) -> ExecutionResponse: """Parse SSE response text into ExecutionResponse""" try: - response_data = json.loads(response_text) + # Support both raw JSON and SSE-formatted payloads (`data: { ... }`) + if "data:" in response_text: + data_lines = [ + ln[len("data:"):].strip() + for ln in response_text.splitlines() + if ln.startswith("data:") + ] + payload = data_lines[-1] if data_lines else response_text + else: + payload = response_text + response_data = json.loads(payload) return ExecutionResponse(**response_data) except json.JSONDecodeError as e: - raise Exception(f"Failed to parse SSE result as JSON: {e}") + raise Exception(f"Failed to parse SSE result as JSON: {e}") from e except Exception as e: - raise Exception(f"Failed to parse response into ExecutionResponse: {e}") + raise Exception(f"Failed to parse response into ExecutionResponse: {e}") from e
63-65: Optional: true event streaming vs. whole-body readReading the entire body with
aread()defeats incremental streaming. If you expect multiple SSE events, iterate lines withasync for line in response.aiter_lines()and emit events until completion.I can sketch a minimal streaming loop if you confirm that multiple events are expected from this endpoint.
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (7)
22-26: Remove or initialize unused attribute _async_http_clientThe private attribute is declared but never set/used. Either assign it in init or remove the field to satisfy Flake8 and reduce noise.
- _async_http_client: httpx.AsyncClientIf you intend to keep it for later use:
def __init__(self, http_client: HTTPClient, async_http_client: httpx.AsyncClient, experiment_slug: str): self._datasets = Datasets(http_client) self._evaluator = Evaluator(async_http_client) self._http_client = http_client self._experiment_slug = experiment_slug + self._async_http_client = async_http_client
79-87: Avoid blocking the event loop during experiment initialization_init_experiment performs synchronous network I/O (requests). Wrap the call with asyncio.to_thread to keep the event loop responsive when run() is awaited in async contexts.
- experiment = self._init_experiment( - experiment_slug, - dataset_slug=dataset_slug, - dataset_version=dataset_version, - evaluator_slugs=[slug for slug, _ in evaluator_details] - if evaluator_details - else None, - experiment_run_metadata=experiment_run_metadata, - ) + experiment = await asyncio.to_thread( + self._init_experiment, + experiment_slug, + dataset_slug=dataset_slug, + dataset_version=dataset_version, + evaluator_slugs=[slug for slug, _ in evaluator_details] if evaluator_details else None, + experiment_run_metadata=experiment_run_metadata, + )
91-95: Validate dataset_version when dataset_slug is providedCurrently, providing dataset_slug without dataset_version silently yields zero rows. This is error-prone. Fail fast with a clear message.
rows = [] -if dataset_slug and dataset_version: +if dataset_slug and dataset_version: jsonl_data = self._datasets.get_version_jsonl(dataset_slug, dataset_version) rows = self._parse_jsonl_to_rows(jsonl_data) +elif dataset_slug and not dataset_version: + raise ValueError("dataset_version is required when dataset_slug is provided")
109-142: Run evaluators per row concurrently to reduce per-row latencyEvaluator executions for a row are currently sequential. If evaluator endpoints are independent, parallelize them to reduce latency, especially with many evaluators.
- if evaluator_details: - for evaluator_slug, evaluator_version in evaluator_details: - try: - if wait_for_results: - eval_result = ( - await self._evaluator.run_experiment_evaluator( - evaluator_slug=evaluator_slug, - evaluator_version=evaluator_version, - task_id=task_id, - experiment_id=experiment.experiment.id, - experiment_run_id=run_id, - input=task_result, - timeout_in_sec=120, - ) - ) - eval_results[evaluator_slug] = eval_result.result - else: - await self._evaluator.trigger_experiment_evaluator( - evaluator_slug=evaluator_slug, - evaluator_version=evaluator_version, - task_id=task_id, - experiment_id=experiment.experiment.id, - experiment_run_id=run_id, - input=task_result, - ) - - eval_results[evaluator_slug] = ( - f"Triggered execution of {evaluator_slug}" - ) - - except Exception as e: - eval_results[evaluator_slug] = f"Error: {str(e)}" + if evaluator_details: + async def run_one(e_slug: str, e_version: Optional[str]): + try: + if wait_for_results: + r = await self._evaluator.run_experiment_evaluator( + evaluator_slug=e_slug, + evaluator_version=e_version, + task_id=task_id, + experiment_id=experiment.experiment.id, + experiment_run_id=run_id, + input=task_result, + timeout_in_sec=120, + ) + return e_slug, r.result + else: + await self._evaluator.trigger_experiment_evaluator( + evaluator_slug=e_slug, + evaluator_version=e_version, + task_id=task_id, + experiment_id=experiment.experiment.id, + experiment_run_id=run_id, + input=task_result, + ) + return e_slug, f"Triggered execution of {e_slug}" + except Exception as e: + return e_slug, f"Error: {str(e)}" + + pairs = await asyncio.gather( + *[run_one(s, v) for s, v in evaluator_details], + return_exceptions=False, + ) + eval_results = {slug: res for slug, res in pairs}Note: If Evaluator closes the shared AsyncClient internally, concurrent calls may fail; see separate note.
121-123: Make evaluator timeout configurableHard-coding 120s can be too short or too long depending on evaluator behavior. Consider a parameter (e.g., evaluator_timeout_sec: int = 120) on run(), passed through here.
61-63: Minor: consider not truncating CUIDsSlicing CUID to 11 chars slightly increases collision risk under high throughput. Not a blocker, but consider using the full cuid() or a short, collision-resistant slug variant if the library provides one.
27-31: Double-check Evaluator’s client lifecycleIf Evaluator closes the provided httpx.AsyncClient per request, subsequent calls here will fail. Ensure Evaluator either:
- Creates and closes ephemeral clients per call, or
- Treats the injected client as owned by the caller and does not close it.
If it currently closes the injected client, pass a new AsyncClient per invocation or refactor Evaluator to not close caller-owned clients.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py(1 hunks)packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.pypackages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py
🧠 Learnings (1)
📚 Learning: 2025-08-04T15:35:30.188Z
Learnt from: nina-kollman
PR: traceloop/openllmetry#3219
File: packages/traceloop-sdk/traceloop/sdk/datasets/dataset.py:357-360
Timestamp: 2025-08-04T15:35:30.188Z
Learning: In the traceloop SDK Dataset class, the established error handling pattern is that HTTP client methods return None on failure (after catching and logging RequestException), and the Dataset API methods check for None return values and raise Exception with descriptive messages. The update_row_api method is inconsistent with this pattern.
Applied to files:
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py
🧬 Code graph analysis (2)
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (4)
packages/traceloop-sdk/traceloop/sdk/client/http.py (3)
HTTPClient(7-91)put(74-91)post(23-37)packages/traceloop-sdk/traceloop/sdk/datasets/datasets.py (2)
Datasets(25-207)get_version_jsonl(165-170)packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (3)
Evaluator(15-154)run_experiment_evaluator(75-118)trigger_experiment_evaluator(121-154)packages/traceloop-sdk/traceloop/sdk/experiment/model.py (5)
InitExperimentRequest(18-26)ExperimentInitResponse(51-55)CreateTaskRequest(58-62)CreateTaskResponse(65-68)TaskResponse(10-15)
packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py (1)
packages/traceloop-sdk/traceloop/sdk/evaluator/model.py (1)
ExecutionResponse(39-43)
🪛 Ruff (0.12.2)
packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py
47-49: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
51-51: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
53-53: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
72-72: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
74-74: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Test Packages (3.10)
- GitHub Check: Test Packages (3.12)
- GitHub Check: Build Packages (3.11)
- GitHub Check: Lint
🔇 Additional comments (2)
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (2)
153-159: Good use of a semaphore to cap concurrencyThe concurrency guard (50) prevents overload. Nice.
161-165: Fire-and-forget mode is handled safelyYou still await task creation/triggers to complete, avoiding orphaned tasks. Returning ([], []) is consistent with the docstring.
| headers = { | ||
| "Authorization": f"Bearer {self.client.headers.get('Authorization')}", | ||
| "Accept": "text/event-stream", | ||
| "Cache-Control": "no-cache", | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix Authorization header handling — avoid double “Bearer ” and missing auth
Prepending "Bearer " to whatever is already in self.client.headers["Authorization"] can produce headers like Bearer Bearer <token> or Bearer None. Let the shared client supply Authorization, and fail fast if it’s missing.
- headers = {
- "Authorization": f"Bearer {self.client.headers.get('Authorization')}",
- "Accept": "text/event-stream",
- "Cache-Control": "no-cache",
- }
+ # Let the shared client carry Authorization to avoid duplicating "Bearer "
+ headers = {
+ "Accept": "text/event-stream",
+ "Cache-Control": "no-cache",
+ }
+ if "Authorization" not in self.client.headers:
+ raise ValueError(
+ "Missing Authorization in shared_client headers; "
+ "initialize the client with proper auth before using SSEClient."
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| headers = { | |
| "Authorization": f"Bearer {self.client.headers.get('Authorization')}", | |
| "Accept": "text/event-stream", | |
| "Cache-Control": "no-cache", | |
| } | |
| # Let the shared client carry Authorization to avoid duplicating "Bearer " | |
| headers = { | |
| "Accept": "text/event-stream", | |
| "Cache-Control": "no-cache", | |
| } | |
| if "Authorization" not in self.client.headers: | |
| raise ValueError( | |
| "Missing Authorization in shared_client headers; " | |
| "initialize the client with proper auth before using SSEClient." | |
| ) |
🤖 Prompt for AI Agents
In packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py around lines
23 to 27, the code naively prepends "Bearer " to
self.client.headers.get('Authorization'), which can produce "Bearer Bearer
<token>" or "Bearer None"; instead read the raw authorization value from
self.client.headers, fail fast if it's missing (raise a clear exception), and
set the Authorization header to that exact value (do not add another "Bearer "
prefix); keep Accept and Cache-Control as-is.
| import asyncio | ||
| import json | ||
| from typing import Any, List, Callable, Optional, Tuple, Dict |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Fix type hint for task and support both sync and async callables; avoid blocking the event loop during task creation
- The current type hints suggest a sync callable but you always await it. Either require an async callable or support both.
- Also, _create_task uses a synchronous HTTP client (requests) inside an async function, which blocks the event loop. Offload to a thread.
-import asyncio
+import asyncio
+import inspect
import json
-from typing import Any, List, Callable, Optional, Tuple, Dict
+from typing import Any, List, Callable, Optional, Tuple, Dict, Awaitable, Union
@@
- task: Callable[[Optional[Dict[str, Any]]], Dict[str, Any]],
+ task: Callable[[Optional[Dict[str, Any]]], Union[Dict[str, Any], Awaitable[Dict[str, Any]]]],
@@
- try:
- task_result = await task(row)
- task_id = self._create_task(
- experiment_slug=experiment_slug,
- experiment_run_id=run_id,
- task_input=row,
- task_output=task_result,
- ).id
+ try:
+ maybe = task(row)
+ task_result = await maybe if inspect.isawaitable(maybe) else maybe
+ task_id = (
+ await asyncio.to_thread(
+ self._create_task,
+ experiment_slug=experiment_slug,
+ experiment_run_id=run_id,
+ task_input=row,
+ task_output=task_result,
+ )
+ ).idVerification tip: ensure all current call sites pass either an async def or a regular def; both will now work. If you prefer to mandate async, change the type to Awaitable[...] and drop the inspect branch.
Also applies to: 35-35, 99-107
🏁 Script executed:
#!/usr/bin/env bash
set -e
# List files in the experiment directory
ls -R packages/traceloop-sdk/traceloop/sdk/experiment
# Show the first 200 lines of experiment.py to inspect imports and signature
sed -n '1,200p' packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py
# Locate the definition of _create_task
grep -R "def _create_task" -n packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py
# Show lines around _create_task implementation (context)
sed -n '80,140p' packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py
# Check for any use of a synchronous HTTP client (requests)
grep -R "import requests" -n packages/traceloop-sdk/traceloop/sdk/experiment/experiment.pyLength of output: 11069
🏁 Script executed:
#!/usr/bin/env bash
set -e
# Show lines around _create_task implementation for verification
sed -n '200,260p' packages/traceloop-sdk/traceloop/sdk/experiment/experiment.pyLength of output: 1605
Require Async Task or Support Both Sync & Async; Offload _create_task Calls to Thread
The run method’s task parameter is currently declared as a synchronous callable (Callable[[Optional[Dict[str, Any]]], Dict[str, Any]]), yet in run_single_row you always await task(row). Meanwhile, _create_task uses the synchronous HTTPClient.post inside an async function, which will block the event loop. To address both issues:
• Update the task type hint to handle both sync and async callables, or restrict to async-only:
- Option A (async-only):
from typing import Awaitable async def run( self, task: Callable[[Optional[Dict[str, Any]]], Awaitable[Dict[str, Any]]], ... ) -> Tuple[...]:
- Option B (both sync & async):
Then in
import inspect from typing import Awaitable, Union async def run( self, task: Callable[[Optional[Dict[str, Any]]], Union[Dict[str, Any], Awaitable[Dict[str, Any]]]], ... ) -> Tuple[...]:
run_single_row:maybe = task(row) task_result = await maybe if inspect.isawaitable(maybe) else maybe
• Offload _create_task to a thread to avoid blocking:
- task_id = self._create_task(
- experiment_slug=experiment_slug,
- experiment_run_id=run_id,
- task_input=row,
- task_output=task_result,
- ).id
+ task_id = (
+ await asyncio.to_thread(
+ self._create_task,
+ experiment_slug,
+ run_id,
+ row,
+ task_result,
+ )
+ ).idLocations:
- packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py:
•async def run(... task: Callable... )(lines 8–11)
•run_single_rowbody (lines 35–70) - Same file:
_create_taskdefinition (lines 208–217)
Addressing these will ensure correct type-checking and prevent blocking the event loop when creating tasks.
🤖 Prompt for AI Agents
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py lines ~2-70 and
~208-217: The run() signature currently types task as a sync Callable but
run_single_row always awaits task and _create_task calls blocking
HTTPClient.post inside async code; update the type hints and runtime handling so
task can be async or sync (either change to an Awaitable-returning Callable for
async-only or accept Union[Dict, Awaitable[Dict]] and use inspect.isawaitable to
await only when needed), add necessary imports (inspect, Awaitable/Union), and
in run_single_row call the task and await conditionally; additionally offload
the blocking _create_task work to a thread executor (use asyncio.to_thread or
loop.run_in_executor) so HTTPClient.post does not block the event loop and
adjust _create_task to be invoked appropriately from async context.
| for completed_task in asyncio.as_completed(tasks): | ||
| try: | ||
| result = await completed_task | ||
| if result.error: | ||
| errors.append(result.error) | ||
| else: | ||
| results.append(result) | ||
| except Exception as e: | ||
| error_msg = f"Task execution error: {str(e)}" | ||
| errors.append(error_msg) | ||
| if stop_on_error: | ||
| break | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Cancel outstanding tasks when stop_on_error is triggered
Breaking out of the loop leaves other tasks running. Cancel them to free resources and avoid noisy background exceptions.
for completed_task in asyncio.as_completed(tasks):
try:
result = await completed_task
if result.error:
errors.append(result.error)
else:
results.append(result)
except Exception as e:
error_msg = f"Task execution error: {str(e)}"
errors.append(error_msg)
if stop_on_error:
- break
+ # Cancel remaining tasks and wait for their cancellation
+ pending = [t for t in tasks if not t.done()]
+ for t in pending:
+ t.cancel()
+ await asyncio.gather(*pending, return_exceptions=True)
+ break📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for completed_task in asyncio.as_completed(tasks): | |
| try: | |
| result = await completed_task | |
| if result.error: | |
| errors.append(result.error) | |
| else: | |
| results.append(result) | |
| except Exception as e: | |
| error_msg = f"Task execution error: {str(e)}" | |
| errors.append(error_msg) | |
| if stop_on_error: | |
| break | |
| for completed_task in asyncio.as_completed(tasks): | |
| try: | |
| result = await completed_task | |
| if result.error: | |
| errors.append(result.error) | |
| else: | |
| results.append(result) | |
| except Exception as e: | |
| error_msg = f"Task execution error: {str(e)}" | |
| errors.append(error_msg) | |
| if stop_on_error: | |
| # Cancel remaining tasks and wait for their cancellation | |
| pending = [t for t in tasks if not t.done()] | |
| for t in pending: | |
| t.cancel() | |
| await asyncio.gather(*pending, return_exceptions=True) | |
| break |
🤖 Prompt for AI Agents
In packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py around lines
166 to 178, when stop_on_error causes the loop to break the other asyncio tasks
are left running; modify the error branch so that before breaking you collect
the remaining tasks that are not done, call cancel() on each, then await them
(use asyncio.gather on the pending tasks with return_exceptions=True) to ensure
they are cancelled and their exceptions are consumed; this will free resources
and prevent noisy background exceptions.
nirga
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approved, note the bug with the class methods in the evaluator class - I think coderabbit's suggestion is good
| api_endpoint = os.environ.get("TRACELOOP_BASE_URL", "https://api.traceloop.com") | ||
| body = request.model_dump() | ||
|
|
||
| client = cls._async_http_client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can't access it from a class method (it's in an object)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important
Looks good to me! 👍
Reviewed d788cbb in 2 minutes and 2 seconds. Click for details.
- Reviewed
170lines of code in3files - Skipped
0files when reviewing. - Skipped posting
5draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/traceloop-sdk/tests/experiment/test_experiment.py:11
- Draft comment:
Setting base_url on the mocked HTTPClient is good; ensure the HTTPClient mock interface includes this attribute for downstream usage. - Reason this comment was not posted:
Comment did not seem useful. Confidence is useful =20%<= threshold50%The comment is suggesting to ensure that the mock interface includes a specific attribute. This is a form of asking the author to double-check something, which is against the rules. The comment does not provide a specific code suggestion or ask for a specific test to be written.
2. packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py:22
- Draft comment:
The new 'api_endpoint' parameter in the Evaluator constructor ensures the endpoint is explicitly provided. Consider renaming it to 'base_url' for consistency with other parts of the code. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 10% vs. threshold = 50% The comment is about a changed part of the code (new parameter). However, the suggestion to rename for consistency requires knowledge of other parts of the codebase that I can't see. Without evidence of this consistency issue, I can't verify if this rename would actually improve anything. The current name api_endpoint is clear and descriptive. The parameter name api_endpoint is actually quite clear and specific - it explicitly indicates this is an API endpoint. base_url would be more generic and potentially less precise. Even if other parts of the code use base_url, api_endpoint might be a better name here since it's more specific to this use case. This comment should be deleted because it suggests a rename without clear evidence that it would improve consistency, and the current name is already clear and appropriate.
3. packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py:46
- Draft comment:
The try/except block in _execute_evaluator_request was removed. Consider adding contextual error handling or using a custom exception to preserve error details. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 10% vs. threshold = 50% The code still handles the most important error case (non-200 responses) explicitly. The removed try/except was very generic and just re-wrapped exceptions with a new message, which doesn't add much value. The current approach of letting HTTP client errors propagate directly provides more detailed error information. The change actually seems like an improvement in error handling. I could be wrong about the value of wrapping exceptions - there might be a good reason to normalize all errors into a common format that I'm not seeing. While normalizing errors can be useful, the generic try/except was just adding a wrapper message without preserving the original error details. The current approach provides better error information. The comment should be deleted. The error handling change appears to be an intentional improvement that provides better error details, not a regression that needs to be fixed.
4. packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py:91
- Draft comment:
run_experiment_evaluator no longer wraps the evaluator call in a try/except block. Ensure that upstream callers are prepared to handle these raw exceptions if additional context is needed. - Reason this comment was not posted:
Comment looked like it was already resolved.
5. packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py:28
- Draft comment:
Passing http_client.base_url to Evaluator is a good approach for consistency. For clarity, consider renaming the Evaluator constructor parameter from 'api_endpoint' to 'base_url'. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable: usefulness confidence = 20% vs. threshold = 50% While the suggestion for consistency makes sense in principle, I cannot verify if 'api_endpoint' is actually the current parameter name since I don't have access to the Evaluator class definition. The comment is making assumptions about code I cannot see. Additionally, parameter renaming suggestions are relatively minor and could be considered obvious refactoring. I might be too quick to dismiss a valid consistency improvement. Parameter naming consistency across a codebase can be important for maintainability. Without seeing the Evaluator class, I cannot verify if this suggestion is correct or if the inconsistency actually exists. We should err on the side of removing comments that make assumptions about unseen code. Delete this comment because it makes assumptions about code we cannot verify, and parameter renaming suggestions are relatively minor.
Workflow ID: wflow_SdN9P4z6V5FTdNar
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (1)
14-21: Good shift to instance-based design and injected clientConverting to instance methods with an injected AsyncClient resolves the earlier classmethod/closure issues and avoids closing a shared client. This aligns with previous review feedback.
🧹 Nitpick comments (4)
packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (4)
22-25: Normalize api_endpoint; avoid double slashes and align with client.base_urlSanitize the injected api_endpoint to prevent accidental double slashes when composing URLs. Also, consider ensuring the provided AsyncClient has base_url configured consistently with api_endpoint to keep SSEClient behavior correct.
Apply:
- self._api_endpoint = api_endpoint + # normalize to avoid trailing slashes (e.g., "https://api.../" -> "https://api...") + self._api_endpoint = api_endpoint.rstrip("/")Optionally, validate that async_http_client.base_url (if set) matches api_endpoint to prevent SSE stream URL mismatches.
31-37: Don’t shadow built-in name ‘input’; rename to ‘inputs’ and fix docstringsUsing the parameter name input shadows Python’s built-in, hurts readability, and can trigger linters. Rename across signatures, usages, and docstrings.
- input: Dict[str, str], + inputs: Dict[str, str], @@ - root={k: InputExtractor(source=v) for k, v in input.items()} + root={k: InputExtractor(source=v) for k, v in inputs.items()} @@ - input: Dict[str, str], + inputs: Dict[str, str], @@ - input: Dict mapping evaluator input field names to their values. {field_name: value, ...} - client: Shared HTTP client for connection reuse (optional) - context_data: Context data to be passed to the evaluator (optional) + inputs: Dict mapping evaluator input field names to their values. {field_name: value, ...} @@ - input: Dict[str, str], + inputs: Dict[str, str], @@ - input: Dict mapping evaluator input field names to their values - evaluator_version: Version of the evaluator to execute (optional) - client: Shared HTTP client for connection reuse (optional) + inputs: Dict mapping evaluator input field names to their values + evaluator_version: Version of the evaluator to execute (optional) @@ - task_id, experiment_id, experiment_run_id, input, evaluator_version + task_id, experiment_id, experiment_run_id, inputs, evaluator_version @@ - task_id, experiment_id, experiment_run_id, input, evaluator_version + task_id, experiment_id, experiment_run_id, inputs, evaluator_versionAlso applies to: 69-75, 112-114, 81-86, 118-126, 130-132, 91-93
78-90: Tidy docstrings: remove stale params and keep arguments conciseThe docstrings still mention client and context_data parameters that aren’t in the signatures. Clean them up for accuracy.
@@ - input: Dict mapping evaluator input field names to their values. {field_name: value, ...} - client: Shared HTTP client for connection reuse (optional) - context_data: Context data to be passed to the evaluator (optional) + inputs: Dict mapping evaluator input field names to their values. {field_name: value, ...} @@ - input: Dict mapping evaluator input field names to their values - evaluator_version: Version of the evaluator to execute (optional) - client: Shared HTTP client for connection reuse (optional) + inputs: Dict mapping evaluator input field names to their values + evaluator_version: Version of the evaluator to execute (optional)Also applies to: 115-129
95-104: Optional: add minimal tracing/logging around execution_id for observabilityA debug-level log of the execution_id and evaluator_slug can simplify troubleshooting without affecting behavior.
Example (if you have a logger in scope):
+ # logger.debug(f"Evaluator '{evaluator_slug}' started with execution_id={execute_response.execution_id}")Also applies to: 134-139
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
packages/traceloop-sdk/tests/experiment/test_experiment.py(1 hunks)packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py(1 hunks)packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- packages/traceloop-sdk/tests/experiment/test_experiment.py
- packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py
🧬 Code graph analysis (1)
packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (2)
packages/traceloop-sdk/traceloop/sdk/evaluator/model.py (5)
InputExtractor(6-7)InputSchemaMapping(10-13)ExecuteEvaluatorRequest(16-21)ExecuteEvaluatorResponse(24-28)ExecutionResponse(39-43)packages/traceloop-sdk/traceloop/sdk/evaluator/stream_client.py (2)
SSEClient(7-74)wait_for_result(13-53)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Test Packages (3.12)
- GitHub Check: Test Packages (3.11)
- GitHub Check: Test Packages (3.10)
- GitHub Check: Lint
- GitHub Check: Build Packages (3.11)
🔇 Additional comments (1)
packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (1)
35-44: Request building looks solidComprehension to create InputSchemaMapping is clear and efficient. No issues here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important
Looks good to me! 👍
Reviewed e466cef in 1 minute and 22 seconds. Click for details.
- Reviewed
51lines of code in3files - Skipped
0files when reviewing. - Skipped posting
3draft comments. View those below. - Modify your settings and rules to customize what types of comments Ellipsis leaves. And don't forget to react with 👍 or 👎 to teach Ellipsis.
1. packages/traceloop-sdk/traceloop/sdk/client/client.py:55
- Draft comment:
Good addition: setting 'base_url=self.api_endpoint' in AsyncClient ensures consistent URL handling. Verify proper trailing slash normalization. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
2. packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py:21
- Draft comment:
Simplifies Evaluator by removing redundant 'api_endpoint'; using relative URL in '_execute_evaluator_request' leverages AsyncClient's base_url. Ensure client is always properly configured. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
3. packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py:26
- Draft comment:
Updated Evaluator instantiation to pass only the async_http_client, aligning with the modified Evaluator constructor. - Reason this comment was not posted:
Confidence changes required:0%<= threshold50%None
Workflow ID: wflow_PhRosbFmDudxzXK5
You can customize by changing your verbosity settings, reacting with 👍 or 👎, replying to comments, or adding code review rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (3)
packages/traceloop-sdk/traceloop/sdk/client/client.py (1)
54-62: Own and expose the AsyncClient lifecycle to prevent socket leaksYou create a shared httpx.AsyncClient but never expose a way to close it. Add explicit close/acontext-managed shutdown on Client. This was raised earlier and remains unresolved.
Apply this diff to add lifecycle methods:
class Client: @@ def __init__( self, api_key: str, app_name: str = sys.argv[0], api_endpoint: str = "https://api.traceloop.com", ): @@ self.experiment = Experiment(self._http, self._async_http, experiment_slug) + + async def aclose(self) -> None: + """Close the shared async HTTP client.""" + await self._async_http.aclose() + + def close(self) -> None: + """Synchronous helper to close the async client.""" + try: + import anyio + anyio.run(self.aclose) + except Exception: + import asyncio + asyncio.get_event_loop().run_until_complete(self.aclose())packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (1)
54-63: Accept all 2xx, raise rich HTTP errors, and guard JSON decodingLimiting success to 200 and unguarded JSON parsing will misclassify valid responses (e.g., 202) and obscure failures. Use raise_for_status and catch JSON errors.
- response = await client.post( - full_url, json=body, timeout=httpx.Timeout(timeout_in_sec) - ) - if response.status_code != 200: - raise Exception( - f"Failed to execute evaluator {evaluator_slug}: " - f"{response.status_code} – {response.text}" - ) - result_data = response.json() + response = await client.post( + full_url, json=body, timeout=httpx.Timeout(timeout_in_sec) + ) + try: + response.raise_for_status() # accepts any 2xx + result_data = response.json() + except httpx.HTTPStatusError as e: + body_text = e.response.text if e.response is not None else "" + status = e.response.status_code if e.response is not None else "n/a" + raise Exception( + f"Failed to execute evaluator {evaluator_slug}: {status} – {body_text}" + ) from e + except ValueError as e: + raise Exception( + f"Failed to parse execute response as JSON: {e} – body: {response.text}" + ) from e return ExecuteEvaluatorResponse(**result_data)packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (1)
226-241: Parse all JSONL lines; don’t drop the first recordNDJSON doesn’t have a header row; skipping the first line discards data. Parse every non-empty line and skip only invalid JSON.
def _parse_jsonl_to_rows(self, jsonl_data: str) -> List[Dict[str, Any]]: """Parse JSONL string into list of {col_name: col_value} dictionaries""" rows = [] lines = jsonl_data.strip().split("\n") - # Skip the first line (columns definition) - for line in lines[1:]: + for line in lines: if line.strip(): try: row_data = json.loads(line) rows.append(row_data) except json.JSONDecodeError: # Skip invalid JSON lines continue return rows
🧹 Nitpick comments (5)
packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (3)
78-88: Fix docstring: remove non-existent parameters and list actual argsDocstring mentions client/context_data which are not present; list the real params to avoid confusion.
""" - Execute evaluator with input schema mapping and wait for result - - Args: - evaluator_slug: Slug of the evaluator to execute - input: Dict mapping evaluator input field names to their values. {field_name: value, ...} - client: Shared HTTP client for connection reuse (optional) - context_data: Context data to be passed to the evaluator (optional) - evaluator_version: Version of the evaluator to execute (optional) - timeout_in_sec: Timeout in seconds for execution + Execute evaluator with input schema mapping and wait for result. + + Args: + evaluator_slug: Slug of the evaluator to execute. + task_id: Task ID for the evaluation. + experiment_id: Experiment ID. + experiment_run_id: Experiment run ID. + input: Dict mapping evaluator input field names to their values. {field_name: value, ...} + timeout_in_sec: Timeout in seconds for execution. + evaluator_version: Version of the evaluator to execute (optional). Returns: ExecutionResponse: The evaluation result from SSE stream """
116-124: Fix docstring: remove ‘client’ and list actual argsKeep the public docs accurate; list only supported parameters.
""" Trigger evaluator execution without waiting for result (fire-and-forget) Args: evaluator_slug: Slug of the evaluator to execute task_id: Task ID for the evaluation experiment_id: Experiment ID experiment_run_id: Experiment run ID input: Dict mapping evaluator input field names to their values evaluator_version: Version of the evaluator to execute (optional) - client: Shared HTTP client for connection reuse (optional) Returns: str: The execution_id that can be used to check results later """
24-31: Avoid shadowing built-in ‘input’; rename parameter to ‘inputs’ (or similar)Using input as a parameter name shadows the built-in function and can trip linters. Prefer inputs or input_data consistently.
Apply in this file:
- def _build_evaluator_request( + def _build_evaluator_request( task_id: str, experiment_id: str, experiment_run_id: str, - input: Dict[str, str], + inputs: Dict[str, str], evaluator_version: Optional[str] = None, ) -> ExecuteEvaluatorRequest: @@ - schema_mapping = InputSchemaMapping( - root={k: InputExtractor(source=v) for k, v in input.items()} + schema_mapping = InputSchemaMapping( + root={k: InputExtractor(source=v) for k, v in inputs.items()} )- input: Dict[str, str], + inputs: Dict[str, str], @@ - request = self._build_evaluator_request( - task_id, experiment_id, experiment_run_id, input, evaluator_version + request = self._build_evaluator_request( + task_id, experiment_id, experiment_run_id, inputs, evaluator_version )- input: Dict[str, str], + inputs: Dict[str, str], @@ - request = self._build_evaluator_request( - task_id, experiment_id, experiment_run_id, input, evaluator_version + request = self._build_evaluator_request( + task_id, experiment_id, experiment_run_id, inputs, evaluator_version )Note: Update call sites accordingly in experiment.py (see separate comment there).
Also applies to: 65-74, 104-112
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (2)
165-176: Cancel pending tasks when stop_on_error triggers to avoid orphaned workBreaking without cancellation leaves other tasks running and can produce noisy exceptions.
for completed_task in asyncio.as_completed(tasks): try: result = await completed_task if result.error: errors.append(result.error) else: results.append(result) except Exception as e: error_msg = f"Task execution error: {str(e)}" errors.append(error_msg) if stop_on_error: - break + pending = [t for t in tasks if not t.done()] + for t in pending: + t.cancel() + await asyncio.gather(*pending, return_exceptions=True) + break
120-133: Follow-up if renaming Evaluator input param: update call sitesIf you adopt the renaming away from input in Evaluator, update these keyword arguments accordingly to keep the API consistent.
- input=task_result, + inputs=task_result, @@ - input=task_result, + inputs=task_result,
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (3)
packages/traceloop-sdk/traceloop/sdk/client/client.py(3 hunks)packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py(1 hunks)packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Store API keys only in environment variables/secure vaults; never hardcode secrets in code
Use Flake8 for code linting and adhere to its rules
Files:
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.pypackages/traceloop-sdk/traceloop/sdk/client/client.pypackages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py
🧠 Learnings (1)
📚 Learning: 2025-08-04T15:35:30.188Z
Learnt from: nina-kollman
PR: traceloop/openllmetry#3219
File: packages/traceloop-sdk/traceloop/sdk/datasets/dataset.py:357-360
Timestamp: 2025-08-04T15:35:30.188Z
Learning: In the traceloop SDK Dataset class, the established error handling pattern is that HTTP client methods return None on failure (after catching and logging RequestException), and the Dataset API methods check for None return values and raise Exception with descriptive messages. The update_row_api method is inconsistent with this pattern.
Applied to files:
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Test Packages (3.12)
- GitHub Check: Test Packages (3.10)
- GitHub Check: Test Packages (3.11)
- GitHub Check: Build Packages (3.11)
- GitHub Check: Lint
🔇 Additional comments (3)
packages/traceloop-sdk/traceloop/sdk/client/client.py (1)
55-61: LGTM: base_url set on AsyncClient enables SSE relative URLsSetting base_url on the shared AsyncClient is correct and unblocks SSEClient building full stream URLs from relative paths.
packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (1)
53-56: LGTM: use of relative path with a base_url-configured AsyncClientPosting to a relative path is fine given the caller configures base_url on the shared client.
packages/traceloop-sdk/traceloop/sdk/experiment/experiment.py (1)
152-164: LGTM: bounded concurrency and non-blocking trigger pathUsing a semaphore of 50 keeps concurrency under control, and the wait_for_results=False path fires tasks and returns promptly.
feat(instrumentation): ...orfix(instrumentation): ....Important
Adds experiment capabilities with dataset-driven tasks, evaluators, and streaming results, including unit tests and dependency updates.
Experimentclass inexperiment.pyfor running dataset-driven experiments with optional evaluators and streaming results.Evaluatorclass inevaluator.pyfor executing evaluators with SSE-based streaming.get_version_jsonl()todatasets.pyfor fetching dataset versions in JSONL format.experiment_example.pydemonstrates medical-prompt experiments.test_experiment.pyfor JSONL parsing behavior and error tolerance.cuidruntime dependency inpyproject.toml.This description was created by
for e466cef. You can customize this summary. It will automatically update as commits are pushed.
Summary by CodeRabbit
New Features
Tests
Chores