Skip to content

refactor: OTel usage pipeline + project-id rename + reporter-flow removal#2285

Merged
nimrod-teich merged 13 commits into
mainfrom
refactor/usage-pipeline
May 20, 2026
Merged

refactor: OTel usage pipeline + project-id rename + reporter-flow removal#2285
nimrod-teich merged 13 commits into
mainfrom
refactor/usage-pipeline

Conversation

@nimrod-teich

@nimrod-teich nimrod-teich commented May 4, 2026

Copy link
Copy Markdown
Contributor

Scope

This PR bundles three independent refactors that all originate from the same consumer-telemetry cutover. Each is reviewable standalone; they ship together because they all touch the same call sites and there's no in-tree dependency that crosses them.

  1. OTel-based usage pipeline — replaces the Kafka producer and the push-style optimizer-QoS server with a single OTLP/HTTP log sink (two event types: relay_usage, optimizer_qos). Off by default. Bulk of the diff.
  2. dapp-idproject-id wire-key rename — the canonical project-attribution wire key changes in chainlib (HTTP header, gRPC metadata, fiber Locals). Includes a fasthttp buffer-aliasing fix at the same extraction sites.
  3. Reporter / ReportsAddressFlag removal — the off-host reporter flow (already disabled in production) is dropped along with its CSV of plumbing (errorsForReport, metrics.Reporter interface, consumer_reports_client.go, consumer_relayserver_client.go, two-field signature shrink on blockProvider / ReportProvider).

Scope 1: OTel usage pipeline (relay_usage + optimizer_qos)

Summary

Replaces the custom Kafka producer with an OpenTelemetry log-based usage emitter that handles two event types:

  • relay_usage — one log record per relay, emitted by RPCConsumerLogs.AddMetricFor*
  • optimizer_qos — one log record per (chain, provider) per sampling tick, emitted by ConsumerOptimizerQoSClient

Each rpcconsumer / rpcsmartrouter process emits OTLP/HTTP log records to a local OTel collector; the collector handles fan-out to whatever backend(s) the operator chooses (Kafka, S3, ClickHouse, …) via exporter YAML — no consumer code changes to swap destinations.

Off by default. With --usage-otel-enabled=false (the default), both relay and QoS paths pay one virtual call per event (NoopUsageSink.{Emit,EmitOptimizerQoS}, both empty). No SDK setup, no goroutines, no batching, no resource cost. Flip the flag on once your collector is ready.

How it works

┌──────────────────────────────────────────────────────────────────────┐
│  Bare-metal host (one of N)                                          │
│                                                                      │
│  ┌──────────────┐   ┌──────────────┐   ┌──────────────┐              │
│  │ rpcconsumer  │   │ rpcconsumer  │   │ rpcconsumer  │              │
│  │  (chain A)   │   │  (chain B)   │   │  (chain C)   │   ...        │
│  └──────┬───────┘   └──────┬───────┘   └──────┬───────┘              │
│         │ OTLP/HTTP        │ OTLP/HTTP        │ OTLP/HTTP            │
│         │ (relay_usage +   │ (relay_usage +   │ (relay_usage +       │
│         │  optimizer_qos)  │  optimizer_qos)  │  optimizer_qos)      │
│         └──────────────────┼──────────────────┘                      │
│                            ▼                                         │
│                    ┌──────────────────┐                              │
│                    │  OTel Collector  │  one per host                │
│                    │  (you have it)   │  filter by body, route to    │
│                    │                  │  per-event-type exporter     │
│                    └────────┬─────────┘                              │
└─────────────────────────────┼────────────────────────────────────────┘
                              │
            ┌─────────────────┴─────────────────┐
            ▼                 ▼                 ▼
      ┌──────────┐      ┌──────────┐      ┌──────────┐
      │  Kafka   │      │ S3 +     │      │ Click-   │
      │ exporter │      │ Athena   │      │ House    │
      │          │      │ (one     │      │ exporter │
      │          │      │ bucket   │      │          │
      │          │      │ per ev.) │      │          │
      └──────────┘      └──────────┘      └──────────┘
            ▲                 ▲                 ▲
            │                 │                 │
        choose any 1, 2, or all 3 by editing the
        collector's exporters: YAML — no code change

The relay and QoS paths' only contact with telemetry is NoopUsageSink.{Emit,EmitOptimizerQoS} (off) or OTelUsageSink.{Emit,EmitOptimizerQoS}BatchLogProcessor.OnEmit (non-blocking enqueue). Backpressure shows up as dropped events accounted for in the SDK's internal counters.

Per-event code path when enabled

relay_usage:                            optimizer_qos:

relay completes                         sampling tick fires
     │                                       │
     ▼                                       ▼
chainlib.AddMetricFor*                  ConsumerOptimizerQoSClient
(already `go ...` launched)             .sampleAndEmit
     │                                       │
     ▼                                       ▼
RPCConsumerLogs.usageSink.Emit          for each registered optimizer:
     │                                    optimizer.CalculateQoSScores
     ▼                                    ForMetrics → for each
OTelUsageSink.Emit                       (chain, provider):
     │                                    usageSink.EmitOptimizerQoS
     ▼                                       │
BatchLogProcessor.OnEmit                     ▼
(non-blocking enqueue;                  OTelUsageSink.EmitOptimizerQoS
 full queue → SDK drops)                     │
                                             ▼
                                        BatchLogProcessor.OnEmit

       (background, shared by both event types)
                            │
                            ▼
                   BatchProcessor flushes on
                   BatchSize / FlushInterval / Shutdown
                            │
                            ▼
                   OTLP/HTTP → localhost collector
                            │
                            ▼
                   collector exporters (operator's choice)

What changes

Sink layer (new)

  • UsageEventSink interface in protocol/metrics/usage_sink.goEmit / EmitOptimizerQoS / Stats / Close. Implementations must be non-blocking.
  • NoopUsageSink — zero-cost default for both event types; the relay and QoS paths each pay one inlinable empty call when telemetry is off.
  • OTelUsageSink — single OTLP/HTTP exporter, BatchLogProcessor with configurable queue / batch / flush / timeout. Resource attributes: service.name + service.instance.id (default hostname-pid). Two log bodies routed by the collector: relay_usage and optimizer_qos.

relay_usage

  • RPCConsumerLogs holds a UsageEventSink (was *ConsumerKafkaClient); AddMetricFor{Http,WebSocket,Grpc} calls usageSink.Emit(NewRelayUsageEvent(data)).
  • Per-relay attributes (flat in the OTel log body): project, chain, api_interface, api_method, cu, latency_ms, success, cache_hit, is_write, is_archive, is_batch, is_debug_trace, hedge_count, provider, origin.

optimizer_qos

  • ConsumerOptimizerQoSClient is now a pure periodic sampler — drops every per-(chain, provider) counter, running sum, and aggregation helper (NodeErrorRate, SelectionRate, AverageSelectionQoSScore, etc.). Downstream computes rates / averages by GROUP BY at query time.
  • UpdatePairingListStake stays as a side channel: it learns the current chain → provider set, the current epoch, and the current pairing-list stake per provider. No per-epoch history retained.
  • On each tick, sampleAndEmit asks every optimizer for CalculateQoSScoresForMetrics and forwards one OTel log record per (chain, provider). Nil reports from the optimizer are skipped so the in-memory latest cache (read by the optional Prometheus scrape endpoint) only ever holds real records.
  • Per-sample attributes (flat): ts, provider, consumer_hostname, consumer_pub_address, chain_id, geo_location, epoch, entry_index, provider_stake, the WRS-normalized scores (selection_availability/_latency/_sync/_stake/_composite), and the weighted contributions (availability_contribution/latency_contribution/sync_contribution/stake_contribution).
  • The legacy raw EWMA fields (sync_score, availability_score, latency_score, generic_score) are not on the wire — they were the optimizer's internal state; the WRS-normalized scores are what actually drive provider selection. Saved 4 floats per record. OptimizerQoSReport (the optimizer's output struct) is unchanged.
  • Construction trigger: the QoS client is now created when --optimizer-qos-listen=true (Prometheus scrape endpoint) OR --usage-otel-enabled=true (OTel emission), in either order. The two toggles are independent.

Lifecycle wiring

usageSink.Close() is deferred immediately after construction in both rpcconsumer.Start and rpcsmartrouter.Start. On process exit the BatchProcessor drains its pending queue (up to ExportTimeout) instead of dropping it.

What's removed (no back-compat — pipelines switch to OTel)

Kafka pipeline (relay_usage)

  • consumer_kafka_client.go and its test
  • All --relay-kafka-* flags
  • The Kafka-specific fields on AnalyticsServerAddresses

Push pipeline (optimizer_qos)

  • --optimizer-qos-server-address flag and OptimizerQosServerAddressFlag
  • --optimizer-qos-push-interval flag (push cadence is now controlled by the OTel BatchProcessor)
  • AnalyticsServerAddresses.OptimizerQoSAddress
  • endpointAddress / interval parameters on NewConsumerOptimizerQoSClient
  • handleOptimizerQoS HTTP handler (dead — never wired into a route)

New flags

Flag Default Purpose
--usage-otel-enabled false master switch (covers both event types)
--usage-otel-endpoint "" OTLP/HTTP endpoint; empty defers to OTEL_EXPORTER_OTLP_ENDPOINT or localhost:4318
--usage-otel-insecure true skip TLS (expected target is a sidecar collector)
--usage-otel-queue-size 50000 in-memory queue capacity; full → drop
--usage-otel-batch-size 1000 flush trigger by record count
--usage-otel-flush-interval 500ms flush trigger by elapsed time
--usage-otel-export-timeout 10s per-batch OTLP export timeout
--usage-otel-service-name lava-rpcconsumer / lava-rpcsmartrouter OTel service.name resource attribute
--usage-otel-service-instance-id hostname-pid OTel service.instance.id; useful when running multiple processes per host

Multi-process-per-host deployments

The expected shape — one rpcconsumer process per chain on a bare-metal host, all sharing one local OTel collector — is supported out of the box. The hostname-pid default for service.instance.id keeps each process individually identifiable downstream without conflating process identity with billing dimensions. Chain remains an event attribute on both event types, so aggregation by chain × project × method (relay_usage) or by chain × provider (optimizer_qos) is just GROUP BY at query time, regardless of which process emitted.

If you'd rather encode chain in the instance ID for ops legibility, the flag override is available: --usage-otel-service-instance-id="$HOSTNAME-eth".

Sample collector exporter config (what the operator owns)

One pipeline per event type — filter by body, route to its own bucket. Each bucket holds one event shape so no event discriminator column is needed downstream.

receivers:
  otlp:
    protocols:
      http:
        endpoint: 127.0.0.1:4318

processors:
  filter/relay_usage:
    error_mode: ignore
    logs:
      log_record: ['body != "relay_usage"']
  filter/optimizer_qos:
    error_mode: ignore
    logs:
      log_record: ['body != "optimizer_qos"']
  transform/usage:
    error_mode: ignore
    log_statements:
      - context: log
        statements:
          - set(attributes["ts"], UnixNano(time))
          - set(attributes["service_name"], resource.attributes["service.name"])
          - set(attributes["service_instance_id"], resource.attributes["service.instance.id"])
          - set(body, attributes)
  batch:
    send_batch_size: 1000
    timeout: 500ms

exporters:
  awss3/relay_usage:
    s3uploader:
      region: us-east-1
      s3_bucket: lava-relay-usage-mainnet
      s3_partition: hour
    marshaler: body
  awss3/optimizer_qos:
    s3uploader:
      region: us-east-1
      s3_bucket: lava-optimizer-qos-mainnet
      s3_partition: hour
    marshaler: body

service:
  pipelines:
    logs/relay_usage:
      receivers: [otlp]
      processors: [filter/relay_usage, transform/usage, batch]
      exporters: [awss3/relay_usage]
    logs/optimizer_qos:
      receivers: [otlp]
      processors: [filter/optimizer_qos, transform/usage, batch]
      exporters: [awss3/optimizer_qos]

The cloud-side counterpart (S3 buckets + Glue tables + Athena workgroup) is in Magma-Devs/devops#21. The collector wiring on the bare-metal hosts is in Magma-Devs/ansible-internal#268.

Dependency / collateral changes

  • OTel bumped v1.21 → v1.30 (logs API stable in v1.27+).
  • gRPC indirectly bumped 1.62 → 1.66.
  • Replaced the deprecated grpc.DialContext + WithBlock in upstream_grpc_pool.go with grpc.NewClient + a state-walk loop on Connect() / WaitForStateChange that only returns success at connectivity.Ready — preserves the original "fast-fail on unreachable target" semantics (a single WaitForStateChange(_, Idle) would have accepted Idle → TransientFailure as success).

API breaking changes (in-tree callers updated)

  • NewRPCConsumerLogs(...) now takes UsageEventSink instead of *ConsumerKafkaClient.
  • NewConsumerOptimizerQoSClient(...) now takes UsageEventSink instead of (endpointAddress, interval).
  • AnalyticsServerAddresses.RelayKafka* fields → AnalyticsServerAddresses.UsageOTel*. AnalyticsServerAddresses.OptimizerQoSAddress removed.
  • ConsumerOptimizerQoSClient no longer exposes Set* mutator methods (SetRelaySentToProvider, SetNodeErrorToProvider, SetProviderSelected) — dead callers in consumer_metrics_manager and rpcconsumer_logs removed.

Scope 2: dapp-idproject-id wire rename

The consumer reads a tenant identifier from every relay request and stamps it onto the per-relay usage event as the project attribute. The wire key was historically dapp-id; this renames it to project-id everywhere:

  • HTTP header name: Project-Id (case-insensitive on the wire)
  • gRPC metadata key: project-id (lower-cased per gRPC convention)
  • Fiber Locals key for the HTTP-to-websocket handoff: project-id
  • New constant chainlib.ProjectIDHeader = "project-id" is the single source of truth at every call site.

fasthttp buffer-aliasing fix (same extraction sites)

fiber.Ctx.Get returns strings that alias fasthttp's per-request header buffer (zero-copy via unsafe). The buffer is recycled when the request completes and reused for subsequent requests on the same worker. RelayMetrics.ProjectHash is enqueued asynchronously into the OTel BatchLogProcessor and serialized after the request returns — by then the backing array may already hold another request's headers, prefix-overwriting the project hash. Both extractDappIDFromFiberContext and extractDappIDFromGrpcHeader now call strings.Clone on the extracted value to detach it from the per-request pool.

Migration

Clients sending the old dapp-id header will be attributed under DefaultDappID until they update to project-id. There is no header-name back-compat shim.


Scope 3: Reporter / ReportsAddressFlag removal

The off-host reporter (POST JSON to a configured URL on every block) has been disabled in production for some time. This drops the dead plumbing:

  • --reports-server-address flag and the ReportsAddressFlag constant
  • metrics.Reporter interface, consumer_reports_client.go, and its test
  • consumer_relayserver_client.go (the only Reporter implementation)
  • errorsForReport / errorsForConsumerSession plumbing through ConsumerSessionManager.blockProvider and ReportedProviders.ReportProvider — both signatures shrunk; in-tree call sites updated
  • ConsumerSessionManager.reporter field, RPCConsumerServer.reporter field, and the matching constructor parameters

Provider unresponsiveness is still surfaced through the existing prometheus counters and the on-chain reporting path — this only removes the off-host push side.


Billing semantics — what reaches the OTel pipeline

Every emitted relay_usage event carries cu from the spec's api.ComputeUnits (set in rpcconsumer_server.go / rpcsmartrouter_server.go immediately before the relay returns), so the downstream pipeline can bill per-request by summing cu grouped by (project, chain, api_method, provider, time_bucket). Coverage and policy notes below.

Coverage by interface × transport

Listener File Emit point
REST chainlib/rest.go every HTTP request (lines 306, 376)
JSON-RPC over HTTP chainlib/jsonRPC.go every HTTP request (line 465)
TendermintRPC over HTTP (batch + non-batch) chainlib/tendermintRPC.go every HTTP request (lines 475, 547)
JSON-RPC / TendermintRPC over WebSocket — normal relay chainlib/consumer_websocket_manager.go every relay (added in this PR)
WebSocket — subscription START same file every *_subscribe call
WebSocket — subscription delivery (per streamed message) same file every message (added in this PR)
WebSocket — unsubscribe / unsubscribe_all chainlib/consumer_ws_subscription_manager.go not emitted (no CU at provider)
gRPC (any spec method) chainlib/grpc.go every call — grpcproxy.makeProxyFunc collapses streaming RPCs to one-shot unary semantics, so each gRPC call lands on this emit path regardless of the spec's stream annotation
UTXO chains (BTC/LTC/DOGE/BCH) rides the JSON-RPC HTTP path every HTTP request — isUTXOFamily only reshapes responses, not the emit

Policy notes for the downstream pipeline

  • Cache hits. Events emit with cache_hit=true AND the full cu value. If the billing model excludes cached responses, filter on cache_hit=false in the collector / Athena query.

  • Failure attribution. Relays that fail before reaching the analytics-fill block in SendRelay / SendParsedRelay emit with cu=0, success=false. Relays that fail after — e.g., provider returned an error response — emit with the spec's cu and success=false. If the model is "bill anything that hit a provider", group on (success=true OR (success=false AND cu>0)); if it's "bill successes only", filter on success=true.

  • Per-subscription-message billing. Each delivered subscription event records one relay_usage with the same project / chain / api_method / provider / cu as the original *_subscribe, just with a fresh ts. A chatty subscription (e.g. eth_subscribe newPendingTransactions on Ethereum mainnet) produces one event per delivered message, each charged at the subscribe API's CU. If the billing model is "subscribe once, deliveries free", deduplicate downstream by (project, chain, api_method, provider, hour_bucket) and bill the first event only; the per-message emit gives that filter a population to work over rather than hiding subscription cost in a single record.

  • Pre-emit failures. Requests that fail at parse / rate-limit / auth before NewRelayAnalytics runs do NOT emit any relay_usage. They represent no CU consumption at the provider, so they're invisible to billing by design. If you need request-count metrics for unparseable traffic, that's a separate Prometheus counter rather than an OTel event.

  • success=false events still carry latency and provider attribution. Useful for SLO dashboards even if they're excluded from the billing aggregate.


Test plan

  • go build ./... clean
  • go test ./protocol/metrics/ -count=1 passes
  • go test ./protocol/rpcconsumer/ -count=1 passes
  • go test ./protocol/rpcsmartrouter/ -count=1 passes
  • golangci-lint run --config .golangci.yml ./protocol/... clean
  • usage_sink_test.go and otel_usage_sink_test.go cover: NoopUsageSink zero-cost on both event types, RelayUsageEvent fidelity, defaults application, hostname error fallback, nil-safety on every method, Emit + EmitOptimizerQoS don't panic or block under unreachable endpoint, Sent counter advances for both
  • optimizer_selection_score_test.go updated to the new constructor signature
  • Consumer + smart-router lifecycle: usageSink.Close() deferred at construction so the BatchProcessor drains pending records on shutdown
  • gRPC dial-state loop preserves WithBlock semantics (only Ready is success; Shutdown fails fast; intermediate Connecting / TransientFailure keep waiting under the connect timeout)
  • sampleAndEmit skips nil reports from the optimizer (no zero-valued records leaking into the Prometheus scrape cache)

@codecov

codecov Bot commented May 4, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 61.28205% with 151 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
protocol/rpcconsumer/rpcconsumer.go 0.00% 44 Missing ⚠️
protocol/rpcsmartrouter/rpcsmartrouter.go 0.00% 43 Missing ⚠️
protocol/rpcsmartrouter/upstream_grpc_pool.go 0.00% 22 Missing ⚠️
protocol/chainlib/consumer_websocket_manager.go 0.00% 10 Missing ⚠️
protocol/metrics/otel_usage_sink.go 92.00% 7 Missing and 3 partials ⚠️
protocol/metrics/rpcconsumer_logs.go 50.00% 10 Missing ⚠️
protocol/metrics/consumer_optimizer_qos_client.go 88.88% 7 Missing and 2 partials ⚠️
protocol/chainlib/common.go 60.00% 2 Missing ⚠️
protocol/provideroptimizer/provider_optimizer.go 0.00% 1 Missing ⚠️
Flag Coverage Δ
consensus 8.96% <ø> (ø)
protocol 35.22% <61.28%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
protocol/common/cobra_common.go 0.00% <ø> (ø)
protocol/lavasession/consumer_session_manager.go 67.78% <100.00%> (ø)
protocol/lavasession/reported_providers.go 86.66% <100.00%> (ø)
protocol/metrics/analytics.go 54.54% <100.00%> (ø)
protocol/metrics/consumer_metrics_manager.go 14.30% <ø> (ø)
protocol/metrics/hostname.go 100.00% <100.00%> (ø)
protocol/metrics/provider_metrics_manager.go 29.58% <ø> (ø)
protocol/metrics/usage_sink.go 100.00% <100.00%> (ø)
protocol/rpcconsumer/rpcconsumer_server.go 32.26% <ø> (ø)
protocol/provideroptimizer/provider_optimizer.go 55.71% <0.00%> (ø)
... and 8 more

... and 199 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@nimrod-teich nimrod-teich force-pushed the refactor/usage-pipeline branch 2 times, most recently from 3bde514 to 286821a Compare May 4, 2026 10:59
@github-actions

github-actions Bot commented May 4, 2026

Copy link
Copy Markdown

Test Results

0 tests  ±0   0 ✅ ±0   0s ⏱️ ±0s
0 suites ±0   0 💤 ±0 
7 files   +1   0 ❌ ±0 
2 errors

For more details on these parsing errors, see this check.

Results for commit ca8fdba. ± Comparison against base commit 564493a.

♻️ This comment has been updated with latest results.

@nimrod-teich nimrod-teich force-pushed the refactor/usage-pipeline branch from 286821a to 4103941 Compare May 4, 2026 12:33
@nimrod-teich nimrod-teich changed the title refactor(metrics): raw, non-blocking, compressed relay-usage Kafka pipeline refactor(metrics): OTel-based relay-usage emitter (replaces Kafka client) May 4, 2026
@nimrod-teich nimrod-teich force-pushed the refactor/usage-pipeline branch from 4103941 to 58203eb Compare May 4, 2026 12:44
@lavanet lavanet deleted a comment from qodo-code-review Bot May 4, 2026
@lavanet lavanet deleted a comment from qodo-code-review Bot May 4, 2026
@lavanet lavanet deleted a comment from qodo-code-review Bot May 4, 2026
@lavanet lavanet deleted a comment from baz-reviewer Bot May 4, 2026
@lavanet lavanet deleted a comment from qodo-code-review Bot May 4, 2026
@lavanet lavanet deleted a comment from baz-reviewer Bot May 4, 2026
@nimrod-teich nimrod-teich force-pushed the refactor/usage-pipeline branch from 58203eb to 18ce2d6 Compare May 4, 2026 12:59
@nimrod-teich nimrod-teich force-pushed the refactor/usage-pipeline branch from cb5fc9d to 521dac4 Compare May 5, 2026 13:01
@nimrod-teich nimrod-teich changed the title refactor(metrics): OTel-based relay-usage emitter (replaces Kafka client) refactor(metrics): OTel-based relay-usage + optimizer-QoS emitter (replaces Kafka client) May 5, 2026
@nimrod-teich nimrod-teich force-pushed the refactor/usage-pipeline branch 4 times, most recently from 058bbf0 to cfad35c Compare May 11, 2026 10:16
@nimrod-teich nimrod-teich changed the title refactor(metrics): OTel-based relay-usage + optimizer-QoS emitter (replaces Kafka client) refactor: OTel usage pipeline + project-id rename + reporter-flow removal May 11, 2026

@avitenzer avitenzer left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otel/log v0.4.0 and sdk/log v0.4.0 are old versions of the packages.

Comment thread protocol/chainlib/consumer_websocket_manager.go
Comment thread protocol/metrics/consumer_optimizer_qos_client_test.go
@nimrod-teich

Copy link
Copy Markdown
Contributor Author

otel/log v0.4.0 and sdk/log v0.4.0 are old versions of the packages.

@avitenzer I upgraded to the latest possible without touching the GO version.

…ent)

Replaces the custom Kafka producer with an OpenTelemetry log-based usage
emitter. Each rpcconsumer/rpcsmartrouter process emits one OTLP log record
per relay to the local OTel collector; the collector handles fan-out to
whatever backends the operator configures (Kafka, S3, ClickHouse, ...) via
exporter YAML — no code change to swap destinations.

Off by default. With --usage-otel-enabled=false (the default) the relay
path pays exactly one virtual call per relay (NoopUsageSink.Emit, which is
empty). No SDK setup, no goroutines, no batching, no resource cost.

Wire format / API
- New UsageEventSink interface (Emit/Stats/Close); implementations must be
  non-blocking on the relay path. Full buffers must drop, never block.
- NoopUsageSink: zero-cost default.
- OTelUsageSink: OTLP/HTTP exporter, BatchProcessor with configurable
  queue/batch/flush/timeout. Resource attributes: service.name +
  service.instance.id (default hostname-pid; overridable for fleets that
  want a chain or role suffix). Per-relay attributes carry project, chain,
  api_interface, api_method, cu, latency, success/cache/write/archive
  flags, hedge_count, provider, origin.

Removals (no back-compat — the Kafka pipeline was unused)
- consumer_kafka_client.go and its test
- All --relay-kafka-* flags
- The Kafka-specific fields on AnalyticsServerAddresses

New flags
- --usage-otel-enabled (default false) — master switch
- --usage-otel-endpoint (default "" → SDK reads OTEL_EXPORTER_OTLP_ENDPOINT
  or localhost:4317)
- --usage-otel-insecure (default true; expected target is a sidecar)
- --usage-otel-queue-size, --usage-otel-batch-size,
  --usage-otel-flush-interval, --usage-otel-export-timeout
- --usage-otel-service-name (default lava-rpcconsumer / lava-rpcsmartrouter)
- --usage-otel-service-instance-id (default hostname-pid)

Why service.instance.id = hostname-pid
- Multiple consumer processes per bare-metal host (one per chain) sharing
  one local collector is a supported deployment shape. hostname-pid makes
  each process individually identifiable downstream without conflating
  process identity with billing dimensions.
- Chain stays as a per-event attribute (chain="..."), so aggregation by
  chain × project × method is GROUP BY at query time.

Dependency / collateral
- Bumps OTel from v1.21 to v1.30 (logs API stable). Indirectly bumps grpc
  from 1.62 to 1.66; replaces the deprecated grpc.DialContext+WithBlock
  in upstream_grpc_pool.go with grpc.NewClient + Connect/WaitForStateChange
  to preserve the original "fast-fail on unreachable target" semantics.

Tests
- usage_sink_test.go: NoopUsageSink zero-cost, RelayUsageEvent fidelity.
- otel_usage_sink_test.go: defaults application, hostname fallback path,
  nil-safety on every method, behavior under unreachable endpoint
  (Emit doesn't panic or block; SDK queues; counters advance).
… sink

Same shape as the relay-usage emitter (off by default, non-blocking,
no aggregation in the producer). The QoS client becomes a pure
periodic sampler over the registered optimizers; downstream computes
rates / averages by GROUP BY at query time.

Producer-side changes
- ConsumerOptimizerQoSClient is now an OTel pass-through:
  - Drops every per-(chain, provider) counter and running sum:
    chainIdToProviderToRelaysCount, chainIdToProviderToNodeErrorsCount,
    chainIdToProviderToSelectionCount, chainIdToProviderToQoSScoreSum,
    chainIdToProviderToLastRNGValue, chainIdToTotalSelections.
  - Drops the calculate* helpers (NodeErrorRate, SelectionRate,
    AverageSelectionQoSScore, getLastRNGValue, getProviderChainSelectionCount,
    getProviderChainRelaysCount, getProviderChainNodeErrorsCount,
    getProviderChainMapCounterValue).
  - Drops the Set* mutator methods (SetRelaySentToProvider,
    SetNodeErrorToProvider, SetProviderSelected) and their dead
    callers in consumer_metrics_manager and rpcconsumer_logs.
  - UpdatePairingListStake stays as a side channel — it learns the
    per-chain provider-address SET, the current epoch, and the
    current pairing-list stake per provider (single-level map, no
    per-epoch history).
  - On each sampling tick, sampleAndEmit asks every optimizer for its
    current scores (via the existing CalculateQoSScoresForMetrics) and
    forwards one OTel log record per (chain, provider) to the usage
    sink. The latest batch is cached for the optional Prometheus-style
    scrape endpoint enabled by --optimizer-qos-listen.

Wire format (OptimizerQoSReportToSend)
- Drops the derived/aggregated fields: NodeErrorRate, SelectionRate,
  SelectionQoSScore, SelectionRNGValue, SelectionCount.
- Drops the legacy raw EWMA fields: sync_score, availability_score,
  latency_score, generic_score. They were the optimizer's internal
  score-store state; the WRS-normalized scores (selection_*) are what
  actually drive provider selection — that's the signal downstream
  analytics want. The raw EWMA is one indirection too many for queryers
  and bloated every emitted record by 4 floats.
- Adds provider_stake (json: provider_stake) populated from the
  per-(chain, provider) snapshot UpdatePairingListStake builds. Each
  optimizer-QoS sample carries the current pairing-list stake for the
  provider that report describes.
- Keeps envelope (timestamp, chain, provider, consumer hostname/pub
  address, geo, epoch, entry index) plus the WRS normalized scores
  (selection_*) and weighted contributions (*_contribution).

Sink interface
- UsageEventSink gains EmitOptimizerQoS(OptimizerQoSReportToSend).
- NoopUsageSink is a no-op for both event types.
- OTelUsageSink emits OTLP log records with body="optimizer_qos" and
  the QoS attributes flat at the top level (matches the Athena
  JsonSerDe table layout — separate bucket per event type, no `event`
  discriminator column).

Removed (no back-compat — pipeline switches to OTel)
- --optimizer-qos-server-address flag and OptimizerQosServerAddressFlag
- --optimizer-qos-push-interval flag and OptimizerQosServerPushInterval
  (push cadence is now controlled by the OTel BatchProcessor)
- AnalyticsServerAddresses.OptimizerQoSAddress on rpcconsumer and
  rpcsmartrouter
- The endpointAddress / interval parameters from
  NewConsumerOptimizerQoSClient (now takes the UsageEventSink directly)
- handleOptimizerQoS HTTP handler (was the receive end of the now-
  removed --optimizer-qos-server-address push; never wired into a
  route, flagged dead by the linter).

OptimizerQoSReport (the optimizer's output struct) is unchanged — its
SyncScore/AvailabilityScore/LatencyScore/GenericScore fields stay
populated but are simply not propagated to the wire envelope.

Construction trigger
- The QoS client is now created when --optimizer-qos-listen=true
  (Prometheus scrape endpoint) OR --usage-otel-enabled=true (OTel
  emission), in either order. The two are independent toggles.

Tests
- usage_sink_test.go: NoopUsageSink.EmitOptimizerQoS zero-cost path.
- otel_usage_sink_test.go: nil-safety + non-blocking emission for the
  QoS path under an unreachable endpoint, counters advance for both
  Emit and EmitOptimizerQoS.
- optimizer_selection_score_test.go: updated to the new constructor
  signature.

Lint clean (gofmt, gofumpt, ineffassign, staticcheck), full test suite
green for metrics + rpcconsumer + rpcsmartrouter + lavasession +
integration.
The two removed pipelines (Kafka relay analytics, QoS push to
delta.lavanet.xyz) still had flag examples in the rpcconsumer /
rpcsmartrouter READMEs and a "Kafka" mention in the analytics.go doc
comment. Replaced with the OTel-based replacement flags
(--usage-otel-enabled and friends) so the docs match the binary.

Pure docs / comment change — no code touched.
The HTTP header / gRPC metadata key the consumer reads to identify
the requesting project was named dapp-id — leftover terminology from
when projects were called "dapps". The downstream wire format and
analytics already use "project" (RelayMetrics.ProjectHash, the
RelayUsageEvent json:"project" field, the Athena `project` column),
so the wire-level header name was the only place still saying
dapp-id. Rename everywhere we read it.

- HTTP: c.Get("dapp-id") -> c.Get("project-id")
  (protocol/chainlib/common.go: extractDappIDFromFiberContext)
- gRPC: metadataValues["dapp-id"] -> metadataValues["project-id"]
  (extractDappIDFromGrpcHeader)
- Fiber Locals key (passes the value from the HTTP upgrade context
  into the websocket handler) renamed for consistency:
  c.Locals("dapp-id", ...) -> c.Locals("project-id", ...)
- Tests updated.

This is a wire-level breaking change: clients and any reverse proxy
in front of the consumer must now send "project-id" instead of
"dapp-id". The bare-metal nginx gateway already extracts the project
hash from the URL path / subdomain but currently doesn't forward it
as a header at all (HTTP) or forwards it as the wrong header
(X-LAVA-ProjectHash on gRPC) — see Magma-Devs/ansible-internal for
the matching nginx config update.

Internal Go names (extractDappIDFromFiberContext, generateNewDappID,
RelayMetrics.ProjectHash, etc.) kept as-is to limit scope. The
default fallback string ("DefaultDappID") is a downstream-visible
constant and stays unchanged.
…r aliasing

fiber.Ctx.Get returns a string aliased to fasthttp's per-request header
buffer (zero-copy via unsafe). RelayMetrics.ProjectHash is enqueued
asynchronously into the OTel BatchLogProcessor and serialized only when
the batch flushes — by then the request has returned, fasthttp has
recycled the underlying buffer, and a subsequent request on the same
worker may have written shorter strings (Accept-Encoding, client IPs,
etc.) into the same backing array. The result is a partially-overwritten
project hash: short prefix from the new write, suffix bytes from the
original 32-char hash that the new write didn't reach.

Observed in testnet S3 output as a buffet of corrupted project values
all exactly 32 chars long, with the user's real hash as suffix:

  e6eaddb0d8063a696a1a37e716f09374    (real)
  gzipddb0d8063a696a1a37e716f09374    ("gzip" + tail[4..])
  172.68.40.693a696a1a37e716f09374    (CF edge IP + tail[12..])
  188.40.110.49a696a1a37e716f09374    (CF edge IP + tail[13..])

strings.Clone allocates a fresh backing array; the resulting string is
safe to retain past the handler return. 32 bytes per relay is trivial
allocator load (Go's concurrent GC is tuned for exactly this shape) so
no need for sync.Pool reuse or string interning.

Apply the same fix to the gRPC extraction path. gRPC metadata values
can alias the receive buffer depending on the transport implementation;
the cost of cloning is the same and the hazard is the same.

Drive-by audit candidates with the same hazard if the OTel pipeline
extends — Origin, ApiMethod, ProviderAddress on RelayUsageEvent are all
header- or request-derived. Not corrupted today only because their
contents aren't fixed-width hashes so partial overwrites are less
visible. Worth a follow-up sweep that clones at the point each crosses
into RelayMetrics.
…2286)

Stacked on top of refactor(metrics): OTel-based relay-usage emitter.

The reports-backend pipeline was wired through but never used in production:
the --reports-be-address flag defaulted to "" and NewConsumerReportsClient
returned nil for that case, making every AppendReport / AppendConflict
call a no-op. The plumbing still threaded a metrics.Reporter through
RPCConsumerServer, ConsumerSessionManager, and ReportedProviders, plus
a useless errorsForReport []error parameter on blockProvider /
ReportProvider that existed solely to populate the dropped report
payload. Strip it all out.

Removed
- protocol/metrics/consumer_reports_client.go and its test
- types: Reporter, ReportsRequest, ConflictRequest,
  NewReportsRequest, NewConflictRequest, ConsumerReportsClient
- AnalyticsServerAddresses.ReportsAddressFlag (rpcconsumer + rpcsmartrouter)
- the --reports-be-address flag (rpcconsumer + rpcsmartrouter)
- NewConsumerReportsClient construction sites
- RPCConsumerServer.reporter field, the matching ServeRPCRequests
  parameter, and the dead pass-through assignment
- ConsumerSessionManager.NewConsumerSessionManager reporter parameter
- ReportedProviders.reporter and ReportedProviders.chainId fields
- ReportedProviders.AppendReport method
- errorsForReport []error parameter on
  ConsumerSessionManager.blockProvider, ReportProvider,
  reportProviderLocked (the value was used only for the dropped
  report payload — the local copy in OnSessionFailure is also
  removed since it had no other readers)

Tests
- protocol/lavasession/reported_providers_test.go updated for the new
  signatures.
- All in-tree callers of NewConsumerSessionManager,
  ServeRPCRequests, and the reported-providers helpers updated to
  drop the now-removed parameters (rpcconsumer, rpcsmartrouter,
  chainlib/consumer_ws_subscription_manager_test, integration/protocol_test).

No production behavior change — the entire path was already a runtime
no-op given the empty default flag.
…er WS gap

Two billing-relevant gaps surfaced by reviewing OTel pipeline coverage
across all listeners.

(1) Origin and Success absent from OTel events
--------------------------------------------------
The Origin attribute on RelayUsageEvent was always empty: data.Origin
was set inside SendMetrics (the legacy in-memory aggregator), which
AddMetricFor* calls AFTER usageSink.Emit(NewRelayUsageEvent(data)).
The OTel event was constructed from a RelayMetrics whose Origin field
had not yet been populated, so every emitted record carried origin="".

Success had a parallel ordering bug specific to the smart router. On
the consumer path, ConsumerMetricsManager.SetRelayMetrics(data, err)
sets data.Success = err == nil before Emit. On the smart-router path,
SmartRouterMetricsManager.SetRelayMetrics is a no-op (the smart router
has its own prometheus surface), so data.Success was never set before
Emit and every smart-router OTel event recorded success=false
regardless of relay outcome.

Reorder both paths so data.Success and data.Origin are populated on
the RelayMetrics before Emit. Drop the now-redundant err parameter
from SendMetrics (Success is set upstream).

Clone Origin where it enters from a buffer-aliased source:
- Websocket Locals storage in
  constructFiberCallbackWithHeaderAndParameterExtraction
  (chainlib/common.go) — c.Get returns a fasthttp-aliased string and
  the value lives in Locals past the request lifetime.
- gRPC metadata read in AddMetricForGrpc — metadata values can alias
  the receive buffer per the transport implementation; same hazard as
  the project-id fix in 6d5e4a9.

HTTP path uses strings.Join which always allocates, so no extra clone
needed. ApiMethod (api.Name from preloaded spec) and ProviderAddress
(session/pairing data) were flagged in 6d5e4a9 as audit candidates,
but neither is request-derived — no clone needed.

(2) Normal WS relays + subscription deliveries were not emitting
----------------------------------------------------------------
consumer_websocket_manager.go's normal-relay branch (the path taken by
every non-subscribe WS message — eth_call, eth_getBalance, ...) fell
into continue before reaching the single AddMetricForWebSocket call at
the bottom of the loop. CUs consumed by every normal WS relay were
silently dropped from the OTel analytics pipeline.

The subscription-delivery goroutine also did not emit per-message:
each streamed subscription event (eth_subscribe newHeads, etc.) is a
billed relay against the same project / chain / API / provider as the
initial subscribe, but only the eth_subscribe START was recorded.

Fix both:
- Normal relay branch: emit AddMetricForWebSocket(metricsData, err,
  websocketConn) before continue. Restructured the if/else so a single
  emit point covers both success and failure with the right err value.
- Subscription delivery: snapshot the populated RelayMetrics into a
  by-value local before launching the streaming goroutine, then emit
  one event per delivered message with a fresh timestamp. By-value
  snapshot is necessary because the start-of-subscription emit races
  with per-message emits on the same pointer (AddMetricForWebSocket
  mutates Success and Origin).

Coverage now matches the PR's billing claim: every relay across HTTP,
WS (normal + subscribe + per-delivery), gRPC unary, TendermintRPC,
REST, and UTXO chains lands in the OTel pipeline.
…Float, WS Locals

Gaps surfaced while reviewing OTel pipeline coverage; nothing here was
covered before.

Relay-usage emit ordering (rpcconsumer_logs_test.go)
- captureSink: in-package fake that records both Emit and
  EmitOptimizerQoS calls for assertion.
- TestAddMetricForHttp_OTelEventPopulated: Success=true and Origin
  reach the OTel event. Uses a nil metrics manager (resolved to
  NoOpConsumerMetrics via SafeMetrics), which mirrors the smart-router
  code path where SetRelayMetrics is a no-op.
- TestAddMetricForHttp_FailurePropagatesSuccessFalse: error →
  Success=false.
- TestAddMetricForGrpc_OTelEventPopulated: gRPC metadata Origin reaches
  the OTel event; metadata.Pairs used so the wire-level key lowercasing
  matches what the gRPC transport delivers.

Optimizer-QoS sampler (consumer_optimizer_qos_client_test.go, new file)
- sanitizeFloat: NaN/+Inf/-Inf → 0; finite values preserved.
- TestSampleAndEmit_EmitsPerChainProvider: fakeOptimizer + stake
  snapshot via UpdatePairingListStake. Asserts one record per
  (chain, provider), provider_stake populated from the snapshot,
  envelope fields wired correctly, WRS scores and contributions
  propagated, GetReportsToSend caches the same batch.
- TestSampleAndEmit_SkipsNilReports: nil entries from the optimizer
  don't leak into the sink or the scrape cache.
- TestSampleAndEmit_NoStakeNoEmit: chain registered but
  UpdatePairingListStake not yet fired → no records.
- TestRegisterOptimizer_IgnoresDuplicate: second registration for the
  same chain is rejected; first wins.

Websocket Origin path (chainlib/common_test.go)
- TestConstructFiberCallback_StashesOriginInLocals: real fiber server
  + gorilla websocket dial with Origin header; asserts the websocket
  handler reads the same value back from
  c.Locals(metrics.OriginHeaderKey).
- TestConstructFiberCallback_NoOriginWhenMetricsDisabled: same setup
  with isMetricEnabled=false → Locals absent.

Race-clean (go test -race) on metrics and chainlib for the new tests.
…ing-interval var

Four small follow-ups from the post-review polish pass; no behavior change
beyond what each item describes.

(1) QoS sampler: time.After → time.NewTicker
StartOptimizersQoSReportsCollecting used time.After inside a for-select,
allocating a fresh timer every tick (at the default 1s cadence, that's
~3600 timers/hr per running sampler) and never stopping it on ctx.Done.
Replace with a single time.NewTicker + defer ticker.Stop().

(2) Drop OTel service-name asymmetry
applyOTelSinkDefaults had a baked-in default of "lava-rpcconsumer",
which is wrong-by-name when the sink is wired into the smart router.
The smart router worked around it with a wiring-layer fallback to
"lava-rpcsmartrouter" — dead code in practice because the cobra flag's
defValue already supplies the right name to both binaries.

Drop the package-level default and the smart-router fallback. The
consumer and smart router each rely on their own flag defValue
("lava-rpcconsumer" / "lava-rpcsmartrouter"); the constructor logs
the resolved name at startup so an empty one is operator-visible.

(3) Eliminate package-level OptimizerQosServerSamplingInterval var
The flag was bound via DurationVar(&metrics.OptimizerQosServerSamplingInterval, …),
making the metrics package hold mutable global state whose only consumers
are two rpcconsumer/rpcsmartrouter Start() functions. Replace with the
codebase's standard pattern: Flags().Duration(…) at registration, then
viper.GetDuration(…) at the use site, into a local variable shared
between the two StartXxx calls in each binary.

(4) gRPC streaming audit — confirmed no gap
chainlib/grpcproxy.makeProxyFunc implements
grpc.UnknownServiceHandler with a one-shot RecvMsg → callBack → SendMsg
pattern. Streaming gRPC RPCs are collapsed to unary semantics by the
proxy (first request → one response → end of stream), so every gRPC
call lands on chainlib/grpc.go's AddMetricForGrpc path regardless of
whether the spec marks the method as unary or streaming. No additional
emit needed.

Bundles a few aligned in-flight edits in the same files (HTTP→4318
comment updates, SinkStats simplification, nil-report skip refinement)
that match the PR body's design.
…nnect state-walk

Two in-flight refinements that belong to the PR scope but had been
sitting in the working tree without a home commit.

(1) SinkStats: drop Failed / Dropped fields, keep only Sent
The OTel SDK doesn't expose per-record send / fail / drop status from
its API surface — those counters live inside the SDK and surface
through its own diagnostics. Shadowing them in SinkStats was misleading
(producer-side Sent was the only counter we actually maintained).

Drop the unused fields. otel_usage_sink.go was already updated in
84c6a59 to construct SinkStats{Sent: …}, which works against either
the 3-field or 1-field struct; this commit just makes the struct match
its actual content.

(2) gRPC connect: replace single WaitForStateChange with a state-walk
upstream_grpc_pool.go.connect previously did:

    grpcConn.Connect()
    if !grpcConn.WaitForStateChange(connectCtx, connectivity.Idle) { … }

WaitForStateChange returns true on ANY state transition, including
Idle → TransientFailure, so the channel could end up "connected" while
actually failing. With WithBlock + WithReturnConnectionError gone in
the grpc 1.65 migration, we have to walk the state machine ourselves:

  - Ready → success.
  - Shutdown → fast-fail (channel can't recover from Shutdown).
  - Connecting / TransientFailure / Idle → keep waiting under the
    configured timeout.

This preserves the original "fast-fail on unreachable target, succeed
only at Ready" semantics that WithBlock provided.
Per-message subscription emits were inheriting the originating
*_subscribe's method and spec CU (e.g. eth_subscribe at 1000 on
Ethereum), so SUM(cu) across the resulting events over-billed by Nx
on chatty subscriptions like eth_subscribe newPendingTransactions.

Override at the per-delivery emit site:
  ApiMethod    = SubscriptionDeliveryMethod  ("subscription_delivery")
  ComputeUnits = DefaultSubscriptionDeliveryCU (10)

Originating subscribe still carries the spec CU under its real method.
Downstream billing is plain SUM(cu) - no subscription-specific dedup
rule, no discriminator field; each event accurately states its cost.
…rom CalculateQoSScoresForMetrics

The implementation forwards to weightedSelector.CalculateProviderScores
which doesn't take cu or requestedBlock; the only in-tree caller was
constructing cu=10 and requestedBlock=LATEST_BLOCK that got passed in
the front door and straight out the trash. A signature that lies about
its dependencies misleads readers and invites pointless tuning attempts.

Drop from the interface, the implementation, the call site (and its
unused spectypes import), and the test fake. Zero behavior change.
Was on the v1.28.0 / v0.4.0 era (June 2024 release train). Bumped to
the highest pair compatible with the project's go 1.23.3 directive:

  go.opentelemetry.io/otel                                       v1.28.0  -> v1.38.0
  go.opentelemetry.io/otel/sdk                                   v1.28.0  -> v1.38.0
  go.opentelemetry.io/otel/trace                                 v1.28.0  -> v1.38.0
  go.opentelemetry.io/otel/log                                   v0.4.0   -> v0.14.0
  go.opentelemetry.io/otel/sdk/log                               v0.4.0   -> v0.14.0
  go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp    v0.4.0   -> v0.14.0
  go.opentelemetry.io/otel/exporters/otlp/otlpmetric/*           v1.28.0  -> v1.38.0
  go.opentelemetry.io/otel/exporters/otlp/otlptrace/*            v1.28.0  -> v1.38.0
  go.opentelemetry.io/otel/exporters/stdout/*                    v1.28.0  -> v1.38.0
  go.opentelemetry.io/otel/exporters/prometheus                  v0.50.0  -> v0.60.0
  go.opentelemetry.io/contrib/exporters/autoexport               v0.53.0  -> v0.63.0

v1.42.0+ would require go 1.25.0 transitively; capped at v1.38.0 to
keep the project's go directive at 1.23.3.

The OTel prometheus exporter v0.60.0 needs newer prometheus/client_golang
APIs (NewConstNativeHistogram, model.UTF8Validation), so the in-repo
replace directives were bumped accordingly:

  github.com/prometheus/client_golang  v1.17.0  -> v1.22.0
  github.com/prometheus/common         v0.45.0  -> v0.63.0

API surface in protocol/metrics/otel_usage_sink.go and protocol/tracing/*
unchanged; full protocol test suite passes.
@nimrod-teich nimrod-teich force-pushed the refactor/usage-pipeline branch from 59d8487 to ca8fdba Compare May 20, 2026 13:16
@nimrod-teich nimrod-teich merged commit cfdd4b9 into main May 20, 2026
28 of 38 checks passed
@nimrod-teich nimrod-teich deleted the refactor/usage-pipeline branch May 20, 2026 13:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants