[Online Scoring][7.0/x] Add find_completed_sessions method #19718
[Online Scoring][7.0/x] Add find_completed_sessions method #19718dbczumar merged 158 commits intomlflow:masterfrom
Conversation
27d3cc9 to
032b0e3
Compare
47b4e4f to
842d3e2
Compare
Add CompletedSession dataclass for representing completed sessions eligible for online scoring. Add find_completed_sessions method to AbstractStore and SqlAlchemyStore for efficiently querying completed sessions within a time window. Signed-off-by: dbczumar <corey.zumar@databricks.com>
Signed-off-by: dbczumar <corey.zumar@databricks.com>
Jobs access SqlAlchemyStore directly, so no REST API is needed. The session scoring job can call find_completed_sessions() directly on the store. Signed-off-by: dbczumar <corey.zumar@databricks.com>
Signed-off-by: dbczumar <corey.zumar@databricks.com>
Add optional filter_string parameter to find_completed_sessions method that filters sessions based on their first trace. When provided, only sessions whose first trace (by timestamp) matches the filter will be returned. This enables session-level scorers to apply filters to determine which sessions to evaluate, with the filter semantics being that it applies to the first trace in each session. Signed-off-by: dbczumar <corey.zumar@databricks.com>
Update the abstract method signature to include filter_string parameter that was added to SqlAlchemyStore implementation. Signed-off-by: dbczumar <corey.zumar@databricks.com>
- Extract filter application logic into shared _apply_trace_filter_clauses helper - Use descriptive _SqlAlchemyStatement TypeVar bounded to Select and Query types - Add proper type hints: ColumnElement for filters, Subquery for join subqueries - Move exists import to top level (was redundantly imported in function) - Eliminate ~50 lines of duplicated filter logic between search_traces and find_completed_sessions Signed-off-by: dbczumar <corey.zumar@databricks.com>
Signed-off-by: dbczumar <corey.zumar@databricks.com>
- Extract 5 helper methods from 210-line find_completed_sessions - _build_candidate_sessions_subquery: sessions with traces in time window - _build_first_trace_filter_subquery: filter by first trace (returns Subquery | None) - _build_session_stats_subquery: aggregate first/last timestamps - _build_sessions_with_recent_traces_subquery: find incomplete sessions - _build_completed_sessions_query: build final query with ordering - Main method reduced from ~210 to ~35 lines - All helpers have proper type hints and single responsibility Signed-off-by: dbczumar <corey.zumar@databricks.com>
Signed-off-by: dbczumar <corey.zumar@databricks.com>
Signed-off-by: dbczumar <corey.zumar@databricks.com>
…aining coverage - test_find_completed_sessions: 126 -> 73 lines * Use for loops for trace creation * Consolidate assertions * Remove redundant pagination test - test_find_completed_sessions_with_filter_string: 124 -> 51 lines * Reduce from 4 sessions to 3 * Combine tag and metadata filter tests more concisely * Use f-strings for trace IDs - Fix variable shadowing: rename loop var from 'time' to 'timestamp' - Remove single-line docstring per clint guidelines Signed-off-by: dbczumar <corey.zumar@databricks.com>
…ameter - Remove candidate_sessions and filtered_sessions distinction - Single 'sessions' parameter - clearer and simpler - Caller passes filtered_sessions if available, otherwise candidate_sessions - Removes conditional logic from _build_session_stats_subquery - Simplifies docstring - no need to explain filtering logic Signed-off-by: dbczumar <corey.zumar@databricks.com>
Move find_completed_sessions above all helper methods for better readability. Public API method now comes first, followed by private helpers. New order: 1. find_completed_sessions (public) 2. _build_candidate_sessions_subquery 3. _build_first_trace_filter_subquery 4. _build_session_stats_subquery 5. _build_sessions_with_recent_traces_subquery 6. _build_completed_sessions_query Signed-off-by: dbczumar <corey.zumar@databricks.com>
Signed-off-by: dbczumar <corey.zumar@databricks.com>
Signed-off-by: dbczumar <corey.zumar@databricks.com>
Signed-off-by: dbczumar <corey.zumar@databricks.com>
dbczumar
left a comment
There was a problem hiding this comment.
@AveshCSingh Thank you so much for the feedback! @smoorjani could you take a pass?
| min_last_trace_timestamp_ms: int, | ||
| ) -> Subquery: | ||
| """ | ||
| Build subquery for sessions with at least one trace in the time window. |
There was a problem hiding this comment.
Ah, great point. Fixed!
| Sessions are ordered by (last_trace_timestamp_ms ASC, session_id ASC) for | ||
| deterministic pagination when timestamp ties occur. |
| This efficiently identifies completed sessions in a single query by: | ||
| 1. Finding all sessions with last trace in [min_last_trace_timestamp_ms, | ||
| max_last_trace_timestamp_ms] | ||
| 2. Checking if each session has any traces after max_last_trace_timestamp_ms |
There was a problem hiding this comment.
Yeah, 2 and 3 are redundant - removed!
smoorjani
left a comment
There was a problem hiding this comment.
Love it! This is amazing, left a few minor nits/questions
| # Build filter conditions starting with attribute filters | ||
| filter_conditions = [*attribute_filters] | ||
|
|
||
| # Handle run_id filter if present |
There was a problem hiding this comment.
nit: noticed we lost the comments when copying over, can we add those back? I found them helpful
There was a problem hiding this comment.
Ah, sorry about that. Restored!
| filter_string: str | None = None, | ||
| ) -> list["CompletedSession"]: | ||
| """ | ||
| Find completed sessions within a time window based on their last trace timestamp. |
There was a problem hiding this comment.
nit: might just be me, but didn't fully understand this until reading the code. Could we define a completed session here (and in the other docstrings)? for example,
A completed session is one where no traces are logged after the max_last_trace_timestamp_ms.
There was a problem hiding this comment.
Absolutely! Done!
| session_id ASC). | ||
| """ | ||
| with self.ManagedSessionMaker() as session: | ||
| candidate_sessions = self._build_candidate_sessions_subquery( |
There was a problem hiding this comment.
thanks so much for this! this made reading so easy
| .subquery() | ||
| ) | ||
|
|
||
| def _build_sessions_with_recent_traces_subquery( |
There was a problem hiding this comment.
nit: not an expert so not sure how much of an impact this has, but should we join against candidate sessions to filter out traces before the min, instead of scanning over the entire table?
There was a problem hiding this comment.
great idea & very impactful. I've incorporated this :)
Signed-off-by: dbczumar <corey.zumar@databricks.com>
Signed-off-by: dbczumar <corey.zumar@databricks.com>
…ble scan Filter by candidate sessions to only check sessions we've already identified, instead of scanning all traces in the experiment. Signed-off-by: dbczumar <corey.zumar@databricks.com>
| # Step 4: Find sessions with any trace > max timestamp (still ongoing) | ||
| # Example: Session B has trace at 500 > 400 | ||
| sessions_with_recent_traces = self._build_sessions_with_recent_traces_subquery( | ||
| session=session, | ||
| experiment_id=experiment_id, | ||
| max_last_trace_timestamp_ms=max_last_trace_timestamp_ms, | ||
| sessions=( | ||
| filtered_sessions if filtered_sessions is not None else candidate_sessions | ||
| ), | ||
| ) |
There was a problem hiding this comment.
Do we still need this when we're filtering last trace timestamp calculated in step 3? Line 3413 and 3414 should have same effect?
There was a problem hiding this comment.
Oof - yeah, you're right. Removed the redundant query!
| session=session, | ||
| experiment_id=experiment_id, | ||
| sessions=( | ||
| filtered_sessions if filtered_sessions is not None else candidate_sessions |
There was a problem hiding this comment.
IMO we should return candidate_sessions in _build_first_trace_filter_subquery instead of None when filter_string is empty, so we don't need if-else here
There was a problem hiding this comment.
Yeah, great point. Fixed!
| Sessions with any traces after this time are excluded. | ||
| max_results: Maximum number of sessions to return. If None, returns all | ||
| matching sessions. | ||
| filter_string: Optional search filter string to apply to the first trace |
There was a problem hiding this comment.
I'm a bit concerned that it's no obvious the filter string only applies on the first trace of the session 🤔 But if this is internal only then it's probably fine
There was a problem hiding this comment.
Agreed, but hopefully the docstring is sufficient for now
serena-ruan
left a comment
There was a problem hiding this comment.
LGTM once https://github.com/mlflow/mlflow/pull/19718/files#r2680738843 is resolved!
Instead of returning None from _build_first_trace_filter_subquery when filter_string is empty, return candidate_sessions directly. This eliminates the need for conditional checks in the calling code since the result is semantically correct (all candidates pass when there's no filter). Changes: - _build_first_trace_filter_subquery now returns Subquery instead of Subquery | None - Return candidate_sessions when filter_string is empty - Remove if-else checks that selected between filtered_sessions and candidate_sessions Addresses: mlflow#19718 (comment) Signed-off-by: dbczumar <corey.zumar@databricks.com>
The sessions_with_recent_traces subquery checked for sessions with any trace where timestamp_ms > max_last_trace_timestamp_ms. However, this check is redundant because sessions_with_stats already contains the MAX timestamp for each session. If the MAX timestamp is <= max_last_trace_timestamp_ms, then by mathematical definition, no trace in that session can be greater than max_last_trace_timestamp_ms. Removing this simplifies the query and improves performance by eliminating: - One subquery construction - One outer join operation - One filter condition All tests continue to pass, confirming the logic is correct. Signed-off-by: dbczumar <corey.zumar@databricks.com>
Related Issues/PRs
#xxxWhat changes are proposed in this pull request?
Add find_completed_sessions utility method / SQL query
How is this PR tested?
Does this PR require documentation update?
Release Notes
Is this a user-facing change?
What component(s), interfaces, languages, and integrations does this PR affect?
Components
area/tracking: Tracking Service, tracking client APIs, autologgingarea/models: MLmodel format, model serialization/deserialization, flavorsarea/model-registry: Model Registry service, APIs, and the fluent client calls for Model Registryarea/scoring: MLflow Model server, model deployment tools, Spark UDFsarea/evaluation: MLflow model evaluation features, evaluation metrics, and evaluation workflowsarea/gateway: MLflow AI Gateway client APIs, server, and third-party integrationsarea/prompts: MLflow prompt engineering features, prompt templates, and prompt managementarea/tracing: MLflow Tracing features, tracing APIs, and LLM tracing functionalityarea/projects: MLproject format, project running backendsarea/uiux: Front-end, user experience, plotting, JavaScript, JavaScript dev serverarea/build: Build and test infrastructure for MLflowarea/docs: MLflow documentation pagesHow should the PR be classified in the release notes? Choose one:
rn/none- No description will be included. The PR will be mentioned only by the PR number in the "Small Bugfixes and Documentation Updates" sectionrn/breaking-change- The PR will be mentioned in the "Breaking Changes" sectionrn/feature- A new user-facing feature worth mentioning in the release notesrn/bug-fix- A user-facing bug fix worth mentioning in the release notesrn/documentation- A user-facing documentation change worth mentioning in the release notesShould this PR be included in the next patch release?
Yesshould be selected for bug fixes, documentation updates, and other small changes.Noshould be selected for new features and larger changes. If you're unsure about the release classification of this PR, leave this unchecked to let the maintainers decide.What is a minor/patch release?
Bug fixes, doc updates and new features usually go into minor releases.
Bug fixes and doc updates usually go into patch releases.