Conversation
* Add a workspace * go * Fix tests concurrency * Update websocket proxy location * Fix * Fix * Update cargo run commands * Use workspace dependencies * Fix issues * update deny * More deny stuff
* Add authenticated endpoint to websocket proxy * Mark code as dead
* disconnect client on lag * add back lagged_connections metric
* Add fb crate + block ext api * Trying to plug builderExt * Finish with the trait * Fix lint * Commit changes * Address url feedback * Add missing env vars * Address feedback * Add feedback * Update
* Fix race condition in FB (check that we return flashblocks for correct endpoint) use std::mem::replace to avoid double locking * run make fmt * Add payload cleaning on FCU * Add test
ws proxy: brotli compress each message from upstream before sending downstream
* chore: remove integration feature for websocket-proxy * chore: received message metric per upstream source * fix: ensure we reconnect when we receive a close * Reformat * Add a metric to track # of bytes broadcasted
Split the single `rate_limited_requests` metric into two separate counters: - `per_ip_rate_limited_requests`: Tracks rejections due to per-IP connection limits - `global_rate_limited_requests`: Tracks rejections due to global instance limits This provides better observability into which rate limiting mechanism is being triggered, enabling more targeted monitoring and alerting. Changes: - Add RateLimitType enum to distinguish between limit types - Update RateLimitError to include limit_type field - Modify server.rs to increment appropriate metric based on limit type - Update both InMemoryRateLimit and RedisRateLimit implementations 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Add structured logging to capture the client IP address whenever a per-IP rate limit is hit. This provides better observability for identifying which specific IPs are hitting rate limits for security monitoring and debugging. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
Separate rate limiting metrics by limit type
Add ping metrics
This reverts commit 190e68145a818c594cfeca373ca977fc689ff5ef.
| let mut conn = self.redis_client.get_connection()?; | ||
|
|
||
| let instance_heartbeat_pattern = format!("{}:instance:*:heartbeat", self.key_prefix); | ||
| let instance_heartbeats: Vec<String> = conn.keys(instance_heartbeat_pattern)?; |
There was a problem hiding this comment.
KEYS is O(N) across the entire keyspace and is explicitly discouraged for production use by Redis docs. This is called on every connection attempt (line 337) and periodically in the cleanup background task. Under load, this will cause latency spikes and can block the Redis server.
Use SCAN for the cleanup path, and for per-IP lookups consider restructuring the key scheme (e.g., a Redis Hash per IP) to avoid pattern scanning on the hot path.
| { | ||
| let mut compressor = | ||
| brotli::CompressorWriter::new(&mut compressed_data_bytes, 4096, 5, 22); | ||
| compressor.write_all(data_bytes).unwrap(); |
There was a problem hiding this comment.
.unwrap() inside the brotli compression closure will panic and crash the entire proxy if a single message fails to compress. This closure runs for every upstream message in the broadcast path.
| compressor.write_all(data_bytes).unwrap(); | |
| compressor.write_all(data_bytes).expect("brotli compression failed for upstream message"); |
Or better yet, handle the error by logging and skipping the message rather than crashing.
| ip_addr_http_header: self.ip_addr_http_header.clone(), | ||
| }); | ||
|
|
||
| let listener = tokio::net::TcpListener::bind(self.listen_addr).await.unwrap(); |
There was a problem hiding this comment.
.unwrap() on TcpListener::bind in non-test server code. If the port is already in use, this will panic with a non-descriptive message. Same for .unwrap() on lines 105 and 110. Consider using expect() with context or propagating the error.
| Self { key_to_application: api_keys } | ||
| } | ||
|
|
||
| pub fn get_application_for_key(&self, api_key: &String) -> Option<&String> { |
There was a problem hiding this comment.
&String parameter should be &str per Rust API guidelines. This forces callers to have a String rather than accepting any string reference.
| pub fn get_application_for_key(&self, api_key: &String) -> Option<&String> { | |
| pub fn get_application_for_key(&self, api_key: &str) -> Option<&String> { |
| [dev-dependencies] | ||
| testcontainers.workspace = true | ||
| testcontainers-modules = { workspace = true, features = ["redis"] } |
There was a problem hiding this comment.
Missing [lints] workspace = true — required by the project's workspace conventions per CLAUDE.md ("All crate and binary Cargo.toml files must inherit lints from the workspace").
| [dev-dependencies] | |
| testcontainers.workspace = true | |
| testcontainers-modules = { workspace = true, features = ["redis"] } | |
| [dev-dependencies] | |
| testcontainers.workspace = true | |
| testcontainers-modules = { workspace = true, features = ["redis"] } | |
| [lints] | |
| workspace = true |
| tokio::spawn(async move { | ||
| loop { | ||
| if let Err(e) = self_clone.update_heartbeat() { | ||
| error!( | ||
| message = "Failed to update heartbeat in background task", | ||
| error = e.to_string() | ||
| ); | ||
| } | ||
|
|
||
| if let Err(e) = self_clone.cleanup_stale_instances() { | ||
| error!( | ||
| message = "Failed to cleanup stale instances in background task", | ||
| error = e.to_string() | ||
| ); | ||
| } | ||
|
|
||
| tokio::time::sleep(self_clone.heartbeat_interval / 2).await; | ||
| } | ||
| }); |
There was a problem hiding this comment.
This background task runs in an infinite loop with no cancellation mechanism — it will run forever even after the proxy shuts down. The CancellationToken used elsewhere in the codebase should be threaded through here. As-is, calling start_background_tasks leaks a tokio task that can never be stopped.
|
These are good follow ups from our bot, I'll do these in follow ups (this is just merging the process from our other repo) |
| error = e.to_string() | ||
| ); | ||
| self.metrics.upstream_errors.increment(1); | ||
| self.metrics.upstream_connections.decrement(1); |
There was a problem hiding this comment.
upstream_connections.decrement(1) is called for every error from connect_and_listen, but the gauge is only incremented on line 163 when the connection is actually established. If connect_async fails (line 150-158), the method returns Err without ever incrementing the gauge — yet this error path still decrements it, driving the gauge negative.
Move the decrement into connect_and_listen so it only fires when the gauge was previously incremented (i.e., after line 163), or guard it with a flag/check.
| } | ||
|
|
||
| #[derive(Debug, PartialEq)] | ||
| pub enum AuthenticationParseError { |
There was a problem hiding this comment.
thiserror is already a dependency of this crate but AuthenticationParseError hand-rolls Display and Error impls. Using #[derive(thiserror::Error)] would be consistent with RateLimitError in this same codebase.
Also, NoData() is a tuple variant with zero elements — prefer NoData as a unit variant (no parens).
| return false; | ||
| } | ||
|
|
||
| let json_result: Result<Value, _> = serde_json::from_str(value.unwrap().as_str()); |
There was a problem hiding this comment.
This is_err() check followed by .unwrap() is a split-check anti-pattern. If someone later removes or reorders the is_err() guard, the unwrap() will panic. Use match or let Ok(value) = value else { return false }; to make the error handling self-contained.
| let json_result: Result<Value, _> = serde_json::from_str(value.unwrap().as_str()); | |
| let Ok(value) = String::from_utf8(uncompressed_data) else { | |
| return false; | |
| }; | |
| let json_result: Result<Value, _> = serde_json::from_str(value.as_str()); |
Review SummaryThis PR merges the websocket-proxy into the monorepo. The code is generally well-structured with good test coverage. Below are the findings from this review (in addition to inline comments from prior runs that remain valid). New findings (inline comments posted)
Previously raised findings (still applicable)
RecommendationThe subscriber gauge bug and Redis DECR underflow are correctness issues worth fixing before merge. The other items are improvements that could be addressed in follow-up PRs as mentioned in the PR description. |
## Description Include govenance token in predeploylist ## Metadata Fixes #726
* refactor: make validity e2e tests configurable * feat: define OutputProposal + WaitForLatestBlockNumber * test: add proving_test * fix: rm now redundant _ProveSingleRange test * chore: reorg adapters.go * chore: verify L2 block number aligns * feat: create ValiditySystem wrapper with DatabaseURL() * feat: define Database client + verifyRange * test: some tests for block-based splitting * deps: update tests/optimism * feat: configurable intervals for l2oo deployment * ci: more parallel e2e jobs * test: _RangeIntervalLargerThanSubmission * feat: enable parallel execution * make vars automatically exported for fdg-contracts deployment * fix: consolidate config paths * doc: comment for per-test isolated system * deps: point to op-succinct-sysgo rev * fix: forge fmt * fix: smt went wrong with last fmt check!
* refactor: make validity e2e tests configurable * feat: define OutputProposal + WaitForLatestBlockNumber * test: add proving_test * fix: rm now redundant _ProveSingleRange test * chore: reorg adapters.go * chore: verify L2 block number aligns * feat: create ValiditySystem wrapper with DatabaseURL() * feat: define Database client + verifyRange * test: some tests for block-based splitting * deps: update tests/optimism * feat: configurable intervals for l2oo deployment * ci: more parallel e2e jobs * test: _RangeIntervalLargerThanSubmission * feat: enable parallel execution * make vars automatically exported for fdg-contracts deployment * fix: consolidate config paths * doc: comment for per-test isolated system * deps: point to op-succinct-sysgo rev * fix: forge fmt * fix: smt went wrong with last fmt check!
* refactor: make validity e2e tests configurable * feat: define OutputProposal + WaitForLatestBlockNumber * test: add proving_test * fix: rm now redundant _ProveSingleRange test * chore: reorg adapters.go * chore: verify L2 block number aligns * feat: create ValiditySystem wrapper with DatabaseURL() * feat: define Database client + verifyRange * test: some tests for block-based splitting * deps: update tests/optimism * feat: configurable intervals for l2oo deployment * ci: more parallel e2e jobs * test: _RangeIntervalLargerThanSubmission * feat: enable parallel execution * make vars automatically exported for fdg-contracts deployment * fix: consolidate config paths * doc: comment for per-test isolated system * deps: point to op-succinct-sysgo rev * fix: forge fmt * fix: smt went wrong with last fmt check!
* Restructure the repo as a workspace (#233) * Add a workspace * go * Fix tests concurrency * Update websocket proxy location * Fix * Fix * Update cargo run commands * Use workspace dependencies * Fix issues * update deny * More deny stuff * websocket proxy: Add authenticated endpoint settings (#261) * Add authenticated endpoint to websocket proxy * Mark code as dead * websocket proxy: disconnect downstream client if lagging (#269) * disconnect client on lag * add back lagged_connections metric * Flashblocks with extension trait (#270) * Add fb crate + block ext api * Trying to plug builderExt * Finish with the trait * Fix lint * Commit changes * Address url feedback * Add missing env vars * Address feedback * Add feedback * Update * brotli compress each message from upstream before sending downstream * hide compression behind a feature flag * remove feature flag, enable_compression arg, fix integration test * fix clippy errors * Fix FB race (#307) * Fix race condition in FB (check that we return flashblocks for correct endpoint) use std::mem::replace to avoid double locking * run make fmt * Add payload cleaning on FCU * Add test * chore: overall websocket limits should be per instance (#311) * Initial batch of metrics for flashblocks service (#300) * chore: minor tweaks to websocket-proxy (#353) * chore: remove integration feature for websocket-proxy * chore: received message metric per upstream source * fix: ensure we reconnect when we receive a close * Reformat * Add a metric to track # of bytes broadcasted * fix: ensure we ping/pong upstream and disconnect when they become unresponsive * Add some additional logging * feat: add ws filtering by addresses and topics with configurable match logic * feat: add client ping/pong health checks * Cherry pick ping/pong clients * Fix initial backoff interval * Separate rate limiting metrics by limit type Split the single `rate_limited_requests` metric into two separate counters: - `per_ip_rate_limited_requests`: Tracks rejections due to per-IP connection limits - `global_rate_limited_requests`: Tracks rejections due to global instance limits This provides better observability into which rate limiting mechanism is being triggered, enabling more targeted monitoring and alerting. Changes: - Add RateLimitType enum to distinguish between limit types - Update RateLimitError to include limit_type field - Modify server.rs to increment appropriate metric based on limit type - Update both InMemoryRateLimit and RedisRateLimit implementations 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Log IP address when per-IP rate limit is exceeded Add structured logging to capture the client IP address whenever a per-IP rate limit is hit. This provides better observability for identifying which specific IPs are hitting rate limits for security monitoring and debugging. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * add ping metrics * Refactor websocket proxy to use Vec<u8> instead of String * Revert "Refactor websocket proxy to use Vec<u8> instead of String" This reverts commit 190e68145a818c594cfeca373ca977fc689ff5ef. * add metrics * fix * update comment * update * send timeout * configurable timeout * update * fix tests * fixes for merging websocket proxy * minimal ci fixes --------- Co-authored-by: Ferran Borreguero <ferran.borreguero@gmail.com> Co-authored-by: Haardik <hhaardik@uwaterloo.ca> Co-authored-by: Haardik H <haardik.haardik@coinbase.com> Co-authored-by: Solar Mithril <mikawamp@gmail.com> Co-authored-by: shana <avalonche@protonmail.com> Co-authored-by: Tobi Akerele <tobi.akerele@coinbase.com> Co-authored-by: Cody Wang <cody.wang@coinbase.com> Co-authored-by: Claude <noreply@anthropic.com>
* refactor: make validity e2e tests configurable * feat: define OutputProposal + WaitForLatestBlockNumber * test: add proving_test * fix: rm now redundant _ProveSingleRange test * chore: reorg adapters.go * chore: verify L2 block number aligns * feat: create ValiditySystem wrapper with DatabaseURL() * feat: define Database client + verifyRange * test: some tests for block-based splitting * deps: update tests/optimism * feat: configurable intervals for l2oo deployment * ci: more parallel e2e jobs * test: _RangeIntervalLargerThanSubmission * feat: enable parallel execution * make vars automatically exported for fdg-contracts deployment * fix: consolidate config paths * doc: comment for per-test isolated system * deps: point to op-succinct-sysgo rev * fix: forge fmt * fix: smt went wrong with last fmt check!
* refactor: make validity e2e tests configurable * feat: define OutputProposal + WaitForLatestBlockNumber * test: add proving_test * fix: rm now redundant _ProveSingleRange test * chore: reorg adapters.go * chore: verify L2 block number aligns * feat: create ValiditySystem wrapper with DatabaseURL() * feat: define Database client + verifyRange * test: some tests for block-based splitting * deps: update tests/optimism * feat: configurable intervals for l2oo deployment * ci: more parallel e2e jobs * test: _RangeIntervalLargerThanSubmission * feat: enable parallel execution * make vars automatically exported for fdg-contracts deployment * fix: consolidate config paths * doc: comment for per-test isolated system * deps: point to op-succinct-sysgo rev * fix: forge fmt * fix: smt went wrong with last fmt check!
* refactor: make validity e2e tests configurable * feat: define OutputProposal + WaitForLatestBlockNumber * test: add proving_test * fix: rm now redundant _ProveSingleRange test * chore: reorg adapters.go * chore: verify L2 block number aligns * feat: create ValiditySystem wrapper with DatabaseURL() * feat: define Database client + verifyRange * test: some tests for block-based splitting * deps: update tests/optimism * feat: configurable intervals for l2oo deployment * ci: more parallel e2e jobs * test: _RangeIntervalLargerThanSubmission * feat: enable parallel execution * make vars automatically exported for fdg-contracts deployment * fix: consolidate config paths * doc: comment for per-test isolated system * deps: point to op-succinct-sysgo rev * fix: forge fmt * fix: smt went wrong with last fmt check!
Description
Merge the websocket-proxy (last piece of infra) into the mono-repo. Everything is from base/rollup-boost:websocket-proxy-release except for:
f05a190
3b19e34
I'll enable workspace linting and setup the binary in follow up PRs