[train][checkpoint] Add checkpoint_upload_mode to ray.train.report#55637
Merged
justinvyu merged 19 commits intoray-project:masterfrom Sep 11, 2025
Merged
[train][checkpoint] Add checkpoint_upload_mode to ray.train.report#55637justinvyu merged 19 commits intoray-project:masterfrom
justinvyu merged 19 commits intoray-project:masterfrom
Conversation
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Implement async checkpoint uploads in
ray.train.report(..., checkpoint_upload_mode), supporting SYNC (default), ASYNC, and NO_UPLOAD.delete_local_checkpoint_after_uploadto control temporary local directory cleanup.Implementation Summary
This PR implements async checkpointing by
checkpoint_upload_modetoray.train.reportwith three optionsnum_reported_checkpointsandnum_attempted_reported_checkpointscounters on theTrainContextcheckpoint_upload_modethe different Ray Train workers are doing, we want to upload checkpoints in the order they wereray.train.reported. Therefore, each Ray Train worker waits for its turn (num_reported_checkpoints == current_report_attempt_number - 1) before adding its checkpoint to the result queue.ThreadPoolExecutorto guard against adding too many checkpoint upload threads.run_train_fnto wrap thetrain_fnintrain_fn_that_waits_for_threadsbecause otherwise, we could be in the following situation: 1) train function exits with pending report threads and worker status is finished 2) controller sees finished status and shuts down worker group 3) result.fit does not return all the reported checkpoints/metricsThreadRunnerbut "wait for threads" as a wrapper function because in the former case, that is the cleanest way for a nested thread to cause the entire worker to exit early, but in this case, the target function is able to wait for the threads that it creates without complicating theThreadRunnerabstraction.A few other notes:
Checkpoints(instead ofCheckpointObjectRefs) to the result queue because:ObjectRefapproach, the controller would create a Ray task that updates controller state. This "driver creates task that updates driver" pattern is unwieldy to implement.API Changes
This PR's only API changes are adding the following two arguments to
ray.train.report:checkpoint_upload_mode:delete_local_checkpoint_after_upload: Whether to delete the checkpoint after uploading it. Users generally won't need to set this since each checkpoint upload mode has its own default:tempfiletempfile- see previous section for explanationHere's a simple example of this API in action:
Testing
Looks like async reporting is indeed faster with the same loss on the pytorch ray train example: https://docs.ray.io/en/latest/train/getting-started-pytorch.html
Sync mode
3m3s
Async mode
2m57s with only ~0.22s blocking time when waiting for the last checkpoint upload: