Skip to content

[Data] Update the export API to remove the redundant data context and op args when refreshing metadata#58755

Merged
bveeramani merged 1 commit intoray-project:masterfrom
coqian:coqian/export-context
Nov 20, 2025
Merged

[Data] Update the export API to remove the redundant data context and op args when refreshing metadata#58755
bveeramani merged 1 commit intoray-project:masterfrom
coqian:coqian/export-context

Conversation

@coqian
Copy link
Copy Markdown
Contributor

@coqian coqian commented Nov 18, 2025

Description

We export dataset and operator metadata whenever there is a state change. However, the size of the export file can be very large because the metadata also includes the DataContext config and operator args, which does not change over time, and they will be written multiple times to the file. To reduce the file size and remove redundant info, we can only export DataContext and operator args when the dataset is registered, and avoid them in the later state updates.

Related issues

Related previous PRs: 55355, 53554

@coqian coqian requested a review from a team as a code owner November 18, 2025 21:57
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an effective optimization to reduce the size of exported dataset metadata. By adding flags to control the export of DataContext and operator arguments, you've successfully removed redundant information from subsequent state updates, while ensuring the full metadata is exported upon initial registration. The changes are well-implemented across metadata_exporter.py and stats.py. I have one suggestion to improve the API consistency in the abstract base class.

@coqian
Copy link
Copy Markdown
Contributor Author

coqian commented Nov 18, 2025

Example updated export event after the change, where the data context and operator args are only exported once.

[
  {
    "event_id": "FF7e0570CdB76ccF13",
    "timestamp": 1763503227,
    "source_type": "EXPORT_DATASET_METADATA",
    "event_data": {
      "topology": {
        "operators": [
          {
            "name": "Input",
            "id": "Input_0",
            "uuid": "5e558f8b-510e-48ef-be79-d2dcf6dac6a6",
            "args": {
              "Read[ReadRange]_0": {
                "_min_rows_per_bundled_input": "None",
                "_num_outputs": "8",
                "_concurrency": "None",
                "_ray_remote_args": {
                  "scheduling_strategy": "SPREAD"
                },
                "_detected_parallelism": "8",
                "_parallelism": "-1",
                "_name": "ReadRange",
                "_output_dependencies": [
                  "MapRows[Map(<lambda>)]",
                  "AbstractUDFMap[ReadRange->Map(<lambda>)(<lambda>)]"
                ],
                "_per_block_limit": "None",
                "_compute": "TaskPoolStrategy(size=None)",
                "_ray_remote_args_fn": "None",
                "_input_dependencies": [],
                "_datasource_or_legacy_reader": "<ray.data._internal.datasource.range_datasource.RangeDatasource object at 0x7fe253432fa0>",
                "_cached_output_metadata": {
                  "num_rows": "1000",
                  "exec_stats": "None",
                  "schema": "id: int64",
                  "size_bytes": "8000",
                  "input_files": []
                },
                "_datasource": "<ray.data._internal.datasource.range_datasource.RangeDatasource object at 0x7fe253432fa0>"
              }
            },
            "state": "PENDING",
            "input_dependencies": [],
            "sub_stages": [],
            "execution_start_time": 0,
            "execution_end_time": 0
          },
          {
            "name": "ReadRange->Map(<lambda>)",
            "id": "ReadRange->Map(<lambda>)_1",
            "uuid": "4173ff1b-97be-42ea-8195-a4e33f4f2315",
            "input_dependencies": [
              "Input_0"
            ],
            "args": {
              "MapRows[Map(<lambda>)]_1": {
                "_min_rows_per_bundled_input": "None",
                "_num_outputs": "None",
                "_fn_constructor_args": "None",
                "_fn_args": "None",
                "_ray_remote_args": {},
                "_compute": "TaskPoolStrategy(size=None)",
                "_name": "Map(<lambda>)",
                "_per_block_limit": "None",
                "_output_dependencies": [],
                "_ray_remote_args_fn": "None",
                "_input_dependencies": [
                  "Read[ReadRange]"
                ],
                "_fn_constructor_kwargs": "None",
                "_fn": "<function <lambda> at 0x7fe20c447160>",
                "_fn_kwargs": "None"
              },
              "Read[ReadRange]_0": {
                "_min_rows_per_bundled_input": "None",
                "_num_outputs": "8",
                "_concurrency": "None",
                "_ray_remote_args": {
                  "scheduling_strategy": "SPREAD"
                },
                "_detected_parallelism": "8",
                "_parallelism": "-1",
                "_name": "ReadRange",
                "_output_dependencies": [
                  "MapRows[Map(<lambda>)]",
                  "AbstractUDFMap[ReadRange->Map(<lambda>)(<lambda>)]"
                ],
                "_per_block_limit": "None",
                "_compute": "TaskPoolStrategy(size=None)",
                "_ray_remote_args_fn": "None",
                "_input_dependencies": [],
                "_datasource_or_legacy_reader": "<ray.data._internal.datasource.range_datasource.RangeDatasource object at 0x7fe253432fa0>",
                "_cached_output_metadata": {
                  "num_rows": "1000",
                  "exec_stats": "None",
                  "schema": "id: int64",
                  "size_bytes": "8000",
                  "input_files": []
                },
                "_datasource": "<ray.data._internal.datasource.range_datasource.RangeDatasource object at 0x7fe253432fa0>"
              }
            },
            "state": "PENDING",
            "sub_stages": [],
            "execution_start_time": 0,
            "execution_end_time": 0
          }
        ]
      },
      "dataset_id": "dataset_1_0",
      "job_id": "01000000",
      "start_time": 1763503227.0410855,
      "data_context": {
        "max_hash_shuffle_finalization_batch_size": "None",
        "max_hash_shuffle_aggregators": "None",
        "_enable_actor_pool_on_exit_hook": "False",
        "enable_pandas_block": "True",
        "write_file_retry_on_errors": [
          "AWS Error INTERNAL_FAILURE",
          "AWS Error NETWORK_CONNECTION",
          "AWS Error SLOW_DOWN",
          "AWS Error UNKNOWN (HTTP status 503)"
        ],
        "read_op_min_num_blocks": "200",
        "default_hash_shuffle_parallelism": "200",
        "large_args_threshold": "52428800",
        "hash_shuffle_aggregator_health_warning_interval_s": "30",
        "decoding_size_estimation": "True",
        "op_resource_reservation_enabled": "True",
        "pandas_block_ignore_metadata": "False",
        "pipeline_push_based_shuffle_reduce_tasks": "True",
        "max_tasks_in_flight_per_actor": "None",
        "scheduling_strategy_large_args": "DEFAULT",
        "use_polars": "False",
        "enable_dynamic_output_queue_size_backpressure": "False",
        "trace_allocations": "False",
        "use_push_based_shuffle": "False",
        "op_resource_reservation_ratio": "0.5",
        "enable_operator_progress_bars": "True",
        "issue_detectors_config": {
          "high_memory_detector_config": {
            "detection_time_interval_s": "30"
          },
          "hanging_detector_config": {
            "op_task_stats_std_factor": "10",
            "detection_time_interval_s": "30.0",
            "op_task_stats_min_count": "10"
          },
          "detectors": [
            "<class 'ray.data._internal.issue_detection.detectors.hanging_detector.HangingExecutionIssueDetector'...",
            "<class 'ray.data._internal.issue_detection.detectors.high_memory_detector.HighMemoryIssueDetector'>"
          ]
        },
        "scheduling_strategy": "SPREAD",
        "eager_free": "False",
        "s3_try_create_dir": "False",
        "actor_task_retry_on_errors": "False",
        "execution_options": "ExecutionOptions(resource_limits=ExecutionResources(cpu=inf, gpu=inf, object_store_memory=inf, memor...",
        "use_arrow_tensor_v2": "True",
        "dataset_logger_id": "dataset_1_0",
        "memory_usage_poll_interval_s": "1",
        "min_parallelism": "200",
        "downstream_capacity_backpressure_max_queued_bundles": "None",
        "enable_rich_progress_bars": "False",
        "raise_original_map_exception": "False",
        "hash_aggregate_operator_actor_num_cpus_override": "None",
        "autoscaling_config": {
          "actor_pool_util_upscaling_threshold": "2.0",
          "actor_pool_util_downscaling_threshold": "0.5"
        },
        "enable_tensor_extension_casting": "True",
        "join_operator_actor_num_cpus_override": "None",
        "wait_for_min_actors_s": "-1",
        "enable_progress_bars": "True",
        "use_polars_sort": "False",
        "enforce_schemas": "False",
        "hash_shuffle_operator_actor_num_cpus_override": "None",
        "streaming_read_buffer_size": "33554432",
        "min_hash_shuffle_aggregator_wait_time_in_s": "300",
        "enable_auto_log_stats": "False",
        "enable_per_node_metrics": "False",
        "enable_fallback_to_arrow_object_ext_type": "None",
        "retried_io_errors": [
          "AWS Error INTERNAL_FAILURE",
          "AWS Error NETWORK_CONNECTION",
          "AWS Error SLOW_DOWN",
          "AWS Error UNKNOWN (HTTP status 503)",
          "AWS Error SERVICE_UNAVAILABLE"
        ],
        "target_min_block_size": "1048576",
        "enable_progress_bar_name_truncation": "True",
        "use_ray_tqdm": "True",
        "override_object_store_memory_limit_fraction": "None",
        "log_internal_stack_trace_to_stdout": "False",
        "enable_get_object_locations_for_metrics": "False",
        "warn_on_driver_memory_usage_bytes": "2147483648",
        "downstream_capacity_backpressure_ratio": "None",
        "print_on_execution_start": "True",
        "actor_prefetcher_enabled": "False",
        "verbose_stats_logs": "False",
        "target_max_block_size": "134217728",
        "_shuffle_strategy": "hash_shuffle",
        "max_errored_blocks": "0"
      },
      "state": "PENDING",
      "operator_panels": [
        {
          "id": "11",
          "title": "Rows Output / Second"
        },
        {
          "id": "57",
          "title": "All logical resources utilization"
        },
        {
          "id": "56",
          "title": "Operator Combined Internal + External Input Queue Size (Blocks)"
        },
        {
          "id": "78",
          "title": "Task Completion Time Histogram (s)"
        },
        {
          "id": "79",
          "title": "Block Completion Time Histogram (s)"
        },
        {
          "id": "80",
          "title": "Block Size (Bytes) Histogram"
        },
        {
          "id": "81",
          "title": "Block Size (Rows) Histogram"
        }
      ],
      "execution_start_time": 0,
      "execution_end_time": 0
    }
  },
  {
    "event_id": "531BBdc3fd83bAAdfc",
    "timestamp": 1763503227,
    "source_type": "EXPORT_DATASET_METADATA",
    "event_data": {
      "topology": {
        "operators": [
          {
            "name": "Input",
            "id": "Input_0",
            "uuid": "5e558f8b-510e-48ef-be79-d2dcf6dac6a6",
            "args": {},
            "state": "PENDING",
            "input_dependencies": [],
            "sub_stages": [],
            "execution_start_time": 0,
            "execution_end_time": 0
          },
          {
            "name": "ReadRange->Map(<lambda>)",
            "id": "ReadRange->Map(<lambda>)_1",
            "uuid": "4173ff1b-97be-42ea-8195-a4e33f4f2315",
            "input_dependencies": [
              "Input_0"
            ],
            "args": {},
            "state": "PENDING",
            "sub_stages": [],
            "execution_start_time": 0,
            "execution_end_time": 0
          }
        ]
      },
      "dataset_id": "dataset_1_0",
      "job_id": "01000000",
      "start_time": 1763503227.0410855,
      "data_context": {},
      "execution_start_time": 1763503227.0946083,
      "state": "RUNNING",
      "operator_panels": [
        {
          "id": "11",
          "title": "Rows Output / Second"
        },
        {
          "id": "57",
          "title": "All logical resources utilization"
        },
        {
          "id": "56",
          "title": "Operator Combined Internal + External Input Queue Size (Blocks)"
        },
        {
          "id": "78",
          "title": "Task Completion Time Histogram (s)"
        },
        {
          "id": "79",
          "title": "Block Completion Time Histogram (s)"
        },
        {
          "id": "80",
          "title": "Block Size (Bytes) Histogram"
        },
        {
          "id": "81",
          "title": "Block Size (Rows) Histogram"
        }
      ],
      "execution_end_time": 0
    }
  },
  {
    "event_id": "BeE00D4B662d290D8B",
    "timestamp": 1763503227,
    "source_type": "EXPORT_DATASET_METADATA",
    "event_data": {
      "topology": {
        "operators": [
          {
            "name": "Input",
            "id": "Input_0",
            "uuid": "5e558f8b-510e-48ef-be79-d2dcf6dac6a6",
            "args": {},
            "execution_start_time": 1763503227.18142,
            "execution_end_time": 1763503227.18142,
            "state": "FINISHED",
            "input_dependencies": [],
            "sub_stages": []
          },
          {
            "name": "ReadRange->Map(<lambda>)",
            "id": "ReadRange->Map(<lambda>)_1",
            "uuid": "4173ff1b-97be-42ea-8195-a4e33f4f2315",
            "input_dependencies": [
              "Input_0"
            ],
            "args": {},
            "execution_start_time": 1763503227.18142,
            "state": "RUNNING",
            "sub_stages": [],
            "execution_end_time": 0
          }
        ]
      },
      "dataset_id": "dataset_1_0",
      "job_id": "01000000",
      "start_time": 1763503227.0410855,
      "data_context": {},
      "execution_start_time": 1763503227.0946083,
      "state": "RUNNING",
      "operator_panels": [
        {
          "id": "11",
          "title": "Rows Output / Second"
        },
        {
          "id": "57",
          "title": "All logical resources utilization"
        },
        {
          "id": "56",
          "title": "Operator Combined Internal + External Input Queue Size (Blocks)"
        },
        {
          "id": "78",
          "title": "Task Completion Time Histogram (s)"
        },
        {
          "id": "79",
          "title": "Block Completion Time Histogram (s)"
        },
        {
          "id": "80",
          "title": "Block Size (Bytes) Histogram"
        },
        {
          "id": "81",
          "title": "Block Size (Rows) Histogram"
        }
      ],
      "execution_end_time": 0
    }
  },
  {
    "event_id": "9cDDbdffa8c9822FAd",
    "timestamp": 1763503229,
    "source_type": "EXPORT_DATASET_METADATA",
    "event_data": {
      "topology": {
        "operators": [
          {
            "name": "Input",
            "id": "Input_0",
            "uuid": "5e558f8b-510e-48ef-be79-d2dcf6dac6a6",
            "args": {},
            "execution_start_time": 1763503227.18142,
            "execution_end_time": 1763503227.18142,
            "state": "FINISHED",
            "input_dependencies": [],
            "sub_stages": []
          },
          {
            "name": "ReadRange->Map(<lambda>)",
            "id": "ReadRange->Map(<lambda>)_1",
            "uuid": "4173ff1b-97be-42ea-8195-a4e33f4f2315",
            "input_dependencies": [
              "Input_0"
            ],
            "args": {},
            "execution_start_time": 1763503227.18142,
            "execution_end_time": 1763503229.326,
            "state": "FINISHED",
            "sub_stages": []
          }
        ]
      },
      "dataset_id": "dataset_1_0",
      "job_id": "01000000",
      "start_time": 1763503227.0410855,
      "data_context": {},
      "execution_start_time": 1763503227.0946083,
      "execution_end_time": 1763503229.326,
      "state": "FINISHED",
      "operator_panels": [
        {
          "id": "11",
          "title": "Rows Output / Second"
        },
        {
          "id": "57",
          "title": "All logical resources utilization"
        },
        {
          "id": "56",
          "title": "Operator Combined Internal + External Input Queue Size (Blocks)"
        },
        {
          "id": "78",
          "title": "Task Completion Time Histogram (s)"
        },
        {
          "id": "79",
          "title": "Block Completion Time Histogram (s)"
        },
        {
          "id": "80",
          "title": "Block Size (Bytes) Histogram"
        },
        {
          "id": "81",
          "title": "Block Size (Rows) Histogram"
        }
      ]
    }
  }
]

@coqian coqian force-pushed the coqian/export-context branch from b31cd87 to 22703d5 Compare November 18, 2025 22:10
Comment on lines +710 to +711
export_data_context=False,
export_op_args=False,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed here and down below you set the defaults to False. When do we use it to be True? Like, are we exporting just a top-level op_args and dataset context?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be True when the dataset is registered, which is the first time the dataset is exported.

Comment on lines +207 to +208
export_data_context: bool = True,
export_op_args: bool = True,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts on renaming this to include_data_context, include_op_args. This feels more descriptive to me.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated with the suggested names

… op args when refreshing metadata

Signed-off-by: cong.qian <cong.qian@anyscale.com>
@coqian coqian force-pushed the coqian/export-context branch from 22703d5 to 126107c Compare November 18, 2025 23:36
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Nov 19, 2025
@iamjustinhsu iamjustinhsu added the go add ONLY when ready to merge, run all tests label Nov 20, 2025
@bveeramani bveeramani enabled auto-merge (squash) November 20, 2025 20:38
@bveeramani bveeramani merged commit 9f3b174 into ray-project:master Nov 20, 2025
8 checks passed
400Ping pushed a commit to 400Ping/ray that referenced this pull request Nov 21, 2025
… op args when refreshing metadata (ray-project#58755)

## Description
We export dataset and operator metadata whenever there is a state
change. However, the size of the export file can be very large because
the metadata also includes the DataContext config and operator args,
which does not change over time, and they will be written multiple times
to the file. To reduce the file size and remove redundant info, we can
only export DataContext and operator args when the dataset is
[registered](https://github.com/ray-project/ray/blob/d1cce8c9dc8411fad7cfbd619350bec6f19839a3/python/ray/data/_internal/stats.py#L621),
and avoid them in the later state updates.

## Related issues
Related previous PRs:
[55355](ray-project#55355),
[53554](ray-project#53554)

Signed-off-by: cong.qian <cong.qian@anyscale.com>
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
… op args when refreshing metadata (ray-project#58755)

## Description
We export dataset and operator metadata whenever there is a state
change. However, the size of the export file can be very large because
the metadata also includes the DataContext config and operator args,
which does not change over time, and they will be written multiple times
to the file. To reduce the file size and remove redundant info, we can
only export DataContext and operator args when the dataset is
[registered](https://github.com/ray-project/ray/blob/d1cce8c9dc8411fad7cfbd619350bec6f19839a3/python/ray/data/_internal/stats.py#L621),
and avoid them in the later state updates.

## Related issues
Related previous PRs:
[55355](ray-project#55355),
[53554](ray-project#53554)

Signed-off-by: cong.qian <cong.qian@anyscale.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
… op args when refreshing metadata (ray-project#58755)

## Description
We export dataset and operator metadata whenever there is a state
change. However, the size of the export file can be very large because
the metadata also includes the DataContext config and operator args,
which does not change over time, and they will be written multiple times
to the file. To reduce the file size and remove redundant info, we can
only export DataContext and operator args when the dataset is
[registered](https://github.com/ray-project/ray/blob/d1cce8c9dc8411fad7cfbd619350bec6f19839a3/python/ray/data/_internal/stats.py#L621),
and avoid them in the later state updates.

## Related issues
Related previous PRs:
[55355](ray-project#55355),
[53554](ray-project#53554)

Signed-off-by: cong.qian <cong.qian@anyscale.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
… op args when refreshing metadata (ray-project#58755)

## Description
We export dataset and operator metadata whenever there is a state
change. However, the size of the export file can be very large because
the metadata also includes the DataContext config and operator args,
which does not change over time, and they will be written multiple times
to the file. To reduce the file size and remove redundant info, we can
only export DataContext and operator args when the dataset is
[registered](https://github.com/ray-project/ray/blob/d1cce8c9dc8411fad7cfbd619350bec6f19839a3/python/ray/data/_internal/stats.py#L621),
and avoid them in the later state updates.

## Related issues
Related previous PRs:
[55355](ray-project#55355),
[53554](ray-project#53554)

Signed-off-by: cong.qian <cong.qian@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants