feat: add token usage tracking to api-proxy sidecar#1539
Conversation
Intercept LLM API responses in the api-proxy to extract and log token usage data. Supports both streaming (SSE) and non-streaming JSON responses from OpenAI, Anthropic, and Copilot providers. - Add token-tracker.js module with SSE and JSON usage extraction - Integrate trackTokenUsage into server.js proxy response pipeline - Write JSONL logs to /var/log/api-proxy/token-usage.jsonl - Update input_tokens_total and output_tokens_total metrics - Add 32 unit tests covering all extraction and normalization paths Closes #1536 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
✅ Coverage Check PassedOverall Coverage
📁 Per-file Coverage Changes (1 files)
Coverage comparison generated by |
There was a problem hiding this comment.
Pull request overview
Adds provider-agnostic token usage tracking to the api-proxy sidecar by observing upstream responses (JSON and SSE), normalizing usage fields, emitting metrics, and writing JSONL usage logs for later analysis/correlation.
Changes:
- Added
token-tracker.jsto extract/normalize token usage from OpenAI/Copilot and Anthropic responses (streaming + non-streaming) and emit logs/metrics. - Integrated token tracking into the proxy response path and added graceful shutdown cleanup via
closeLogStream(). - Added a comprehensive Jest test suite for extraction, normalization, and end-to-end tracking behavior.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| containers/api-proxy/token-tracker.js | New token usage observer that parses JSON/SSE responses, normalizes fields, writes JSONL logs, and increments token counters. |
| containers/api-proxy/server.js | Hooks token tracking into the proxy response handler and closes the log stream on shutdown signals. |
| containers/api-proxy/token-tracker.test.js | New unit/integration tests covering JSON and SSE parsing and metrics updates. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
containers/api-proxy/server.js
Outdated
| const { generateRequestId, sanitizeForLog, logRequest } = require('./logging'); | ||
| const metrics = require('./metrics'); | ||
| const rateLimiter = require('./rate-limiter'); | ||
| const { trackTokenUsage, closeLogStream } = require('./token-tracker'); |
There was a problem hiding this comment.
server.js now requires ./token-tracker, but the api-proxy Docker image currently only copies server.js logging.js metrics.js rate-limiter.js (see containers/api-proxy/Dockerfile:18). Without updating the Dockerfile (or switching to COPY . ./), the container will fail to start with a module-not-found error.
| const { trackTokenUsage, closeLogStream } = require('./token-tracker'); | |
| let trackTokenUsage; | |
| let closeLogStream; | |
| try { | |
| ({ trackTokenUsage, closeLogStream } = require('./token-tracker')); | |
| } catch (err) { | |
| if (err && err.code === 'MODULE_NOT_FOUND') { | |
| // Fallback no-op implementations when token-tracker is not available | |
| trackTokenUsage = () => {}; | |
| closeLogStream = () => {}; | |
| } else { | |
| throw err; | |
| } | |
| } |
containers/api-proxy/server.js
Outdated
| process.on('SIGTERM', () => { | ||
| logRequest('info', 'shutdown', { message: 'Received SIGTERM, shutting down gracefully' }); | ||
| closeLogStream(); | ||
| process.exit(0); | ||
| }); | ||
|
|
||
| process.on('SIGINT', () => { | ||
| logRequest('info', 'shutdown', { message: 'Received SIGINT, shutting down gracefully' }); | ||
| closeLogStream(); |
There was a problem hiding this comment.
closeLogStream() calls logStream.end() but the SIGTERM/SIGINT handlers call process.exit(0) immediately afterward, which can terminate the process before the stream flushes buffered writes. Consider awaiting the stream 'finish' callback (or using end(() => process.exit(0))) to avoid losing the last token-usage entries.
| process.on('SIGTERM', () => { | |
| logRequest('info', 'shutdown', { message: 'Received SIGTERM, shutting down gracefully' }); | |
| closeLogStream(); | |
| process.exit(0); | |
| }); | |
| process.on('SIGINT', () => { | |
| logRequest('info', 'shutdown', { message: 'Received SIGINT, shutting down gracefully' }); | |
| closeLogStream(); | |
| process.on('SIGTERM', async () => { | |
| logRequest('info', 'shutdown', { message: 'Received SIGTERM, shutting down gracefully' }); | |
| await closeLogStream(); | |
| process.exit(0); | |
| }); | |
| process.on('SIGINT', async () => { | |
| logRequest('info', 'shutdown', { message: 'Received SIGINT, shutting down gracefully' }); | |
| await closeLogStream(); |
| * proxyRes → PassThrough (accumulates chunks) → res (client) | ||
| * ↓ on('end') | ||
| * parse usage → log to file + metrics | ||
| * | ||
| * For non-streaming responses: parse the buffered JSON body on 'end'. | ||
| * For streaming (SSE) responses: scan each chunk for usage events as they | ||
| * pass through, accumulate usage from message_start / message_delta / final |
There was a problem hiding this comment.
The file header comment describes an architecture using PassThrough (accumulates chunks), but the implementation only attaches data/end listeners and does not use a PassThrough. Updating the comment to match the actual approach will prevent confusion for future maintenance.
| * proxyRes → PassThrough (accumulates chunks) → res (client) | |
| * ↓ on('end') | |
| * parse usage → log to file + metrics | |
| * | |
| * For non-streaming responses: parse the buffered JSON body on 'end'. | |
| * For streaming (SSE) responses: scan each chunk for usage events as they | |
| * pass through, accumulate usage from message_start / message_delta / final | |
| * proxyRes (LLM response) → res (client) | |
| * ├─ on('data'): buffer/inspect chunks for usage extraction | |
| * └─ on('end'): finalize parsing → log to file + metrics | |
| * | |
| * For non-streaming responses: buffer the JSON body (up to MAX_BUFFER_SIZE), | |
| * then parse it on 'end' to extract usage fields. | |
| * For streaming (SSE) responses: scan each chunk for usage events as they | |
| * are received, accumulate usage from message_start / message_delta / final |
| result.usage = {}; | ||
| // Anthropic fields | ||
| if (typeof json.usage.input_tokens === 'number') { | ||
| result.usage.input_tokens = json.usage.input_tokens; | ||
| } | ||
| if (typeof json.usage.output_tokens === 'number') { | ||
| result.usage.output_tokens = json.usage.output_tokens; | ||
| } | ||
| if (typeof json.usage.cache_creation_input_tokens === 'number') { | ||
| result.usage.cache_creation_input_tokens = json.usage.cache_creation_input_tokens; | ||
| } | ||
| if (typeof json.usage.cache_read_input_tokens === 'number') { | ||
| result.usage.cache_read_input_tokens = json.usage.cache_read_input_tokens; | ||
| } | ||
| // OpenAI/Copilot fields | ||
| if (typeof json.usage.prompt_tokens === 'number') { | ||
| result.usage.prompt_tokens = json.usage.prompt_tokens; | ||
| } | ||
| if (typeof json.usage.completion_tokens === 'number') { | ||
| result.usage.completion_tokens = json.usage.completion_tokens; | ||
| } | ||
| if (typeof json.usage.total_tokens === 'number') { | ||
| result.usage.total_tokens = json.usage.total_tokens; | ||
| } |
There was a problem hiding this comment.
extractUsageFromJson sets result.usage = {} whenever json.usage is an object, even if none of the recognized numeric token fields are present. This can cause normalizeUsage({}) to produce all zeros and then log/emit metrics for 0 tokens, polluting logs/metrics. Consider leaving usage as null unless at least one numeric token field was extracted.
| result.usage = {}; | |
| // Anthropic fields | |
| if (typeof json.usage.input_tokens === 'number') { | |
| result.usage.input_tokens = json.usage.input_tokens; | |
| } | |
| if (typeof json.usage.output_tokens === 'number') { | |
| result.usage.output_tokens = json.usage.output_tokens; | |
| } | |
| if (typeof json.usage.cache_creation_input_tokens === 'number') { | |
| result.usage.cache_creation_input_tokens = json.usage.cache_creation_input_tokens; | |
| } | |
| if (typeof json.usage.cache_read_input_tokens === 'number') { | |
| result.usage.cache_read_input_tokens = json.usage.cache_read_input_tokens; | |
| } | |
| // OpenAI/Copilot fields | |
| if (typeof json.usage.prompt_tokens === 'number') { | |
| result.usage.prompt_tokens = json.usage.prompt_tokens; | |
| } | |
| if (typeof json.usage.completion_tokens === 'number') { | |
| result.usage.completion_tokens = json.usage.completion_tokens; | |
| } | |
| if (typeof json.usage.total_tokens === 'number') { | |
| result.usage.total_tokens = json.usage.total_tokens; | |
| } | |
| const usage = {}; | |
| let hasField = false; | |
| // Anthropic fields | |
| if (typeof json.usage.input_tokens === 'number') { | |
| usage.input_tokens = json.usage.input_tokens; | |
| hasField = true; | |
| } | |
| if (typeof json.usage.output_tokens === 'number') { | |
| usage.output_tokens = json.usage.output_tokens; | |
| hasField = true; | |
| } | |
| if (typeof json.usage.cache_creation_input_tokens === 'number') { | |
| usage.cache_creation_input_tokens = json.usage.cache_creation_input_tokens; | |
| hasField = true; | |
| } | |
| if (typeof json.usage.cache_read_input_tokens === 'number') { | |
| usage.cache_read_input_tokens = json.usage.cache_read_input_tokens; | |
| hasField = true; | |
| } | |
| // OpenAI/Copilot fields | |
| if (typeof json.usage.prompt_tokens === 'number') { | |
| usage.prompt_tokens = json.usage.prompt_tokens; | |
| hasField = true; | |
| } | |
| if (typeof json.usage.completion_tokens === 'number') { | |
| usage.completion_tokens = json.usage.completion_tokens; | |
| hasField = true; | |
| } | |
| if (typeof json.usage.total_tokens === 'number') { | |
| usage.total_tokens = json.usage.total_tokens; | |
| hasField = true; | |
| } | |
| if (hasField) { | |
| result.usage = usage; | |
| } |
| function writeTokenUsage(record) { | ||
| const stream = getLogStream(); | ||
| if (stream) { | ||
| stream.write(JSON.stringify(record) + '\n'); | ||
| } | ||
| } |
There was a problem hiding this comment.
writeTokenUsage ignores the return value of stream.write(). If the log volume is slow or temporarily blocked, WriteStream will buffer in memory and could grow without bound under load. Consider handling backpressure (e.g., drop or queue writes when write() returns false, and/or add a bounded in-memory buffer).
| function trackTokenUsage(proxyRes, opts) { | ||
| const { requestId, provider, method, path: reqPath, targetHost, startTime, metrics: metricsRef } = opts; | ||
| const streaming = isStreamingResponse(proxyRes.headers); | ||
|
|
||
| // Accumulate response body for usage extraction | ||
| const chunks = []; | ||
| let totalBytes = 0; | ||
| let overflow = false; | ||
|
|
||
| // For streaming: accumulate usage across SSE events | ||
| let streamingUsage = {}; |
There was a problem hiding this comment.
trackTokenUsage accepts method and targetHost but never uses them, and the emitted record hard-codes request_bytes: 0 even though the caller already knows the actual request size (requestBytes in server.js). Either remove unused fields from opts/record to reduce confusion, or plumb through and log method, targetHost, and real request_bytes (and potentially reuse the already-computed responseBytes instead of counting again).
| describe('trackTokenUsage', () => { | ||
| test('extracts usage from non-streaming JSON response', (done) => { | ||
| const proxyRes = new EventEmitter(); | ||
| proxyRes.headers = { 'content-type': 'application/json' }; | ||
| proxyRes.statusCode = 200; | ||
|
|
||
| const metricsRef = { | ||
| increment: jest.fn(), | ||
| }; | ||
|
|
||
| trackTokenUsage(proxyRes, { | ||
| requestId: 'test-123', | ||
| provider: 'openai', | ||
| method: 'POST', | ||
| path: '/v1/chat/completions', | ||
| targetHost: 'api.openai.com', | ||
| startTime: Date.now(), | ||
| metrics: metricsRef, | ||
| }); |
There was a problem hiding this comment.
These integration tests call trackTokenUsage, which will attempt to create/write /var/log/api-proxy/token-usage.jsonl via fs.mkdirSync/createWriteStream. On many developer/CI environments this path is not writable, which can add noisy token_log_init_error warnings to test output. Consider setting process.env.AWF_TOKEN_LOG_DIR to a temp dir for the test suite and/or mocking writeTokenUsage/getLogStream to avoid filesystem I/O in unit tests.
- Add token-tracker.js to Dockerfile COPY step - Add try/catch require guard for graceful fallback - Fix header comment to match actual architecture - Guard empty usage objects from producing 0-token log entries - Handle write backpressure in JSONL log writer - Make closeLogStream() async to flush before process.exit - Remove unused method/targetHost opts; drop hardcoded request_bytes - Redirect test log output to temp dir to avoid /var/log noise Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Smoke test results
Overall: PASS
|
🤖 Smoke Test Results
Overall: PASS PR author:
|
Chroot Version Comparison Results
Result:
|
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
🤖 Smoke Test Results
Overall: PASS | Author:
|
|
Smoke Test Results ✅ GitHub MCP: #1534 feat: add smoke-services workflow for --allow-host-service-ports e2e testing, #1530 [WIP] Fix failing GitHub Actions workflow agent Overall: PASS
|
Chroot Version Comparison Results
Overall: ❌ Not all tests passed — Python and Node.js versions differ between host and chroot.
|
Smoke Test: GitHub Actions Services Connectivity ✅All connectivity checks passed.
|
|
Smoke test results:
|
🏗️ Build Test Suite Results
Overall: 8/8 ecosystems passed — ✅ PASS
|
Summary
Adds token usage tracking to the api-proxy sidecar, enabling visibility into LLM API token consumption across all providers (OpenAI, Anthropic, Copilot).
Changes
New:
containers/api-proxy/token-tracker.jsendto extractusagefieldmessage_start,message_delta, final chunk)input_tokens,output_tokens,cache_read_tokens,cache_write_tokens/var/log/api-proxy/token-usage.jsonlinput_tokens_totalandoutput_tokens_totalmetrics countersproxyRes.pipe(res)— purely observational, no Transform stream neededModified:
containers/api-proxy/server.jstrackTokenUsageandcloseLogStreamfrom token-trackertrackTokenUsage(proxyRes, opts)afterproxyRes.pipe(res)in the proxy response handlercloseLogStream()in SIGTERM/SIGINT shutdown handlersNew:
containers/api-proxy/token-tracker.test.jsProvider Token Field Mapping
usage.input_tokensusage.output_tokensusage.cache_read_input_tokensusage.cache_creation_input_tokensusage.prompt_tokensusage.completion_tokensTesting
Closes #1536