Skip to content

feat: implement diagnostic channels for observability#3195

Open
logaretm wants to merge 85 commits intoredis:masterfrom
logaretm:feat/tracing-channel
Open

feat: implement diagnostic channels for observability#3195
logaretm wants to merge 85 commits intoredis:masterfrom
logaretm:feat/tracing-channel

Conversation

@logaretm
Copy link

@logaretm logaretm commented Mar 11, 2026


Actual Diff is here: https://github.com/logaretm/node-redis/pull/1/changes
This is currently waiting on #3110 to get merged and then it will show the true size of changes


Re-implements otel metrics tracing using diagnostic and tracing channels, while setting up more channels for tracing.

The key shift: core code now describes what is happening, not what to compute.

Before, every instrumented code path had to know about OTel, creating closures, capturing timestamps, building attribute maps, recording histograms. The code was telling the system how to measure itself. And every metric class needed a noop counterpart for when its metric group was disabled.

Now core code says "a command started," "a connection closed," "an error occurred." Subscribers like OTel, Sentry, a debug logger, anything can independently decide what to compute from those events.

The domain knowledge in #3110 (which metrics to emit, which semantic conventions to follow, how to classify errors) is preserved. It just moved from being inline in the hot path to being a subscriber that opts in when needed.

Noops are gone. When a metric group is disabled, no subscription is created. No subscribers means hasSubscribers is false, which means trace() and publish() skip the channel entirely, zero cost by design, not by maintaining parallel noop implementations. Both noop-metrics.ts and noop-meter.ts are deleted — when OTel is not initialized, no OTelMetrics instance is created, no instruments are registered, no subscriptions exist.

Two functions for all observability:

  • trace() — wraps async operations via TracingChannel (commands, batches, connections, pool waits)
  • publish() — emits point events via dc.channel() (errors, cache hits/misses, pubsub, maintenance, connection lifecycle)

Both use factory callbacks + hasSubscribers checks for zero overhead when no APM subscribes. This is true for metrics, if the user never calls the opentelemetry plugin then no channel subscriptions occur, and no object allocation happens compared to what we had before.

Channels

TracingChannel (async lifecycle: start/end/asyncStart/asyncEnd/error)

This sets up tracing with spans in the future, but still can be used by metrics or logs.

Channel Wraps
node-redis:command Individual command execution
node-redis:batch MULTI/PIPELINE as a whole
node-redis:connect Client connection
node-redis:connection:wait Pool wait for available client

Single instance events

This is what metrics usually use, not suitable for traces but can also be used by logs.

Channel Emits
node-redis:connection:ready Connection established (includes create time)
node-redis:connection:closed Connection closed (includes reason)
node-redis:connection:relaxed-timeout Maintenance timeout relaxation
node-redis:connection:handoff Connection handed off to new node
node-redis:error Client/cluster errors
node-redis:maintenance Enterprise maintenance notifications
node-redis:pubsub PubSub messages in/out
node-redis:cache:request Client-side cache hit/miss
node-redis:cache:eviction Cache eviction
node-redis:command:reply Command reply (for stream lag, pubsub out)

What this enables

Any APM can subscribe without monkey-patching or depending on OTel:

import dc from 'node:diagnostics_channel';
import { CHANNELS } from 'redis';

// Create spans
dc.tracingChannel(CHANNELS.TRACE_COMMAND).subscribe({
  start(ctx) { /* start span */ },
  asyncEnd(ctx) { /* end span */ },
  error(ctx) { /* record error, end span */ },
});

// Track errors
dc.channel(CHANNELS.ERROR).subscribe((ctx) => { /* record error metric */ });

Key Changes

  • Zero OTelMetrics imports in core code
  • Zero noop files — both noop-metrics.ts and noop-meter.ts deleted
  • Existing production code: +603 / -1,152 = -549 lines
  • New channel infrastructure (tracing.ts): +280 lines
  • Net production code: -269 lines
  • All channel names exported via CHANNELS const map
  • All event types exported for consumer typing

Setting Up OTEL Tracing

Almost Everything Is Already Done

  • Async context propagation: tracePromise (used by trace()) already manages AsyncLocalStorage context. The OTel context manager piggybacks on this. No context.with(trace.setSpan(...)) needed, spans created in start are automatically the active span for downstream operations.
  • Sanitized args: ctx.args is already sanitized by sanitizeArgs(), so db.query.text is just a simple join (e.g., SET mykey ?).
  • Error classification: getErrorInfo() already extracts errorType, category, and statusCode for span attributes.

Here is an example of tracing for command calls:

import { trace, context, SpanKind, SpanStatusCode } from "@opentelemetry/api";
import dc from "node:diagnostics_channel";
import { CHANNELS } from "redis";

const tracer = trace.getTracer("node-redis");

const commandTC = dc.tracingChannel(CHANNELS.TRACE_COMMAND);
commandTC.subscribe({
  start(ctx) {
    const span = tracer.startSpan(ctx.command, {
      kind: SpanKind.CLIENT,
      attributes: {
        "db.system.name": "redis",
        "db.operation.name": ctx.command,
        "db.query.text": ctx.args.join(" "),
        "db.namespace": String(ctx.database),
        "server.address": ctx.serverAddress,
        "server.port": ctx.serverPort,
      },
    });
    // Set span on OTel context — AsyncLocalStorage propagates it
    // to asyncEnd/error automatically via TracingChannel
    trace.setSpan(context.active(), span);
  },
  asyncEnd() {
    trace.getSpan(context.active())?.end();
  },
  error(ctx) {
    const span = trace.getSpan(context.active());
    if (span) {
      span.recordException(ctx.error);
      span.setStatus({
        code: SpanStatusCode.ERROR,
        message: ctx.error?.message,
      });
      span.end();
    }
  },
});                                      

Note that the exact implementation can differ since we may need to use bindStore on the AsyncLocalStorage instance inside OTEL itself, but should be straight forward.


Note

Medium Risk
Touches core client hot paths (command execution, socket lifecycle, pooling, cluster redirections) to emit diagnostics/tracing events and register clients for metrics, which could affect performance or lifecycle edge-cases if instrumentation is mis-wired. Changes are additive and gated by subscriber checks, but span multiple critical runtime components.

Overview
Adds a diagnostics-based observability layer: new lib/client/tracing.ts exposes trace() (TracingChannel-wrapped async ops) and publish() (point events) with argument sanitization, and the client now emits these events around connect(), sendCommand(), MULTI/pipeline execution, pool wait time, pubsub messages, cache hits/misses/evictions, maintenance notifications, and connection lifecycle.

Introduces client identity/registry plumbing for metrics attribution: new identity.ts generates stable, bounded clientIds and roles (standalone/cluster/pool/etc), RedisSocket/PubSub/CommandsQueue now carry clientId, and clients register/unregister with a new ClientRegistry so OpenTelemetry metrics can observe pending requests, cache size, and connection state.

Exposes OpenTelemetry.init() from @redis/client and adds docs + an otel-metrics example; updates dependencies/lockfile for @opentelemetry/api/@opentelemetry/sdk-metrics, and adds tests covering tracing sanitization, identity, and client registry behavior.

Written by Cursor Bugbot for commit c7d824c. This will update automatically on new commits. Configure here.

@logaretm logaretm force-pushed the feat/tracing-channel branch from ce03e84 to 0a0e64d Compare March 26, 2026 02:22
@logaretm logaretm changed the title feat: Implement TracingChannel Proposal feat: implement TracingChannel-powered observability Mar 26, 2026
Remove duplicate COMMAND_REPLY publish from sendCommand — typed
commands already publish via _executeCommand. Also merge the two
separate COMMAND_REPLY subscribers (pubsub out + streaming) into
a single #subscribeCommandReply method.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@logaretm logaretm changed the title feat: implement TracingChannel-powered observability feat: implement Diagnostic Channels for Observability Mar 26, 2026
@logaretm logaretm changed the title feat: implement Diagnostic Channels for Observability feat: implement diagnostic channels for observability Mar 26, 2026
When the acquire timeout fires, rejectWait() is called so the
TracingChannel error channel fires — APMs see the timeout as an
error instead of an orphaned span that never closes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@logaretm logaretm force-pushed the feat/tracing-channel branch 2 times, most recently from 1d34951 to a80bfc4 Compare March 26, 2026 03:06
When OTel is not initialized or disabled, skip instrument registration
and subscriber creation entirely. No noop instruments needed — if
there are no subscribers, channels are never fired.

Removes 139-line noop-meter.ts and all noop fallback branches from
instrument creation helpers.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@logaretm logaretm force-pushed the feat/tracing-channel branch from a80bfc4 to e13cd3e Compare March 26, 2026 03:07
- COMMAND_REPLY in _executeCommand now passes sanitizeArgs(parser.redisArgs)
  instead of raw args, preventing sensitive values from leaking to subscribers
- Pool connection wait trace promise gets .catch(noop) to prevent
  unhandled rejection if a tracing subscriber throws

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When pool.destroy() is called with tasks still queued, call
rejectWait() on each pending task so the TracingChannel error
channel fires and APM subscribers close their spans cleanly.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@logaretm logaretm force-pushed the feat/tracing-channel branch from e6fc460 to c6c6695 Compare March 26, 2026 05:14
logaretm and others added 3 commits March 26, 2026 01:32
- Tracing test: check isOpen before destroying already-closed client
- OTel E2E tests: rewrite 5 tests that called removed methods
  (resiliencyMetrics.recordClientErrors, streamMetrics.recordStreamLag)
  to publish via channels instead
- Connection closed metric: split subscriber for basic (connection count)
  and advanced (close reason) so tests enabling only one group see
  correct metrics

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The original code always recorded wait time, even for 0ms waits
when a client was available. The TracingChannel refactor only traced
the "no client, must wait" path. Add trace for immediate availability
so the metric is always emitted.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@logaretm logaretm force-pushed the feat/tracing-channel branch from 362e1f9 to c7d824c Compare March 26, 2026 05:43
@logaretm logaretm force-pushed the feat/tracing-channel branch 2 times, most recently from ce03e84 to da0139d Compare March 26, 2026 18:01
…or event

Remove redundant publish(CHANNELS.ERROR) from sendCommand — the
TracingChannel already emits error events for command failures.
Wire OTel resiliency subscriber to commandTC.error for command-level
error counts; CHANNELS.ERROR now only carries cluster/internal errors.

Extract subscribeTC() helper to reduce TracingChannel subscription
boilerplate and #recordError() to share error metric recording logic.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Fix All in Cursor

fn
fn,
resolveWait: resolveWait!,
rejectWait: rejectWait!,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pool trace uses potentially uninitialized resolveWait/rejectWait when no acquire timeout

Medium Severity

When acquireTimeout is 0 (or negative), no setTimeout is created, so rejectWait is never called on timeout. But more critically, the resolveWait and rejectWait variables are assigned inside the trace() callback's inner Promise constructor. If trace() short-circuits (no tracing channel available or no subscribers), fn() is called directly, and the Promise executor still runs synchronously — so the variables get assigned. However, if trace() takes the tracing path via tracePromise, the assignment still happens synchronously inside the Promise constructor. The real issue is that the task object stores resolveWait! and rejectWait! with non-null assertions, but if the trace call itself throws synchronously before executing fn(), these would remain uninitialized, and task.resolveWait() in #returnClient would crash.

Fix in Cursor Fix in Web

{ regex: /^(LPUSH|MSET|PFA|PUBLISH|RPUSH|SADD|SET|SPUBLISH|XADD|ZADD)/i, args: 1 },
{ regex: /^(HSET|HMSET|LSET|LINSERT)/i, args: 2 },
{ regex: /^(ACL|BIT|B[LRZ]|CLIENT|CLUSTER|CONFIG|COMMAND|DECR|DEL|EVAL|EX|FUNCTION|GEO|GET|HINCR|HMGET|HSCAN|INCR|L[TRLM]|MEMORY|P[EFISTU]|RPOP|S[CDIMORSU]|XACK|X[CDGILPRT]|Z[CDILMPRS])/i, args: -1 },
];
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sanitization regex prefix-matches commands too broadly causing over-redaction

Medium Severity

The SERIALIZATION_SUBSETS regexes use prefix matching (e.g., ^SET, ^HSET) without end anchors, so commands like SETNX match the args: 1 rule for SET, and SETRANGE also matches. More importantly, SUBSCRIBE starts with S and matches the args: -1 rule via S[CDIMORSU] before being checked, which is correct. However, SINTERCARD would match S[CDIMORSU] (the SI prefix), getting args: -1 and exposing all args. Meanwhile the PFA prefix in args: 1 would match PFADD (correct) but also match PFCOUNT and PFMERGE which are read-only commands — those should have args: -1 but instead get args: 1, incorrectly redacting their arguments.

Fix in Cursor Fix in Web

}
} else { // 3/3a
this.#statsCounter.recordMisses(1);
publish(CHANNELS.CACHE_REQUEST, () => ({ result: 'miss', clientId: client._clientId }));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache publish calls have wrong indentation level in conditional blocks

High Severity

The publish(CHANNELS.CACHE_REQUEST, ...) calls inside handleCache appear to have incorrect indentation that places them at the wrong nesting level. At line 561, the publish for cache hit is indented to the same level as the if (cacheEntry instanceof ClientSideCacheEntryValue) block rather than inside it. Similarly at lines 567 and 574. While JavaScript/TypeScript ignores indentation for execution, checking the actual brace structure reveals these calls execute inside the correct if/else branches — the indentation is misleading but the logic is correct. However, at line 561 the publish call is placed after recordHits(1) but before the return structuredClone(...), so if publish throws (subscriber error), the cache hit value is never returned.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants