Fix ill-combination of journal and gRPC#6175
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR fixes an issue with updating trial state values when using gRPC by refining the logic in set_trial_state_values and related helper functions.
- Introduces a new error message constant for finished trials.
- Adds a synchronized check within a lock to ensure that trial updates occur only when valid.
- Refactors the return logic in set_trial_state_values and updates error handling in _trial_exists_and_updatable.
Comments suppressed due to low confidence (1)
optuna/storages/journal/_storage.py:34
- [nitpick] Consider changing 'can not' to 'cannot' for improved readability and consistency in the error message.
UNUPDATABLE_MSG = "Trial#{trial_number} has already finished and can not be updated."
|
This pull request has not seen any recent activity. |
tests/study_tests/test_study.py
Outdated
| num_enqueued = 30 | ||
| # NOTE(nabenabe): Fewer threads in gRPC increases the probability of thread collision on the | ||
| # proxy side. See https://github.com/optuna/optuna/issues/6084 | ||
| storage_kwargs = ( | ||
| {"thread_pool": ThreadPoolExecutor(2)} if storage_mode == "grpc_journal_file" else {} | ||
| ) | ||
| with StorageSupplier(storage_mode, **storage_kwargs) as storage: |
There was a problem hiding this comment.
The master branch with this change yielded 80 failures out of 100 runs on Ubuntu 20.04.
$ (for _ in `seq 0 99`; do python -m pytest tests/study_tests/test_study.py::test_pop_waiting_trial_thread_safe[grpc_journal_file] | grep "1 f
ailed"; done) | wc -l
>>> 80Note
Meanwhile, I got no failures with the changes in this PR.
tests/study_tests/test_study.py
Outdated
|
|
||
| num_enqueued = 10 | ||
| with StorageSupplier(storage_mode) as storage: | ||
| num_enqueued = 30 |
There was a problem hiding this comment.
There was a problem hiding this comment.
This unit test also induces quite frequent failures in the master branch.
|
I tested the new unit tests on the master branch: $ (for _ in `seq 0 99`; do python -m pytest tests/storages_tests/journal_tests/test_combination_with_grpc.py::test_pop_waiting_trial_multiprocess_safe | grep "1 failed"; done) | wc -l
>>> 45
$ (for _ in `seq 0 99`; do python -m pytest tests/storages_tests/journal_tests/test_combination_with_grpc.py::test_pop_waiting_trial_thread_safe | grep "1 failed"; done) | wc -l
>>> 8 |
|
This PR addresses the concern here: |
|
It seems the thread problem in the unit test also went away: |
| existing_trial = self._replay_result._trials.get(trial_id) | ||
| if existing_trial is not None and existing_trial.state != TrialState.WAITING: | ||
| if existing_trial.state.is_finished(): | ||
| raise UpdateFinishedTrialError( | ||
| UNUPDATABLE_MSG.format(trial_number=existing_trial.number) | ||
| ) | ||
| return False |
There was a problem hiding this comment.
I have two questions.
- Can we use an
assertstatement to check thatexisting_trial is not None, so we simplify the condition in the if statement? - Since
existing_trial.state.is_finished()implies thatexisting_trial.state != TrialState.WAITINGis always true, can we reduce the nesting like this?
existing_trial = self._replay_result._trials.get(trial_id)
assert existing_trial is not None, (
"This must be True. Please file a bug report on GitHub if this line raises AssertionError."
)
if existing_trial.state.is_finished():
raise UpdateFinishedTrialError(
UNUPDATABLE_MSG.format(trial_number=existing_trial.number)
)
if existing_trial.state != TrialState.WAITING: # this line is equivalent to `existing_trial.state == TrialState.RUNNING`.
return FalseThere was a problem hiding this comment.
I confirmed that this PR works nicely on a toy problem even with your change!
Let me confirm with a bigger task:)
I am gonna get back to you as soon as the experiment finishes!
|
I also checked docker run -d --name redis -p 127.0.0.1:6379:6379 redisfrom concurrent.futures import ProcessPoolExecutor
import optuna
from optuna.storages import JournalStorage
from optuna.storages.journal import JournalFileBackend
from optuna.storages.journal import JournalRedisBackend
storage = JournalStorage(JournalRedisBackend(url="redis://127.0.0.1:6379", prefix="pr-6175"))
def objective(trial: optuna.Trial) -> float:
x = trial.suggest_float("x", -10, 10)
return x ** 2
if __name__ == "__main__":
study = optuna.create_study(storage=storage)
for i in range(1000):
study.enqueue_trial({"x": -5.0 + float(i) / 100})
with ProcessPoolExecutor(max_workers=20) as pool:
for i in range(100):
pool.submit(study.optimize, objective, n_trials=10)
print(f"{len(study.trials)=}")
print(f"{len({trial.number for trial in study.trials})=}")
print(f"{len({trial._trial_id for trial in study.trials})=}") |
c-bata
left a comment
There was a problem hiding this comment.
Changes look good to me. this PR can be merged after:
- Confirming it works as expected with the larger task by @nabenabe0928
- Receiving approval from the second reviewer
| # return statement of trial_id == _replay_result.owned_trial_id. To eliminate false | ||
| # positives, we verify whether another process is already evaluating the trial with | ||
| # trial_id. If True, it means this query does not update the trial state. | ||
| existing_trial = self._replay_result._trials.get(trial_id) |
There was a problem hiding this comment.
Isn't it necessary to call self._sync_with_backend()? Since the thin line uses self._reply_result.
There was a problem hiding this comment.
Thanks, will do it:)
Co-authored-by: Gen <54583542+gen740@users.noreply.github.com>
ab09c1e to
4733d00
Compare
Motivation
This PR resolves the following issues:
JournalStoragefails frequently in distributed optimization setups in combination withGrpcProxyStorage#6084JournalFileStoragebehavior withGrpcProxyStoragein a multi-threading setup #6172Description of the changes
set_trial_state_values