Skip to content

Ray Data Refactor histogram metrics#57851

Merged
alanwguo merged 23 commits intoray-project:masterfrom
alanwguo:refactor-data-histograms
Nov 14, 2025
Merged

Ray Data Refactor histogram metrics#57851
alanwguo merged 23 commits intoray-project:masterfrom
alanwguo:refactor-data-histograms

Conversation

@alanwguo
Copy link
Copy Markdown
Contributor

@alanwguo alanwguo commented Oct 17, 2025

Description

Reintroduce the old task_completion_time metric as task_completion_time_total.

Refactors the ray data histogram metrics to accomplish a few things:

  • Abstract away histogram details into a class RuntimeMetricsHistogram
  • Removes the need for a lock in the OpRuntimeMetrics class.

It does so primarily by moving the delta tracking logic from the OpRuntimeMetrics to the StatsActor. The delta tracking logic is necessary because the prometheus Histogram api only accepts new observations as input and does not allow directly setting histogram bucket values.

Related issues

Additional information

Verified metrics worked:

# HELP ray_data_task_completion_time Time spent per task running those tasks to completion.
# TYPE ray_data_task_completion_time histogram
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.1",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.25",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1.0",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5.0",operator="ReadRange->Map(identity_with_sleep)_1"} 3.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="7.5",operator="ReadRange->Map(identity_with_sleep)_1"} 6.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="10.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="15.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="20.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="25.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="50.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="75.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="100.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="150.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="+Inf",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_count{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_sum{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 65.0
# HELP ray_data_task_completion_time_total Time spent running tasks to completion. This is a sum of all tasks' completion times.
# TYPE ray_data_task_completion_time_total gauge
ray_data_task_completion_time_total{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 62.872898580506444

Signed-off-by: Alan Guo <aguo@anyscale.com>
@alanwguo alanwguo requested a review from a team as a code owner October 17, 2025 18:57
task_completion_time: list[int] = metric_field(
default_factory=list,
description="Time spent running tasks to completion.",
task_completion_time_total: float = metric_field(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexeykudinkin , is it okay if we rename this to task_completion_time_total? I think this name is slightly better than calling the other metric task_completion_time_histogram

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
task_completion_time_total: float = metric_field(
task_completion_time_total_s: float = metric_field(

cursor[bot]

This comment was marked as outdated.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors Ray Data's histogram metrics by introducing a RuntimeMetricsHistogram class. This is a positive change that encapsulates histogram logic and simplifies the OpRuntimeMetrics class. The goal of removing the lock in OpRuntimeMetrics by moving delta-tracking to the StatsActor is sound.

However, I've identified a critical issue: the removal of the lock appears to have reintroduced a race condition. The OpRuntimeMetrics object is accessed by both the operator thread and the _StatsManager's background thread without synchronization, which can lead to inconsistent or corrupt metrics data.

Additionally, I have a couple of medium-severity suggestions to improve code quality. One is to avoid monkey-patching the Histogram object from ray.util.metrics by managing state within the _StatsActor. The other is to remove a now-unused method from the new RuntimeMetricsHistogram class.

Overall, the direction of the refactor is good, but the thread-safety issue needs to be addressed before merging.

Comment on lines +538 to +544
for metric in self.get_metrics():
if not self._is_map and metric.map_only:
continue
if skip_internal_metrics and metric.internal_only:
continue
value = getattr(self, metric.name)
result.append((metric.name, value))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Removing the _histogram_thread_lock has introduced a race condition. The OpRuntimeMetrics object is updated by the operator's thread (e.g., via on_task_finished which calls observe) and read by the _StatsManager's background thread (which calls as_dict).

The observe method in RuntimeMetricsHistogram performs a non-atomic read-modify-write operation (+=) on _bucket_counts. The as_dict method reads the RuntimeMetricsHistogram object, which is then pickled for an RPC call. This pickling process reads _bucket_counts.

Without a lock, the _StatsManager thread could read a partially updated or inconsistent state of _bucket_counts while the operator thread is modifying it, leading to incorrect metrics. The original _histogram_thread_lock was there to prevent this exact race condition.

A possible solution is to add a lock within RuntimeMetricsHistogram and use it in observe and when creating a copy of the histogram data for serialization. For example, as_dict could call a new copy() method on RuntimeMetricsHistogram that creates a thread-safe copy of the object.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this comment is true, we choose to drop the lock to prioritize performance.

The consequence of reading a partially modified list of bucket counts is that the statsactor may get partially out-dated counts. But by the next loop, the counts should catch up.

@ray-gardener ray-gardener bot added data Ray Data-related issues tech-debt The issue that's due to tech debt labels Oct 17, 2025
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Alan Guo <aguo@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Alan Guo <aguo@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: Alan Guo <aguo@anyscale.com>
cursor[bot]

This comment was marked as outdated.

@alanwguo alanwguo added the go add ONLY when ready to merge, run all tests label Oct 20, 2025
Signed-off-by: Alan Guo <aguo@anyscale.com>
@alexeykudinkin
Copy link
Copy Markdown
Contributor

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request is a great refactoring of the histogram metrics in Ray Data. Abstracting the histogram logic into the RuntimeMetricsHistogram class and moving the delta tracking to the StatsActor significantly cleans up the code in OpRuntimeMetrics and correctly removes the need for a lock. The reintroduction of task_completion_time_total is also a good addition.

I have one main piece of feedback regarding a magic number in RuntimeMetricsHistogram.apply_to_metric that could lead to incorrect metric values for histograms with large boundary values. Please see the detailed comment.

Comment on lines +127 to +131
boundary_upper_bound = (
self.boundaries[i]
if i < len(self._bucket_counts) - 1
else self.boundaries[-1] + 100
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The use of + 100 to determine the upper bound for the +Inf bucket is not robust. While it might work for time-based histograms in seconds, it will produce misleading results for histograms with different scales, such as histogram_buckets_bytes where the last boundary is 4096 * GiB. Adding 100 bytes to this large number will result in a representative value for the bucket that is almost identical to the lower boundary, skewing the histogram.

A better approach would be to use a proportional value, similar to how the average is calculated in _calculate_average_value. Using a multiplier, for example * 2, would make the logic scale-independent and consistent with the average calculation. This would make the representative value for the last bucket 1.5 * self.boundaries[-1], which is more sensible.

Suggested change
boundary_upper_bound = (
self.boundaries[i]
if i < len(self._bucket_counts) - 1
else self.boundaries[-1] + 100
)
boundary_upper_bound = (
self.boundaries[i]
if i < len(self._bucket_counts) - 1
else self.boundaries[-1] * 2
)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not an issue since we this is guaranteed to be the last bucket and there is no "upper limit" to the bucket (other than integer overflows).

task_completion_time: list[int] = metric_field(
default_factory=list,
description="Time spent running tasks to completion.",
task_completion_time_total: float = metric_field(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
task_completion_time_total: float = metric_field(
task_completion_time_total_s: float = metric_field(

]


class RuntimeMetricsHistogram:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just call it Histogram?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to differentiate it from ray.util.metrics.Histogram.

self._bucket_counts[self._find_bucket_index(value)] += num_observations
self._memoized_avg = None

def apply_to_metric(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def apply_to_metric(
def export_to(

Comment on lines +116 to +117
if getattr(metric, "last_applied_bucket_counts_for_tags", None) is None:
metric.last_applied_bucket_counts_for_tags = {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sketchy. Why not keeping track in this class iself?

Just copy the whole array as last snapshot

Copy link
Copy Markdown
Contributor Author

@alanwguo alanwguo Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not track it in this class because this class is pickled and sent over to the stats_actor.

I keep it tracked in the stats_actor. I could move this from the metric class to the stats_actor directly. I think that will definitely be less sketchy, but the logic will be less encapsulated and leak into the overall stats_actor

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think this is the better approach. The alternative I would think of is would be something like:

def export_to(
        self,
        metric: Histogram,
        tags: Dict[str, str],
        previous_bucket_counts_for_tags: List[float]
    ):

But then that would move a bunch of logic out into the stats actor including:
last_applied_bucket_counts_for_tags will be a dictionary of tags -> counts. This will have to be stored per metric so in stats actor it will be:
last_applied_bucket_counts_for_tags_for_metric: Dict histogram_name -> tags -> bucket counts

tags_key for the above dictionary is a normalized string based on the label and values of the tags dictionary. This logic will also have to move up to the stats actor.

Comment on lines +141 to +143
if diff > 0:
for _ in range(diff):
metric.observe(bucket_value, tags)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do this hoopla instead of just wrapping prom's histogram in this class itself?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the class is initiated in op_runtime_metrics and then passed to stats actor through pickling.

I don't know if prom's histogram is picklable (and even if it is, how efficiently it can be passed through).

Prom's histogram metric contains the values for each unique set of tags where a sample has been observed. There will need to be some amount of refactoring to send it over only once instead of per operator.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes sense. I keep forgetting that we have convoluted setup

Signed-off-by: Alan Guo <aguo@anyscale.com>
Copy link
Copy Markdown
Contributor

@iamjustinhsu iamjustinhsu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this Alan! Also make sure to rebase from master

Signed-off-by: Alan Guo <aguo@anyscale.com>
Copy link
Copy Markdown

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug

The average_total_task_completion_time_s property references self.task_completion_time_s, but this field was renamed to self.task_completion_time_total_s. This causes an AttributeError when the property is accessed.

python/ray/data/_internal/execution/interfaces/op_runtime_metrics.py#L594-L600

)
def average_total_task_completion_time_s(self) -> Optional[float]:
"""Average task's completion time in seconds (including throttling)"""
if self.num_tasks_finished == 0:
return None
else:
return self.task_completion_time_s / self.num_tasks_finished

Fix in Cursor Fix in Web


Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Alan Guo <aguo@anyscale.com>
Copy link
Copy Markdown
Contributor

@iamjustinhsu iamjustinhsu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! I think if alexey has any follow ups we can do in a separate PR

@alanwguo alanwguo merged commit eabece5 into ray-project:master Nov 14, 2025
6 checks passed
bveeramani pushed a commit that referenced this pull request Nov 14, 2025
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that #55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
ArturNiederfahrenhorst pushed a commit to ArturNiederfahrenhorst/ray that referenced this pull request Nov 16, 2025
## Description
Reintroduce the old task_completion_time metric as
`task_completion_time_total`.

Refactors the ray data histogram metrics to accomplish a few things:
- Abstract away histogram details into a class RuntimeMetricsHistogram
- Removes the need for a lock in the OpRuntimeMetrics class.

It does so primarily by moving the delta tracking logic from the
OpRuntimeMetrics to the StatsActor. The delta tracking logic is
necessary because the prometheus Histogram api only accepts new
observations as input and does not allow directly setting histogram
bucket values.

## Related issues


## Additional information
Verified metrics worked:
```
# HELP ray_data_task_completion_time Time spent per task running those tasks to completion.
# TYPE ray_data_task_completion_time histogram
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.1",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.25",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1.0",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5.0",operator="ReadRange->Map(identity_with_sleep)_1"} 3.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="7.5",operator="ReadRange->Map(identity_with_sleep)_1"} 6.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="10.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="15.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="20.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="25.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="50.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="75.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="100.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="150.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="+Inf",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_count{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_sum{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 65.0
# HELP ray_data_task_completion_time_total Time spent running tasks to completion. This is a sum of all tasks' completion times.
# TYPE ray_data_task_completion_time_total gauge
ray_data_task_completion_time_total{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 62.872898580506444
```

---------

Signed-off-by: Alan Guo <aguo@anyscale.com>
ArturNiederfahrenhorst pushed a commit to ArturNiederfahrenhorst/ray that referenced this pull request Nov 16, 2025
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
ray-project#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that ray-project#55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
## Description
Reintroduce the old task_completion_time metric as
`task_completion_time_total`.

Refactors the ray data histogram metrics to accomplish a few things:
- Abstract away histogram details into a class RuntimeMetricsHistogram
- Removes the need for a lock in the OpRuntimeMetrics class.

It does so primarily by moving the delta tracking logic from the
OpRuntimeMetrics to the StatsActor. The delta tracking logic is
necessary because the prometheus Histogram api only accepts new
observations as input and does not allow directly setting histogram
bucket values.

## Related issues

## Additional information
Verified metrics worked:
```
# HELP ray_data_task_completion_time Time spent per task running those tasks to completion.
# TYPE ray_data_task_completion_time histogram
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.1",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.25",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1.0",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5.0",operator="ReadRange->Map(identity_with_sleep)_1"} 3.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="7.5",operator="ReadRange->Map(identity_with_sleep)_1"} 6.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="10.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="15.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="20.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="25.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="50.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="75.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="100.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="150.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="+Inf",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_count{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_sum{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 65.0
# HELP ray_data_task_completion_time_total Time spent running tasks to completion. This is a sum of all tasks' completion times.
# TYPE ray_data_task_completion_time_total gauge
ray_data_task_completion_time_total{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 62.872898580506444
```

---------

Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
ray-project#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that ray-project#55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
## Description
Reintroduce the old task_completion_time metric as
`task_completion_time_total`.

Refactors the ray data histogram metrics to accomplish a few things:
- Abstract away histogram details into a class RuntimeMetricsHistogram
- Removes the need for a lock in the OpRuntimeMetrics class.

It does so primarily by moving the delta tracking logic from the
OpRuntimeMetrics to the StatsActor. The delta tracking logic is
necessary because the prometheus Histogram api only accepts new
observations as input and does not allow directly setting histogram
bucket values.

## Related issues

## Additional information
Verified metrics worked:
```
# HELP ray_data_task_completion_time Time spent per task running those tasks to completion.
# TYPE ray_data_task_completion_time histogram
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.1",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.25",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1.0",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5.0",operator="ReadRange->Map(identity_with_sleep)_1"} 3.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="7.5",operator="ReadRange->Map(identity_with_sleep)_1"} 6.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="10.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="15.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="20.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="25.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="50.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="75.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="100.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="150.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="+Inf",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_count{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_sum{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 65.0
# HELP ray_data_task_completion_time_total Time spent running tasks to completion. This is a sum of all tasks' completion times.
# TYPE ray_data_task_completion_time_total gauge
ray_data_task_completion_time_total{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 62.872898580506444
```

---------

Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
ray-project#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that ray-project#55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
## Description
Reintroduce the old task_completion_time metric as
`task_completion_time_total`.

Refactors the ray data histogram metrics to accomplish a few things:
- Abstract away histogram details into a class RuntimeMetricsHistogram
- Removes the need for a lock in the OpRuntimeMetrics class.

It does so primarily by moving the delta tracking logic from the
OpRuntimeMetrics to the StatsActor. The delta tracking logic is
necessary because the prometheus Histogram api only accepts new
observations as input and does not allow directly setting histogram
bucket values.

## Related issues


## Additional information
Verified metrics worked:
```
# HELP ray_data_task_completion_time Time spent per task running those tasks to completion.
# TYPE ray_data_task_completion_time histogram
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.1",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.25",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1.0",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5.0",operator="ReadRange->Map(identity_with_sleep)_1"} 3.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="7.5",operator="ReadRange->Map(identity_with_sleep)_1"} 6.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="10.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="15.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="20.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="25.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="50.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="75.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="100.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="150.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="+Inf",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_count{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_sum{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 65.0
# HELP ray_data_task_completion_time_total Time spent running tasks to completion. This is a sum of all tasks' completion times.
# TYPE ray_data_task_completion_time_total gauge
ray_data_task_completion_time_total{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 62.872898580506444
```

---------

Signed-off-by: Alan Guo <aguo@anyscale.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
ray-project#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that ray-project#55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
## Description
Reintroduce the old task_completion_time metric as
`task_completion_time_total`.

Refactors the ray data histogram metrics to accomplish a few things:
- Abstract away histogram details into a class RuntimeMetricsHistogram
- Removes the need for a lock in the OpRuntimeMetrics class.

It does so primarily by moving the delta tracking logic from the
OpRuntimeMetrics to the StatsActor. The delta tracking logic is
necessary because the prometheus Histogram api only accepts new
observations as input and does not allow directly setting histogram
bucket values.

## Related issues

## Additional information
Verified metrics worked:
```
# HELP ray_data_task_completion_time Time spent per task running those tasks to completion.
# TYPE ray_data_task_completion_time histogram
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.1",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.25",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1.0",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5.0",operator="ReadRange->Map(identity_with_sleep)_1"} 3.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="7.5",operator="ReadRange->Map(identity_with_sleep)_1"} 6.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="10.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="15.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="20.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="25.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="50.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="75.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="100.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="150.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="+Inf",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_count{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_sum{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 65.0
# HELP ray_data_task_completion_time_total Time spent running tasks to completion. This is a sum of all tasks' completion times.
# TYPE ray_data_task_completion_time_total gauge
ray_data_task_completion_time_total{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 62.872898580506444
```

---------

Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
ray-project#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that ray-project#55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Reintroduce the old task_completion_time metric as
`task_completion_time_total`.

Refactors the ray data histogram metrics to accomplish a few things:
- Abstract away histogram details into a class RuntimeMetricsHistogram
- Removes the need for a lock in the OpRuntimeMetrics class.

It does so primarily by moving the delta tracking logic from the
OpRuntimeMetrics to the StatsActor. The delta tracking logic is
necessary because the prometheus Histogram api only accepts new
observations as input and does not allow directly setting histogram
bucket values.

## Related issues

## Additional information
Verified metrics worked:
```
# HELP ray_data_task_completion_time Time spent per task running those tasks to completion.
# TYPE ray_data_task_completion_time histogram
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.1",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.25",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="0.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1.0",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2.5",operator="ReadRange->Map(identity_with_sleep)_1"} 0.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5.0",operator="ReadRange->Map(identity_with_sleep)_1"} 3.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="7.5",operator="ReadRange->Map(identity_with_sleep)_1"} 6.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="10.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="15.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="20.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="25.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="50.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="75.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="100.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="150.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="1000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="2500.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="5000.0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_bucket{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",le="+Inf",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_count{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 10.0
ray_data_task_completion_time_sum{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 65.0
# HELP ray_data_task_completion_time_total Time spent running tasks to completion. This is a sum of all tasks' completion times.
# TYPE ray_data_task_completion_time_total gauge
ray_data_task_completion_time_total{Component="core_worker",NodeAddress="127.0.0.1",SessionName="session_2025-10-17_12-04-00_414091_75603",Version="3.0.0.dev0",WorkerId="9fa17dcb3156c7bee37b4077bd4361f9ce7e96c06b5267ee9e67a308",dataset="dataset_2_0",operator="ReadRange->Map(identity_with_sleep)_1"} 62.872898580506444
```

---------

Signed-off-by: Alan Guo <aguo@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Before this PR, the metrics would follow this path
1. `StreamingExecutor` collects metrics per operator
2. `_StatsManager` creates a thread to export metrics
3. `StreamingExecutor` sends metrics to `_StatsManager`, which performs
a copy and holds a `_stats_lock`.
4. Stats Thread reads the metrics sent from 2)
5. Stats Thread sleeps every 5-10 seconds before exporting metrics to
`_StatsActor`. These metrics can come in 2 forms: iteration and
execution metrics.

I believe the purpose of the stats thread created in 2) was 2-fold
- Don't export stats very frequently
- Don't export Iteration and Execution stats separately (have them sent
in the same rpc call)

However, this creates a lot of complexity (handling idle threads,
etc...) and also makes it harder to perform histogram metrics, which
need to copy an entire list of values. See
ray-project#57851 for more details.

By removing the stats thread in 2), we can reduce complexity of
management, and also avoid wasteful copying of metrics. The downside is
that iteration and execution metrics are now sent separately, increasing
the # of rpc calls. I don't think this is a concern, because the async
updates to the `_StatsActor` were happening previously, and we can also
tweak the update interval.

~~It's important to note that `_stats_lock` still lives on to update the
last timestamps of each dataset. See * below for more details.~~

Now the new flow is:
1. `StreamingExecutor` collects metrics per operator
2. `StreamingExecutor` checks the last time `_StatsActor` was updated.
If more than a default 5 seconds has passed since last updated, we send
metrics to `_StatsActor` through the `_StatsManager`. Afterwards, we
update the last updated timestamp. See * below for caveat.

~~\*[important] Ray Data supports running multiple datasets
concurrently. Therefore, I must keep track of each dataset last updated
timestamp. `_stats_lock` is used to update that dictionary[dataset,
last_updated] safely on `register_dataset` and on `shutdown`. On update,
we don't require the lock because it does not update the dictionary's
size. If we want to remove the lock entirely, I can think of 2
workarounds.~~
1. ~~Create a per dataset `StatsManager`. Pros: no thread lock. Cons:
Much more code changes. The iteration metrics go through a separate code
path that is independent of the streaming executor, which will make this
more challenging.~~
2. ~~Update on every unix_epoch_timestamp % interval == 0, so that at
12:00, 12:05, etc.. the updates will be on that interval. Pros: easy to
implement and it's stateless. Cons: Breaks down for slower streaming
executors.~~
3. I can removed the lock by keeping the state in the 2 areas
- BatchIterator
- StreamingExecutor

I also verified that ray-project#55163 still
solves the original issue
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests tech-debt The issue that's due to tech debt

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants