Skip to content

chore: merge websocket-proxy#726

Merged
danyalprout merged 39 commits intomainfrom
danyal/merge-websocket-proxy
Feb 14, 2026
Merged

chore: merge websocket-proxy#726
danyalprout merged 39 commits intomainfrom
danyal/merge-websocket-proxy

Conversation

@danyalprout
Copy link
Copy Markdown
Collaborator

@danyalprout danyalprout commented Feb 13, 2026

Description

Merge the websocket-proxy (last piece of infra) into the mono-repo. Everything is from base/rollup-boost:websocket-proxy-release except for:

f05a190
3b19e34

I'll enable workspace linting and setup the binary in follow up PRs

ferranbt and others added 30 commits May 28, 2025 22:29
* Add a workspace

* go

* Fix tests concurrency

* Update websocket proxy location

* Fix

* Fix

* Update cargo run commands

* Use workspace dependencies

* Fix issues

* update deny

* More deny stuff
* Add authenticated endpoint to websocket proxy

* Mark code as dead
* disconnect client on lag

* add back lagged_connections metric
* Add fb crate + block ext api

* Trying to plug builderExt

* Finish with the trait

* Fix lint

* Commit changes

* Address url feedback

* Add missing env vars

* Address feedback

* Add feedback

* Update
* Fix race condition in FB (check that we return flashblocks for correct endpoint)
use std::mem::replace to avoid double locking

* run make fmt

* Add payload cleaning on FCU

* Add test
ws proxy: brotli compress each message from upstream before sending downstream
* chore: remove integration feature for websocket-proxy

* chore: received message metric per upstream source

* fix: ensure we reconnect when we receive a close

* Reformat

* Add a metric to track # of bytes broadcasted
Split the single `rate_limited_requests` metric into two separate counters:
- `per_ip_rate_limited_requests`: Tracks rejections due to per-IP connection limits
- `global_rate_limited_requests`: Tracks rejections due to global instance limits

This provides better observability into which rate limiting mechanism is being
triggered, enabling more targeted monitoring and alerting.

Changes:
- Add RateLimitType enum to distinguish between limit types
- Update RateLimitError to include limit_type field
- Modify server.rs to increment appropriate metric based on limit type
- Update both InMemoryRateLimit and RedisRateLimit implementations

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Add structured logging to capture the client IP address whenever a per-IP
rate limit is hit. This provides better observability for identifying which
specific IPs are hitting rate limits for security monitoring and debugging.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Separate rate limiting metrics by limit type
This reverts commit 190e68145a818c594cfeca373ca977fc689ff5ef.
let mut conn = self.redis_client.get_connection()?;

let instance_heartbeat_pattern = format!("{}:instance:*:heartbeat", self.key_prefix);
let instance_heartbeats: Vec<String> = conn.keys(instance_heartbeat_pattern)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KEYS is O(N) across the entire keyspace and is explicitly discouraged for production use by Redis docs. This is called on every connection attempt (line 337) and periodically in the cleanup background task. Under load, this will cause latency spikes and can block the Redis server.

Use SCAN for the cleanup path, and for per-IP lookups consider restructuring the key scheme (e.g., a Redis Hash per IP) to avoid pattern scanning on the hot path.

{
let mut compressor =
brotli::CompressorWriter::new(&mut compressed_data_bytes, 4096, 5, 22);
compressor.write_all(data_bytes).unwrap();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.unwrap() inside the brotli compression closure will panic and crash the entire proxy if a single message fails to compress. This closure runs for every upstream message in the broadcast path.

Suggested change
compressor.write_all(data_bytes).unwrap();
compressor.write_all(data_bytes).expect("brotli compression failed for upstream message");

Or better yet, handle the error by logging and skipping the message rather than crashing.

ip_addr_http_header: self.ip_addr_http_header.clone(),
});

let listener = tokio::net::TcpListener::bind(self.listen_addr).await.unwrap();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.unwrap() on TcpListener::bind in non-test server code. If the port is already in use, this will panic with a non-descriptive message. Same for .unwrap() on lines 105 and 110. Consider using expect() with context or propagating the error.

Self { key_to_application: api_keys }
}

pub fn get_application_for_key(&self, api_key: &String) -> Option<&String> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&String parameter should be &str per Rust API guidelines. This forces callers to have a String rather than accepting any string reference.

Suggested change
pub fn get_application_for_key(&self, api_key: &String) -> Option<&String> {
pub fn get_application_for_key(&self, api_key: &str) -> Option<&String> {

Comment on lines +40 to +42
[dev-dependencies]
testcontainers.workspace = true
testcontainers-modules = { workspace = true, features = ["redis"] }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing [lints] workspace = true — required by the project's workspace conventions per CLAUDE.md ("All crate and binary Cargo.toml files must inherit lints from the workspace").

Suggested change
[dev-dependencies]
testcontainers.workspace = true
testcontainers-modules = { workspace = true, features = ["redis"] }
[dev-dependencies]
testcontainers.workspace = true
testcontainers-modules = { workspace = true, features = ["redis"] }
[lints]
workspace = true

Comment on lines +181 to +199
tokio::spawn(async move {
loop {
if let Err(e) = self_clone.update_heartbeat() {
error!(
message = "Failed to update heartbeat in background task",
error = e.to_string()
);
}

if let Err(e) = self_clone.cleanup_stale_instances() {
error!(
message = "Failed to cleanup stale instances in background task",
error = e.to_string()
);
}

tokio::time::sleep(self_clone.heartbeat_interval / 2).await;
}
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This background task runs in an infinite loop with no cancellation mechanism — it will run forever even after the proxy shuts down. The CancellationToken used elsewhere in the codebase should be threaded through here. As-is, calling start_background_tasks leaks a tokio task that can never be stopped.

@danyalprout
Copy link
Copy Markdown
Collaborator Author

These are good follow ups from our bot, I'll do these in follow ups (this is just merging the process from our other repo)

@danyalprout danyalprout marked this pull request as ready for review February 13, 2026 23:46
error = e.to_string()
);
self.metrics.upstream_errors.increment(1);
self.metrics.upstream_connections.decrement(1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upstream_connections.decrement(1) is called for every error from connect_and_listen, but the gauge is only incremented on line 163 when the connection is actually established. If connect_async fails (line 150-158), the method returns Err without ever incrementing the gauge — yet this error path still decrements it, driving the gauge negative.

Move the decrement into connect_and_listen so it only fires when the gauge was previously incremented (i.e., after line 163), or guard it with a flag/check.

}

#[derive(Debug, PartialEq)]
pub enum AuthenticationParseError {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thiserror is already a dependency of this crate but AuthenticationParseError hand-rolls Display and Error impls. Using #[derive(thiserror::Error)] would be consistent with RateLimitError in this same codebase.

Also, NoData() is a tuple variant with zero elements — prefer NoData as a unit variant (no parens).

return false;
}

let json_result: Result<Value, _> = serde_json::from_str(value.unwrap().as_str());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is_err() check followed by .unwrap() is a split-check anti-pattern. If someone later removes or reorders the is_err() guard, the unwrap() will panic. Use match or let Ok(value) = value else { return false }; to make the error handling self-contained.

Suggested change
let json_result: Result<Value, _> = serde_json::from_str(value.unwrap().as_str());
let Ok(value) = String::from_utf8(uncompressed_data) else {
return false;
};
let json_result: Result<Value, _> = serde_json::from_str(value.as_str());

@github-actions
Copy link
Copy Markdown
Contributor

Review Summary

This PR merges the websocket-proxy into the monorepo. The code is generally well-structured with good test coverage. Below are the findings from this review (in addition to inline comments from prior runs that remain valid).

New findings (inline comments posted)

Severity File Issue
Bug subscriber.rs:119 upstream_connections gauge is decremented on every error, but only incremented on successful connection — will go negative on connection failures
Bug rate_limit.rs:396 Redis DECR in release() can go negative (no underflow guard), unlike the in-memory implementation which correctly clamps to zero
Nit auth.rs:14 AuthenticationParseError hand-rolls Display/Error when thiserror is already a dependency; NoData() should be a unit variant NoData
Nit filter.rs:89-94 Split is_err() check + .unwrap() anti-pattern; should use let Ok(value) = ... else { return false }

Previously raised findings (still applicable)

  • rate_limit.rs: Mutex .unwrap() in production code, TOCTOU race in Redis rate limiting, KEYS command on hot path, background task without cancellation
  • main.rs:252: .unwrap() in brotli compression closure (panics on compression failure)
  • server.rs:103-110: .unwrap() on TcpListener::bind and axum::serve in non-test code
  • auth.rs:93: &String parameter should be &str
  • lib.rs: pub mod declarations violate project conventions; missing #![doc = include_str!("../README.md")]
  • Cargo.toml: Missing [lints] workspace = true

Recommendation

The subscriber gauge bug and Redis DECR underflow are correctness issues worth fixing before merge. The other items are improvements that could be addressed in follow-up PRs as mentioned in the PR description.

@danyalprout danyalprout added this pull request to the merge queue Feb 14, 2026
Merged via the queue into main with commit e431c76 Feb 14, 2026
21 checks passed
@danyalprout danyalprout deleted the danyal/merge-websocket-proxy branch February 14, 2026 00:17
danyalprout pushed a commit that referenced this pull request Mar 5, 2026
## Description

Include govenance token in predeploylist

## Metadata

Fixes #726
mw2000 pushed a commit that referenced this pull request Mar 7, 2026
* refactor: make validity e2e tests configurable

* feat: define OutputProposal + WaitForLatestBlockNumber

* test: add proving_test

* fix: rm now redundant _ProveSingleRange test

* chore: reorg adapters.go

* chore: verify L2 block number aligns

* feat: create ValiditySystem wrapper with DatabaseURL()

* feat: define Database client + verifyRange

* test: some tests for block-based splitting

* deps: update tests/optimism

* feat: configurable intervals for l2oo deployment

* ci: more parallel e2e jobs

* test: _RangeIntervalLargerThanSubmission

* feat: enable parallel execution

* make vars automatically exported for fdg-contracts deployment

* fix: consolidate config paths

* doc: comment for per-test isolated system

* deps: point to op-succinct-sysgo rev

* fix: forge fmt

* fix: smt went wrong with last fmt check!
mw2000 pushed a commit that referenced this pull request Mar 9, 2026
* refactor: make validity e2e tests configurable

* feat: define OutputProposal + WaitForLatestBlockNumber

* test: add proving_test

* fix: rm now redundant _ProveSingleRange test

* chore: reorg adapters.go

* chore: verify L2 block number aligns

* feat: create ValiditySystem wrapper with DatabaseURL()

* feat: define Database client + verifyRange

* test: some tests for block-based splitting

* deps: update tests/optimism

* feat: configurable intervals for l2oo deployment

* ci: more parallel e2e jobs

* test: _RangeIntervalLargerThanSubmission

* feat: enable parallel execution

* make vars automatically exported for fdg-contracts deployment

* fix: consolidate config paths

* doc: comment for per-test isolated system

* deps: point to op-succinct-sysgo rev

* fix: forge fmt

* fix: smt went wrong with last fmt check!
mw2000 pushed a commit that referenced this pull request Mar 16, 2026
* refactor: make validity e2e tests configurable

* feat: define OutputProposal + WaitForLatestBlockNumber

* test: add proving_test

* fix: rm now redundant _ProveSingleRange test

* chore: reorg adapters.go

* chore: verify L2 block number aligns

* feat: create ValiditySystem wrapper with DatabaseURL()

* feat: define Database client + verifyRange

* test: some tests for block-based splitting

* deps: update tests/optimism

* feat: configurable intervals for l2oo deployment

* ci: more parallel e2e jobs

* test: _RangeIntervalLargerThanSubmission

* feat: enable parallel execution

* make vars automatically exported for fdg-contracts deployment

* fix: consolidate config paths

* doc: comment for per-test isolated system

* deps: point to op-succinct-sysgo rev

* fix: forge fmt

* fix: smt went wrong with last fmt check!
haardikk21 added a commit that referenced this pull request Mar 17, 2026
* Restructure the repo as a workspace (#233)

* Add a workspace

* go

* Fix tests concurrency

* Update websocket proxy location

* Fix

* Fix

* Update cargo run commands

* Use workspace dependencies

* Fix issues

* update deny

* More deny stuff

* websocket proxy: Add authenticated endpoint settings (#261)

* Add authenticated endpoint to websocket proxy

* Mark code as dead

* websocket proxy: disconnect downstream client if lagging (#269)

* disconnect client on lag

* add back lagged_connections metric

* Flashblocks with extension trait (#270)

* Add fb crate + block ext api

* Trying to plug builderExt

* Finish with the trait

* Fix lint

* Commit changes

* Address url feedback

* Add missing env vars

* Address feedback

* Add feedback

* Update

* brotli compress each message from upstream before sending downstream

* hide compression behind a feature flag

* remove feature flag, enable_compression arg, fix integration test

* fix clippy errors

* Fix FB race (#307)

* Fix race condition in FB (check that we return flashblocks for correct endpoint)
use std::mem::replace to avoid double locking

* run make fmt

* Add payload cleaning on FCU

* Add test

* chore: overall websocket limits should be per instance (#311)

* Initial batch of metrics for flashblocks service (#300)

* chore: minor tweaks to websocket-proxy (#353)

* chore: remove integration feature for websocket-proxy

* chore: received message metric per upstream source

* fix: ensure we reconnect when we receive a close

* Reformat

* Add a metric to track # of bytes broadcasted

* fix: ensure we ping/pong upstream and disconnect when they become unresponsive

* Add some additional logging

* feat: add ws filtering by addresses and topics with configurable match logic

* feat: add client ping/pong health checks

* Cherry pick ping/pong clients

* Fix initial backoff interval

* Separate rate limiting metrics by limit type

Split the single `rate_limited_requests` metric into two separate counters:
- `per_ip_rate_limited_requests`: Tracks rejections due to per-IP connection limits
- `global_rate_limited_requests`: Tracks rejections due to global instance limits

This provides better observability into which rate limiting mechanism is being
triggered, enabling more targeted monitoring and alerting.

Changes:
- Add RateLimitType enum to distinguish between limit types
- Update RateLimitError to include limit_type field
- Modify server.rs to increment appropriate metric based on limit type
- Update both InMemoryRateLimit and RedisRateLimit implementations

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Log IP address when per-IP rate limit is exceeded

Add structured logging to capture the client IP address whenever a per-IP
rate limit is hit. This provides better observability for identifying which
specific IPs are hitting rate limits for security monitoring and debugging.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* add ping metrics

* Refactor websocket proxy to use Vec<u8> instead of String

* Revert "Refactor websocket proxy to use Vec<u8> instead of String"

This reverts commit 190e68145a818c594cfeca373ca977fc689ff5ef.

* add metrics

* fix

* update comment

* update

* send timeout

* configurable timeout

* update

* fix tests

* fixes for merging websocket proxy

* minimal ci fixes

---------

Co-authored-by: Ferran Borreguero <ferran.borreguero@gmail.com>
Co-authored-by: Haardik <hhaardik@uwaterloo.ca>
Co-authored-by: Haardik H <haardik.haardik@coinbase.com>
Co-authored-by: Solar Mithril <mikawamp@gmail.com>
Co-authored-by: shana <avalonche@protonmail.com>
Co-authored-by: Tobi Akerele <tobi.akerele@coinbase.com>
Co-authored-by: Cody Wang <cody.wang@coinbase.com>
Co-authored-by: Claude <noreply@anthropic.com>
haardikk21 pushed a commit that referenced this pull request Mar 17, 2026
* refactor: make validity e2e tests configurable

* feat: define OutputProposal + WaitForLatestBlockNumber

* test: add proving_test

* fix: rm now redundant _ProveSingleRange test

* chore: reorg adapters.go

* chore: verify L2 block number aligns

* feat: create ValiditySystem wrapper with DatabaseURL()

* feat: define Database client + verifyRange

* test: some tests for block-based splitting

* deps: update tests/optimism

* feat: configurable intervals for l2oo deployment

* ci: more parallel e2e jobs

* test: _RangeIntervalLargerThanSubmission

* feat: enable parallel execution

* make vars automatically exported for fdg-contracts deployment

* fix: consolidate config paths

* doc: comment for per-test isolated system

* deps: point to op-succinct-sysgo rev

* fix: forge fmt

* fix: smt went wrong with last fmt check!
mw2000 pushed a commit that referenced this pull request Mar 19, 2026
* refactor: make validity e2e tests configurable

* feat: define OutputProposal + WaitForLatestBlockNumber

* test: add proving_test

* fix: rm now redundant _ProveSingleRange test

* chore: reorg adapters.go

* chore: verify L2 block number aligns

* feat: create ValiditySystem wrapper with DatabaseURL()

* feat: define Database client + verifyRange

* test: some tests for block-based splitting

* deps: update tests/optimism

* feat: configurable intervals for l2oo deployment

* ci: more parallel e2e jobs

* test: _RangeIntervalLargerThanSubmission

* feat: enable parallel execution

* make vars automatically exported for fdg-contracts deployment

* fix: consolidate config paths

* doc: comment for per-test isolated system

* deps: point to op-succinct-sysgo rev

* fix: forge fmt

* fix: smt went wrong with last fmt check!
mw2000 pushed a commit that referenced this pull request Mar 23, 2026
* refactor: make validity e2e tests configurable

* feat: define OutputProposal + WaitForLatestBlockNumber

* test: add proving_test

* fix: rm now redundant _ProveSingleRange test

* chore: reorg adapters.go

* chore: verify L2 block number aligns

* feat: create ValiditySystem wrapper with DatabaseURL()

* feat: define Database client + verifyRange

* test: some tests for block-based splitting

* deps: update tests/optimism

* feat: configurable intervals for l2oo deployment

* ci: more parallel e2e jobs

* test: _RangeIntervalLargerThanSubmission

* feat: enable parallel execution

* make vars automatically exported for fdg-contracts deployment

* fix: consolidate config paths

* doc: comment for per-test isolated system

* deps: point to op-succinct-sysgo rev

* fix: forge fmt

* fix: smt went wrong with last fmt check!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants