[train][v2] implement state export#50622
Conversation
Signed-off-by: Matthew Deng <matt@anyscale.com>
| ExportTrainRun train_run = 9; | ||
| ExportTrainRunAttempt train_run_attempt = 10; |
There was a problem hiding this comment.
Is there a naming convention for this? Should I include Event or Data or both as a suffix?
There was a problem hiding this comment.
No strong preference but I think adding EventData would be most consistent. Ideally ExportNodeData and ExportActorData would also have EventData
There was a problem hiding this comment.
updated to have EventData suffix
There was a problem hiding this comment.
Should these be in separate files or a single file?
There was a problem hiding this comment.
I think let's do a single file. When I was testing this e2e, having a single file could be useful because we can more easily ingest the train runs and train run attempts in order and possibly start rendering train runs with their attempts progressively as we fetch.
When we have two separate files, I have to read through all the train run files first before starting to read train run attempts because I want to make sure the train runs are ingested before trying to ingest attempts so I can assign the attempt to the owner run immediately.
There was a problem hiding this comment.
@alanwguo I think you're talking about whether or not the ExportTrainRun and ExportTrainRunAttempt data should get dumped to 1 vs 2 files.
Dumping to a single file would require the ExportTrainRun schema to contain the whole list of ExportTrainRunAttempt's associated with it.
Option 1: Only export 1 event type (ExportTrainRun)
ExportTrainRun {
...
attempts: List[ExportTrainRunAttempt]
}
With this schema, every update (smallest update could be the status changing from PENDING -> RUNNING) to an attempt results in exporting the entire ExportTrainRun data that contains ALL attempts so far. The size of this grows with num_attempts * num_workers, which should get to ~1000 attempts x workers for the large end of workloads that run with Ray Train (10+ attempts, 100+ workers).
Option 2: Keep the 2 exported events (ExportTrainRun, ExportTrainRunAttempt), but track attempt ID in the in the ExportTrainRun schema.
ExportTrainRun {
...
attempt_ids: List[str]
}
We'll need to do 2 exports for attempt creation (1 for exporting the run attempt + 1 for updating the parent ID list), but otherwise run attempt state changes only need to export the run attempt.
This should also solve the "assignment" problem since the train run already points to the right attempts.
There was a problem hiding this comment.
I think the way it already works today is fine. It's already in two files but as two separate events.
Train Run events has nothing about attempts
but in the same file, a Train Run attempt event will get emitted with the parent TrainRun id as a field
|
It doesn't seem like Train Run Attempts |
alanwguo
left a comment
There was a problem hiding this comment.
Before merging, can we actually completely disable export in this PR (even if the env var is set to true).
Only once we merge the full implementation (that includes aborting, v1 -> v2 conversion, etc), then we enable export when the env var is enabled.
I want to make so when we set the env var to enable export, no matter what ray version the user is using, the export is either fully working or completely disabled. I don't want there to be some ray versions with some half-implemented state and we have to figure out what ray version the user is on.
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
| log_path = os.path.join( | ||
| ray._private.worker._global_node.get_session_dir_path(), "logs" | ||
| ) |
There was a problem hiding this comment.
use /tmp/ray/session_xxx/logs/train subdirectory
| def _to_proto_timestamp(time_ms: int) -> int: | ||
| """Convert millisecond timestamp to ns.""" | ||
| return time_ms * 1000 | ||
|
|
…xport Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
| actor_id=bytes.from_hex(worker.actor_id), | ||
| node_id=bytes.from_hex(worker.node_id), |
There was a problem hiding this comment.
I convert to bytes at the end when creating the export data rather than keeping it as bytes in the internal train state schema, since the public ray.get_runtime_context().get_job_id() returns it as a string, and also since the string is easier to work with if we want to log it internally.
| logger = get_export_event_logger( | ||
| ExportEvent.SourceType.EXPORT_TRAIN_RUN, | ||
| log_directory, | ||
| ) |
There was a problem hiding this comment.
This first param determines the filename of the .log file that will be written to. Both TRAIN_RUN and TRAIN_RUN_ATTEMPT source types will be written in this file and will need to be parsed properly by the reader.
@nikitavemuri The event logger allows this but logs an error:
ray/python/ray/_private/event/export_event_logger.py
Lines 72 to 77 in de2b421
I think what we want is for the filename to be event_train_state.log (rather than just picking one of the source type names arbitrarily) and multiple schemas being written to the file (and their source type is already attached).
There was a problem hiding this comment.
Also, a follow-up item is to add a PID somewhere in the filename to prevent collisions if there are multiple Train controllers on the same node (ex: usage with Ray Tune). Then, add wildcard logic on the reader side.
There was a problem hiding this comment.
I think for that we would need ExportEventLoggerAdapter to take a difference source enum that refers to the filename, rather than ExportEvent.SourceType. We could then only print this error if the mapping between ExportEventLoggerAdapter.source and event.source_type is not what we expect
There was a problem hiding this comment.
@matthewdeng We discussed offline and decided:
- The
get_export_event_loggermethod should take in a different enum than theSourceType.EXPORT_TRAIN_RUN. For example, it should take inTRAIN_STATEand write files toevent_TRAIN_STATE.log. - We should also have a map of
TRAIN_STATE: [EXPORT_TRAIN_RUN, EXPORT_TRAIN_RUN_ATTEMPT]` to indicate which source types can be written to the file. If you try to write a source type outside of this list, log this error. This is to prevent random data being written in random files.
There was a problem hiding this comment.
Should the new source enum be taken in as input or should the mapping just be handled within?
e.g.
get_export_event_logger(TRAIN_STATE)vs.
get_export_event_logger(EXPORT_TRAIN_RUN)
-> maps to TRAIN_STATE insideThere was a problem hiding this comment.
@nikitavemuri suggested the first option. And the TRAIN_STATE enum can be a python object and does not need to be included in any proto file.
Signed-off-by: Justin Yu <justinvyu@anyscale.com>
nikitavemuri
left a comment
There was a problem hiding this comment.
few minor comments, but overall looks good!
| class EventLogType(Enum): | ||
| TRAIN_STATE = ( | ||
| "train_state", | ||
| {ExportTrainRunEventData, ExportTrainRunAttemptEventData}, | ||
| ) | ||
| SUBMISSION_JOB = ("submission_job", {ExportSubmissionJobEventData}) |
There was a problem hiding this comment.
I'm not sure if anyone is using it, but would probably be safer to keep the same name. The submission job events currently go to event_EXPORT_SUBMISSION_JOB.log, so log_type_name here should be EXPORT_SUBMISSION_JOB. The rest of this interface looks good!
Signed-off-by: Matthew Deng <matt@anyscale.com>
Signed-off-by: Matthew Deng <matt@anyscale.com>
This PR adds Export API support for Ray Train state events. ## Key Changes - Added new proto messages `ExportTrainRunEventData` and `ExportTrainRunAttemptEventData` to capture training state - Created `EventLogType` enum to manage different types of export event logs - Updated `TrainStateActor` to export Train state events when export is enabled - Modified timestamp fields from milliseconds to nanoseconds (for both proto and python schema) - `start_time_ms` → `start_time_ns` - `end_time_ms` → `end_time_ns` ## Implementation Details - Train run and attempt events are now written to the `event_EXPORT_TRAIN_STATE.log` log file when the export API is enabled - Export can be enabled either globally or specifically for Train events using environment variables: - `RAY_enable_export_api_write=1` (all events) - `RAY_enable_export_api_write_config=EXPORT_TRAIN_RUN` (Train run events only) - `RAY_enable_export_api_write_config=EXPORT_TRAIN_RUN_ATTEMPT` (Train run attempt events only) Based off of #47888. Follows the new schema added in #50515. --------- Signed-off-by: Matthew Deng <matt@anyscale.com> Signed-off-by: Alan Guo <aguo@anyscale.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Alan Guo <aguo@anyscale.com> Co-authored-by: Justin Yu <justinvyu@anyscale.com> Signed-off-by: Abrar Sheikh <abrar@anyscale.com>
This PR implements the export API for Ray Train V1 state. This builds on top of #50622, which implements the export API for Ray Train V2. ## Key Changes - Added `export.py` with conversion functions between Train V1 state models and Train (V2) state export protobuf - Updated `TrainRunInfo` and `TrainWorkerInfo` schemas with additional fields for compatibility: - Log file paths for controller and workers - Note that these point to the Ray worker stderr logs, rather than specific train logs. - Resource allocation information - Made worker status a required field - Note that it is always set as ACTIVE for now. Signed-off-by: Matthew Deng <matt@anyscale.com>
This PR adds Export API support for Ray Train state events. ## Key Changes - Added new proto messages `ExportTrainRunEventData` and `ExportTrainRunAttemptEventData` to capture training state - Created `EventLogType` enum to manage different types of export event logs - Updated `TrainStateActor` to export Train state events when export is enabled - Modified timestamp fields from milliseconds to nanoseconds (for both proto and python schema) - `start_time_ms` → `start_time_ns` - `end_time_ms` → `end_time_ns` ## Implementation Details - Train run and attempt events are now written to the `event_EXPORT_TRAIN_STATE.log` log file when the export API is enabled - Export can be enabled either globally or specifically for Train events using environment variables: - `RAY_enable_export_api_write=1` (all events) - `RAY_enable_export_api_write_config=EXPORT_TRAIN_RUN` (Train run events only) - `RAY_enable_export_api_write_config=EXPORT_TRAIN_RUN_ATTEMPT` (Train run attempt events only) Based off of ray-project#47888. Follows the new schema added in ray-project#50515. --------- Signed-off-by: Matthew Deng <matt@anyscale.com> Signed-off-by: Alan Guo <aguo@anyscale.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Alan Guo <aguo@anyscale.com> Co-authored-by: Justin Yu <justinvyu@anyscale.com>
This PR implements the export API for Ray Train V1 state. This builds on top of ray-project#50622, which implements the export API for Ray Train V2. ## Key Changes - Added `export.py` with conversion functions between Train V1 state models and Train (V2) state export protobuf - Updated `TrainRunInfo` and `TrainWorkerInfo` schemas with additional fields for compatibility: - Log file paths for controller and workers - Note that these point to the Ray worker stderr logs, rather than specific train logs. - Resource allocation information - Made worker status a required field - Note that it is always set as ACTIVE for now. Signed-off-by: Matthew Deng <matt@anyscale.com>
This PR implements the export API for Ray Train V1 state. This builds on top of ray-project#50622, which implements the export API for Ray Train V2. ## Key Changes - Added `export.py` with conversion functions between Train V1 state models and Train (V2) state export protobuf - Updated `TrainRunInfo` and `TrainWorkerInfo` schemas with additional fields for compatibility: - Log file paths for controller and workers - Note that these point to the Ray worker stderr logs, rather than specific train logs. - Resource allocation information - Made worker status a required field - Note that it is always set as ACTIVE for now. Signed-off-by: Matthew Deng <matt@anyscale.com> Signed-off-by: Dhakshin Suriakannu <d_suriakannu@apple.com>
## Why are these changes needed? _Inspired / based on the equivalent train work [here](#50622 We want to be able to persist the metadata for a dataset using the export api. Initially this will include the dag toplogy, but in the future we can add more to this. It will be exported once upon registering the dataset to the stats actor. ## Example Test script: ```python import ray ds = ray.data.range(1000) ds = ds.map(lambda x: {"id": x["id"], "squared": x["id"]*x["id"]}) ds = ds.filter(lambda x: x["id"] % 3 == 0) ds = ds.random_shuffle(seed=42) ds = ds.map(lambda x: {**x, "cubed": x["id"]**3}) for _ in ds.iter_batches(): pass ``` Verifying events are being exported: ``` ❯ cat test_topo.py import ray ds = ray.data.range(1000) ds = ds.map(lambda x: {"id": x["id"], "squared": x["id"]*x["id"]}) ds = ds.filter(lambda x: x["id"] % 3 == 0) ds = ds.random_shuffle(seed=42) ds = ds.map(lambda x: {**x, "cubed": x["id"]**3}) for _ in ds.iter_batches(): pass ❯ RAY_enable_export_api_write=True python test_topo.py 2025-04-10 14:41:53,395 INFO worker.py:1849 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 /Users/mowen/code/ray/python/ray/data/dataset.py:1402: UserWarning: Use 'expr' instead of 'fn' when possible for performant filters. warnings.warn( 2025-04-10 14:41:54,380 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-04-10_14-41-51_402358_17728/logs/ray-data 2025-04-10 14:41:54,380 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> AllToAllOperator[ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle] -> TaskPoolMapOperator[Map(<lambda>)] Running 0: 0.00 row [00:00, ? row/s] 2025-04-10 14:41:55,104 WARNING progress_bar.py:120 -- Truncating long operator name to 100 characters. To disable this behavior, set `ray.data.DataContext.get_current().DEFAULT_ENABLE_PROGRESS_BAR_NAME_TRUNCATION = False`.0%| | 0.00/1.00 [00:00<?, ? row/s] ✔️ Dataset execution finished in 0.76 seconds: 100%|████████████████████████████████████████| 334/334 [00:00<00:00, 441 row/s] - ReadRange->...->RandomShuffle: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 334 rows output: : 0.00 row- ReadRange->...->RandomShuffle: Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 0.0B object store; 334 rows output: 100%|█| 33 *- Shuffle Map: 100%|██████████████████████████████████████████████████████████████████████| 334/334 [00:00<00:00, 450 row/s] *- Shuffle Reduce: 100%|███████████████████████████████████████████████████████████████████| 334/334 [00:00<00:00, 449 row/s] - Map(<lambda>): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 2.8KB object store: 100%|███| 334/334 [00:00<00:00, 449 row/s] ❯ cat /tmp/ray/session_2025-04-16_19-47-48_523587_36185/logs/export_events/event_EXPORT_DATA_METADATA.log {"event_id": "dfb2EF5c345aE39d58", "timestamp": 1744858071, "source_type": "EXPORT_DATA_METADATA", "event_data": {"dag": {"operators": [{"name": "Input", "id": "Input_0", "uuid": "1356dab5-1a24-4afc-9bb5-f520b90fb76a", "input_dependencies": [], "sub_operators": []}, {"name": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle", "id": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1", "uuid": "39e11798-20e0-4b06-b7cb-345fc0b74f63", "input_dependencies": ["Input_0"], "sub_operators": [{"name": "Shuffle Map", "id": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1_sub_0"}, {"name": "Shuffle Reduce", "id": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1_sub_1"}]}, {"name": "Map(<lambda>)", "id": "Map(<lambda>)_2", "uuid": "46da676f-48a1-43f7-894e-93b8a08d1eb2", "input_dependencies": ["ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1"], "sub_operators": []}]}, "dataset_id": "dataset_4", "job_id": "01000000", "start_time": "1744858071"}} ``` Formatted export event JSON ``` { "event_id": "dfb2EF5c345aE39d58", "timestamp": 1744858071, "source_type": "EXPORT_DATA_METADATA", "event_data": { "dag": { "operators": [ { "name": "Input", "id": "Input_0", "uuid": "1356dab5-1a24-4afc-9bb5-f520b90fb76a", "input_dependencies": [], "sub_operators": [] }, { "name": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle", "id": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1", "uuid": "39e11798-20e0-4b06-b7cb-345fc0b74f63", "input_dependencies": [ "Input_0" ], "sub_operators": [ { "name": "Shuffle Map", "id": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1_sub_0" }, { "name": "Shuffle Reduce", "id": "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1_sub_1" } ] }, { "name": "Map(<lambda>)", "id": "Map(<lambda>)_2", "uuid": "46da676f-48a1-43f7-894e-93b8a08d1eb2", "input_dependencies": [ "ReadRange->Map(<lambda>)->Filter(<lambda>)->RandomShuffle_1" ], "sub_operators": [] } ] }, "dataset_id": "dataset_4", "job_id": "01000000", "start_time": "1744858071" } } ``` ## Related issue number <!-- For example: "Closes #1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Matthew Owen <mowen@anyscale.com>
This PR adds Export API support for Ray Train state events.
Key Changes
ExportTrainRunEventDataandExportTrainRunAttemptEventDatato capture training stateEventLogTypeenum to manage different types of export event logsTrainStateActorto export Train state events when export is enabledstart_time_ms→start_time_nsend_time_ms→end_time_nsImplementation Details
event_EXPORT_TRAIN_STATE.loglog file when the export API is enabledRAY_enable_export_api_write=1(all events)RAY_enable_export_api_write_config=EXPORT_TRAIN_RUN(Train run events only)RAY_enable_export_api_write_config=EXPORT_TRAIN_RUN_ATTEMPT(Train run attempt events only)Based off of #47888.
Follows the new schema added in #50515.
Related issue number
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.