feat: add extproc prometheus metrics#316
Conversation
mathetake
left a comment
There was a problem hiding this comment.
thanks! Left comments mostly about the testability
this is why at least you need unit tests ;) anyways the coverage check is failing as expected |
internal/extproc/processor.go
Outdated
| model, body, err := p.config.bodyParser(path, rawBody) | ||
| if err != nil { | ||
| // Add to metrics tracker | ||
| metrics.RequestsTotal.WithLabelValues("unknown", "unknown", "error").Inc() |
There was a problem hiding this comment.
I see that we're setting some metrics when there are errors, but not on all of them. Is it intentional? If not, this looks a bit error-prone, and easy to miss setting the error metric when adding new conditionals to the code int he future...
WDYT about refactoring the metrics functionality out to a processor that could wrap this one? something like an InstrumentedProcessor that wraps this one and emits the metrics when needed? This way it's easier to always emit the error metrics (when the inner processor returned an error), and don't miss that.
Also, the backend and model are published in the headers, so they should be accessible to the wrapping processor.
I don't know if given the current code and execution flows it is possible, but if it is, it would probably be cleaner
There was a problem hiding this comment.
The metrics is created in its own factory. When the extproc process starts, the metrics is there. If there is a misconstructed body, tracking it in the metrics is more helpful than dropping it silently or burry it in logs.
There was a problem hiding this comment.
Agree, but this does not answer my question. Let me re-ask:
I see that we're setting some metrics when there are errors, but not on all of them. Is it intentional?
As an example, in the processor.go:102 you're incrementing the error counter, but then in lines 109, 115, 127 (it's an example, there are many others), you're not incrementing that error count.
Could you explain why? Under which circumstances should the error count be incremented and under which circumstances it should not? There needs to be a proper reasoning for that, also reflected in the comments, so that future changes to the processor can properly account for metrics.
You also did not provide an answer to the question about the Decorator pattern instead of embedding everything in the current logic. Could you also give your thoughts on that? Using a decoration pattern would help a lot not miss adding metrics in conditional cases (such as errors, etc).
There was a problem hiding this comment.
@nacx i see what you meant. Yes, all errors should be tracked. The codebase is rapidly changing to catch up these errors :D I'll make sure the metrics are all the places you mentioned during next rebase.
|
@mathetake PR updated with metrics builder and more test coverage. PTAL and I'll rebase. Thanks |
| return nil, nil, nil, fmt.Errorf("failed to marshal body: %w", err) | ||
| } | ||
| setContentLength(headerMutation, mut.Body) | ||
| o.requestStart = time.Now() |
There was a problem hiding this comment.
Move this line upfront as there are early returns, also request starts is counted from requestHeader not body @mathetake ?
There was a problem hiding this comment.
i don't see a requestHeader call here. But it can definitely move there once the API is implemented @mathetake
There was a problem hiding this comment.
I agree with @yuzisun. The main issue is that there is a mix of lifecycles here:
- The request lifecycle is owned by the processor. That is the one that has the OnRequestHeaders, OnResponseBody, etc.
- This is "translator code", which basically transforms request/responses from/to the specific selected backend formats.
With this, I think there a re a couple options:
- If we want the translator code to be recording metrics, then some context about the initial request (such as the request creation time) should be passed to it when it's being instantiated.
- If we want to keep the translator focused on just translating (this is probably what I think would be cleaner), we should think about a way the translator could provide information about internal processing to the calling processor, so that the processor can emit the right metrics.
| // Since we are only interested in the time between tokens, and openai streaming is by chunk, | ||
| // we can calculate the time between tokens by the time between the last token and the current token, divided by the number of tokens. | ||
| // And in some cases, the number of tokens can be 0, so we need to check for that. | ||
| div := tokenUsage.OutputTokens |
There was a problem hiding this comment.
need to confirm if the token count reported is accumulative or just the output tokens in the trunk, it may differ between different backends
|
@mathetake sure thing, back to work today, will rebase soon. |
d24a773 to
7e9800c
Compare
|
|
||
| // ProcessRequestBody implements [Processor.ProcessRequestBody]. | ||
| func (c *chatCompletionProcessor) ProcessRequestBody(ctx context.Context, rawBody *extprocv3.HttpBody) (res *extprocv3.ProcessingResponse, err error) { | ||
| c.requestStart = time.Now() // Track the start time of the request |
There was a problem hiding this comment.
Should we set the requestStart in the OnRequestHeaders method?
There was a problem hiding this comment.
Sorry, I meant ProcessRequestHeaders
| return nil, nil, nil, fmt.Errorf("failed to marshal body: %w", err) | ||
| } | ||
| setContentLength(headerMutation, mut.Body) | ||
| o.requestStart = time.Now() |
There was a problem hiding this comment.
I agree with @yuzisun. The main issue is that there is a mix of lifecycles here:
- The request lifecycle is owned by the processor. That is the one that has the OnRequestHeaders, OnResponseBody, etc.
- This is "translator code", which basically transforms request/responses from/to the specific selected backend formats.
With this, I think there a re a couple options:
- If we want the translator code to be recording metrics, then some context about the initial request (such as the request creation time) should be passed to it when it's being instantiated.
- If we want to keep the translator focused on just translating (this is probably what I think would be cleaner), we should think about a way the translator could provide information about internal processing to the calling processor, so that the processor can emit the right metrics.
23d6403 to
7141702
Compare
**Commit Message** Add prometheus metrics to extproc to track data plan metrics. These metrics help understand requests or token level usage on models and backends. Signed-off-by: Huamin Chen <hchen@redhat.com>
|
@mathetake @nacx @yuzisun at the moment, there are still questions on entrypoint of recording the start of the request header. I don't see request header entrypoint in main branch yet. But the current code is flexible to move to new entrypoints when you have them. |
Signed-off-by: Huamin Chen <hchen@redhat.com>
|
**Commit Message** After #325, the translator is not where the abstraction over :path is implemented, but it moved to the choice of processor. As a result, the translator interface has become effectively tied to a specific endpoint, notably /v1/chat/completion. This renames the `translator.Translator` interface accordingly not only to remove the unnecessary code path but also facilitates the implementation of additional features like metrics. **Related Issues/PRs (if applicable)** Follow up on #325 and contribute to #316 --------- Signed-off-by: Takeshi Yoneda <t.y.mathetake@gmail.com>
| fs.StringVar(&flags.promAddr, | ||
| "promAddr", | ||
| ":9190", | ||
| "address for prometheus metrics, default is :9190", |
There was a problem hiding this comment.
The flag library already includes the default when printing the flags. If we keep it here, there will be duplicate text.
| "address for prometheus metrics, default is :9190", | |
| "address for prometheus metrics", |
| go func() { | ||
| <-ctx.Done() | ||
| s.GracefulStop() | ||
| if err := promServer.Shutdown(ctx); err != nil { |
There was a problem hiding this comment.
Probably it's better to use context.Background() here, as ctx is already terminated at this point.
| // cost is the cost of the request that is accumulated during the processing of the response. | ||
| costs translator.LLMTokenUsage | ||
| // for metrics | ||
| metrics *metrics.TokenMetrics |
There was a problem hiding this comment.
Let's extract TokenMetrics as an interface and keep the implementation type private to the metrics package. Keep the NewTokenMetrics() constructor but probably just returning the interface int he signature. This will make this more flexible and easier to test injecting convenient mocks tailored to the needs of each test.
| c.modelName = "unknown" | ||
| model, body, err := parseOpenAIChatCompletionBody(rawBody) | ||
| if err != nil { | ||
| c.metrics.UpdateRequestMetrics(c.backendName, c.modelName, "error") |
There was a problem hiding this comment.
Can you remove these error metrics from every conditional and instead just do it once on a deferred function at the beginning of the method, as suggested in the previous review?
Doing it in a deferred function ensures that we will not miss recording the error metric in the future if we add other error blocks and forget to copy the metric recording. It is overall less error-prone.
| } | ||
|
|
||
| // Track the backend and model name for metrics | ||
| c.metrics.StartRequest(c.backendName, c.modelName) |
There was a problem hiding this comment.
This should be probably moved to the ProcessRequestHeaders method.
| selectedBackendHeaderKey: "x-ai-gateway-backend-key", | ||
| modelNameHeaderKey: "x-ai-gateway-model-key", | ||
| }, requestHeaders: headers, logger: slog.Default(), translator: mt} | ||
| }, requestHeaders: headers, logger: slog.Default(), translator: mt, metrics: metrics.NewTokenMetrics()} |
There was a problem hiding this comment.
I think this test file should contain assertions to verify that the right metrics methods were called.
If you turn the metrics property into an interface as suggested above, you can use a mock implementation that just records the number of times each method is called, and after each individual test you can verify that the right methods have been called the right number of times.
THis way the unit tests will check that the processor behaves as expected but also that it calls the logic to record metrics and it's not missing any, or calling more than one by mistake (this would catch missing metric recording in error blocks or duplicate metric recording in translator/processor, and make the code more robust and more future-change-proof).
| return nil, nil, nil, fmt.Errorf("failed to marshal body: %w", err) | ||
| } | ||
| setContentLength(headerMutation, mut.Body) | ||
| o.tokenMetrics.StartRequest(o.backendName, o.modelName) |
There was a problem hiding this comment.
If we use here the same instance of the token metrics than the one used by the processor, then the request recording as already started and the translator shouldn't need to do this.
| backendName string | ||
| modelName string |
There was a problem hiding this comment.
Could these fields be moved to the TokenMetrics implementation, and provide methods to set them?
I think it makes sense and keeps the processor state cleaner, plus these values will be already available to the methods called by the translator, and we won't need to have those in the translator either. I think that would be cleaner.
@mathetake WDYT?
| requestStart time.Time | ||
| modelName string | ||
| backendName string | ||
| metrics *metrics.Metrics |
There was a problem hiding this comment.
Are these actually used?
In any case, when you see the same properties/state being used in N different places (processor, translator, this type), it is a sign that things could be better encapsulated.
These fields should be probably all moved inside the TokenMetrics implementation, and the "tokenMetrics" instance should be the only thing that is shared and passed between processor, translator, etc.
| } | ||
| headerMutation = &extprocv3.HeaderMutation{} | ||
| setContentLength(headerMutation, mut.Body) | ||
| o.tokenMetrics.UpdateTokenMetrics(o.backendName, o.modelName, tokenUsage.OutputTokens, tokenUsage.InputTokens, tokenUsage.TotalTokens) |
There was a problem hiding this comment.
As mentioned in the previous review, these metrics are already recorded by the calling processor in its ProcessResponseBody method, once the translator returns.
It looks like the values will be incremened twice, which is not correct?
This is why it is important to have the tests I mention above, to make sure we're only recording metrics when needed, and not doing it N times by mistake.
This also applies to the other translators.
@mathetake ok, that'll be a major refactor. I am going to do it in a clean way then. Closing this PR and will submit a new one based on the updated interface. |
|
No reason to close the PR though ... |
|
Agree. No need to close the PR. You're doing an amazing job on this one, keeping up with all the reviews, etc. We can merge this and tackle that comment in a follow-up PR! |
Commit Message
Add prometheus metrics to extproc to track data plan metrics. These metrics help understand requests or token level usage on models and backends.
Commit Message
Related Issues/PRs (if applicable)
Special notes for reviewers (if applicable)
Below is an example output from extproc: