Skip to content

feat: add extproc prometheus metrics#316

Closed
rootfs wants to merge 2 commits intoenvoyproxy:mainfrom
rootfs:prom
Closed

feat: add extproc prometheus metrics#316
rootfs wants to merge 2 commits intoenvoyproxy:mainfrom
rootfs:prom

Conversation

@rootfs
Copy link
Copy Markdown

@rootfs rootfs commented Feb 10, 2025

Commit Message
Add prometheus metrics to extproc to track data plan metrics. These metrics help understand requests or token level usage on models and backends.

Commit Message

Related Issues/PRs (if applicable)

Special notes for reviewers (if applicable)

Below is an example output from extproc:

# curl localhost:9190/metrics  
# HELP aigateway_first_token_latency_seconds Time to receive first token in streaming responses
# TYPE aigateway_first_token_latency_seconds histogram
aigateway_first_token_latency_seconds_bucket{backend="ollama",model="phi4",le="0.1"} 0
aigateway_first_token_latency_seconds_bucket{backend="ollama",model="phi4",le="0.25"} 0
aigateway_first_token_latency_seconds_bucket{backend="ollama",model="phi4",le="0.5"} 0
aigateway_first_token_latency_seconds_bucket{backend="ollama",model="phi4",le="1"} 1
aigateway_first_token_latency_seconds_bucket{backend="ollama",model="phi4",le="2.5"} 1
aigateway_first_token_latency_seconds_bucket{backend="ollama",model="phi4",le="5"} 1
aigateway_first_token_latency_seconds_bucket{backend="ollama",model="phi4",le="10"} 1
aigateway_first_token_latency_seconds_bucket{backend="ollama",model="phi4",le="+Inf"} 1
aigateway_first_token_latency_seconds_sum{backend="ollama",model="phi4"} 0.566842292
aigateway_first_token_latency_seconds_count{backend="ollama",model="phi4"} 1
# HELP aigateway_inter_token_latency_seconds Time between consecutive tokens in streaming responses
# TYPE aigateway_inter_token_latency_seconds histogram
aigateway_inter_token_latency_seconds_bucket{backend="ollama",model="phi4",le="0.1"} 7
aigateway_inter_token_latency_seconds_bucket{backend="ollama",model="phi4",le="0.25"} 7
aigateway_inter_token_latency_seconds_bucket{backend="ollama",model="phi4",le="0.5"} 7
aigateway_inter_token_latency_seconds_bucket{backend="ollama",model="phi4",le="1"} 7
aigateway_inter_token_latency_seconds_bucket{backend="ollama",model="phi4",le="2.5"} 7
aigateway_inter_token_latency_seconds_bucket{backend="ollama",model="phi4",le="5"} 7
aigateway_inter_token_latency_seconds_bucket{backend="ollama",model="phi4",le="10"} 7
aigateway_inter_token_latency_seconds_bucket{backend="ollama",model="phi4",le="+Inf"} 7
aigateway_inter_token_latency_seconds_sum{backend="ollama",model="phi4"} 4.837e-05
aigateway_inter_token_latency_seconds_count{backend="ollama",model="phi4"} 7
# HELP aigateway_model_tokens_total Total number of tokens processed by model and type
# TYPE aigateway_model_tokens_total counter
aigateway_model_tokens_total{backend="ollama",model="phi4",type="completion"} 1048
aigateway_model_tokens_total{backend="ollama",model="phi4",type="prompt"} 34
aigateway_model_tokens_total{backend="ollama",model="phi4",type="total"} 1082
# HELP aigateway_requests_total Total number of requests processed
# TYPE aigateway_requests_total counter
aigateway_requests_total{backend="ollama",model="phi4",status="success"} 2
# HELP aigateway_total_latency_seconds Time spent processing request
# TYPE aigateway_total_latency_seconds histogram
aigateway_total_latency_seconds_bucket{backend="ollama",model="phi4",status="success",le="0.1"} 0
aigateway_total_latency_seconds_bucket{backend="ollama",model="phi4",status="success",le="0.5"} 0
aigateway_total_latency_seconds_bucket{backend="ollama",model="phi4",status="success",le="1"} 1
aigateway_total_latency_seconds_bucket{backend="ollama",model="phi4",status="success",le="2.5"} 1
aigateway_total_latency_seconds_bucket{backend="ollama",model="phi4",status="success",le="5"} 1
aigateway_total_latency_seconds_bucket{backend="ollama",model="phi4",status="success",le="10"} 1
aigateway_total_latency_seconds_bucket{backend="ollama",model="phi4",status="success",le="20"} 2
aigateway_total_latency_seconds_bucket{backend="ollama",model="phi4",status="success",le="30"} 2
aigateway_total_latency_seconds_bucket{backend="ollama",model="phi4",status="success",le="60"} 2
aigateway_total_latency_seconds_bucket{backend="ollama",model="phi4",status="success",le="+Inf"} 2
aigateway_total_latency_seconds_sum{backend="ollama",model="phi4",status="success"} 14.449215641
aigateway_total_latency_seconds_count{backend="ollama",model="phi4",status="success"} 2

