Skip to content

[Data] Add export api for ray dataset metadata#52227

Merged
bveeramani merged 21 commits intoray-project:masterfrom
omatthew98:mowen/persist-rd-metadata-for-dashboard
Apr 22, 2025
Merged

[Data] Add export api for ray dataset metadata#52227
bveeramani merged 21 commits intoray-project:masterfrom
omatthew98:mowen/persist-rd-metadata-for-dashboard

Conversation

@omatthew98
Copy link
Copy Markdown
Contributor

@omatthew98 omatthew98 commented Apr 10, 2025

Why are these changes needed?

Inspired / based on the equivalent train work here.

We want to be able to persist the metadata for a dataset using the export api. Initially this will include the dag toplogy, but in the future we can add more to this. It will be exported once upon registering the dataset to the stats actor.

Example

Test script:

import ray

ds = ray.data.range(1000)
ds = ds.map(lambda x: {"id": x["id"], "squared": x["id"]*x["id"]})
ds = ds.filter(lambda x: x["id"] % 3 == 0)
ds = ds.random_shuffle(seed=42)
ds = ds.map(lambda x: {**x, "cubed": x["id"]**3})


for _ in ds.iter_batches():
    pass

Verifying events are being exported:

❯ cat test_topo.py
import ray

ds = ray.data.range(1000)
ds = ds.map(lambda x: {"id": x["id"], "squared": x["id"]*x["id"]})
ds = ds.filter(lambda x: x["id"] % 3 == 0)
ds = ds.random_shuffle(seed=42)
ds = ds.map(lambda x: {**x, "cubed": x["id"]**3})


for _ in ds.iter_batches():
    pass
❯ RAY_enable_export_api_write=True python test_topo.py
2025-04-10 14:41:53,395	INFO worker.py:1849 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
/Users/mowen/code/ray/python/ray/data/dataset.py:1402: UserWarning: Use 'expr' instead of 'fn' when possible for performant filters.
  warnings.warn(
2025-04-10 14:41:54,380	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-04-10_14-41-51_402358_17728/logs/ray-data
2025-04-10 14:41:54,380	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle] -> TaskPoolMapOperator[Map(<lambda>)]
Running 0: 0.00 row [00:00, ? row/s]        2025-04-10 14:41:55,104	WARNING progress_bar.py:120 -- Truncating long operator name to 100 characters. To disable this behavior, set `ray.data.DataContext.get_current().DEFAULT_ENABLE_PROGRESS_BAR_NAME_TRUNCATION = False`.0%|                                                                             | 0.00/1.00 [00:00<?, ? row/s]
✔️  Dataset execution finished in 0.76 seconds: 100%|████████████████████████████████████████| 334/334 [00:00<00:00, 441 row/s]

- ReadRange->...->RandomShuffle: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 334 rows output: : 0.00 row- ReadRange->...->RandomShuffle: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 334 rows output: 100%|█| 33  *- Shuffle Map: 100%|██████████████████████████████████████████████████████████████████████| 334/334 [00:00<00:00, 450 row/s]
  *- Shuffle Reduce: 100%|███████████████████████████████████████████████████████████████████| 334/334 [00:00<00:00, 449 row/s]
- Map(<lambda>): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 2.8KB object store: 100%|███| 334/334 [00:00<00:00, 449 row/s]
❯ cat /tmp/ray/session_2025-04-16_19-47-48_523587_36185/logs/export_events/event_EXPORT_DATA_METADATA.log
{"event_id": "dfb2EF5c345aE39d58", "timestamp": 1744858071, "source_type": "EXPORT_DATA_METADATA", "event_data": {"dag": {"operators": [{"name": "Input", "id": "Input_0", "uuid": "1356dab5-1a24-4afc-9bb5-f520b90fb76a", "input_dependencies": [], "sub_operators": []}, {"name": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle", "id": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1", "uuid": "39e11798-20e0-4b06-b7cb-345fc0b74f63", "input_dependencies": ["Input_0"], "sub_operators": [{"name": "Shuffle Map", "id": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1_sub_0"}, {"name": "Shuffle Reduce", "id": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1_sub_1"}]}, {"name": "Map(<lambda>)", "id": "Map(<lambda>)_2", "uuid": "46da676f-48a1-43f7-894e-93b8a08d1eb2", "input_dependencies": ["ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1"], "sub_operators": []}]}, "dataset_id": "dataset_4", "job_id": "01000000", "start_time": "1744858071"}}

Formatted export event JSON

{
  "event_id": "dfb2EF5c345aE39d58",
  "timestamp": 1744858071,
  "source_type": "EXPORT_DATA_METADATA",
  "event_data": {
    "dag": {
      "operators": [
        {
          "name": "Input",
          "id": "Input_0",
          "uuid": "1356dab5-1a24-4afc-9bb5-f520b90fb76a",
          "input_dependencies": [],
          "sub_operators": []
        },
        {
          "name": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle",
          "id": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1",
          "uuid": "39e11798-20e0-4b06-b7cb-345fc0b74f63",
          "input_dependencies": [
            "Input_0"
          ],
          "sub_operators": [
            {
              "name": "Shuffle Map",
              "id": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1_sub_0"
            },
            {
              "name": "Shuffle Reduce",
              "id": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1_sub_1"
            }
          ]
        },
        {
          "name": "Map(<lambda>)",
          "id": "Map(<lambda>)_2",
          "uuid": "46da676f-48a1-43f7-894e-93b8a08d1eb2",
          "input_dependencies": [
            "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1"
          ],
          "sub_operators": []
        }
      ]
    },
    "dataset_id": "dataset_4",
    "job_id": "01000000",
    "start_time": "1744858071"
  }
}

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Loading
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-backlog go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants