Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions python/ray/dashboard/client/src/pages/metrics/Metrics.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -329,20 +329,8 @@ const DATA_METRICS_CONFIG: MetricsSectionConfig[] = [
pathParams: "theme=light&panelId=37",
},
{
title: "(p50) Task Completion Time",
pathParams: "theme=light&panelId=40",
},
{
title: "(p75) Task Completion Time",
pathParams: "theme=light&panelId=41",
},
{
title: "(p99) Task Completion Time",
pathParams: "theme=light&panelId=44",
},
{
title: "(p100) Task Completion Time",
pathParams: "theme=light&panelId=45",
title: "Task Completion Time",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq, where is the pxx filter added?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided to ultimately remove it because of how, in general, ray data percentiles are incorrectly calculated. We will revisit it later, though, and if ur curious, the PR for that is https://github.com/iamjustinhsu/ray/pull/1/files#diff-0633fe346cc983e2e5518eba4b75f4067735898a304812be9b18d5652fc7e00dR134-R179

pathParams: "theme=light&panelId=38",
},
],
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,97 +460,41 @@
),
Panel(
id=38,
title="(p00) Task Completion Time",
description="Time spent running tasks to completion.",
title="Task Completion Time",
description="Time spent running tasks to completion w/ backpressure.",
unit="seconds",
targets=[
Target(
expr="histogram_quantile(0, sum by (dataset, operator, le) (rate(ray_data_task_completion_time_bucket{{{global_filters}}}[5m])))",
legend="(p00) Completion Time: {{dataset}}, {{operator}}",
expr="increase(ray_data_task_completion_time{{{global_filters}}}[5m]) / increase(ray_data_num_tasks_finished{{{global_filters}}}[5m])",
legend="Task Completion Time: {{dataset}}, {{operator}}",
),
],
fill=0,
stack=False,
),
Panel(
id=39,
title="(p05) Task Completion Time",
description="Time spent running tasks to completion.",
title="Task Output Backpressure Time",
description="Time spent in output backpressure.",
unit="seconds",
targets=[
Target(
expr="histogram_quantile(0.05, sum by (dataset, operator, le) (rate(ray_data_task_completion_time_bucket{{{global_filters}}}[5m])))",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the previous implementation of ray_data_task_completion_time_bucket, is my understanding correct that'd we'd record the latest task completion time when we send metrics to the StatsActor, rather than recording all task completion times?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, this part is a bug. I think alexey renamed task_completion_time to mean_task_completion_time, but he didn't update the dashboard.

the previous and current(I didn't change it in this PR) implementation currently sends a running mean metric

legend="(p05) Completion Time: {{dataset}}, {{operator}}",
expr="increase(ray_data_task_output_backpressure_time{{{global_filters}}}[5m]) / increase(ray_data_num_tasks_finished{{{global_filters}}}[5m])",
legend="Task Output Backpressure Time: {{dataset}}, {{operator}}",
),
],
fill=0,
stack=False,
),
Panel(
id=40,
title="(p50) Task Completion Time",
description="Time spent running tasks to completion.",
title="Task Completion Time Without Backpressure",
description="Time spent running tasks to completion w/o backpressure.",
unit="seconds",
targets=[
Target(
expr="histogram_quantile(0.50, sum by (dataset, operator, le) (rate(ray_data_task_completion_time_bucket{{{global_filters}}}[5m])))",
legend="(p50) Completion Time: {{dataset}}, {{operator}}",
),
],
fill=0,
stack=False,
),
Panel(
id=41,
title="(p75) Task Completion Time",
description="Time spent running tasks to completion.",
unit="seconds",
targets=[
Target(
expr="histogram_quantile(0.75, sum by (dataset, operator, le) (rate(ray_data_task_completion_time_bucket{{{global_filters}}}[5m])))",
legend="(p75) Completion Time: {{dataset}}, {{operator}}",
),
],
fill=0,
stack=False,
),
Panel(
id=42,
title="(p90) Task Completion Time",
description="Time spent running tasks to completion.",
unit="seconds",
targets=[
Target(
expr="histogram_quantile(0.9, sum by (dataset, operator, le) (rate(ray_data_task_completion_time_bucket{{{global_filters}}}[5m])))",
legend="(p90) Completion Time: {{dataset}}, {{operator}}",
),
],
fill=0,
stack=False,
),
Panel(
id=44,
title="p(99) Task Completion Time",
description="Time spent running tasks to completion.",
unit="seconds",
targets=[
Target(
expr="histogram_quantile(0.99, sum by (dataset, operator, le) (rate(ray_data_task_completion_time_bucket{{{global_filters}}}[5m])))",
legend="(p99) Completion Time: {{dataset}}, {{operator}}",
),
],
fill=0,
stack=False,
),
Panel(
id=45,
title="p(100) Task Completion Time",
description="Time spent running tasks to completion.",
unit="seconds",
targets=[
Target(
expr="histogram_quantile(1, sum by (dataset, operator, le) (rate(ray_data_task_completion_time_bucket{{{global_filters}}}[5m])))",
legend="(p100) Completion Time: {{dataset}}, {{operator}}",
expr="increase(ray_data_task_completion_time_without_backpressure{{{global_filters}}}[5m]) / increase(ray_data_num_tasks_finished{{{global_filters}}}[5m])",
legend="Task Completion Time w/o Backpressure: {{dataset}}, {{operator}}",
),
],
fill=0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class RunningTaskInfo:
bytes_outputs: int
num_rows_produced: int
start_time: float
cum_block_gen_time: float


@dataclass
Expand Down Expand Up @@ -386,13 +387,15 @@ class OpRuntimeMetrics(metaclass=OpRuntimesMetricsMeta):
2500.0,
5000.0,
]

mean_task_completion_time: float = metric_field(
task_completion_time: float = metric_field(
default=0,
description="Time spent running tasks to completion.",
metrics_group=MetricsGroup.TASKS,
metrics_type=MetricsType.Histogram,
metrics_args={"boundaries": histogram_buckets_s},
)
task_completion_time_without_backpressure: float = metric_field(
default=0,
description="Time spent running tasks to completion without backpressure.",
metrics_group=MetricsGroup.TASKS,
)

# === Actor-related metrics ===
Expand Down Expand Up @@ -721,9 +724,8 @@ def on_toggle_task_output_backpressure(self, in_backpressure):
self._task_output_backpressure_start_time = time.perf_counter()
elif self._task_output_backpressure_start_time != -1:
# backpressure stopping, stop timer
self.task_output_backpressure_time += (
time.perf_counter() - self._task_output_backpressure_start_time
)
delta = time.perf_counter() - self._task_output_backpressure_start_time
self.task_output_backpressure_time += delta
self._task_output_backpressure_start_time = -1

def on_output_taken(self, output: RefBundle):
Expand All @@ -746,6 +748,7 @@ def on_task_submitted(self, task_index: int, inputs: RefBundle):
bytes_outputs=0,
num_rows_produced=0,
start_time=time.perf_counter(),
cum_block_gen_time=0,
)

def on_task_output_generated(self, task_index: int, output: RefBundle):
Expand All @@ -771,6 +774,7 @@ def on_task_output_generated(self, task_index: int, output: RefBundle):
meta.exec_stats is not None and meta.exec_stats.wall_time_s is not None
)
self.block_generation_time += meta.exec_stats.wall_time_s
task_info.cum_block_gen_time += meta.exec_stats.wall_time_s
assert meta.num_rows is not None
trace_allocation(block_ref, "operator_output")
if meta.exec_stats.max_uss_bytes is not None:
Expand Down Expand Up @@ -802,8 +806,10 @@ def on_task_finished(self, task_index: int, exception: Optional[Exception]):
self.rows_outputs_of_finished_tasks += task_info.num_rows_produced

task_time_delta = time.perf_counter() - task_info.start_time
self._op_task_duration_stats.add_duration(task_time_delta)
self.mean_task_completion_time = self._op_task_duration_stats.mean()
self.task_completion_time += task_time_delta

assert task_info.cum_block_gen_time is not None
self.task_completion_time_without_backpressure += task_info.cum_block_gen_time
inputs = self._running_tasks[task_index].inputs
self.num_task_inputs_processed += len(inputs)
total_input_size = inputs.size_bytes()
Expand Down
21 changes: 16 additions & 5 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ def gen_expected_metrics(
"'task_output_backpressure_time': "
f"{'N' if task_output_backpressure else 'Z'}"
),
("'mean_task_completion_time': " f"{'N' if task_backpressure else 'Z'}"),
("'task_completion_time': " f"{'N' if task_backpressure else 'Z'}"),
(
"'task_completion_time_without_backpressure': "
f"{'N' if task_backpressure else 'Z'}"
),
"'num_alive_actors': Z",
"'num_restarting_actors': Z",
"'num_pending_actors': Z",
Expand Down Expand Up @@ -168,7 +172,11 @@ def gen_expected_metrics(
"'task_output_backpressure_time': "
f"{'N' if task_output_backpressure else 'Z'}"
),
("'mean_task_completion_time': " f"{'N' if task_backpressure else 'Z'}"),
("'task_completion_time': " f"{'N' if task_backpressure else 'Z'}"),
(
"'task_completion_time_without_backpressure': "
f"{'N' if task_backpressure else 'Z'}"
),
"'num_alive_actors': Z",
"'num_restarting_actors': Z",
"'num_pending_actors': Z",
Expand Down Expand Up @@ -671,7 +679,8 @@ def test_dataset__repr__(ray_start_regular_shared, restore_data_context):
" block_generation_time: N,\n"
" task_submission_backpressure_time: N,\n"
" task_output_backpressure_time: Z,\n"
" mean_task_completion_time: N,\n"
" task_completion_time: N,\n"
" task_completion_time_without_backpressure: N,\n"
" num_alive_actors: Z,\n"
" num_restarting_actors: Z,\n"
" num_pending_actors: Z,\n"
Expand Down Expand Up @@ -798,7 +807,8 @@ def check_stats():
" block_generation_time: N,\n"
" task_submission_backpressure_time: N,\n"
" task_output_backpressure_time: Z,\n"
" mean_task_completion_time: N,\n"
" task_completion_time: N,\n"
" task_completion_time_without_backpressure: N,\n"
" num_alive_actors: Z,\n"
" num_restarting_actors: Z,\n"
" num_pending_actors: Z,\n"
Expand Down Expand Up @@ -880,7 +890,8 @@ def check_stats():
" block_generation_time: N,\n"
" task_submission_backpressure_time: N,\n"
" task_output_backpressure_time: Z,\n"
" mean_task_completion_time: N,\n"
" task_completion_time: N,\n"
" task_completion_time_without_backpressure: N,\n"
" num_alive_actors: Z,\n"
" num_restarting_actors: Z,\n"
" num_pending_actors: Z,\n"
Expand Down