@rootfs rootfs requested a review from a team as a code owner February 10, 2025 18:25
@rootfs rootfs changed the title feat: add aigateway prometheus metrics to track model, request, and token usage feat: add extproc prometheus metrics Feb 10, 2025
Copy link
Copy Markdown
Member

@mathetake mathetake left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! Left comments mostly about the testability

@mathetake
Copy link
Copy Markdown
Member

I have only tested on OpenAI compatible platforms. If anybody can test aws bedrock, that'll be great

this is why at least you need unit tests ;) anyways the coverage check is failing as expected

model, body, err := p.config.bodyParser(path, rawBody)
if err != nil {
// Add to metrics tracker
metrics.RequestsTotal.WithLabelValues("unknown", "unknown", "error").Inc()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we're setting some metrics when there are errors, but not on all of them. Is it intentional? If not, this looks a bit error-prone, and easy to miss setting the error metric when adding new conditionals to the code int he future...

WDYT about refactoring the metrics functionality out to a processor that could wrap this one? something like an InstrumentedProcessor that wraps this one and emits the metrics when needed? This way it's easier to always emit the error metrics (when the inner processor returned an error), and don't miss that.
Also, the backend and model are published in the headers, so they should be accessible to the wrapping processor.

I don't know if given the current code and execution flows it is possible, but if it is, it would probably be cleaner

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metrics is created in its own factory. When the extproc process starts, the metrics is there. If there is a misconstructed body, tracking it in the metrics is more helpful than dropping it silently or burry it in logs.

Copy link
Copy Markdown
Member

@nacx nacx Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, but this does not answer my question. Let me re-ask:

I see that we're setting some metrics when there are errors, but not on all of them. Is it intentional?

As an example, in the processor.go:102 you're incrementing the error counter, but then in lines 109, 115, 127 (it's an example, there are many others), you're not incrementing that error count.
Could you explain why? Under which circumstances should the error count be incremented and under which circumstances it should not? There needs to be a proper reasoning for that, also reflected in the comments, so that future changes to the processor can properly account for metrics.

You also did not provide an answer to the question about the Decorator pattern instead of embedding everything in the current logic. Could you also give your thoughts on that? Using a decoration pattern would help a lot not miss adding metrics in conditional cases (such as errors, etc).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nacx i see what you meant. Yes, all errors should be tracked. The codebase is rapidly changing to catch up these errors :D I'll make sure the metrics are all the places you mentioned during next rebase.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not resolved

@rootfs
Copy link
Copy Markdown
Author

rootfs commented Feb 14, 2025

@mathetake PR updated with metrics builder and more test coverage. PTAL and I'll rebase. Thanks

return nil, nil, nil, fmt.Errorf("failed to marshal body: %w", err)
}
setContentLength(headerMutation, mut.Body)
o.requestStart = time.Now()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this line upfront as there are early returns, also request starts is counted from requestHeader not body @mathetake ?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't see a requestHeader call here. But it can definitely move there once the API is implemented @mathetake

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @yuzisun. The main issue is that there is a mix of lifecycles here:

  • The request lifecycle is owned by the processor. That is the one that has the OnRequestHeaders, OnResponseBody, etc.
  • This is "translator code", which basically transforms request/responses from/to the specific selected backend formats.

With this, I think there a re a couple options:

  • If we want the translator code to be recording metrics, then some context about the initial request (such as the request creation time) should be passed to it when it's being instantiated.
  • If we want to keep the translator focused on just translating (this is probably what I think would be cleaner), we should think about a way the translator could provide information about internal processing to the calling processor, so that the processor can emit the right metrics.

// Since we are only interested in the time between tokens, and openai streaming is by chunk,
// we can calculate the time between tokens by the time between the last token and the current token, divided by the number of tokens.
// And in some cases, the number of tokens can be 0, so we need to check for that.
div := tokenUsage.OutputTokens
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to confirm if the token count reported is accumulative or just the output tokens in the trunk, it may differ between different backends

@mathetake
Copy link
Copy Markdown
Member

@rootfs would it be possible to resolve the conflicts and address the existing comments? @nacx and I really would like to see this finish

@rootfs
Copy link
Copy Markdown
Author

rootfs commented Feb 24, 2025

@mathetake sure thing, back to work today, will rebase soon.

@rootfs rootfs force-pushed the prom branch 2 times, most recently from d24a773 to 7e9800c Compare February 25, 2025 15:58

// ProcessRequestBody implements [Processor.ProcessRequestBody].
func (c *chatCompletionProcessor) ProcessRequestBody(ctx context.Context, rawBody *extprocv3.HttpBody) (res *extprocv3.ProcessingResponse, err error) {
c.requestStart = time.Now() // Track the start time of the request
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set the requestStart in the OnRequestHeaders method?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't see this method yet

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant ProcessRequestHeaders

return nil, nil, nil, fmt.Errorf("failed to marshal body: %w", err)
}
setContentLength(headerMutation, mut.Body)
o.requestStart = time.Now()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @yuzisun. The main issue is that there is a mix of lifecycles here:

  • The request lifecycle is owned by the processor. That is the one that has the OnRequestHeaders, OnResponseBody, etc.
  • This is "translator code", which basically transforms request/responses from/to the specific selected backend formats.

With this, I think there a re a couple options:

  • If we want the translator code to be recording metrics, then some context about the initial request (such as the request creation time) should be passed to it when it's being instantiated.
  • If we want to keep the translator focused on just translating (this is probably what I think would be cleaner), we should think about a way the translator could provide information about internal processing to the calling processor, so that the processor can emit the right metrics.

@rootfs rootfs force-pushed the prom branch 2 times, most recently from 23d6403 to 7141702 Compare February 25, 2025 20:58
**Commit Message**
Add prometheus metrics to extproc to track data plan metrics.
These metrics help understand requests or token level usage
on models and backends.

Signed-off-by: Huamin Chen <hchen@redhat.com>
@rootfs
Copy link
Copy Markdown
Author

rootfs commented Feb 25, 2025

@mathetake @nacx @yuzisun at the moment, there are still questions on entrypoint of recording the start of the request header. I don't see request header entrypoint in main branch yet. But the current code is flexible to move to new entrypoints when you have them.

Signed-off-by: Huamin Chen <hchen@redhat.com>
@mathetake
Copy link
Copy Markdown
Member

mathetake commented Feb 25, 2025

  • I would strongly suggest to not have a metrics context tied to translator. The metric "sink" should be coupled with the kind of Processor, not translator. To do so,
    1. Define ChatCompletionsMetrics and make it owned by ChatCompletionsProcessor.
    2. Pass it from processor to the OpenAIChatCompletionTranslator methods as an argument or pass it to NewChatCompletionOpenAIToOpenAITranslator and NewChatCompletionOpenAIToAWSBedrockTranslator. Not initialize the metric stuff in translator's constructor. (extproc: renames translator interface #425 will be merged in a sec)
  • Then, the request time initialization should start at RequestHeaders method

mathetake added a commit that referenced this pull request Feb 25, 2025
**Commit Message**

After #325, the translator is not where the abstraction over :path is
implemented, but it moved to the choice of processor. As a result, the
translator interface has become effectively tied to a specific endpoint,
notably /v1/chat/completion. This renames the `translator.Translator`
interface accordingly not only to remove the unnecessary code path but
also facilitates the implementation of additional features like metrics.

**Related Issues/PRs (if applicable)**

Follow up on #325 and contribute to #316

---------

Signed-off-by: Takeshi Yoneda <t.y.mathetake@gmail.com>
fs.StringVar(&flags.promAddr,
"promAddr",
":9190",
"address for prometheus metrics, default is :9190",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag library already includes the default when printing the flags. If we keep it here, there will be duplicate text.

Suggested change
"address for prometheus metrics, default is :9190",
"address for prometheus metrics",

go func() {
<-ctx.Done()
s.GracefulStop()
if err := promServer.Shutdown(ctx); err != nil {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably it's better to use context.Background() here, as ctx is already terminated at this point.

// cost is the cost of the request that is accumulated during the processing of the response.
costs translator.LLMTokenUsage
// for metrics
metrics *metrics.TokenMetrics
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's extract TokenMetrics as an interface and keep the implementation type private to the metrics package. Keep the NewTokenMetrics() constructor but probably just returning the interface int he signature. This will make this more flexible and easier to test injecting convenient mocks tailored to the needs of each test.

c.modelName = "unknown"
model, body, err := parseOpenAIChatCompletionBody(rawBody)
if err != nil {
c.metrics.UpdateRequestMetrics(c.backendName, c.modelName, "error")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove these error metrics from every conditional and instead just do it once on a deferred function at the beginning of the method, as suggested in the previous review?
Doing it in a deferred function ensures that we will not miss recording the error metric in the future if we add other error blocks and forget to copy the metric recording. It is overall less error-prone.

}

// Track the backend and model name for metrics
c.metrics.StartRequest(c.backendName, c.modelName)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be probably moved to the ProcessRequestHeaders method.

selectedBackendHeaderKey: "x-ai-gateway-backend-key",
modelNameHeaderKey: "x-ai-gateway-model-key",
}, requestHeaders: headers, logger: slog.Default(), translator: mt}
}, requestHeaders: headers, logger: slog.Default(), translator: mt, metrics: metrics.NewTokenMetrics()}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test file should contain assertions to verify that the right metrics methods were called.

If you turn the metrics property into an interface as suggested above, you can use a mock implementation that just records the number of times each method is called, and after each individual test you can verify that the right methods have been called the right number of times.

THis way the unit tests will check that the processor behaves as expected but also that it calls the logic to record metrics and it's not missing any, or calling more than one by mistake (this would catch missing metric recording in error blocks or duplicate metric recording in translator/processor, and make the code more robust and more future-change-proof).

return nil, nil, nil, fmt.Errorf("failed to marshal body: %w", err)
}
setContentLength(headerMutation, mut.Body)
o.tokenMetrics.StartRequest(o.backendName, o.modelName)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use here the same instance of the token metrics than the one used by the processor, then the request recording as already started and the translator shouldn't need to do this.

Comment on lines +56 to +57
backendName string
modelName string
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these fields be moved to the TokenMetrics implementation, and provide methods to set them?
I think it makes sense and keeps the processor state cleaner, plus these values will be already available to the methods called by the translator, and we won't need to have those in the translator either. I think that would be cleaner.
@mathetake WDYT?

Comment on lines +34 to +37
requestStart time.Time
modelName string
backendName string
metrics *metrics.Metrics
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these actually used?

In any case, when you see the same properties/state being used in N different places (processor, translator, this type), it is a sign that things could be better encapsulated.

These fields should be probably all moved inside the TokenMetrics implementation, and the "tokenMetrics" instance should be the only thing that is shared and passed between processor, translator, etc.

}
headerMutation = &extprocv3.HeaderMutation{}
setContentLength(headerMutation, mut.Body)
o.tokenMetrics.UpdateTokenMetrics(o.backendName, o.modelName, tokenUsage.OutputTokens, tokenUsage.InputTokens, tokenUsage.TotalTokens)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the previous review, these metrics are already recorded by the calling processor in its ProcessResponseBody method, once the translator returns.

It looks like the values will be incremened twice, which is not correct?

This is why it is important to have the tests I mention above, to make sure we're only recording metrics when needed, and not doing it N times by mistake.

This also applies to the other translators.

@rootfs
Copy link
Copy Markdown
Author

rootfs commented Feb 26, 2025

  • I would strongly suggest to not have a metrics context tied to translator. The metric "sink" should be coupled with the kind of Processor, not translator. To do so,

    1. Define ChatCompletionsMetrics and make it owned by ChatCompletionsProcessor.
    2. Pass it from processor to the OpenAIChatCompletionTranslator methods as an argument or pass it to NewChatCompletionOpenAIToOpenAITranslator and NewChatCompletionOpenAIToAWSBedrockTranslator. Not initialize the metric stuff in translator's constructor. (extproc: renames translator interface #425 will be merged in a sec)
  • Then, the request time initialization should start at RequestHeaders method

@mathetake ok, that'll be a major refactor. I am going to do it in a clean way then. Closing this PR and will submit a new one based on the updated interface.

@rootfs rootfs closed this Feb 26, 2025
@mathetake
Copy link
Copy Markdown
Member

No reason to close the PR though ...

@nacx
Copy link
Copy Markdown
Member

nacx commented Feb 26, 2025

Agree. No need to close the PR. You're doing an amazing job on this one, keeping up with all the reviews, etc. We can merge this and tackle that comment in a follow-up PR!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants