Skip to content

Commit c4b9f54

Browse files
authored
fix(diagnostics): flush OTel trace batches
Apply diagnostics.otel.flushIntervalMs to OpenTelemetry trace batching so short-lived Windows and QA runs do not lose late lifecycle/model spans. Also make the OTel QA smoke wait for required telemetry and print bounded failure diagnostics.
1 parent d569e41 commit c4b9f54

3 files changed

Lines changed: 95 additions & 3 deletions

File tree

extensions/diagnostics-otel/src/service.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ const logShutdown = vi.hoisted(() => vi.fn().mockResolvedValue(undefined));
5353
const traceExporterCtor = vi.hoisted(() => vi.fn());
5454
const metricExporterCtor = vi.hoisted(() => vi.fn());
5555
const logExporterCtor = vi.hoisted(() => vi.fn());
56+
const spanProcessorCtor = vi.hoisted(() => vi.fn());
5657
const unhandledRejectionHandlerState = vi.hoisted(() => {
5758
let handlers: Array<(reason: unknown) => boolean> = [];
5859
return {
@@ -136,6 +137,9 @@ vi.mock("@opentelemetry/sdk-metrics", () => ({
136137
}));
137138

138139
vi.mock("@opentelemetry/sdk-trace-base", () => ({
140+
BatchSpanProcessor: function BatchSpanProcessor(exporter?: unknown, options?: unknown) {
141+
spanProcessorCtor(exporter, options);
142+
},
139143
ParentBasedSampler: function ParentBasedSampler() {},
140144
TraceIdRatioBasedSampler: function TraceIdRatioBasedSampler() {},
141145
}));
@@ -261,6 +265,10 @@ function firstExporterOptions(mock: { mock: { calls: unknown[][] } }): { url?: s
261265
return mockCallArg(mock, 0) as { url?: string };
262266
}
263267

268+
function firstSpanProcessorOptions(): { scheduledDelayMillis?: number } {
269+
return mockCallArg(spanProcessorCtor, 1) as { scheduledDelayMillis?: number };
270+
}
271+
264272
function firstSetSpanContext(): Record<string, unknown> {
265273
return mockCallArg(telemetryState.tracer.setSpanContext, 1) as Record<string, unknown>;
266274
}
@@ -390,6 +398,7 @@ describe("diagnostics-otel service", () => {
390398
traceExporterCtor.mockClear();
391399
metricExporterCtor.mockClear();
392400
logExporterCtor.mockClear();
401+
spanProcessorCtor.mockClear();
393402
unhandledRejectionHandlerState.reset();
394403
unhandledRejectionHandlerState.register.mockClear();
395404
delete process.env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT;
@@ -1195,6 +1204,18 @@ describe("diagnostics-otel service", () => {
11951204
await service.stop?.(ctx);
11961205
});
11971206

1207+
test("applies flush interval to trace batching", async () => {
1208+
const service = createDiagnosticsOtelService();
1209+
const ctx = createTraceOnlyContext(OTEL_TEST_ENDPOINT);
1210+
ctx.config.diagnostics!.otel!.flushIntervalMs = 250;
1211+
1212+
await service.start(ctx);
1213+
1214+
expect(spanProcessorCtor).toHaveBeenCalledTimes(1);
1215+
expect(firstSpanProcessorOptions().scheduledDelayMillis).toBe(1000);
1216+
await service.stop?.(ctx);
1217+
});
1218+
11981219
test("uses signal-specific OTLP endpoints ahead of the shared endpoint", async () => {
11991220
const service = createDiagnosticsOtelService();
12001221
const ctx = createOtelContext(OTEL_TEST_ENDPOINT, {

extensions/diagnostics-otel/src/service.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@ import { resourceFromAttributes } from "@opentelemetry/resources";
1414
import { BatchLogRecordProcessor, LoggerProvider } from "@opentelemetry/sdk-logs";
1515
import { PeriodicExportingMetricReader } from "@opentelemetry/sdk-metrics";
1616
import { NodeSDK } from "@opentelemetry/sdk-node";
17-
import { ParentBasedSampler, TraceIdRatioBasedSampler } from "@opentelemetry/sdk-trace-base";
17+
import {
18+
BatchSpanProcessor,
19+
ParentBasedSampler,
20+
TraceIdRatioBasedSampler,
21+
} from "@opentelemetry/sdk-trace-base";
1822
import { ATTR_SERVICE_NAME } from "@opentelemetry/semantic-conventions";
1923
import {
2024
ATTR_GEN_AI_INPUT_MESSAGES,
@@ -1167,6 +1171,14 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
11671171
...(headers ? { headers } : {}),
11681172
})
11691173
: undefined;
1174+
const spanProcessors =
1175+
traceExporter && typeof otel.flushIntervalMs === "number"
1176+
? [
1177+
new BatchSpanProcessor(traceExporter, {
1178+
scheduledDelayMillis: Math.max(1000, otel.flushIntervalMs),
1179+
}),
1180+
]
1181+
: undefined;
11701182

11711183
const metricExporter = metricsEnabled
11721184
? new OTLPMetricExporter({
@@ -1186,7 +1198,7 @@ export function createDiagnosticsOtelService(): OpenClawPluginService {
11861198

11871199
sdk = new NodeSDK({
11881200
resource,
1189-
...(traceExporter ? { traceExporter } : {}),
1201+
...(spanProcessors ? { spanProcessors } : traceExporter ? { traceExporter } : {}),
11901202
...(metricReader ? { metricReader } : {}),
11911203
...(sampleRate !== undefined
11921204
? {

scripts/qa-otel-smoke.ts

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -923,6 +923,47 @@ function isLatestGenAiModelCallSpan(span: CapturedSpan): boolean {
923923
);
924924
}
925925

926+
async function delay(ms: number): Promise<void> {
927+
await new Promise((resolve) => setTimeout(resolve, ms));
928+
}
929+
930+
function hasRequiredSmokeSignals(receiver: ReturnType<typeof startLocalOtlpReceiver>): boolean {
931+
const spanNames = new Set(receiver.capturedSpans.map((span) => span.name));
932+
const metricNames = new Set(receiver.capturedMetrics.map((metric) => metric.name));
933+
return (
934+
REQUIRED_SPAN_NAMES.every((name) => spanNames.has(name)) &&
935+
receiver.capturedSpans.some(isLatestGenAiModelCallSpan) &&
936+
REQUIRED_METRIC_NAMES.every((name) => metricNames.has(name)) &&
937+
receiver.capturedLogRecords.length > 0 &&
938+
["traces", "metrics", "logs"].every((signal) =>
939+
receiver.capturedRequests.some((request) => request.signal === signal)
940+
)
941+
);
942+
}
943+
944+
async function waitForExpectedTelemetry(
945+
receiver: ReturnType<typeof startLocalOtlpReceiver>,
946+
timeoutMs: number,
947+
): Promise<void> {
948+
const deadline = Date.now() + timeoutMs;
949+
while (Date.now() < deadline) {
950+
if (hasRequiredSmokeSignals(receiver)) {
951+
return;
952+
}
953+
await delay(250);
954+
}
955+
}
956+
957+
function formatBoundedList(values: readonly string[], maxItems: number): string {
958+
if (values.length === 0) {
959+
return "(none)";
960+
}
961+
const visible = values.slice(0, maxItems);
962+
const suffix =
963+
values.length > visible.length ? `, ... (${values.length - visible.length} more)` : "";
964+
return `${visible.join(", ")}${suffix}`;
965+
}
966+
926967
function assertSmoke(params: {
927968
childExitCode: number;
928969
disallowedBodyNeedles: string[];
@@ -1075,7 +1116,11 @@ async function main() {
10751116
child.stdout?.on("data", (chunk) => process.stdout.write(chunk));
10761117
child.stderr?.on("data", (chunk) => process.stderr.write(chunk));
10771118
childExitCode = await waitForChild(child);
1078-
await new Promise((resolve) => setTimeout(resolve, 3000));
1119+
if (childExitCode === 0) {
1120+
await waitForExpectedTelemetry(receiver, 15_000);
1121+
} else {
1122+
await delay(3000);
1123+
}
10791124
} finally {
10801125
try {
10811126
await collector?.close();
@@ -1136,6 +1181,20 @@ async function main() {
11361181
for (const failure of assertion.failures) {
11371182
process.stderr.write(`qa-otel-smoke: ${failure}\n`);
11381183
}
1184+
process.stderr.write(
1185+
`qa-otel-smoke: captured request counts traces=${assertion.signalRequestCounts.traces} ` +
1186+
`metrics=${assertion.signalRequestCounts.metrics} logs=${assertion.signalRequestCounts.logs}\n`,
1187+
);
1188+
process.stderr.write(
1189+
`qa-otel-smoke: captured decoded counts spans=${receiver.capturedSpans.length} ` +
1190+
`metrics=${receiver.capturedMetrics.length} logs=${receiver.capturedLogRecords.length}\n`,
1191+
);
1192+
process.stderr.write(
1193+
`qa-otel-smoke: captured span names: ${formatBoundedList(assertion.spanNames, 40)}\n`,
1194+
);
1195+
process.stderr.write(
1196+
`qa-otel-smoke: captured metric names: ${formatBoundedList(assertion.metricNames, 40)}\n`,
1197+
);
11391198
for (const [signal, contexts] of Object.entries(assertion.leakContexts)) {
11401199
for (const context of contexts ?? []) {
11411200
process.stderr.write(`qa-otel-smoke: ${signal} leak context: ${context}\n`);

0 commit comments

Comments
 (0)