Ray Data Refactor histogram metrics#57851
Conversation
Signed-off-by: Alan Guo <aguo@anyscale.com>
| task_completion_time: list[int] = metric_field( | ||
| default_factory=list, | ||
| description="Time spent running tasks to completion.", | ||
| task_completion_time_total: float = metric_field( |
There was a problem hiding this comment.
@alexeykudinkin , is it okay if we rename this to task_completion_time_total? I think this name is slightly better than calling the other metric task_completion_time_histogram
There was a problem hiding this comment.
| task_completion_time_total: float = metric_field( | |
| task_completion_time_total_s: float = metric_field( |
There was a problem hiding this comment.
Code Review
This pull request refactors Ray Data's histogram metrics by introducing a RuntimeMetricsHistogram class. This is a positive change that encapsulates histogram logic and simplifies the OpRuntimeMetrics class. The goal of removing the lock in OpRuntimeMetrics by moving delta-tracking to the StatsActor is sound.
However, I've identified a critical issue: the removal of the lock appears to have reintroduced a race condition. The OpRuntimeMetrics object is accessed by both the operator thread and the _StatsManager's background thread without synchronization, which can lead to inconsistent or corrupt metrics data.
Additionally, I have a couple of medium-severity suggestions to improve code quality. One is to avoid monkey-patching the Histogram object from ray.util.metrics by managing state within the _StatsActor. The other is to remove a now-unused method from the new RuntimeMetricsHistogram class.
Overall, the direction of the refactor is good, but the thread-safety issue needs to be addressed before merging.
| for metric in self.get_metrics(): | ||
| if not self._is_map and metric.map_only: | ||
| continue | ||
| if skip_internal_metrics and metric.internal_only: | ||
| continue | ||
| value = getattr(self, metric.name) | ||
| result.append((metric.name, value)) |
There was a problem hiding this comment.
Removing the _histogram_thread_lock has introduced a race condition. The OpRuntimeMetrics object is updated by the operator's thread (e.g., via on_task_finished which calls observe) and read by the _StatsManager's background thread (which calls as_dict).
The observe method in RuntimeMetricsHistogram performs a non-atomic read-modify-write operation (+=) on _bucket_counts. The as_dict method reads the RuntimeMetricsHistogram object, which is then pickled for an RPC call. This pickling process reads _bucket_counts.
Without a lock, the _StatsManager thread could read a partially updated or inconsistent state of _bucket_counts while the operator thread is modifying it, leading to incorrect metrics. The original _histogram_thread_lock was there to prevent this exact race condition.
A possible solution is to add a lock within RuntimeMetricsHistogram and use it in observe and when creating a copy of the histogram data for serialization. For example, as_dict could call a new copy() method on RuntimeMetricsHistogram that creates a thread-safe copy of the object.
There was a problem hiding this comment.
Although this comment is true, we choose to drop the lock to prioritize performance.
The consequence of reading a partially modified list of bucket counts is that the statsactor may get partially out-dated counts. But by the next loop, the counts should catch up.
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Alan Guo <aguo@anyscale.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request is a great refactoring of the histogram metrics in Ray Data. Abstracting the histogram logic into the RuntimeMetricsHistogram class and moving the delta tracking to the StatsActor significantly cleans up the code in OpRuntimeMetrics and correctly removes the need for a lock. The reintroduction of task_completion_time_total is also a good addition.
I have one main piece of feedback regarding a magic number in RuntimeMetricsHistogram.apply_to_metric that could lead to incorrect metric values for histograms with large boundary values. Please see the detailed comment.
| boundary_upper_bound = ( | ||
| self.boundaries[i] | ||
| if i < len(self._bucket_counts) - 1 | ||
| else self.boundaries[-1] + 100 | ||
| ) |
There was a problem hiding this comment.
The use of + 100 to determine the upper bound for the +Inf bucket is not robust. While it might work for time-based histograms in seconds, it will produce misleading results for histograms with different scales, such as histogram_buckets_bytes where the last boundary is 4096 * GiB. Adding 100 bytes to this large number will result in a representative value for the bucket that is almost identical to the lower boundary, skewing the histogram.
A better approach would be to use a proportional value, similar to how the average is calculated in _calculate_average_value. Using a multiplier, for example * 2, would make the logic scale-independent and consistent with the average calculation. This would make the representative value for the last bucket 1.5 * self.boundaries[-1], which is more sensible.
| boundary_upper_bound = ( | |
| self.boundaries[i] | |
| if i < len(self._bucket_counts) - 1 | |
| else self.boundaries[-1] + 100 | |
| ) | |
| boundary_upper_bound = ( | |
| self.boundaries[i] | |
| if i < len(self._bucket_counts) - 1 | |
| else self.boundaries[-1] * 2 | |
| ) |
There was a problem hiding this comment.
not an issue since we this is guaranteed to be the last bucket and there is no "upper limit" to the bucket (other than integer overflows).
| task_completion_time: list[int] = metric_field( | ||
| default_factory=list, | ||
| description="Time spent running tasks to completion.", | ||
| task_completion_time_total: float = metric_field( |
There was a problem hiding this comment.
| task_completion_time_total: float = metric_field( | |
| task_completion_time_total_s: float = metric_field( |
| ] | ||
|
|
||
|
|
||
| class RuntimeMetricsHistogram: |
There was a problem hiding this comment.
Why not just call it Histogram?
There was a problem hiding this comment.
to differentiate it from ray.util.metrics.Histogram.
| self._bucket_counts[self._find_bucket_index(value)] += num_observations | ||
| self._memoized_avg = None | ||
|
|
||
| def apply_to_metric( |
There was a problem hiding this comment.
| def apply_to_metric( | |
| def export_to( |
| if getattr(metric, "last_applied_bucket_counts_for_tags", None) is None: | ||
| metric.last_applied_bucket_counts_for_tags = {} |
There was a problem hiding this comment.
This is sketchy. Why not keeping track in this class iself?
Just copy the whole array as last snapshot
There was a problem hiding this comment.
We should not track it in this class because this class is pickled and sent over to the stats_actor.
I keep it tracked in the stats_actor. I could move this from the metric class to the stats_actor directly. I think that will definitely be less sketchy, but the logic will be less encapsulated and leak into the overall stats_actor
There was a problem hiding this comment.
I actually think this is the better approach. The alternative I would think of is would be something like:
def export_to(
self,
metric: Histogram,
tags: Dict[str, str],
previous_bucket_counts_for_tags: List[float]
):
But then that would move a bunch of logic out into the stats actor including:
last_applied_bucket_counts_for_tags will be a dictionary of tags -> counts. This will have to be stored per metric so in stats actor it will be:
last_applied_bucket_counts_for_tags_for_metric: Dict histogram_name -> tags -> bucket counts
tags_key for the above dictionary is a normalized string based on the label and values of the tags dictionary. This logic will also have to move up to the stats actor.
| if diff > 0: | ||
| for _ in range(diff): | ||
| metric.observe(bucket_value, tags) |
There was a problem hiding this comment.
Why do we need to do this hoopla instead of just wrapping prom's histogram in this class itself?
There was a problem hiding this comment.
the class is initiated in op_runtime_metrics and then passed to stats actor through pickling.
I don't know if prom's histogram is picklable (and even if it is, how efficiently it can be passed through).
Prom's histogram metric contains the values for each unique set of tags where a sample has been observed. There will need to be some amount of refactoring to send it over only once instead of per operator.
There was a problem hiding this comment.
Yeah, makes sense. I keep forgetting that we have convoluted setup
Signed-off-by: Alan Guo <aguo@anyscale.com>
There was a problem hiding this comment.
Bug
The average_total_task_completion_time_s property references self.task_completion_time_s, but this field was renamed to self.task_completion_time_total_s. This causes an AttributeError when the property is accessed.
python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py#L594-L600
ray/python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py
Lines 594 to 600 in bd5f9dc
iamjustinhsu
left a comment
There was a problem hiding this comment.
nice! I think if alexey has any follow ups we can do in a separate PR
## Description Before this PR, the metrics would follow this path 1. `StreamingExecutor` collects metrics per operator 2. `_StatsManager` creates a thread to export metrics 3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs a copy and holds a `_stats_lock`. 4. Stats Thread reads the metrics sent from 2) 5. Stats Thread sleeps every 5-10 seconds before exporting metrics to `_StatsActor`. These metrics can come in 2 forms: iteration and execution metrics. I believe the purpose of the stats thread created in 2) was 2-fold - Don't export stats very frequently - Don't export Iteration and Execution stats separately (have them sent in the same rpc call) However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See #57851 for more details. By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the `_StatsActor` were happening previously, and we can also tweak the update interval. ~~It's important to note that `_stats_lock` still lives on to update the last timestamps of each dataset. See * below for more details.~~ Now the new flow is: 1. `StreamingExecutor` collects metrics per operator 2. `StreamingExecutor` checks the last time `_StatsActor` was updated. If more than a default 5 seconds has passed since last updated, we send metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we update the last updated timestamp. See * below for caveat. ~~\*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp. `_stats_lock` is used to update that dictionary[dataset, last_updated] safely on `register_dataset` and on `shutdown`. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.~~ 1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.~~ 2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.~~ 3. I can removed the lock by keeping the state in the 2 areas - BatchIterator - StreamingExecutor I also verified that #55163 still solves the original issue ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description
Reintroduce the old task_completion_time metric as
`task_completion_time_total`.
Refactors the ray data histogram metrics to accomplish a few things:
- Abstract away histogram details into a class RuntimeMetricsHistogram
- Removes the need for a lock in the OpRuntimeMetrics class.
It does so primarily by moving the delta tracking logic from the
OpRuntimeMetrics to the StatsActor. The delta tracking logic is
necessary because the prometheus Histogram api only accepts new
observations as input and does not allow directly setting histogram
bucket values.
## Related issues
## Additional information
Verified metrics worked:
```
# HELP ray_data_task_completion_time Time spent per task running those tasks to completion.
# TYPE ray_data_task_completion_time histogram
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.1",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.25",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1.0",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5.0",operator="ReadRange->Map(identity_with_sleep)_1"} 3.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="7.5",operator="ReadRange->Map(identity_with_sleep)_1"} 6.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="10.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="15.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="20.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="25.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="50.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="75.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="100.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="150.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="+Inf",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_count{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_sum{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 65.0
# HELP ray_data_task_completion_time_total Time spent running tasks to completion. This is a sum of all tasks' completion times.
# TYPE ray_data_task_completion_time_total gauge
ray_data_task_completion_time_total{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 62.872898580506444
```
---------
Signed-off-by: Alan Guo <aguo@anyscale.com>
## Description Before this PR, the metrics would follow this path 1. `StreamingExecutor` collects metrics per operator 2. `_StatsManager` creates a thread to export metrics 3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs a copy and holds a `_stats_lock`. 4. Stats Thread reads the metrics sent from 2) 5. Stats Thread sleeps every 5-10 seconds before exporting metrics to `_StatsActor`. These metrics can come in 2 forms: iteration and execution metrics. I believe the purpose of the stats thread created in 2) was 2-fold - Don't export stats very frequently - Don't export Iteration and Execution stats separately (have them sent in the same rpc call) However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See ray-project#57851 for more details. By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the `_StatsActor` were happening previously, and we can also tweak the update interval. ~~It's important to note that `_stats_lock` still lives on to update the last timestamps of each dataset. See * below for more details.~~ Now the new flow is: 1. `StreamingExecutor` collects metrics per operator 2. `StreamingExecutor` checks the last time `_StatsActor` was updated. If more than a default 5 seconds has passed since last updated, we send metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we update the last updated timestamp. See * below for caveat. ~~\*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp. `_stats_lock` is used to update that dictionary[dataset, last_updated] safely on `register_dataset` and on `shutdown`. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.~~ 1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.~~ 2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.~~ 3. I can removed the lock by keeping the state in the 2 areas - BatchIterator - StreamingExecutor I also verified that ray-project#55163 still solves the original issue ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description
Reintroduce the old task_completion_time metric as
`task_completion_time_total`.
Refactors the ray data histogram metrics to accomplish a few things:
- Abstract away histogram details into a class RuntimeMetricsHistogram
- Removes the need for a lock in the OpRuntimeMetrics class.
It does so primarily by moving the delta tracking logic from the
OpRuntimeMetrics to the StatsActor. The delta tracking logic is
necessary because the prometheus Histogram api only accepts new
observations as input and does not allow directly setting histogram
bucket values.
## Related issues
## Additional information
Verified metrics worked:
```
# HELP ray_data_task_completion_time Time spent per task running those tasks to completion.
# TYPE ray_data_task_completion_time histogram
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.1",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.25",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1.0",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5.0",operator="ReadRange->Map(identity_with_sleep)_1"} 3.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="7.5",operator="ReadRange->Map(identity_with_sleep)_1"} 6.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="10.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="15.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="20.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="25.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="50.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="75.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="100.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="150.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="+Inf",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_count{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_sum{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 65.0
# HELP ray_data_task_completion_time_total Time spent running tasks to completion. This is a sum of all tasks' completion times.
# TYPE ray_data_task_completion_time_total gauge
ray_data_task_completion_time_total{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 62.872898580506444
```
---------
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
## Description Before this PR, the metrics would follow this path 1. `StreamingExecutor` collects metrics per operator 2. `_StatsManager` creates a thread to export metrics 3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs a copy and holds a `_stats_lock`. 4. Stats Thread reads the metrics sent from 2) 5. Stats Thread sleeps every 5-10 seconds before exporting metrics to `_StatsActor`. These metrics can come in 2 forms: iteration and execution metrics. I believe the purpose of the stats thread created in 2) was 2-fold - Don't export stats very frequently - Don't export Iteration and Execution stats separately (have them sent in the same rpc call) However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See ray-project#57851 for more details. By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the `_StatsActor` were happening previously, and we can also tweak the update interval. ~~It's important to note that `_stats_lock` still lives on to update the last timestamps of each dataset. See * below for more details.~~ Now the new flow is: 1. `StreamingExecutor` collects metrics per operator 2. `StreamingExecutor` checks the last time `_StatsActor` was updated. If more than a default 5 seconds has passed since last updated, we send metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we update the last updated timestamp. See * below for caveat. ~~\*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp. `_stats_lock` is used to update that dictionary[dataset, last_updated] safely on `register_dataset` and on `shutdown`. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.~~ 1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.~~ 2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.~~ 3. I can removed the lock by keeping the state in the 2 areas - BatchIterator - StreamingExecutor I also verified that ray-project#55163 still solves the original issue ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: Aydin Abiar <aydin@anyscale.com>
## Description
Reintroduce the old task_completion_time metric as
`task_completion_time_total`.
Refactors the ray data histogram metrics to accomplish a few things:
- Abstract away histogram details into a class RuntimeMetricsHistogram
- Removes the need for a lock in the OpRuntimeMetrics class.
It does so primarily by moving the delta tracking logic from the
OpRuntimeMetrics to the StatsActor. The delta tracking logic is
necessary because the prometheus Histogram api only accepts new
observations as input and does not allow directly setting histogram
bucket values.
## Related issues
## Additional information
Verified metrics worked:
```
# HELP ray_data_task_completion_time Time spent per task running those tasks to completion.
# TYPE ray_data_task_completion_time histogram
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.1",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.25",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1.0",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5.0",operator="ReadRange->Map(identity_with_sleep)_1"} 3.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="7.5",operator="ReadRange->Map(identity_with_sleep)_1"} 6.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="10.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="15.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="20.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="25.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="50.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="75.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="100.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="150.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="+Inf",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_count{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_sum{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 65.0
# HELP ray_data_task_completion_time_total Time spent running tasks to completion. This is a sum of all tasks' completion times.
# TYPE ray_data_task_completion_time_total gauge
ray_data_task_completion_time_total{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 62.872898580506444
```
---------
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
## Description Before this PR, the metrics would follow this path 1. `StreamingExecutor` collects metrics per operator 2. `_StatsManager` creates a thread to export metrics 3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs a copy and holds a `_stats_lock`. 4. Stats Thread reads the metrics sent from 2) 5. Stats Thread sleeps every 5-10 seconds before exporting metrics to `_StatsActor`. These metrics can come in 2 forms: iteration and execution metrics. I believe the purpose of the stats thread created in 2) was 2-fold - Don't export stats very frequently - Don't export Iteration and Execution stats separately (have them sent in the same rpc call) However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See ray-project#57851 for more details. By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the `_StatsActor` were happening previously, and we can also tweak the update interval. ~~It's important to note that `_stats_lock` still lives on to update the last timestamps of each dataset. See * below for more details.~~ Now the new flow is: 1. `StreamingExecutor` collects metrics per operator 2. `StreamingExecutor` checks the last time `_StatsActor` was updated. If more than a default 5 seconds has passed since last updated, we send metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we update the last updated timestamp. See * below for caveat. ~~\*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp. `_stats_lock` is used to update that dictionary[dataset, last_updated] safely on `register_dataset` and on `shutdown`. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.~~ 1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.~~ 2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.~~ 3. I can removed the lock by keeping the state in the 2 areas - BatchIterator - StreamingExecutor I also verified that ray-project#55163 still solves the original issue ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
## Description
Reintroduce the old task_completion_time metric as
`task_completion_time_total`.
Refactors the ray data histogram metrics to accomplish a few things:
- Abstract away histogram details into a class RuntimeMetricsHistogram
- Removes the need for a lock in the OpRuntimeMetrics class.
It does so primarily by moving the delta tracking logic from the
OpRuntimeMetrics to the StatsActor. The delta tracking logic is
necessary because the prometheus Histogram api only accepts new
observations as input and does not allow directly setting histogram
bucket values.
## Related issues
## Additional information
Verified metrics worked:
```
# HELP ray_data_task_completion_time Time spent per task running those tasks to completion.
# TYPE ray_data_task_completion_time histogram
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.1",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.25",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1.0",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5.0",operator="ReadRange->Map(identity_with_sleep)_1"} 3.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="7.5",operator="ReadRange->Map(identity_with_sleep)_1"} 6.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="10.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="15.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="20.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="25.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="50.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="75.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="100.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="150.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="+Inf",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_count{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_sum{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 65.0
# HELP ray_data_task_completion_time_total Time spent running tasks to completion. This is a sum of all tasks' completion times.
# TYPE ray_data_task_completion_time_total gauge
ray_data_task_completion_time_total{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 62.872898580506444
```
---------
Signed-off-by: Alan Guo <aguo@anyscale.com>
## Description Before this PR, the metrics would follow this path 1. `StreamingExecutor` collects metrics per operator 2. `_StatsManager` creates a thread to export metrics 3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs a copy and holds a `_stats_lock`. 4. Stats Thread reads the metrics sent from 2) 5. Stats Thread sleeps every 5-10 seconds before exporting metrics to `_StatsActor`. These metrics can come in 2 forms: iteration and execution metrics. I believe the purpose of the stats thread created in 2) was 2-fold - Don't export stats very frequently - Don't export Iteration and Execution stats separately (have them sent in the same rpc call) However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See ray-project#57851 for more details. By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the `_StatsActor` were happening previously, and we can also tweak the update interval. ~~It's important to note that `_stats_lock` still lives on to update the last timestamps of each dataset. See * below for more details.~~ Now the new flow is: 1. `StreamingExecutor` collects metrics per operator 2. `StreamingExecutor` checks the last time `_StatsActor` was updated. If more than a default 5 seconds has passed since last updated, we send metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we update the last updated timestamp. See * below for caveat. ~~\*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp. `_stats_lock` is used to update that dictionary[dataset, last_updated] safely on `register_dataset` and on `shutdown`. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.~~ 1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.~~ 2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.~~ 3. I can removed the lock by keeping the state in the 2 areas - BatchIterator - StreamingExecutor I also verified that ray-project#55163 still solves the original issue ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description
Reintroduce the old task_completion_time metric as
`task_completion_time_total`.
Refactors the ray data histogram metrics to accomplish a few things:
- Abstract away histogram details into a class RuntimeMetricsHistogram
- Removes the need for a lock in the OpRuntimeMetrics class.
It does so primarily by moving the delta tracking logic from the
OpRuntimeMetrics to the StatsActor. The delta tracking logic is
necessary because the prometheus Histogram api only accepts new
observations as input and does not allow directly setting histogram
bucket values.
## Related issues
## Additional information
Verified metrics worked:
```
# HELP ray_data_task_completion_time Time spent per task running those tasks to completion.
# TYPE ray_data_task_completion_time histogram
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.1",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.25",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1.0",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5.0",operator="ReadRange->Map(identity_with_sleep)_1"} 3.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="7.5",operator="ReadRange->Map(identity_with_sleep)_1"} 6.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="10.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="15.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="20.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="25.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="50.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="75.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="100.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="150.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="+Inf",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_count{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_sum{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 65.0
# HELP ray_data_task_completion_time_total Time spent running tasks to completion. This is a sum of all tasks' completion times.
# TYPE ray_data_task_completion_time_total gauge
ray_data_task_completion_time_total{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 62.872898580506444
```
---------
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
## Description Before this PR, the metrics would follow this path 1. `StreamingExecutor` collects metrics per operator 2. `_StatsManager` creates a thread to export metrics 3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs a copy and holds a `_stats_lock`. 4. Stats Thread reads the metrics sent from 2) 5. Stats Thread sleeps every 5-10 seconds before exporting metrics to `_StatsActor`. These metrics can come in 2 forms: iteration and execution metrics. I believe the purpose of the stats thread created in 2) was 2-fold - Don't export stats very frequently - Don't export Iteration and Execution stats separately (have them sent in the same rpc call) However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See ray-project#57851 for more details. By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the `_StatsActor` were happening previously, and we can also tweak the update interval. ~~It's important to note that `_stats_lock` still lives on to update the last timestamps of each dataset. See * below for more details.~~ Now the new flow is: 1. `StreamingExecutor` collects metrics per operator 2. `StreamingExecutor` checks the last time `_StatsActor` was updated. If more than a default 5 seconds has passed since last updated, we send metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we update the last updated timestamp. See * below for caveat. ~~\*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp. `_stats_lock` is used to update that dictionary[dataset, last_updated] safely on `register_dataset` and on `shutdown`. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.~~ 1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.~~ 2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.~~ 3. I can removed the lock by keeping the state in the 2 areas - BatchIterator - StreamingExecutor I also verified that ray-project#55163 still solves the original issue ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
## Description
Reintroduce the old task_completion_time metric as
`task_completion_time_total`.
Refactors the ray data histogram metrics to accomplish a few things:
- Abstract away histogram details into a class RuntimeMetricsHistogram
- Removes the need for a lock in the OpRuntimeMetrics class.
It does so primarily by moving the delta tracking logic from the
OpRuntimeMetrics to the StatsActor. The delta tracking logic is
necessary because the prometheus Histogram api only accepts new
observations as input and does not allow directly setting histogram
bucket values.
## Related issues
## Additional information
Verified metrics worked:
```
# HELP ray_data_task_completion_time Time spent per task running those tasks to completion.
# TYPE ray_data_task_completion_time histogram
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.1",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.25",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1.0",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5.0",operator="ReadRange->Map(identity_with_sleep)_1"} 3.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="7.5",operator="ReadRange->Map(identity_with_sleep)_1"} 6.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="10.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="15.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="20.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="25.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="50.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="75.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="100.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="150.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="+Inf",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_count{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_sum{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 65.0
# HELP ray_data_task_completion_time_total Time spent running tasks to completion. This is a sum of all tasks' completion times.
# TYPE ray_data_task_completion_time_total gauge
ray_data_task_completion_time_total{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 62.872898580506444
```
---------
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
## Description Before this PR, the metrics would follow this path 1. `StreamingExecutor` collects metrics per operator 2. `_StatsManager` creates a thread to export metrics 3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs a copy and holds a `_stats_lock`. 4. Stats Thread reads the metrics sent from 2) 5. Stats Thread sleeps every 5-10 seconds before exporting metrics to `_StatsActor`. These metrics can come in 2 forms: iteration and execution metrics. I believe the purpose of the stats thread created in 2) was 2-fold - Don't export stats very frequently - Don't export Iteration and Execution stats separately (have them sent in the same rpc call) However, this creates a lot of complexity (handling idle threads, etc...) and also makes it harder to perform histogram metrics, which need to copy an entire list of values. See ray-project#57851 for more details. By removing the stats thread in 2), we can reduce complexity of management, and also avoid wasteful copying of metrics. The downside is that iteration and execution metrics are now sent separately, increasing the # of rpc calls. I don't think this is a concern, because the async updates to the `_StatsActor` were happening previously, and we can also tweak the update interval. ~~It's important to note that `_stats_lock` still lives on to update the last timestamps of each dataset. See * below for more details.~~ Now the new flow is: 1. `StreamingExecutor` collects metrics per operator 2. `StreamingExecutor` checks the last time `_StatsActor` was updated. If more than a default 5 seconds has passed since last updated, we send metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we update the last updated timestamp. See * below for caveat. ~~\*[important] Ray Data supports running multiple datasets concurrently. Therefore, I must keep track of each dataset last updated timestamp. `_stats_lock` is used to update that dictionary[dataset, last_updated] safely on `register_dataset` and on `shutdown`. On update, we don't require the lock because it does not update the dictionary's size. If we want to remove the lock entirely, I can think of 2 workarounds.~~ 1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons: Much more code changes. The iteration metrics go through a separate code path that is independent of the streaming executor, which will make this more challenging.~~ 2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at 12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to implement and it's stateless. Cons: Breaks down for slower streaming executors.~~ 3. I can removed the lock by keeping the state in the 2 areas - BatchIterator - StreamingExecutor I also verified that ray-project#55163 still solves the original issue ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Reintroduce the old task_completion_time metric as
task_completion_time_total.Refactors the ray data histogram metrics to accomplish a few things:
It does so primarily by moving the delta tracking logic from the OpRuntimeMetrics to the StatsActor. The delta tracking logic is necessary because the prometheus Histogram api only accepts new observations as input and does not allow directly setting histogram bucket values.
Related issues
Additional information
Verified metrics worked: