Skip to content

feat: proxy consolidation#34

Merged
thushan merged 11 commits intomainfrom
feature/proxy-consolidation
Jul 31, 2025
Merged

feat: proxy consolidation#34
thushan merged 11 commits intomainfrom
feature/proxy-consolidation

Conversation

@thushan
Copy link
Copy Markdown
Owner

@thushan thushan commented Jul 30, 2025

This PR tackles consolidating the two proxies and avoiding the excessive duplication across the two now that they both are considered stable implementation wise. Also fixes a few annoying bugs and improves performance.

  • Olla and Sherpa now share the same configuration structs.
  • Olla and Sherpa proxies now reuse a lot more code (but maintain their traits)
  • BUGFIX: Path matching with StripPrefix(/olla prefix will incorrectly strip /ollama/ma across patterns sometimes)
  • BREAK: We used to always send X-Forwarded-Host (like in scout/sherpa) but now we only send if it's not present, this may break for multi-hop proxies.

However, we looked at tuning it even further based on the internal benchmark.

Summary of changes

Benchmark Metric Baseline Current Δ Change
ErrorHandling ns/op 28,126 5,972 🟢 -78.8% 4.7× faster
allocs/op 154 83 🟢 -46% Fewer allocations
B/op 12,241 7,233 🟢 -40.9% Less memory
ConfigUpdates ns/op 49.43 57.54 🟡 +16.4% Slight trade-off
StatsCollection ns/op 35.00 8.17 🟢 -76.6% 4.3× faster
HeaderProcessing ns/op 111,672 98,838 🟢 -11.5% Faster processing
allocs/op 257 224 🟢 -12.8% Fewer allocations
B/op 22,618 16,899 🟢 -25.3% Less memory
GetStats (Sherpa) ns/op 35.59 7.75 🟢 -78.2% 4.6× faster
GetStats (Olla) ns/op 34.94 7.72 🟢 -77.9% 4.5× faster
ErrorFunction ns/op 475.5 536.5 🔴 +12.8% Minor regression
MemoryUsage (Sherpa) ns/op 95,751 104,199 🔴 +8.8% Slight increase
MemoryUsage (Olla) ns/op 94,634 98,746 🔴 +4.3% Acceptable
allocs/op 209 187 🟢 -10.5% Improved
ConnectionReuse (Sherpa) ns/op 87,066 69,172 🟢 -20.6% Faster reuse
ConnectionReuse (Olla) ns/op 25,401 10,517 🟢 -58.6% 2.4× faster
allocs/op 208 185 🟢 -11% Improved
CircuitBreaker (Check) ns/op 3.20 16.64 🔴 +420% New cost from granular tracking
CircuitBreaker (Concurrent) ns/op 15.84 ⚪ — We added this
CircuitBreaker (Threshold) ns/op 143.7 ⚪ — We added this

Olla's refactor reduced latency and allocations across most benchmarks (up to 4.7× faster), with minor regressions in circuit breaker cost due to added tracking.

Baseline

main commit d03fe4...d94

BenchmarkProxyImplementations/Olla/ErrorHandling-16           	   43749	     28126 ns/op	   12241 B/op	     154 allocs/op
BenchmarkProxyImplementations/Olla/ConfigUpdates
BenchmarkProxyImplementations/Olla/ConfigUpdates-16           	23395597	        49.43 ns/op	      80 B/op	       1 allocs/op
BenchmarkProxyImplementations/Olla/StatsCollection
BenchmarkProxyImplementations/Olla/StatsCollection-16         	33153475	        35.00 ns/op	       0 B/op	       0 allocs/op
BenchmarkProxyImplementations/Olla/HeaderProcessing
BenchmarkProxyImplementations/Olla/HeaderProcessing-16        	   10795	    111672 ns/op	   22618 B/op	     257 allocs/op
BenchmarkProxyStats
BenchmarkProxyStats/Sherpa_GetStats
BenchmarkProxyStats/Sherpa_GetStats-16                        	32711808	        35.59 ns/op	       0 B/op	       0 allocs/op
BenchmarkProxyStats/Olla_GetStats
BenchmarkProxyStats/Olla_GetStats-16                          	33994333	        34.94 ns/op	       0 B/op	       0 allocs/op
BenchmarkErrorFunction
BenchmarkErrorFunction-16                                     	 2502780	       475.5 ns/op	     137 B/op	       4 allocs/op
BenchmarkMemoryUsage
BenchmarkMemoryUsage/Sherpa_MemoryAllocations
BenchmarkMemoryUsage/Sherpa_MemoryAllocations-16              	   12224	     95751 ns/op	   18823 B/op	     210 allocs/op
BenchmarkMemoryUsage/Olla_MemoryAllocations
BenchmarkMemoryUsage/Olla_MemoryAllocations-16                	   12624	     94634 ns/op	   18576 B/op	     209 allocs/op
BenchmarkConnectionPooling
BenchmarkConnectionPooling/Sherpa_ConnectionReuse
BenchmarkConnectionPooling/Sherpa_ConnectionReuse-16          	   16390	     87066 ns/op	   31445 B/op	     224 allocs/op
BenchmarkConnectionPooling/Olla_ConnectionReuse
BenchmarkConnectionPooling/Olla_ConnectionReuse-16            	   46867	     25401 ns/op	   20269 B/op	     208 allocs/op
BenchmarkCircuitBreaker
BenchmarkCircuitBreaker/Olla_CircuitBreakerCheck
BenchmarkCircuitBreaker/Olla_CircuitBreakerCheck-16           	362502390	         3.200 ns/op	       0 B/op	       0 allocs/op
BenchmarkObjectPools
BenchmarkObjectPools/Olla_BufferPool
BenchmarkObjectPools/Olla_BufferPool-16                       	100000000	        12.16 ns/op	       0 B/op	       0 allocs/op
BenchmarkObjectPools/Olla_RequestContextPool
BenchmarkObjectPools/Olla_RequestContextPool-16               	100000000	        11.84 ns/op	       0 B/op	       0 allocs/op
PASS

Consolidated & Optimised Further

This includes new rounds of profiling and heavy traffic analysis on our busier nodes.

BenchmarkProxyImplementations/Olla/ErrorHandling-16           	  281624	      5972 ns/op	    7233 B/op	      83 allocs/op
BenchmarkProxyImplementations/Olla/ConfigUpdates
BenchmarkProxyImplementations/Olla/ConfigUpdates-16           	35091412	        57.54 ns/op	      80 B/op	       1 allocs/op
BenchmarkProxyImplementations/Olla/StatsCollection
BenchmarkProxyImplementations/Olla/StatsCollection-16         	145893889	         8.171 ns/op	       0 B/op	       0 allocs/op
BenchmarkProxyImplementations/Olla/HeaderProcessing
BenchmarkProxyImplementations/Olla/HeaderProcessing-16        	   12396	     98838 ns/op	   16899 B/op	     224 allocs/op
BenchmarkProxyStats
BenchmarkProxyStats/Sherpa_GetStats
BenchmarkProxyStats/Sherpa_GetStats-16                        	153806734	         7.750 ns/op	       0 B/op	       0 allocs/op
BenchmarkProxyStats/Olla_GetStats
BenchmarkProxyStats/Olla_GetStats-16                          	154590666	         7.715 ns/op	       0 B/op	       0 allocs/op
BenchmarkErrorFunction
BenchmarkErrorFunction-16                                     	 2212374	       536.5 ns/op	     137 B/op	       4 allocs/op
BenchmarkMemoryUsage
BenchmarkMemoryUsage/Sherpa_MemoryAllocations
BenchmarkMemoryUsage/Sherpa_MemoryAllocations-16              	   12654	    104199 ns/op	   16619 B/op	     206 allocs/op
BenchmarkMemoryUsage/Olla_MemoryAllocations
BenchmarkMemoryUsage/Olla_MemoryAllocations-16                	   12193	     98746 ns/op	   14418 B/op	     187 allocs/op
BenchmarkConnectionPooling
BenchmarkConnectionPooling/Sherpa_ConnectionReuse
BenchmarkConnectionPooling/Sherpa_ConnectionReuse-16          	   21009	     69172 ns/op	   18258 B/op	     213 allocs/op
BenchmarkConnectionPooling/Olla_ConnectionReuse
BenchmarkConnectionPooling/Olla_ConnectionReuse-16            	  141662	     10517 ns/op	   14411 B/op	     185 allocs/op
BenchmarkCircuitBreaker
BenchmarkCircuitBreaker/CircuitBreakerCheck
BenchmarkCircuitBreaker/CircuitBreakerCheck-16                	75242185	        16.64 ns/op	       0 B/op	       0 allocs/op
BenchmarkCircuitBreaker/CircuitBreakerFailureThreshold
BenchmarkCircuitBreaker/CircuitBreakerFailureThreshold-16     	 8753750	       143.7 ns/op	       0 B/op	       0 allocs/op
BenchmarkCircuitBreaker/CircuitBreakerConcurrent
BenchmarkCircuitBreaker/CircuitBreakerConcurrent-16           	71275837	        15.84 ns/op	       0 B/op	       0 allocs/op
PASS

Summary by CodeRabbit

New Features

  • Introduced new Sherpa and Olla proxy services with enhanced performance, streaming, error handling, and connection management tailored for AI inference workloads.
  • Added extensive benchmarking suites comparing proxy implementations on request handling, streaming, configuration updates, and circuit breaker behavior.
  • Implemented detailed, user-friendly error messages for proxy failures with actionable guidance.
  • Added advanced proxy configuration options for connection pooling, timeouts, and TCP tuning.
  • Introduced a new streaming response test script validating incremental streaming quality across multiple LLM providers.

Bug Fixes

  • Improved and rigorously tested route prefix stripping and request path forwarding to prevent double-stripping and ensure accurate upstream paths.
  • Enhanced HTTP header filtering to block sensitive headers and correctly add proxy metadata headers.

Documentation

  • Expanded documentation with detailed instructions for the new streaming response test script and updated test running procedures.

Refactor

  • Replaced previous proxy implementations with refactored, encapsulated Sherpa and Olla services exposing public constructors and methods.
  • Unified proxy core components for event publishing, statistics tracking, and header management.
  • Introduced a worker pool for event bus asynchronous publishing to improve concurrency and resource management.
  • Updated proxy factory to support error-aware proxy creation and improved type clarity.

Tests

  • Added comprehensive unit, integration, and leak detection tests covering proxy routing, streaming, error handling, circuit breakers, connection pools, and header management.
  • Modernised existing proxy tests to use public APIs and constructors, removing direct internal state inspection.
  • Added new tests for proxy path stripping, routing regressions, and streaming client disconnect handling.
  • Introduced stress and concurrency tests for proxy streaming, event bus worker pools, and goroutine leak detection.

Chores

  • Renamed context key constants for improved clarity and consistency.
  • Updated logger implementations to use centralized context key constants.
  • Improved test scripts and load testing commands with timeouts and error handling enhancements.

thushan added 4 commits July 29, 2025 08:46
refactor and address missing upstream metrics

adds new streaming tester scripts

fix missing log events
update routing and apply better comments
update tests to utilise services

fix missing constants

Invalid http/https in  resume context

lint errors

lint fixes

redo header checks

add common tests
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Jul 30, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

This update introduces a significant refactor and modularisation of the proxy layer, splitting the Sherpa and Olla proxy implementations into separate, maintainable packages. The changes include new core proxy abstractions, improved error handling, enhanced configuration management, and comprehensive test and benchmark coverage. Legacy monolithic proxy files are removed, replaced with modern, robust, and testable code structures.

Changes

Cohort / File(s) Change Summary
Sherpa Proxy Refactor
internal/adapter/proxy/sherpa/service.go,
internal/adapter/proxy/sherpa/service_streaming.go,
internal/adapter/proxy/sherpa/service_leak_test.go,
internal/adapter/proxy/sherpa/service_leak_stress_test.go
Introduces a new Sherpa proxy implementation with streaming, timeout, and observability support. Adds robust streaming logic with timeouts and cancellation handling. Includes goroutine leak detection and stress tests for streaming components.
Olla Proxy Refactor
internal/adapter/proxy/olla/service.go
Implements a new Olla proxy service optimised for AI streaming, with circuit breakers, connection pooling, object pooling, and advanced TCP tuning.
Core Proxy Abstractions
internal/adapter/proxy/core/base.go,
internal/adapter/proxy/core/common.go,
internal/adapter/proxy/core/common_test.go,
internal/adapter/proxy/core/stats.go
Adds shared proxy abstractions: event types, proxy event bus, atomic stats, header utilities, and comprehensive tests for header management.
Proxy Configuration & Error Handling
internal/adapter/proxy/config.go,
internal/adapter/proxy/common/errors.go,
internal/adapter/proxy/error.go
Refactors configuration with new defaults and advanced pooling options. Centralises user-friendly error handling and exposes error helpers.
Proxy Factory & Registration
internal/adapter/proxy/factory.go
Refactors the proxy factory to use error-returning creators and registers new Sherpa and Olla implementations. Adds utility for listing proxy types.
Legacy Proxy Removal
internal/adapter/proxy/proxy_olla.go,
internal/adapter/proxy/proxy_sherpa.go,
internal/adapter/proxy/config_olla.go
Removes legacy monolithic Olla and Sherpa proxy implementations and their configuration.
Proxy Test Refactor
internal/adapter/proxy/proxy_olla_test.go,
internal/adapter/proxy/proxy_sherpa_test.go,
internal/adapter/proxy/proxy_test.go,
internal/adapter/proxy/proxy_integration_test.go,
internal/adapter/proxy/proxy_headers_test.go
Refactors all proxy tests to use new modular proxy services, adapts to new APIs, and removes direct access to internal state. Updates error handling and stat checks.
New Proxy Routing & Path Handling Tests
internal/adapter/proxy/proxy_path_stripping_test.go,
internal/adapter/proxy/proxy_routing_test.go
Adds comprehensive tests for path stripping, routing, and regression scenarios for both proxy types. Includes benchmarks for path handling.
Proxy Benchmarks
internal/adapter/proxy/benchmark_comparison_test.go,
internal/adapter/proxy/benchmark_refactor_test.go,
internal/adapter/proxy/proxy_benchmark_test.go
Introduces new and refactored benchmarks for performance, streaming, error handling, circuit breakers, and object pools. Removes benchmarks tied to legacy internals.
App Context Key Updates
internal/app/config.go,
internal/app/handlers/handler_common.go,
internal/app/handlers/handler_provider_common.go,
internal/app/handlers/handler_proxy.go,
internal/core/constants/context.go,
internal/router/registry.go
Renames context keys for clarity and consistency across the application and proxy layers. Updates usage throughout the app and router.
Logger Context Key Update
internal/logger/styled_plain.go,
internal/logger/styled_pretty.go
Uses centralised context key constants for request ID in logger implementations.
EventBus Worker Pool
pkg/eventbus/eventbus.go,
pkg/eventbus/eventbus_worker.go,
pkg/eventbus/eventbus_worker_test.go,
pkg/eventbus/eventbus_stress_test.go
Adds a worker pool for asynchronous event publishing in the event bus, improving concurrency control and resource cleanup. Includes tests for worker pool behavior, backpressure, concurrency, and goroutine leak detection.
Documentation and Streaming Test Script
test/scripts/logic/README.md,
test/scripts/logic/test-streaming-responses.py
Adds a new streaming response test script and documents its usage, metrics, and output.
Load Test Script Update
test/scripts/load/test-load-chaos.sh
Adds curl timeout and suppresses output/errors to improve robustness of load test streaming requests.
Makefile Updates
makefile
Adds new test-stress target for running stress tests and modifies test-race to run with -short. Adjusts ready target dependencies and help output accordingly.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Proxy (Sherpa/Olla)
    participant Selector
    participant Endpoint
    participant StatsCollector
    participant Logger

    Client->>Proxy (Sherpa/Olla): HTTP Request
    Proxy->>Selector: Select healthy endpoint
    Selector-->>Proxy: Endpoint
    Proxy->>Endpoint: Forward request (strip prefix, copy headers)
    Endpoint-->>Proxy: Response (streaming)
    Proxy->>StatsCollector: Record stats (success/failure, latency)
    Proxy->>Logger: Log event / error
    Proxy-->>Client: HTTP Response (streamed)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~90+ minutes

Complexity: This is a major refactor and modularisation, introducing new core abstractions, proxy implementations, configuration, error handling, and comprehensive tests and benchmarks. The review requires careful attention to concurrency, streaming, error propagation, and interface compatibility, as well as verification of test and benchmark coverage.

Possibly related PRs

  • feat: proxy consolidation #34: Adds the same benchmark comparison test file for Sherpa and Olla proxies, directly related to the benchmark additions in this PR.

Note

⚡️ Unit Test Generation is now available in beta!

Learn more here, or try it out under "Finishing Touches" below.


📜 Recent review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c4bf056 and 033bbd1.

📒 Files selected for processing (5)
  • internal/adapter/proxy/sherpa/service_leak_stress_test.go (1 hunks)
  • internal/adapter/proxy/sherpa/service_leak_test.go (1 hunks)
  • makefile (3 hunks)
  • pkg/eventbus/eventbus_stress_test.go (1 hunks)
  • pkg/eventbus/eventbus_worker_test.go (1 hunks)
✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/proxy-consolidation

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai generate unit tests to generate unit tests for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

🔭 Outside diff range comments (1)
internal/adapter/proxy/factory.go (1)

60-63: Configuration values should be sourced from the provided configuration

The Olla configuration has hardcoded values for MaxIdleConns, IdleConnTimeout, and MaxConnsPerHost. These should ideally come from the provided ports.ProxyConfiguration interface to maintain consistency and allow for runtime configuration.

Consider extending the ports.ProxyConfiguration interface to include these connection pool settings, or use default values from a constants file that can be overridden.

🧹 Nitpick comments (10)
test/scripts/logic/README.md (1)

296-320: Add language specification to code block.

The code block should specify the language for proper syntax highlighting and linting compliance.

-```
+```text
 Testing model: llama3.2
   Testing ollama streaming: ............. [OK]
     → Endpoint: local-ollama
internal/adapter/proxy/common/errors.go (1)

97-111: Consider consolidating string-based error checks.

While the string-based error checking works as a fallback, some of these cases might already be covered by the type assertions above (e.g., "connection refused" should be caught by the syscall.ECONNREFUSED check).

Consider adding a comment explaining that these string checks are fallbacks for cases where error types aren't properly wrapped:

 // HTTP transport errors
+// These string checks serve as fallbacks for errors that might not be properly wrapped
+// or come from different transport implementations
 errStr := err.Error()
internal/adapter/proxy/core/common.go (1)

53-53: Fix typo in comment

-		// TODO: we should consider a more copmrehensive security policy / technique here
+		// TODO: we should consider a more comprehensive security policy / technique here
internal/adapter/proxy/factory.go (1)

96-99: Return a sorted list of proxy types for consistency

The method returns proxy types in random map iteration order, which could lead to inconsistent results across calls.

+import "sort"

 func (f *Factory) GetAvailableTypes() []string {
 	f.mu.RLock()
 	defer f.mu.RUnlock()
 
 	types := make([]string, 0, len(f.creators))
 	for name := range f.creators {
 		types = append(types, name)
 	}
+	sort.Strings(types)
 	return types
 }
internal/adapter/proxy/proxy_integration_test.go (2)

418-497: Circuit breaker test only covers Olla implementation

The circuit breaker test only tests the Olla implementation but comments suggest Sherpa doesn't have circuit breakers. According to the coding guidelines, shared proxy tests should ensure compatibility between both proxy engines. Consider documenting this architectural difference more clearly or testing both implementations' error handling approaches.

Add a comment at the test level explaining why only Olla is tested for circuit breaker functionality, or test both implementations' respective error handling mechanisms.


523-524: Performance test uses hardcoded latency threshold

The performance test uses a hardcoded maxLatency of 100ms which may be too restrictive for CI/CD environments or slower systems.

-const maxLatency = 100 * time.Millisecond
+// Allow configuration via environment variable for CI/CD environments
+maxLatencyStr := os.Getenv("PROXY_TEST_MAX_LATENCY_MS")
+maxLatency := 100 * time.Millisecond
+if maxLatencyStr != "" {
+    if ms, err := strconv.Atoi(maxLatencyStr); err == nil {
+        maxLatency = time.Duration(ms) * time.Millisecond
+    }
+}
test/scripts/logic/test-streaming-responses.py (2)

12-14: Remove unused imports

The static analysis correctly identifies unused imports.

-from typing import Dict, List, Tuple, Optional, Any
+from typing import Dict, List, Optional, Any
 from collections import defaultdict
-from datetime import datetime

195-195: Remove f-string prefixes where no placeholders are used

The static analysis correctly identifies f-strings without placeholders.

-                self.print_color(GREEN, f" [OK]")
+                self.print_color(GREEN, " [OK]")
-                self.print_color(RED, f" [FAIL]")
+                self.print_color(RED, " [FAIL]")

Apply similar changes to lines 417, 435, 485, and 541.

Also applies to: 293-293, 308-308, 417-417, 435-435, 485-485, 541-541

internal/adapter/proxy/sherpa/service_streaming.go (1)

32-33: Consider validating buffer size

The function accepts a buffer but doesn't validate its size. A zero-length buffer would cause issues.

 func (s *Service) streamResponseWithTimeout(clientCtx, upstreamCtx context.Context, w http.ResponseWriter, body io.Reader, buffer []byte, rlog logger.StyledLogger) (int, error) {
+	if len(buffer) == 0 {
+		return 0, fmt.Errorf("buffer size must be greater than zero")
+	}
 	state := &streamState{
 		lastReadTime: time.Now(),
 	}
internal/adapter/proxy/proxy_olla_test.go (1)

399-407: Potential race condition in latency calculation

The test calculates latency after the proxy call if not set, but this could lead to inaccurate measurements since time has passed since the actual request.

Consider asserting that stats.Latency is always set by the proxy:

-    // Calculate latency if not set
-    if stats.Latency == 0 && !stats.StartTime.IsZero() {
-        stats.Latency = time.Since(stats.StartTime).Milliseconds()
-    }
-
-    // Check if latency was calculated (either by proxy or by us)
-    if stats.Latency <= 0 && stats.StartTime.IsZero() {
-        t.Error("ProxyRequestToEndpoints() should record total latency or set StartTime")
+    // The proxy should always set the latency
+    if stats.Latency <= 0 {
+        t.Error("ProxyRequestToEndpoints() should record total latency")
     }

Comment on lines +26 to +34
// ProxyEventMetadata provides type-safe metadata for events
type ProxyEventMetadata struct {
Message string
Model string
BytesSent int64
BytesAfterDisconnect int
Counter int
StatusCode int
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Inconsistent integer types in ProxyEventMetadata

The struct uses int for BytesAfterDisconnect and Counter but int64 for BytesSent. This inconsistency could lead to issues on 32-bit systems or when handling large values.

Consider using consistent types:

 type ProxyEventMetadata struct {
     Message              string
     Model                string
     BytesSent            int64
-    BytesAfterDisconnect int
-    Counter              int
-    StatusCode           int
+    BytesAfterDisconnect int64
+    Counter              int64
+    StatusCode           int  // OK to keep as int for HTTP status codes
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// ProxyEventMetadata provides type-safe metadata for events
type ProxyEventMetadata struct {
Message string
Model string
BytesSent int64
BytesAfterDisconnect int
Counter int
StatusCode int
}
// ProxyEventMetadata provides type-safe metadata for events
type ProxyEventMetadata struct {
Message string
Model string
BytesSent int64
BytesAfterDisconnect int64
Counter int64
StatusCode int // OK to keep as int for HTTP status codes
}
🤖 Prompt for AI Agents
In internal/adapter/proxy/core/base.go around lines 26 to 34, the
ProxyEventMetadata struct uses mixed integer types: int64 for BytesSent and int
for BytesAfterDisconnect and Counter. To ensure consistency and avoid potential
issues on 32-bit systems or with large values, change BytesAfterDisconnect and
Counter fields to int64 to match BytesSent.

Comment on lines +197 to +213

if configuration.StreamBufferSize == 0 {
configuration.StreamBufferSize = DefaultStreamBufferSize
}
if configuration.MaxIdleConns == 0 {
configuration.MaxIdleConns = DefaultMaxIdleConns
}
if configuration.MaxConnsPerHost == 0 {
configuration.MaxConnsPerHost = DefaultMaxConnsPerHost
}
if configuration.IdleConnTimeout == 0 {
configuration.IdleConnTimeout = DefaultIdleConnTimeout
}
if configuration.ReadTimeout == 0 {
configuration.ReadTimeout = DefaultReadTimeout
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid mutating the caller's configuration object

The function directly modifies the passed configuration object to set defaults. This mutation could surprise callers who might not expect their config to be modified.

Consider creating a copy or documenting this behaviour clearly:

 func NewService(
     discoveryService ports.DiscoveryService,
     selector domain.EndpointSelector,
     configuration *Configuration,
     statsCollector ports.StatsCollector,
     logger logger.StyledLogger,
 ) (*Service, error) {
+    // Create a copy to avoid mutating caller's config
+    config := *configuration
+    configuration = &config

     if configuration.StreamBufferSize == 0 {
         configuration.StreamBufferSize = DefaultStreamBufferSize
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if configuration.StreamBufferSize == 0 {
configuration.StreamBufferSize = DefaultStreamBufferSize
}
if configuration.MaxIdleConns == 0 {
configuration.MaxIdleConns = DefaultMaxIdleConns
}
if configuration.MaxConnsPerHost == 0 {
configuration.MaxConnsPerHost = DefaultMaxConnsPerHost
}
if configuration.IdleConnTimeout == 0 {
configuration.IdleConnTimeout = DefaultIdleConnTimeout
}
if configuration.ReadTimeout == 0 {
configuration.ReadTimeout = DefaultReadTimeout
}
func NewService(
discoveryService ports.DiscoveryService,
selector domain.EndpointSelector,
configuration *Configuration,
statsCollector ports.StatsCollector,
logger logger.StyledLogger,
) (*Service, error) {
// Create a copy to avoid mutating the caller's config
config := *configuration
configuration = &config
if configuration.StreamBufferSize == 0 {
configuration.StreamBufferSize = DefaultStreamBufferSize
}
if configuration.MaxIdleConns == 0 {
configuration.MaxIdleConns = DefaultMaxIdleConns
}
if configuration.MaxConnsPerHost == 0 {
configuration.MaxConnsPerHost = DefaultMaxConnsPerHost
}
if configuration.IdleConnTimeout == 0 {
configuration.IdleConnTimeout = DefaultIdleConnTimeout
}
if configuration.ReadTimeout == 0 {
configuration.ReadTimeout = DefaultReadTimeout
}
// ... rest of NewService ...
}
🤖 Prompt for AI Agents
In internal/adapter/proxy/olla/service.go around lines 197 to 213, the code
directly modifies the passed configuration object to set default values, which
can cause unexpected side effects for callers. To fix this, create a copy of the
configuration object at the start of the function and apply default values to
the copy instead, leaving the original configuration unmodified. Use this copied
and updated configuration for further processing.

Comment on lines +63 to +69
{
name: "prefix_only_partial_match",
proxyPrefix: "/olla",
requestPath: "/ollama/api/chat",
expectedPath: "/ma/api/chat", // This is the ACTUAL behavior (a bug!)
description: "Partial prefix match incorrectly stripped (BUG)",
},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Known bug documented - partial prefix match

The test correctly documents a bug where /olla prefix incorrectly strips from /ollama/api/chat resulting in /ma/api/chat. This should be fixed to prevent incorrect routing.

The path stripping logic needs to ensure it only strips complete path segments. Consider using strings.HasPrefix(path, prefix + "/") or similar logic to avoid partial matches.

🤖 Prompt for AI Agents
In internal/adapter/proxy/proxy_path_stripping_test.go around lines 63 to 69,
the test reveals a bug where the prefix "/olla" is partially matched and
stripped from "/ollama/api/chat", resulting in incorrect path "/ma/api/chat". To
fix this, update the path stripping logic to only remove the prefix if it
matches a complete path segment, for example by checking if the request path
starts with the prefix followed by a slash (prefix + "/") before stripping. This
will prevent partial prefix matches and ensure correct routing.

Comment on lines +75 to +88
func (s *Service) createCombinedContext(clientCtx, upstreamCtx context.Context) (context.Context, context.CancelFunc) {
combinedCtx, cancel := context.WithCancel(context.Background())

go func() {
select {
case <-clientCtx.Done():
cancel()
case <-upstreamCtx.Done():
cancel()
}
}()

return combinedCtx, cancel
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential goroutine leak in context combination

The goroutine spawned to monitor contexts will leak if neither context is cancelled before the function returns. This is particularly problematic if the streaming completes normally (EOF).

Consider using a more robust approach:

 func (s *Service) createCombinedContext(clientCtx, upstreamCtx context.Context) (context.Context, context.CancelFunc) {
-	combinedCtx, cancel := context.WithCancel(context.Background())
-
-	go func() {
-		select {
-		case <-clientCtx.Done():
-			cancel()
-		case <-upstreamCtx.Done():
-			cancel()
-		}
-	}()
-
-	return combinedCtx, cancel
+	// Create a context that cancels when either parent cancels
+	combinedCtx, cancel := context.WithCancel(clientCtx)
+	
+	// Monitor upstream context
+	go func() {
+		<-upstreamCtx.Done()
+		cancel()
+	}()
+	
+	return combinedCtx, cancel
 }

Or better yet, avoid the goroutine entirely by checking both contexts in the select statements.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In internal/adapter/proxy/sherpa/service_streaming.go around lines 75 to 88, the
goroutine created to monitor clientCtx and upstreamCtx can leak if neither
context is cancelled before the function returns. To fix this, refactor the
function to avoid spawning a goroutine by using context.WithCancel and
context.WithCancel combined with context.WithDeadline or context.WithTimeout, or
by using context.WithCancel on a derived context that listens to both input
contexts in a single select statement without a separate goroutine. This ensures
the combined context is cancelled appropriately without leaking goroutines.


try:
return json.loads(data_str)
except:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace bare except clauses with specific exception handling

Bare except: clauses can hide unexpected errors and make debugging difficult.

-            except:
+            except (json.JSONDecodeError, ValueError):
                 return None

and

-                    except:
+                    except (json.JSONDecodeError, ValueError):
                        pass

Also applies to: 231-231

🧰 Tools
🪛 Ruff (0.12.2)

111-111: Do not use bare except

(E722)

🤖 Prompt for AI Agents
In test/scripts/logic/test-streaming-responses.py at lines 111 and 231, replace
the bare except clauses with specific exception types to avoid hiding unexpected
errors. Identify the exceptions that the code is expected to handle in these
blocks and explicitly catch only those exceptions. This will improve error
handling clarity and make debugging easier.

Comment on lines +387 to +513
def main():
parser = argparse.ArgumentParser(description='Test Olla streaming responses')
parser.add_argument('--url', default=TARGET_URL, help='Olla base URL')
parser.add_argument('--max-time', type=int, default=DEFAULT_MAX_STREAM_TIME,
help='Maximum streaming time per test (seconds)')
parser.add_argument('--models', nargs='+', help='Specific models to test')
parser.add_argument('--providers', nargs='+', choices=['openai', 'ollama', 'lmstudio'],
help='Providers to test (default: all)')
parser.add_argument('--sample', action='store_true',
help='Test only a few models per provider')
parser.add_argument('--analyze', action='store_true',
help='Show detailed streaming pattern analysis')

args = parser.parse_args()

providers = args.providers or ['openai', 'ollama', 'lmstudio']

tester = StreamingTester(args.url, args.max_time)
tester.print_header()

if not tester.check_health():
sys.exit(1)

print()

if not tester.fetch_models():
self.print_color(RED, "No models found!")
sys.exit(1)

print()
self.print_color(WHITE, f"Configuration:")
print(f" Max streaming time: {CYAN}{args.max_time}s{RESET}")
print(f" Max tokens: {CYAN}{DEFAULT_MAX_TOKENS}{RESET}")
print(f" Providers: {CYAN}{', '.join(providers)}{RESET}")

# Determine which models to test
if args.models:
models_to_test = args.models
elif args.sample:
# Sample 2-3 models per provider
models_to_test = []
for provider in providers:
provider_models = tester.provider_models.get(provider, [])
models_to_test.extend(provider_models[:3])
models_to_test = list(set(models_to_test)) # Remove duplicates
else:
models_to_test = tester.all_models

self.print_color(WHITE, f"\nTesting {len(models_to_test)} models for streaming capability...")

# Test each model
for model in models_to_test:
tester.test_model_streaming(model, providers, sample_only=args.sample)

# Show analysis if requested
if args.analyze and tester.test_results:
tester.analyze_streaming_patterns()

tester.print_summary()

if __name__ == '__main__':
# Fix 'self' references in main()
class MainRunner:
@staticmethod
def print_color(color: str, message: str):
print(f"{color}{message}{RESET}")

runner = MainRunner()

parser = argparse.ArgumentParser(description='Test Olla streaming responses')
parser.add_argument('--url', default=TARGET_URL, help='Olla base URL')
parser.add_argument('--max-time', type=int, default=DEFAULT_MAX_STREAM_TIME,
help='Maximum streaming time per test (seconds)')
parser.add_argument('--models', nargs='+', help='Specific models to test')
parser.add_argument('--providers', nargs='+', choices=['openai', 'ollama', 'lmstudio'],
help='Providers to test (default: all)')
parser.add_argument('--sample', action='store_true',
help='Test only a few models per provider')
parser.add_argument('--analyze', action='store_true',
help='Show detailed streaming pattern analysis')

args = parser.parse_args()

providers = args.providers or ['openai', 'ollama', 'lmstudio']

tester = StreamingTester(args.url, args.max_time)
tester.print_header()

if not tester.check_health():
sys.exit(1)

print()

if not tester.fetch_models():
runner.print_color(RED, "No models found!")
sys.exit(1)

print()
runner.print_color(WHITE, f"Configuration:")
print(f" Max streaming time: {CYAN}{args.max_time}s{RESET}")
print(f" Max tokens: {CYAN}{DEFAULT_MAX_TOKENS}{RESET}")
print(f" Providers: {CYAN}{', '.join(providers)}{RESET}")

# Determine which models to test
if args.models:
models_to_test = args.models
elif args.sample:
# Sample 2-3 models per provider
models_to_test = []
for provider in providers:
provider_models = tester.provider_models.get(provider, [])
models_to_test.extend(provider_models[:3])
models_to_test = list(set(models_to_test)) # Remove duplicates
else:
models_to_test = tester.all_models

runner.print_color(WHITE, f"\nTesting {len(models_to_test)} models for streaming capability...")

# Test each model
for model in models_to_test:
tester.test_model_streaming(model, providers, sample_only=args.sample)

# Show analysis if requested
if args.analyze and tester.test_results:
tester.analyze_streaming_patterns()

tester.print_summary() No newline at end of file
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Remove duplicate main() function implementation

The file contains two implementations of the main logic (lines 387-446 and 447-513). The second implementation appears to be the correct one as it fixes the 'self' reference issues.

Remove the first main() function (lines 387-446) entirely and keep only the corrected implementation that starts at line 447.

🧰 Tools
🪛 Ruff (0.12.2)

413-413: Undefined name self

(F821)


417-417: Undefined name self

(F821)


417-417: f-string without any placeholders

Remove extraneous f prefix

(F541)


435-435: Undefined name self

(F821)


485-485: f-string without any placeholders

Remove extraneous f prefix

(F541)

🤖 Prompt for AI Agents
In test/scripts/logic/test-streaming-responses.py from lines 387 to 446, there
is a duplicate main() function that contains incorrect 'self' references. Remove
this entire first main() function and retain only the second implementation
starting at line 447, which correctly fixes the 'self' references by using a
MainRunner class for printing. This will eliminate redundancy and fix the
reference errors.

Adds eventbus worker to manage _alot_ of eventbus events

Add tests for workers

properly close context for read timeout

update EventBus for worker replenishing

hmmm revert tests when worker fails

fix eventbus worker

apply reverted tests

update chaos test scripts

streaming fixes, we time for cancellations
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🔭 Outside diff range comments (1)
test/scripts/load/test-load-chaos.sh (1)

23-33: Missing dependency check for timeout

The script now relies on timeout, yet only jq and curl are verified. On macOS/Homebrew the command is usually gtimeout, and some distros don’t ship it by default. A failed lookup will abort the script because set -e.

+if ! command -v timeout &> /dev/null; then
+  if command -v gtimeout &> /dev/null; then
+    alias timeout=gtimeout                # Homebrew coreutils fallback
+  else
+    echo "timeout is required but not installed. Please install coreutils."
+    exit 1
+  fi
+fi
♻️ Duplicate comments (2)
internal/adapter/proxy/olla/service.go (2)

201-217: Avoid mutating the caller's configuration object

This issue was already identified in a previous review. The function still directly modifies the passed configuration object.


492-499: Verify StripPrefix handles edge cases correctly

This issue regarding incorrect path stripping (e.g., /olla from /ollama) was already identified in a previous review.

🧹 Nitpick comments (3)
test/scripts/load/test-load-chaos.sh (2)

89-95: timeout 10s curl … can leave stray processes

timeout wraps only the curl process; head continues reading until EOF. If curl is killed, head waits on a closed pipe for the full read window, wasting time and PIDs under heavy load.

A cleaner, one-liner using curl’s own timer avoids the extra process and ensures the entire pipeline terminates together:

-    timeout 10s curl -s -X POST "$TARGET" \
+    curl -s --max-time 10 -X POST "$TARGET" \
       -H "Content-Type: application/json" \
       -H "User-Agent: OllaChaosMonkey/1.0" \
       -d "$payload" \
       --no-buffer | head -c "$chaos_bytes" > /dev/null 2>&1 || true

57-62: Non-portable fractional sleeps

sleep with sub-second values isn’t POSIX; BusyBox and some minimal Alpine images accept integers only.

Either round to the nearest second or detect support at runtime:

if ! sleep 0.1 2>/dev/null; then
  jitter_sleep() { sleep 1; }   # fallback
fi
internal/adapter/proxy/olla/service.go (1)

261-261: Consider different intervals for cleanup ticker and stale threshold

The cleanup ticker and stale threshold both use 5 minutes, meaning resources are removed as soon as they become eligible. Consider using a longer stale threshold (e.g., 10 minutes) to avoid premature cleanup:

-		cleanupTicker:       time.NewTicker(5 * time.Minute),
+		cleanupTicker:       time.NewTicker(5 * time.Minute),
-	staleThreshold := int64(5 * time.Minute)
+	staleThreshold := int64(10 * time.Minute)

Also applies to: 761-761

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed93e9a and e6930ab.

📒 Files selected for processing (7)
  • internal/adapter/proxy/olla/service.go (1 hunks)
  • internal/adapter/proxy/sherpa/service_leak_test.go (1 hunks)
  • internal/adapter/proxy/sherpa/service_streaming.go (1 hunks)
  • pkg/eventbus/eventbus.go (6 hunks)
  • pkg/eventbus/eventbus_worker.go (1 hunks)
  • pkg/eventbus/eventbus_worker_test.go (1 hunks)
  • test/scripts/load/test-load-chaos.sh (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/adapter/proxy/sherpa/service_streaming.go
🧰 Additional context used
📓 Path-based instructions (2)
**/*_test.go

📄 CodeRabbit Inference Engine (CLAUDE.md)

Unit tests should test individual components in isolation.

Files:

  • pkg/eventbus/eventbus_worker_test.go
  • internal/adapter/proxy/sherpa/service_leak_test.go
internal/{app,adapter}/**/*.go

📄 CodeRabbit Inference Engine (CLAUDE.md)

Endpoints should be exposed at /internal/health and /internal/status.

Files:

  • internal/adapter/proxy/sherpa/service_leak_test.go
  • internal/adapter/proxy/olla/service.go
🧠 Learnings (5)
📓 Common learnings
Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to {proxy_sherpa.go,proxy_olla.go} : Proxy implementations should be in `proxy_sherpa.go` and `proxy_olla.go`.
Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Benchmark tests should measure performance of critical paths, proxy engine comparisons, connection pooling efficiency, and circuit breaker behavior.
Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Shared proxy tests should ensure compatibility between both proxy engines.
test/scripts/load/test-load-chaos.sh (1)

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to test/scripts/security/*.sh : Security tests should validate rate limiting and size restrictions (see /test/scripts/security/).

pkg/eventbus/eventbus_worker_test.go (3)

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to **/*_test.go : Unit tests should test individual components in isolation.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Benchmark tests should measure performance of critical paths, proxy engine comparisons, connection pooling efficiency, and circuit breaker behavior.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Shared proxy tests should ensure compatibility between both proxy engines.

internal/adapter/proxy/sherpa/service_leak_test.go (5)

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Shared proxy tests should ensure compatibility between both proxy engines.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Integration tests should test the full request flow through the proxy.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Benchmark tests should measure performance of critical paths, proxy engine comparisons, connection pooling efficiency, and circuit breaker behavior.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to **/*_test.go : Unit tests should test individual components in isolation.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to {proxy_sherpa.go,proxy_olla.go} : Proxy implementations should be in proxy_sherpa.go and proxy_olla.go.

internal/adapter/proxy/olla/service.go (8)

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to {proxy_sherpa.go,proxy_olla.go} : Proxy implementations should be in proxy_sherpa.go and proxy_olla.go.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.788Z
Learning: Applies to internal/adapter/proxy/*.go : Expose the following response headers: X-Olla-Endpoint, X-Olla-Model, X-Olla-Backend-Type, X-Olla-Request-ID, X-Olla-Response-Time.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Integration tests should test the full request flow through the proxy.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to handler_proxy.go : Request routing logic should be implemented in handler_proxy.go.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Benchmark tests should measure performance of critical paths, proxy engine comparisons, connection pooling efficiency, and circuit breaker behavior.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.788Z
Learning: Applies to internal/{app,adapter}/**/*.go : Endpoints should be exposed at /internal/health and /internal/status.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Shared proxy tests should ensure compatibility between both proxy engines.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to **/*_test.go : Unit tests should test individual components in isolation.

🧬 Code Graph Analysis (1)
pkg/eventbus/eventbus_worker.go (1)
pkg/eventbus/eventbus.go (1)
  • EventBus (20-29)
🪛 GitHub Actions: CI
pkg/eventbus/eventbus_worker_test.go

[error] 165-172: TestWorkerPool_ConcurrentPublishing failed: Expected at least 500 events, got 431. Race condition detected during test execution.

pkg/eventbus/eventbus.go

[error] 224-224: Data race detected: concurrent write and read on channel during event bus unsubscribe and publish operations.


[error] 104-104: Data race detected in Subscribe function during channel close operation.


[error] 127-127: Data race detected in Publish function during channel send operation.

🔇 Additional comments (1)
test/scripts/load/test-load-chaos.sh (1)

48-53: Guard against empty TARGET_URL

TARGET=${TARGET_URL}${PROXY_PATH} silently produces an invalid URL when $TARGET_URL is unset. This results in workers hammering the local shell with DNS errors.

Consider aborting early if the variable is empty:

: "${TARGET_URL:?Environment variable TARGET_URL not set}"

Comment on lines +38 to +40
// Wait for the combined context to be cancelled
<-combinedCtx.Done()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add timeout protection to prevent test hangs

The test waits indefinitely for combinedCtx.Done() which could cause the test to hang if the context combination logic fails. Consider adding a timeout:

-		// Wait for the combined context to be cancelled
-		<-combinedCtx.Done()
+		// Wait for the combined context to be cancelled with timeout
+		select {
+		case <-combinedCtx.Done():
+			// Good
+		case <-time.After(time.Second):
+			t.Fatal("Combined context did not cancel within timeout")
+		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Wait for the combined context to be cancelled
<-combinedCtx.Done()
// Wait for the combined context to be cancelled with timeout
select {
case <-combinedCtx.Done():
// Good
case <-time.After(time.Second):
t.Fatal("Combined context did not cancel within timeout")
}
🤖 Prompt for AI Agents
In internal/adapter/proxy/sherpa/service_leak_test.go around lines 38 to 40, the
test waits indefinitely on combinedCtx.Done(), risking a hang if the context
never cancels. Modify the code to use a select statement that waits on
combinedCtx.Done() or a timeout channel (e.g., time.After with a reasonable
duration) to ensure the test fails or exits if the context does not cancel in
time.

Comment on lines +39 to +55
func (wp *WorkerPool[T]) PublishAsync(event T) {
// Check if we're shutting down
select {
case <-wp.ctx.Done():
// Worker pool is shutting down, drop the event
return
default:
}

// Try non-blocking send
select {
case wp.eventChan <- event:
// Successfully queued
default:
// Queue is full, drop the event to prevent blocking
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential race condition between PublishAsync and Shutdown

There's a small race window between checking ctx.Done() and sending to eventChan. If Shutdown() is called between these operations, it could close the channel causing a panic.

Consider using a mutex or atomic flag to ensure the channel isn't closed while publishing:

 type WorkerPool[T any] struct {
     ctx       context.Context
     eventChan chan T
     bus       *EventBus[T]
     cancel    context.CancelFunc
     wg        sync.WaitGroup
     workers   int
+    mu        sync.RWMutex
+    closed    atomic.Bool
 }

Then in PublishAsync:

 func (wp *WorkerPool[T]) PublishAsync(event T) {
+    if wp.closed.Load() {
+        return
+    }
+
     select {
     case <-wp.ctx.Done():
         return
     default:
     }
 
     select {
     case wp.eventChan <- event:
     default:
     }
 }

And in Shutdown:

 func (wp *WorkerPool[T]) Shutdown() {
+    if !wp.closed.CompareAndSwap(false, true) {
+        return
+    }
     wp.cancel()
     close(wp.eventChan)
     wp.wg.Wait()
 }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In pkg/eventbus/eventbus_worker.go lines 39 to 55, there is a race condition
where PublishAsync checks wp.ctx.Done() before sending to wp.eventChan, but
Shutdown might close the channel in between, causing a panic. To fix this,
introduce a mutex or atomic flag to synchronize access: protect the sending
operation in PublishAsync by locking or checking the flag to ensure the channel
is not closed, and similarly lock or set the flag in Shutdown before closing the
channel to prevent concurrent sends during closure.

Comment on lines 217 to 226
func (eb *EventBus[T]) unsubscribe(id string) {
if sub, exists := eb.subscribers.LoadAndDelete(id); exists {
if sub, exists := eb.subscribers.Load(id); exists {
// Mark as inactive first to prevent new sends
sub.isActive.Store(false)
// Remove from map so no new operations can find it
eb.subscribers.Delete(id)
// Now safe to close the channel
close(sub.ch)
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical race condition between unsubscribe and Publish

The pipeline detected data races between channel operations. The current implementation has a race window where unsubscribe closes the channel while Publish is sending to it. The double-check of isActive doesn't fully prevent this.

Consider using a more robust synchronisation approach:

 type subscriber[T any] struct {
     ch         chan T
     id         string
     lastActive atomic.Int64
     dropped    atomic.Uint64
     isActive   atomic.Bool
+    mu         sync.RWMutex  // Protects channel operations
 }

In Publish:

-        if sub.isActive.Load() {
+        sub.mu.RLock()
+        if sub.isActive.Load() {
             select {
             case sub.ch <- event:
                 sub.lastActive.Store(now)
                 delivered++
             default:
                 sub.dropped.Add(1)
             }
         }
+        sub.mu.RUnlock()

In unsubscribe:

 func (eb *EventBus[T]) unsubscribe(id string) {
     if sub, exists := eb.subscribers.Load(id); exists {
         sub.isActive.Store(false)
         eb.subscribers.Delete(id)
+        sub.mu.Lock()
         close(sub.ch)
+        sub.mu.Unlock()
     }
 }

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 GitHub Actions: CI

[error] 224-224: Data race detected: concurrent write and read on channel during event bus unsubscribe and publish operations.

🤖 Prompt for AI Agents
In pkg/eventbus/eventbus.go lines 217 to 226, there is a race condition where
unsubscribe closes a subscriber's channel while Publish might still be sending
to it, causing unsafe concurrent channel operations. To fix this, implement
stronger synchronization by using a mutex or other locking mechanism around
channel sends and closure to ensure no sends occur after the channel is closed.
Modify Publish to acquire the lock before sending and check subscriber activity,
and in unsubscribe, acquire the same lock before marking inactive, deleting the
subscriber, and closing the channel to prevent concurrent access.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (3)
internal/adapter/proxy/olla/service.go (3)

201-217: Avoid mutating the caller's configuration object

The function directly modifies the passed configuration object to set defaults. This mutation could surprise callers who might not expect their config to be modified.


360-367: Circuit breaker event publication handled elsewhere

The RecordFailure method correctly updates failure counts and state. Event publication for circuit breaker state changes is properly handled in executeBackendRequest (lines 552-560), so no additional event publication is needed here.


488-496: Verify StripPrefix handles edge cases correctly

The buildTargetURL method uses util.StripPrefix which may have path boundary issues as mentioned in the PR objectives (incorrectly stripping "/olla" from "/ollama").

🧹 Nitpick comments (1)
internal/adapter/proxy/olla/service.go (1)

801-825: Consider removing forced garbage collection

The cleanup implementation is comprehensive with proper resource cleanup and goroutine management. However, the forced GC call on line 822 is generally unnecessary as Go's garbage collector is designed to run automatically when needed.

Consider removing the forced GC:

 	s.BaseProxyComponents.Shutdown()
 
-	// force GC to clean up
-	runtime.GC()
-
 	s.Logger.Debug("Olla proxy service cleaned up")
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e6930ab and c4bf056.

📒 Files selected for processing (7)
  • internal/adapter/proxy/core/common.go (1 hunks)
  • internal/adapter/proxy/core/common_test.go (1 hunks)
  • internal/adapter/proxy/olla/service.go (1 hunks)
  • pkg/eventbus/eventbus.go (6 hunks)
  • pkg/eventbus/eventbus_test.go (5 hunks)
  • pkg/eventbus/eventbus_worker.go (1 hunks)
  • pkg/eventbus/eventbus_worker_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • pkg/eventbus/eventbus_worker.go
  • pkg/eventbus/eventbus_worker_test.go
  • internal/adapter/proxy/core/common.go
  • pkg/eventbus/eventbus.go
  • internal/adapter/proxy/core/common_test.go
🧰 Additional context used
📓 Path-based instructions (2)
**/*_test.go

📄 CodeRabbit Inference Engine (CLAUDE.md)

Unit tests should test individual components in isolation.

Files:

  • pkg/eventbus/eventbus_test.go
internal/{app,adapter}/**/*.go

📄 CodeRabbit Inference Engine (CLAUDE.md)

Endpoints should be exposed at /internal/health and /internal/status.

Files:

  • internal/adapter/proxy/olla/service.go
🧠 Learnings (3)
📓 Common learnings
Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to {proxy_sherpa.go,proxy_olla.go} : Proxy implementations should be in `proxy_sherpa.go` and `proxy_olla.go`.
Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Benchmark tests should measure performance of critical paths, proxy engine comparisons, connection pooling efficiency, and circuit breaker behavior.
Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Shared proxy tests should ensure compatibility between both proxy engines.
pkg/eventbus/eventbus_test.go (1)

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to **/*_test.go : Unit tests should test individual components in isolation.

internal/adapter/proxy/olla/service.go (8)

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to {proxy_sherpa.go,proxy_olla.go} : Proxy implementations should be in proxy_sherpa.go and proxy_olla.go.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.788Z
Learning: Applies to internal/adapter/proxy/*.go : Expose the following response headers: X-Olla-Endpoint, X-Olla-Model, X-Olla-Backend-Type, X-Olla-Request-ID, X-Olla-Response-Time.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Integration tests should test the full request flow through the proxy.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to handler_proxy.go : Request routing logic should be implemented in handler_proxy.go.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Benchmark tests should measure performance of critical paths, proxy engine comparisons, connection pooling efficiency, and circuit breaker behavior.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.788Z
Learning: Applies to internal/{app,adapter}/**/*.go : Endpoints should be exposed at /internal/health and /internal/status.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to internal/adapter/proxy/*_test.go : Shared proxy tests should ensure compatibility between both proxy engines.

Learnt from: CR
PR: thushan/olla#0
File: CLAUDE.md:0-0
Timestamp: 2025-07-27T12:59:29.787Z
Learning: Applies to **/*_test.go : Unit tests should test individual components in isolation.

🔇 Additional comments (12)
pkg/eventbus/eventbus_test.go (6)

98-107: LGTM! Test correctly validates new channel behavior.

The changes properly reflect the architectural shift where subscriber channels remain open after context cancellation to prevent send-on-closed-channel panics. The sleep ensures asynchronous unsubscribe processing completes before verification.


168-188: Improved concurrency testing with explicit goroutine management.

The replacement of errgroup with sync.WaitGroup and explicit subscriber goroutines provides better control over the test's concurrency model. The atomic operations ensure thread-safe event counting, and context cancellation enables graceful shutdown.


214-216: Proper synchronization for async event processing.

The sequence of sleep, context cancellation, and WaitGroup ensures all events are processed and subscriber goroutines terminate cleanly before test verification. This is essential for reliable testing of asynchronous systems.


303-310: Correct testing of shutdown behavior with open channels.

The test properly validates that no events are received after shutdown while acknowledging the architectural decision to leave channels open for garbage collection. This approach prevents send-on-closed-channel panics in the event bus implementation.


377-383: Appropriate testing of cleanup behavior.

The test correctly validates that inactive subscribers don't receive events after cleanup. Using the default case provides an immediate non-blocking check, which is suitable for verifying cleanup effectiveness.


171-171: Minor formatting adjustment - LGTM.

No functional change to the Subscribe call.

internal/adapter/proxy/olla/service.go (6)

1-48: Excellent documentation and clean imports

The comprehensive package documentation clearly explains Olla's purpose, features, and optimisations compared to Sherpa. The imports are well-organised and appropriate for the functionality.


50-191: Well-designed types and sensible defaults

The constants provide appropriate defaults for AI streaming workloads. The Service struct design with embedded BaseProxyComponents promotes code reuse. Object pooling with proper Reset methods will effectively reduce allocations. Configuration getters with fallback defaults ensure robust operation.


271-333: Excellent transport optimisation and connection management

The HTTP transport configuration is well-tuned for AI streaming workloads with appropriate buffer sizes, compression disabled, and HTTP/2 support. Per-endpoint connection pooling prevents cross-endpoint interference. Circuit breaker implementation uses atomic operations correctly for thread safety.


369-447: Robust request handling with proper safeguards

The request flow is well-structured with appropriate panic recovery, circuit breaker integration, and connection tracking. The defer pattern for connection decrement ensures proper cleanup even on failures.


522-567: Excellent backend request execution with comprehensive monitoring

The implementation correctly uses per-endpoint connection pools and provides detailed circuit breaker monitoring. The before/after failure count comparison enables precise logging of state changes. Event publication and user-friendly error handling are well implemented.


569-711: Outstanding streaming implementation with comprehensive edge case handling

The response streaming is exceptionally well-designed with client disconnect detection, backpressure handling, read timeouts, and graceful degradation. The timer reuse pattern avoids allocations in the hot path. Circuit breaker state transition logging provides excellent observability.

@thushan thushan merged commit 72d7c63 into main Jul 31, 2025
2 checks passed
@thushan thushan deleted the feature/proxy-consolidation branch July 31, 2025 11:36
This was referenced Aug 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant