Skip to content

refactor(metrics): remove unused ReportsAddressFlag / Reporter flow#2286

Merged
nimrod-teich merged 1 commit into
refactor/usage-pipelinefrom
refactor/remove-reports-flow
May 11, 2026
Merged

refactor(metrics): remove unused ReportsAddressFlag / Reporter flow#2286
nimrod-teich merged 1 commit into
refactor/usage-pipelinefrom
refactor/remove-reports-flow

Conversation

@nimrod-teich

Copy link
Copy Markdown
Contributor

Summary

Stacked on top of #2285 — depends on the OTel relay-usage emitter PR.

The --reports-be-address pipeline was wired through every layer of the consumer but never enabled in production. Default flag value "" made NewConsumerReportsClient return nil, and every AppendReport / AppendConflict call short-circuited via nil-receiver guards. The interface, types, and parameter plumbing remained as dead weight. This PR rips it all out.

What this removes

Files

  • protocol/metrics/consumer_reports_client.go
  • protocol/metrics/consumer_reports_client_test.go

Types & API

  • metrics.Reporter interface
  • metrics.ReportsRequest, metrics.ConflictRequest, metrics.ConflictContainer
  • metrics.NewReportsRequest, metrics.NewConflictRequest constructors
  • metrics.ConsumerReportsClient and metrics.NewConsumerReportsClient

Flag & config

  • --reports-be-address flag (rpcconsumer + rpcsmartrouter)
  • AnalyticsServerAddresses.ReportsAddressFlag field (both)
  • The reportsSendBEAddress const

Parameters threaded through dead plumbing

  • RPCConsumerServer.reporter field + ServeRPCRequests(... reporter ...) parameter
  • lavasession.NewConsumerSessionManager(... reporter ...) parameter
  • lavasession.NewReportedProviders(reporter, chainId) — both arguments gone
  • ReportedProviders.reporter and ReportedProviders.chainId fields
  • ReportedProviders.AppendReport method
  • errorsForReport []error parameter on blockProvider, ReportProvider, reportProviderLocked (only ever populated to construct the now-dropped report; the local copy in OnSessionFailure is removed too — no remaining readers)

Why this is safe

  • Runtime equivalence: with the default empty flag, NewConsumerReportsClient returned nil, and every AppendReport was a guarded no-op. Removing the call sites is observably identical to the current production deployment.
  • No external references: the Reporter interface and report types are not exported beyond protocol/metrics for any external consumer.

What changed in tests

  • protocol/lavasession/reported_providers_test.go — updated for the new NewReportedProviders() and ReportProvider(addr, errors, disconnections, cb) signatures.
  • protocol/lavasession/consumer_session_manager_test.go, protocol/rpcconsumer/rpcconsumer_server_test.go, protocol/rpcsmartrouter/rpcsmartrouter_test.go, protocol/integration/protocol_test.go, protocol/chainlib/consumer_ws_subscription_manager_test.go — drop the now-removed nil reporter args and nil-trailing call params from NewConsumerSessionManager / ServeRPCRequests / blockProvider invocations.

Test plan

  • go build ./... clean
  • go test ./protocol/lavasession/ -count=1 passes
  • go test ./protocol/metrics/ -count=1 passes
  • go test ./protocol/rpcconsumer/ -count=1 passes
  • go test ./protocol/rpcsmartrouter/ -count=1 passes
  • golangci-lint run --config .golangci.yml ./protocol/... clean

Stacking note

Set the base branch to refactor/usage-pipeline (the OTel PR). Once that merges to main, retarget this PR's base to main.

@qodo-code-review

Copy link
Copy Markdown

Review Summary by Qodo

Remove unused ReportsAddressFlag and Reporter flow from metrics pipeline

🐞 Bug fix ✨ Enhancement

Grey Divider

Walkthroughs

Description
• Remove unused --reports-be-address flag and entire reporter pipeline
• Delete ConsumerReportsClient, Reporter interface, and related types
• Simplify ReportedProviders by removing reporter and chainId fields
• Remove errorsForReport parameter from provider blocking flow
• Update all call sites across rpcconsumer, rpcsmartrouter, and tests
Diagram
flowchart LR
  A["Consumer Reports Client"] -->|removed| B["Reporter Interface"]
  B -->|removed| C["ReportsRequest Types"]
  D["blockProvider calls"] -->|simplify| E["Remove errorsForReport param"]
  F["NewConsumerSessionManager"] -->|remove reporter param| G["NewReportedProviders"]
  G -->|simplify| H["ReportedProviders"]
  I["--reports-be-address flag"] -->|removed| J["AnalyticsServerAddresses"]
Loading

Grey Divider

File Changes

1. protocol/metrics/consumer_reports_client.go 🐞 Bug fix +0/-114

Delete entire ConsumerReportsClient implementation file

protocol/metrics/consumer_reports_client.go


2. protocol/metrics/consumer_reports_client_test.go 🧪 Tests +0/-102

Delete ConsumerReportsClient test file

protocol/metrics/consumer_reports_client_test.go


3. protocol/lavasession/reported_providers.go ✨ Enhancement +6/-22

Remove reporter and chainId fields, simplify API

protocol/lavasession/reported_providers.go


View more (10)
4. protocol/lavasession/reported_providers_test.go 🧪 Tests +13/-13

Update tests for simplified ReportedProviders constructor

protocol/lavasession/reported_providers_test.go


5. protocol/lavasession/consumer_session_manager.go ✨ Enhancement +9/-12

Remove reporter parameter and errorsForReport plumbing

protocol/lavasession/consumer_session_manager.go


6. protocol/lavasession/consumer_session_manager_test.go 🧪 Tests +8/-8

Update test calls to remove reporter and error parameters

protocol/lavasession/consumer_session_manager_test.go


7. protocol/rpcconsumer/rpcconsumer.go ✨ Enhancement +3/-9

Remove ReportsAddressFlag and reporter initialization

protocol/rpcconsumer/rpcconsumer.go


8. protocol/rpcconsumer/rpcconsumer_server.go ✨ Enhancement +0/-3

Remove reporter field and ServeRPCRequests parameter

protocol/rpcconsumer/rpcconsumer_server.go


9. protocol/rpcconsumer/rpcconsumer_server_test.go 🧪 Tests +2/-2

Update test to remove reporter parameter

protocol/rpcconsumer/rpcconsumer_server_test.go


10. protocol/rpcsmartrouter/rpcsmartrouter.go ✨ Enhancement +2/-8

Remove ReportsAddressFlag and reporter initialization

protocol/rpcsmartrouter/rpcsmartrouter.go


11. protocol/rpcsmartrouter/rpcsmartrouter_test.go 🧪 Tests +5/-5

Update test calls to remove reporter parameter

protocol/rpcsmartrouter/rpcsmartrouter_test.go


12. protocol/chainlib/consumer_ws_subscription_manager_test.go 🧪 Tests +1/-1

Update test to remove reporter parameter

protocol/chainlib/consumer_ws_subscription_manager_test.go


13. protocol/integration/protocol_test.go 🧪 Tests +2/-2

Update integration tests to remove reporter parameters

protocol/integration/protocol_test.go


Grey Divider

Qodo Logo

@qodo-code-review

qodo-code-review Bot commented May 4, 2026

Copy link
Copy Markdown

Code Review by Qodo

🐞 Bugs (1) 📘 Rule violations (0)

Grey Divider


Remediation recommended

1. Stale reports config example 🐞 Bug ⚙ Maintainability
Description
The --reports-be-address flag/config was removed, but the consumer example YAML still mentions
reports-be-address, which will now be silently ignored and can mislead operators during
configuration.
Code

protocol/rpcconsumer/rpcconsumer.go[908]

-	cmdRPCConsumer.Flags().String(reportsSendBEAddress, "", "address to send reports to")
Evidence
The PR removes the reports address plumbing from RPCConsumer (no ReportsAddressFlag field
anymore), but the shipped example config still documents the old key, creating documentation/config
drift.

protocol/rpcconsumer/rpcconsumer.go[103-116]
config/consumer_examples/full_consumer_example.yml[158-159]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

### Issue description
The example consumer config still documents `reports-be-address`, but the PR removes the corresponding flag/config plumbing. This creates documentation drift and can confuse users because the key will be ignored.

### Issue Context
The reports pipeline (`ConsumerReportsClient` / `Reporter`) and the `--reports-be-address` flag were removed in this PR.

### Fix Focus Areas
- config/consumer_examples/full_consumer_example.yml[158-159]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

Qodo Logo

@nimrod-teich nimrod-teich force-pushed the refactor/usage-pipeline branch 2 times, most recently from dc541f7 to e283c21 Compare May 6, 2026 08:46
Stacked on top of refactor(metrics): OTel-based relay-usage emitter.

The reports-backend pipeline was wired through but never used in production:
the --reports-be-address flag defaulted to "" and NewConsumerReportsClient
returned nil for that case, making every AppendReport / AppendConflict
call a no-op. The plumbing still threaded a metrics.Reporter through
RPCConsumerServer, ConsumerSessionManager, and ReportedProviders, plus
a useless errorsForReport []error parameter on blockProvider /
ReportProvider that existed solely to populate the dropped report
payload. Strip it all out.

Removed
- protocol/metrics/consumer_reports_client.go and its test
- types: Reporter, ReportsRequest, ConflictRequest,
  NewReportsRequest, NewConflictRequest, ConsumerReportsClient
- AnalyticsServerAddresses.ReportsAddressFlag (rpcconsumer + rpcsmartrouter)
- the --reports-be-address flag (rpcconsumer + rpcsmartrouter)
- NewConsumerReportsClient construction sites
- RPCConsumerServer.reporter field, the matching ServeRPCRequests
  parameter, and the dead pass-through assignment
- ConsumerSessionManager.NewConsumerSessionManager reporter parameter
- ReportedProviders.reporter and ReportedProviders.chainId fields
- ReportedProviders.AppendReport method
- errorsForReport []error parameter on
  ConsumerSessionManager.blockProvider, ReportProvider,
  reportProviderLocked (the value was used only for the dropped
  report payload — the local copy in OnSessionFailure is also
  removed since it had no other readers)

Tests
- protocol/lavasession/reported_providers_test.go updated for the new
  signatures.
- All in-tree callers of NewConsumerSessionManager,
  ServeRPCRequests, and the reported-providers helpers updated to
  drop the now-removed parameters (rpcconsumer, rpcsmartrouter,
  chainlib/consumer_ws_subscription_manager_test, integration/protocol_test).

No production behavior change — the entire path was already a runtime
no-op given the empty default flag.
@nimrod-teich nimrod-teich force-pushed the refactor/remove-reports-flow branch from f1bc721 to 3645f17 Compare May 11, 2026 09:23
@nimrod-teich nimrod-teich merged commit 058bbf0 into refactor/usage-pipeline May 11, 2026
3 checks passed
@nimrod-teich nimrod-teich deleted the refactor/remove-reports-flow branch May 11, 2026 09:25
nimrod-teich added a commit that referenced this pull request May 11, 2026
…2286)

Stacked on top of refactor(metrics): OTel-based relay-usage emitter.

The reports-backend pipeline was wired through but never used in production:
the --reports-be-address flag defaulted to "" and NewConsumerReportsClient
returned nil for that case, making every AppendReport / AppendConflict
call a no-op. The plumbing still threaded a metrics.Reporter through
RPCConsumerServer, ConsumerSessionManager, and ReportedProviders, plus
a useless errorsForReport []error parameter on blockProvider /
ReportProvider that existed solely to populate the dropped report
payload. Strip it all out.

Removed
- protocol/metrics/consumer_reports_client.go and its test
- types: Reporter, ReportsRequest, ConflictRequest,
  NewReportsRequest, NewConflictRequest, ConsumerReportsClient
- AnalyticsServerAddresses.ReportsAddressFlag (rpcconsumer + rpcsmartrouter)
- the --reports-be-address flag (rpcconsumer + rpcsmartrouter)
- NewConsumerReportsClient construction sites
- RPCConsumerServer.reporter field, the matching ServeRPCRequests
  parameter, and the dead pass-through assignment
- ConsumerSessionManager.NewConsumerSessionManager reporter parameter
- ReportedProviders.reporter and ReportedProviders.chainId fields
- ReportedProviders.AppendReport method
- errorsForReport []error parameter on
  ConsumerSessionManager.blockProvider, ReportProvider,
  reportProviderLocked (the value was used only for the dropped
  report payload — the local copy in OnSessionFailure is also
  removed since it had no other readers)

Tests
- protocol/lavasession/reported_providers_test.go updated for the new
  signatures.
- All in-tree callers of NewConsumerSessionManager,
  ServeRPCRequests, and the reported-providers helpers updated to
  drop the now-removed parameters (rpcconsumer, rpcsmartrouter,
  chainlib/consumer_ws_subscription_manager_test, integration/protocol_test).

No production behavior change — the entire path was already a runtime
no-op given the empty default flag.
nimrod-teich added a commit that referenced this pull request May 20, 2026
…2286)

Stacked on top of refactor(metrics): OTel-based relay-usage emitter.

The reports-backend pipeline was wired through but never used in production:
the --reports-be-address flag defaulted to "" and NewConsumerReportsClient
returned nil for that case, making every AppendReport / AppendConflict
call a no-op. The plumbing still threaded a metrics.Reporter through
RPCConsumerServer, ConsumerSessionManager, and ReportedProviders, plus
a useless errorsForReport []error parameter on blockProvider /
ReportProvider that existed solely to populate the dropped report
payload. Strip it all out.

Removed
- protocol/metrics/consumer_reports_client.go and its test
- types: Reporter, ReportsRequest, ConflictRequest,
  NewReportsRequest, NewConflictRequest, ConsumerReportsClient
- AnalyticsServerAddresses.ReportsAddressFlag (rpcconsumer + rpcsmartrouter)
- the --reports-be-address flag (rpcconsumer + rpcsmartrouter)
- NewConsumerReportsClient construction sites
- RPCConsumerServer.reporter field, the matching ServeRPCRequests
  parameter, and the dead pass-through assignment
- ConsumerSessionManager.NewConsumerSessionManager reporter parameter
- ReportedProviders.reporter and ReportedProviders.chainId fields
- ReportedProviders.AppendReport method
- errorsForReport []error parameter on
  ConsumerSessionManager.blockProvider, ReportProvider,
  reportProviderLocked (the value was used only for the dropped
  report payload — the local copy in OnSessionFailure is also
  removed since it had no other readers)

Tests
- protocol/lavasession/reported_providers_test.go updated for the new
  signatures.
- All in-tree callers of NewConsumerSessionManager,
  ServeRPCRequests, and the reported-providers helpers updated to
  drop the now-removed parameters (rpcconsumer, rpcsmartrouter,
  chainlib/consumer_ws_subscription_manager_test, integration/protocol_test).

No production behavior change — the entire path was already a runtime
no-op given the empty default flag.
nimrod-teich added a commit that referenced this pull request May 20, 2026
…oval (#2285)

* refactor(metrics): OTel-based relay-usage emitter (replaces Kafka client)

Replaces the custom Kafka producer with an OpenTelemetry log-based usage
emitter. Each rpcconsumer/rpcsmartrouter process emits one OTLP log record
per relay to the local OTel collector; the collector handles fan-out to
whatever backends the operator configures (Kafka, S3, ClickHouse, ...) via
exporter YAML — no code change to swap destinations.

Off by default. With --usage-otel-enabled=false (the default) the relay
path pays exactly one virtual call per relay (NoopUsageSink.Emit, which is
empty). No SDK setup, no goroutines, no batching, no resource cost.

Wire format / API
- New UsageEventSink interface (Emit/Stats/Close); implementations must be
  non-blocking on the relay path. Full buffers must drop, never block.
- NoopUsageSink: zero-cost default.
- OTelUsageSink: OTLP/HTTP exporter, BatchProcessor with configurable
  queue/batch/flush/timeout. Resource attributes: service.name +
  service.instance.id (default hostname-pid; overridable for fleets that
  want a chain or role suffix). Per-relay attributes carry project, chain,
  api_interface, api_method, cu, latency, success/cache/write/archive
  flags, hedge_count, provider, origin.

Removals (no back-compat — the Kafka pipeline was unused)
- consumer_kafka_client.go and its test
- All --relay-kafka-* flags
- The Kafka-specific fields on AnalyticsServerAddresses

New flags
- --usage-otel-enabled (default false) — master switch
- --usage-otel-endpoint (default "" → SDK reads OTEL_EXPORTER_OTLP_ENDPOINT
  or localhost:4317)
- --usage-otel-insecure (default true; expected target is a sidecar)
- --usage-otel-queue-size, --usage-otel-batch-size,
  --usage-otel-flush-interval, --usage-otel-export-timeout
- --usage-otel-service-name (default lava-rpcconsumer / lava-rpcsmartrouter)
- --usage-otel-service-instance-id (default hostname-pid)

Why service.instance.id = hostname-pid
- Multiple consumer processes per bare-metal host (one per chain) sharing
  one local collector is a supported deployment shape. hostname-pid makes
  each process individually identifiable downstream without conflating
  process identity with billing dimensions.
- Chain stays as a per-event attribute (chain="..."), so aggregation by
  chain × project × method is GROUP BY at query time.

Dependency / collateral
- Bumps OTel from v1.21 to v1.30 (logs API stable). Indirectly bumps grpc
  from 1.62 to 1.66; replaces the deprecated grpc.DialContext+WithBlock
  in upstream_grpc_pool.go with grpc.NewClient + Connect/WaitForStateChange
  to preserve the original "fast-fail on unreachable target" semantics.

Tests
- usage_sink_test.go: NoopUsageSink zero-cost, RelayUsageEvent fidelity.
- otel_usage_sink_test.go: defaults application, hostname fallback path,
  nil-safety on every method, behavior under unreachable endpoint
  (Emit doesn't panic or block; SDK queues; counters advance).

* refactor(metrics): route optimizer-QoS reports through the OTel usage sink

Same shape as the relay-usage emitter (off by default, non-blocking,
no aggregation in the producer). The QoS client becomes a pure
periodic sampler over the registered optimizers; downstream computes
rates / averages by GROUP BY at query time.

Producer-side changes
- ConsumerOptimizerQoSClient is now an OTel pass-through:
  - Drops every per-(chain, provider) counter and running sum:
    chainIdToProviderToRelaysCount, chainIdToProviderToNodeErrorsCount,
    chainIdToProviderToSelectionCount, chainIdToProviderToQoSScoreSum,
    chainIdToProviderToLastRNGValue, chainIdToTotalSelections.
  - Drops the calculate* helpers (NodeErrorRate, SelectionRate,
    AverageSelectionQoSScore, getLastRNGValue, getProviderChainSelectionCount,
    getProviderChainRelaysCount, getProviderChainNodeErrorsCount,
    getProviderChainMapCounterValue).
  - Drops the Set* mutator methods (SetRelaySentToProvider,
    SetNodeErrorToProvider, SetProviderSelected) and their dead
    callers in consumer_metrics_manager and rpcconsumer_logs.
  - UpdatePairingListStake stays as a side channel — it learns the
    per-chain provider-address SET, the current epoch, and the
    current pairing-list stake per provider (single-level map, no
    per-epoch history).
  - On each sampling tick, sampleAndEmit asks every optimizer for its
    current scores (via the existing CalculateQoSScoresForMetrics) and
    forwards one OTel log record per (chain, provider) to the usage
    sink. The latest batch is cached for the optional Prometheus-style
    scrape endpoint enabled by --optimizer-qos-listen.

Wire format (OptimizerQoSReportToSend)
- Drops the derived/aggregated fields: NodeErrorRate, SelectionRate,
  SelectionQoSScore, SelectionRNGValue, SelectionCount.
- Drops the legacy raw EWMA fields: sync_score, availability_score,
  latency_score, generic_score. They were the optimizer's internal
  score-store state; the WRS-normalized scores (selection_*) are what
  actually drive provider selection — that's the signal downstream
  analytics want. The raw EWMA is one indirection too many for queryers
  and bloated every emitted record by 4 floats.
- Adds provider_stake (json: provider_stake) populated from the
  per-(chain, provider) snapshot UpdatePairingListStake builds. Each
  optimizer-QoS sample carries the current pairing-list stake for the
  provider that report describes.
- Keeps envelope (timestamp, chain, provider, consumer hostname/pub
  address, geo, epoch, entry index) plus the WRS normalized scores
  (selection_*) and weighted contributions (*_contribution).

Sink interface
- UsageEventSink gains EmitOptimizerQoS(OptimizerQoSReportToSend).
- NoopUsageSink is a no-op for both event types.
- OTelUsageSink emits OTLP log records with body="optimizer_qos" and
  the QoS attributes flat at the top level (matches the Athena
  JsonSerDe table layout — separate bucket per event type, no `event`
  discriminator column).

Removed (no back-compat — pipeline switches to OTel)
- --optimizer-qos-server-address flag and OptimizerQosServerAddressFlag
- --optimizer-qos-push-interval flag and OptimizerQosServerPushInterval
  (push cadence is now controlled by the OTel BatchProcessor)
- AnalyticsServerAddresses.OptimizerQoSAddress on rpcconsumer and
  rpcsmartrouter
- The endpointAddress / interval parameters from
  NewConsumerOptimizerQoSClient (now takes the UsageEventSink directly)
- handleOptimizerQoS HTTP handler (was the receive end of the now-
  removed --optimizer-qos-server-address push; never wired into a
  route, flagged dead by the linter).

OptimizerQoSReport (the optimizer's output struct) is unchanged — its
SyncScore/AvailabilityScore/LatencyScore/GenericScore fields stay
populated but are simply not propagated to the wire envelope.

Construction trigger
- The QoS client is now created when --optimizer-qos-listen=true
  (Prometheus scrape endpoint) OR --usage-otel-enabled=true (OTel
  emission), in either order. The two are independent toggles.

Tests
- usage_sink_test.go: NoopUsageSink.EmitOptimizerQoS zero-cost path.
- otel_usage_sink_test.go: nil-safety + non-blocking emission for the
  QoS path under an unreachable endpoint, counters advance for both
  Emit and EmitOptimizerQoS.
- optimizer_selection_score_test.go: updated to the new constructor
  signature.

Lint clean (gofmt, gofumpt, ineffassign, staticcheck), full test suite
green for metrics + rpcconsumer + rpcsmartrouter + lavasession +
integration.

* docs: drop stale --relay-kafka-* and --optimizer-qos-server-* references

The two removed pipelines (Kafka relay analytics, QoS push to
delta.lavanet.xyz) still had flag examples in the rpcconsumer /
rpcsmartrouter READMEs and a "Kafka" mention in the analytics.go doc
comment. Replaced with the OTel-based replacement flags
(--usage-otel-enabled and friends) so the docs match the binary.

Pure docs / comment change — no code touched.

* refactor(chainlib): rename dapp-id wire key to project-id

The HTTP header / gRPC metadata key the consumer reads to identify
the requesting project was named dapp-id — leftover terminology from
when projects were called "dapps". The downstream wire format and
analytics already use "project" (RelayMetrics.ProjectHash, the
RelayUsageEvent json:"project" field, the Athena `project` column),
so the wire-level header name was the only place still saying
dapp-id. Rename everywhere we read it.

- HTTP: c.Get("dapp-id") -> c.Get("project-id")
  (protocol/chainlib/common.go: extractDappIDFromFiberContext)
- gRPC: metadataValues["dapp-id"] -> metadataValues["project-id"]
  (extractDappIDFromGrpcHeader)
- Fiber Locals key (passes the value from the HTTP upgrade context
  into the websocket handler) renamed for consistency:
  c.Locals("dapp-id", ...) -> c.Locals("project-id", ...)
- Tests updated.

This is a wire-level breaking change: clients and any reverse proxy
in front of the consumer must now send "project-id" instead of
"dapp-id". The bare-metal nginx gateway already extracts the project
hash from the URL path / subdomain but currently doesn't forward it
as a header at all (HTTP) or forwards it as the wrong header
(X-LAVA-ProjectHash on gRPC) — see Magma-Devs/ansible-internal for
the matching nginx config update.

Internal Go names (extractDappIDFromFiberContext, generateNewDappID,
RelayMetrics.ProjectHash, etc.) kept as-is to limit scope. The
default fallback string ("DefaultDappID") is a downstream-visible
constant and stays unchanged.

* fix(chainlib): clone project-id at extraction to avoid fasthttp buffer aliasing

fiber.Ctx.Get returns a string aliased to fasthttp's per-request header
buffer (zero-copy via unsafe). RelayMetrics.ProjectHash is enqueued
asynchronously into the OTel BatchLogProcessor and serialized only when
the batch flushes — by then the request has returned, fasthttp has
recycled the underlying buffer, and a subsequent request on the same
worker may have written shorter strings (Accept-Encoding, client IPs,
etc.) into the same backing array. The result is a partially-overwritten
project hash: short prefix from the new write, suffix bytes from the
original 32-char hash that the new write didn't reach.

Observed in testnet S3 output as a buffet of corrupted project values
all exactly 32 chars long, with the user's real hash as suffix:

  e6eaddb0d8063a696a1a37e716f09374    (real)
  gzipddb0d8063a696a1a37e716f09374    ("gzip" + tail[4..])
  172.68.40.693a696a1a37e716f09374    (CF edge IP + tail[12..])
  188.40.110.49a696a1a37e716f09374    (CF edge IP + tail[13..])

strings.Clone allocates a fresh backing array; the resulting string is
safe to retain past the handler return. 32 bytes per relay is trivial
allocator load (Go's concurrent GC is tuned for exactly this shape) so
no need for sync.Pool reuse or string interning.

Apply the same fix to the gRPC extraction path. gRPC metadata values
can alias the receive buffer depending on the transport implementation;
the cost of cloning is the same and the hazard is the same.

Drive-by audit candidates with the same hazard if the OTel pipeline
extends — Origin, ApiMethod, ProviderAddress on RelayUsageEvent are all
header- or request-derived. Not corrupted today only because their
contents aren't fixed-width hashes so partial overwrites are less
visible. Worth a follow-up sweep that clones at the point each crosses
into RelayMetrics.

* refactor(metrics): remove unused ReportsAddressFlag / Reporter flow (#2286)

Stacked on top of refactor(metrics): OTel-based relay-usage emitter.

The reports-backend pipeline was wired through but never used in production:
the --reports-be-address flag defaulted to "" and NewConsumerReportsClient
returned nil for that case, making every AppendReport / AppendConflict
call a no-op. The plumbing still threaded a metrics.Reporter through
RPCConsumerServer, ConsumerSessionManager, and ReportedProviders, plus
a useless errorsForReport []error parameter on blockProvider /
ReportProvider that existed solely to populate the dropped report
payload. Strip it all out.

Removed
- protocol/metrics/consumer_reports_client.go and its test
- types: Reporter, ReportsRequest, ConflictRequest,
  NewReportsRequest, NewConflictRequest, ConsumerReportsClient
- AnalyticsServerAddresses.ReportsAddressFlag (rpcconsumer + rpcsmartrouter)
- the --reports-be-address flag (rpcconsumer + rpcsmartrouter)
- NewConsumerReportsClient construction sites
- RPCConsumerServer.reporter field, the matching ServeRPCRequests
  parameter, and the dead pass-through assignment
- ConsumerSessionManager.NewConsumerSessionManager reporter parameter
- ReportedProviders.reporter and ReportedProviders.chainId fields
- ReportedProviders.AppendReport method
- errorsForReport []error parameter on
  ConsumerSessionManager.blockProvider, ReportProvider,
  reportProviderLocked (the value was used only for the dropped
  report payload — the local copy in OnSessionFailure is also
  removed since it had no other readers)

Tests
- protocol/lavasession/reported_providers_test.go updated for the new
  signatures.
- All in-tree callers of NewConsumerSessionManager,
  ServeRPCRequests, and the reported-providers helpers updated to
  drop the now-removed parameters (rpcconsumer, rpcsmartrouter,
  chainlib/consumer_ws_subscription_manager_test, integration/protocol_test).

No production behavior change — the entire path was already a runtime
no-op given the empty default flag.

* fix(metrics, chainlib): populate Origin/Success on RelayMetrics + cover WS gap

Two billing-relevant gaps surfaced by reviewing OTel pipeline coverage
across all listeners.

(1) Origin and Success absent from OTel events
--------------------------------------------------
The Origin attribute on RelayUsageEvent was always empty: data.Origin
was set inside SendMetrics (the legacy in-memory aggregator), which
AddMetricFor* calls AFTER usageSink.Emit(NewRelayUsageEvent(data)).
The OTel event was constructed from a RelayMetrics whose Origin field
had not yet been populated, so every emitted record carried origin="".

Success had a parallel ordering bug specific to the smart router. On
the consumer path, ConsumerMetricsManager.SetRelayMetrics(data, err)
sets data.Success = err == nil before Emit. On the smart-router path,
SmartRouterMetricsManager.SetRelayMetrics is a no-op (the smart router
has its own prometheus surface), so data.Success was never set before
Emit and every smart-router OTel event recorded success=false
regardless of relay outcome.

Reorder both paths so data.Success and data.Origin are populated on
the RelayMetrics before Emit. Drop the now-redundant err parameter
from SendMetrics (Success is set upstream).

Clone Origin where it enters from a buffer-aliased source:
- Websocket Locals storage in
  constructFiberCallbackWithHeaderAndParameterExtraction
  (chainlib/common.go) — c.Get returns a fasthttp-aliased string and
  the value lives in Locals past the request lifetime.
- gRPC metadata read in AddMetricForGrpc — metadata values can alias
  the receive buffer per the transport implementation; same hazard as
  the project-id fix in 6d5e4a9.

HTTP path uses strings.Join which always allocates, so no extra clone
needed. ApiMethod (api.Name from preloaded spec) and ProviderAddress
(session/pairing data) were flagged in 6d5e4a9 as audit candidates,
but neither is request-derived — no clone needed.

(2) Normal WS relays + subscription deliveries were not emitting
----------------------------------------------------------------
consumer_websocket_manager.go's normal-relay branch (the path taken by
every non-subscribe WS message — eth_call, eth_getBalance, ...) fell
into continue before reaching the single AddMetricForWebSocket call at
the bottom of the loop. CUs consumed by every normal WS relay were
silently dropped from the OTel analytics pipeline.

The subscription-delivery goroutine also did not emit per-message:
each streamed subscription event (eth_subscribe newHeads, etc.) is a
billed relay against the same project / chain / API / provider as the
initial subscribe, but only the eth_subscribe START was recorded.

Fix both:
- Normal relay branch: emit AddMetricForWebSocket(metricsData, err,
  websocketConn) before continue. Restructured the if/else so a single
  emit point covers both success and failure with the right err value.
- Subscription delivery: snapshot the populated RelayMetrics into a
  by-value local before launching the streaming goroutine, then emit
  one event per delivered message with a fresh timestamp. By-value
  snapshot is necessary because the start-of-subscription emit races
  with per-message emits on the same pointer (AddMetricForWebSocket
  mutates Success and Origin).

Coverage now matches the PR's billing claim: every relay across HTTP,
WS (normal + subscribe + per-delivery), gRPC unary, TendermintRPC,
REST, and UTXO chains lands in the OTel pipeline.

* test(metrics, chainlib): cover OTel event ordering, sampler, sanitizeFloat, WS Locals

Gaps surfaced while reviewing OTel pipeline coverage; nothing here was
covered before.

Relay-usage emit ordering (rpcconsumer_logs_test.go)
- captureSink: in-package fake that records both Emit and
  EmitOptimizerQoS calls for assertion.
- TestAddMetricForHttp_OTelEventPopulated: Success=true and Origin
  reach the OTel event. Uses a nil metrics manager (resolved to
  NoOpConsumerMetrics via SafeMetrics), which mirrors the smart-router
  code path where SetRelayMetrics is a no-op.
- TestAddMetricForHttp_FailurePropagatesSuccessFalse: error →
  Success=false.
- TestAddMetricForGrpc_OTelEventPopulated: gRPC metadata Origin reaches
  the OTel event; metadata.Pairs used so the wire-level key lowercasing
  matches what the gRPC transport delivers.

Optimizer-QoS sampler (consumer_optimizer_qos_client_test.go, new file)
- sanitizeFloat: NaN/+Inf/-Inf → 0; finite values preserved.
- TestSampleAndEmit_EmitsPerChainProvider: fakeOptimizer + stake
  snapshot via UpdatePairingListStake. Asserts one record per
  (chain, provider), provider_stake populated from the snapshot,
  envelope fields wired correctly, WRS scores and contributions
  propagated, GetReportsToSend caches the same batch.
- TestSampleAndEmit_SkipsNilReports: nil entries from the optimizer
  don't leak into the sink or the scrape cache.
- TestSampleAndEmit_NoStakeNoEmit: chain registered but
  UpdatePairingListStake not yet fired → no records.
- TestRegisterOptimizer_IgnoresDuplicate: second registration for the
  same chain is rejected; first wins.

Websocket Origin path (chainlib/common_test.go)
- TestConstructFiberCallback_StashesOriginInLocals: real fiber server
  + gorilla websocket dial with Origin header; asserts the websocket
  handler reads the same value back from
  c.Locals(metrics.OriginHeaderKey).
- TestConstructFiberCallback_NoOriginWhenMetricsDisabled: same setup
  with isMetricEnabled=false → Locals absent.

Race-clean (go test -race) on metrics and chainlib for the new tests.

* refactor(metrics): polish OTel pipeline — ticker, service-name, sampling-interval var

Four small follow-ups from the post-review polish pass; no behavior change
beyond what each item describes.

(1) QoS sampler: time.After → time.NewTicker
StartOptimizersQoSReportsCollecting used time.After inside a for-select,
allocating a fresh timer every tick (at the default 1s cadence, that's
~3600 timers/hr per running sampler) and never stopping it on ctx.Done.
Replace with a single time.NewTicker + defer ticker.Stop().

(2) Drop OTel service-name asymmetry
applyOTelSinkDefaults had a baked-in default of "lava-rpcconsumer",
which is wrong-by-name when the sink is wired into the smart router.
The smart router worked around it with a wiring-layer fallback to
"lava-rpcsmartrouter" — dead code in practice because the cobra flag's
defValue already supplies the right name to both binaries.

Drop the package-level default and the smart-router fallback. The
consumer and smart router each rely on their own flag defValue
("lava-rpcconsumer" / "lava-rpcsmartrouter"); the constructor logs
the resolved name at startup so an empty one is operator-visible.

(3) Eliminate package-level OptimizerQosServerSamplingInterval var
The flag was bound via DurationVar(&metrics.OptimizerQosServerSamplingInterval, …),
making the metrics package hold mutable global state whose only consumers
are two rpcconsumer/rpcsmartrouter Start() functions. Replace with the
codebase's standard pattern: Flags().Duration(…) at registration, then
viper.GetDuration(…) at the use site, into a local variable shared
between the two StartXxx calls in each binary.

(4) gRPC streaming audit — confirmed no gap
chainlib/grpcproxy.makeProxyFunc implements
grpc.UnknownServiceHandler with a one-shot RecvMsg → callBack → SendMsg
pattern. Streaming gRPC RPCs are collapsed to unary semantics by the
proxy (first request → one response → end of stream), so every gRPC
call lands on chainlib/grpc.go's AddMetricForGrpc path regardless of
whether the spec marks the method as unary or streaming. No additional
emit needed.

Bundles a few aligned in-flight edits in the same files (HTTP→4318
comment updates, SinkStats simplification, nil-report skip refinement)
that match the PR body's design.

* refactor(metrics, rpcsmartrouter): SinkStats simplification + gRPC connect state-walk

Two in-flight refinements that belong to the PR scope but had been
sitting in the working tree without a home commit.

(1) SinkStats: drop Failed / Dropped fields, keep only Sent
The OTel SDK doesn't expose per-record send / fail / drop status from
its API surface — those counters live inside the SDK and surface
through its own diagnostics. Shadowing them in SinkStats was misleading
(producer-side Sent was the only counter we actually maintained).

Drop the unused fields. otel_usage_sink.go was already updated in
84c6a59 to construct SinkStats{Sent: …}, which works against either
the 3-field or 1-field struct; this commit just makes the struct match
its actual content.

(2) gRPC connect: replace single WaitForStateChange with a state-walk
upstream_grpc_pool.go.connect previously did:

    grpcConn.Connect()
    if !grpcConn.WaitForStateChange(connectCtx, connectivity.Idle) { … }

WaitForStateChange returns true on ANY state transition, including
Idle → TransientFailure, so the channel could end up "connected" while
actually failing. With WithBlock + WithReturnConnectionError gone in
the grpc 1.65 migration, we have to walk the state machine ourselves:

  - Ready → success.
  - Shutdown → fast-fail (channel can't recover from Shutdown).
  - Connecting / TransientFailure / Idle → keep waiting under the
    configured timeout.

This preserves the original "fast-fail on unreachable target, succeed
only at Ready" semantics that WithBlock provided.

* fix(chainlib): tag subscription deliveries with own method + flat CU

Per-message subscription emits were inheriting the originating
*_subscribe's method and spec CU (e.g. eth_subscribe at 1000 on
Ethereum), so SUM(cu) across the resulting events over-billed by Nx
on chatty subscriptions like eth_subscribe newPendingTransactions.

Override at the per-delivery emit site:
  ApiMethod    = SubscriptionDeliveryMethod  ("subscription_delivery")
  ComputeUnits = DefaultSubscriptionDeliveryCU (10)

Originating subscribe still carries the spec CU under its real method.
Downstream billing is plain SUM(cu) - no subscription-specific dedup
rule, no discriminator field; each event accurately states its cost.

* refactor(metrics, provideroptimizer): drop unused cu/requestedBlock from CalculateQoSScoresForMetrics

The implementation forwards to weightedSelector.CalculateProviderScores
which doesn't take cu or requestedBlock; the only in-tree caller was
constructing cu=10 and requestedBlock=LATEST_BLOCK that got passed in
the front door and straight out the trash. A signature that lies about
its dependencies misleads readers and invites pointless tuning attempts.

Drop from the interface, the implementation, the call site (and its
unused spectypes import), and the test fake. Zero behavior change.

* chore(deps): bump OTel stack to v1.38.0 / log v0.14.0

Was on the v1.28.0 / v0.4.0 era (June 2024 release train). Bumped to
the highest pair compatible with the project's go 1.23.3 directive:

  go.opentelemetry.io/otel                                       v1.28.0  -> v1.38.0
  go.opentelemetry.io/otel/sdk                                   v1.28.0  -> v1.38.0
  go.opentelemetry.io/otel/trace                                 v1.28.0  -> v1.38.0
  go.opentelemetry.io/otel/log                                   v0.4.0   -> v0.14.0
  go.opentelemetry.io/otel/sdk/log                               v0.4.0   -> v0.14.0
  go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp    v0.4.0   -> v0.14.0
  go.opentelemetry.io/otel/exporters/otlp/otlpmetric/*           v1.28.0  -> v1.38.0
  go.opentelemetry.io/otel/exporters/otlp/otlptrace/*            v1.28.0  -> v1.38.0
  go.opentelemetry.io/otel/exporters/stdout/*                    v1.28.0  -> v1.38.0
  go.opentelemetry.io/otel/exporters/prometheus                  v0.50.0  -> v0.60.0
  go.opentelemetry.io/contrib/exporters/autoexport               v0.53.0  -> v0.63.0

v1.42.0+ would require go 1.25.0 transitively; capped at v1.38.0 to
keep the project's go directive at 1.23.3.

The OTel prometheus exporter v0.60.0 needs newer prometheus/client_golang
APIs (NewConstNativeHistogram, model.UTF8Validation), so the in-repo
replace directives were bumped accordingly:

  github.com/prometheus/client_golang  v1.17.0  -> v1.22.0
  github.com/prometheus/common         v0.45.0  -> v0.63.0

API surface in protocol/metrics/otel_usage_sink.go and protocol/tracing/*
unchanged; full protocol test suite passes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant