Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cba770f
feat: add TracingChannel support with argument sanitization
logaretm Mar 25, 2026
5a0dad3
test: add comprehensive unit tests for sanitizeArgs
logaretm Mar 25, 2026
205b9d8
refactor: use ? placeholder for redacted args
logaretm Mar 25, 2026
bcb51b1
refactor: move OTel command metrics to TracingChannel subscribers
logaretm Mar 25, 2026
83ed332
refactor: remove NoopCommandMetrics class
logaretm Mar 25, 2026
9554fcf
refactor: move all inline OTel metrics to diagnostics_channel events
logaretm Mar 25, 2026
01e6b4f
refactor: consolidate all OTel metric classes into channel subscribers
logaretm Mar 25, 2026
024767a
test: remove no-op channel publish tests
logaretm Mar 25, 2026
b67ee21
refactor: use CHANNELS map consistently and tracingChannel API for su…
logaretm Mar 25, 2026
c6403c6
chore: remove dead types and interfaces from opentelemetry module
logaretm Mar 26, 2026
aa6a92e
refactor: deduplicate diagnostics_channel loading
logaretm Mar 26, 2026
e0bd538
refactor: rename BatchTraceContext to BatchOperationContext
logaretm Mar 26, 2026
12bf22f
chore: remove unused CONNECTION_WAIT_START event
logaretm Mar 26, 2026
3a4c6d1
fix: restore wasReady guard on connection count decrement
logaretm Mar 26, 2026
3297619
feat: export all channel names and event types for APM consumers
logaretm Mar 26, 2026
9ae0cb8
refactor: convert pool wait time to TracingChannel
logaretm Mar 26, 2026
b39e0a4
fix: consistent serverPort typing across all event interfaces
logaretm Mar 26, 2026
08bc5bc
refactor: add getTracingChannel helper with auto-resolved context types
logaretm Mar 26, 2026
cbb8dd6
refactor: stop exporting dc, use getTracingChannel and getChannel ins…
logaretm Mar 26, 2026
ba3183f
refactor: unify trace functions into single generic trace()
logaretm Mar 26, 2026
0f3bd14
refactor: store unsubscriber closures instead of handler references
logaretm Mar 26, 2026
0a0e64d
chore: delete noop-metrics.ts
logaretm Mar 26, 2026
9940998
fix: deduplicate COMMAND_REPLY events and merge subscriber
logaretm Mar 26, 2026
38000e1
fix: reject tracing promise on pool acquire timeout
logaretm Mar 26, 2026
e13cd3e
chore: remove noop-meter.ts
logaretm Mar 26, 2026
19f8eaa
fix: sanitize COMMAND_REPLY args and catch pool trace promise
logaretm Mar 26, 2026
a0f2241
fix: reject pending wait traces on pool destroy
logaretm Mar 26, 2026
c6c6695
style: move const noop after imports in pool.ts
logaretm Mar 26, 2026
e677a61
fix: E2E test failures from refactor
logaretm Mar 26, 2026
96d23b4
fix: record connection wait time when client is immediately available
logaretm Mar 26, 2026
c7d824c
style: clean up comments, remove em dashes, be concise
logaretm Mar 26, 2026
da0139d
fix(perf): optimize channel acquisition
logaretm Mar 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions packages/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,23 @@ export { SetOptions, CLIENT_KILL_FILTERS, FAILOVER_MODES, CLUSTER_SLOT_STATES, C

export { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache';
export { OpenTelemetry } from './lib/opentelemetry';

export {
CHANNELS,
type ChannelEvents,
type CommandTraceContext,
type BatchCommandTraceContext,
type BatchOperationContext,
type ConnectTraceContext,
type ConnectionReadyEvent,
type ConnectionClosedEvent,
type ConnectionRelaxedTimeoutEvent,
type ConnectionHandoffEvent,
type ConnectionWaitContext,
type ClientErrorEvent,
type MaintenanceNotificationEvent,
type PubSubMessageEvent,
type CacheRequestEvent,
type CacheEvictionEvent,
type CommandReplyEvent,
} from './lib/client/tracing';
29 changes: 8 additions & 21 deletions packages/client/lib/client/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { EventEmitter } from 'stream';
import RedisClient from '.';
import { RedisArgument, ReplyUnion, TransformReply, TypeMapping } from '../RESP/types';
import { BasicCommandParser } from './parser';
import { OTelMetrics, CSC_RESULT, CSC_EVICTION_REASON } from '../opentelemetry';
import { publish, CHANNELS } from './tracing';

/**
* A snapshot of cache statistics.
Expand Down Expand Up @@ -558,33 +558,20 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
// If instanceof is "too slow", can add a "type" and then use an "as" cast to call proper getters.
if (cacheEntry instanceof ClientSideCacheEntryValue) { // "2b1"
this.#statsCounter.recordHits(1);
OTelMetrics.instance.clientSideCacheMetrics.recordCacheRequest(
CSC_RESULT.HIT,
client._clientId,
);
OTelMetrics.instance.clientSideCacheMetrics.recordNetworkBytesSaved(
cacheEntry.value,
client._clientId,
);
publish(CHANNELS.CACHE_REQUEST, () => ({ result: 'hit', clientId: client._clientId }));

return structuredClone(cacheEntry.value);
} else if (cacheEntry instanceof ClientSideCacheEntryPromise) { // 2b2
// This counts as a miss since the value hasn't been fully loaded yet.
this.#statsCounter.recordMisses(1);
OTelMetrics.instance.clientSideCacheMetrics.recordCacheRequest(
CSC_RESULT.MISS,
client._clientId,
);
publish(CHANNELS.CACHE_REQUEST, () => ({ result: 'miss', clientId: client._clientId }));
reply = await cacheEntry.promise;
} else {
throw new Error("unknown cache entry type");
}
} else { // 3/3a
this.#statsCounter.recordMisses(1);
OTelMetrics.instance.clientSideCacheMetrics.recordCacheRequest(
CSC_RESULT.MISS,
client._clientId,
);
publish(CHANNELS.CACHE_REQUEST, () => ({ result: 'miss', clientId: client._clientId }));

const startTime = performance.now();
const promise = fn();
Expand Down Expand Up @@ -640,7 +627,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
this.clear(false);
// Record invalidations as server-initiated evictions
if (oldSize > 0) {
OTelMetrics.instance.clientSideCacheMetrics.recordCacheEviction(CSC_EVICTION_REASON.INVALIDATION, oldSize);
publish(CHANNELS.CACHE_EVICTION, () => ({ reason: 'invalidation', count: oldSize }));
}
this.emit("invalidate", key);

Expand All @@ -661,7 +648,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
this.#keyToCacheKeySetMap.delete(key.toString());
if (deletedCount > 0) {
// Record invalidations as server-initiated evictions
OTelMetrics.instance.clientSideCacheMetrics.recordCacheEviction(CSC_EVICTION_REASON.INVALIDATION, deletedCount);
publish(CHANNELS.CACHE_EVICTION, () => ({ reason: 'invalidation', count: deletedCount }));
}
}

Expand Down Expand Up @@ -692,7 +679,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
this.delete(cacheKey);
this.#statsCounter.recordEvictions(1);
// Entry failed validation - this is TTL expiry since invalidation marks are handled separately
OTelMetrics.instance.clientSideCacheMetrics.recordCacheEviction(CSC_EVICTION_REASON.TTL);
publish(CHANNELS.CACHE_EVICTION, () => ({ reason: 'ttl', count: 1 }));
this.emit("cache-evict", cacheKey);

return undefined;
Expand Down Expand Up @@ -731,7 +718,7 @@ export class BasicClientSideCache extends ClientSideCacheProvider {
this.deleteOldest();
this.#statsCounter.recordEvictions(1);
// Eviction due to cache capacity limit
OTelMetrics.instance.clientSideCacheMetrics.recordCacheEviction(CSC_EVICTION_REASON.FULL);
publish(CHANNELS.CACHE_EVICTION, () => ({ reason: 'full', count: 1 }));
}

this.#cacheKeyToEntryMap.set(cacheKey, cacheEntry);
Expand Down
21 changes: 9 additions & 12 deletions packages/client/lib/client/enterprise-maintenance-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import { setTimeout } from "node:timers/promises";
import { RedisTcpSocketOptions } from "./socket";
import diagnostics_channel from "node:diagnostics_channel";
import { RedisArgument } from "../RESP/types";
import { OTelMetrics } from "../opentelemetry";
import { METRIC_ERROR_ORIGIN } from "../opentelemetry/types";
import { publish, CHANNELS } from "./tracing";

type RedisType = RedisClient<any, any, any, any, any>;

Expand Down Expand Up @@ -124,12 +123,12 @@ export default class EnterpriseMaintenanceManager {
errorHandler: (error: Error) => {
dbgMaintenance("handshake failed:", error);

OTelMetrics.instance.resiliencyMetrics.recordClientErrors({
publish(CHANNELS.ERROR, () => ({
error,
origin: METRIC_ERROR_ORIGIN.CLIENT,
origin: 'client',
internal: true,
clientId,
});
}));

if (options.maintNotifications === "enabled") {
throw error;
Expand Down Expand Up @@ -160,10 +159,10 @@ export default class EnterpriseMaintenanceManager {

const type = String(push[0]);

OTelMetrics.instance.resiliencyMetrics.recordMaintenanceNotifications(
type,
this.#client._clientId,
);
publish(CHANNELS.MAINTENANCE, () => ({
notification: type,
clientId: this.#client._clientId,
}));

emitDiagnostics({
type,
Expand Down Expand Up @@ -306,9 +305,7 @@ export default class EnterpriseMaintenanceManager {
dbgMaintenance("Resume writing");
this.#client._unpause();
this.#onMigrated();
OTelMetrics.instance.connectionBasicMetrics.recordConnectionHandoff(
this.#client._clientId,
);
publish(CHANNELS.CONNECTION_HANDOFF, () => ({ clientId: this.#client._clientId }));
};

#onMigrating = () => {
Expand Down
Loading