-
Notifications
You must be signed in to change notification settings - Fork 7.4k
Description
System information
- OS Platform and Distribution (e.g., Linux Ubuntu 16.04): OSX 10.12 and Ubuntu 16.04 (can reproduce on both)
- Ray installed from (source or binary): binary
- Ray version: 0.7.0
- Python version: 3.5, 3.6, 3.7 (can reproduce on all)
- Exact command to reproduce: See minimal snippet source code to reproduce.
Describe the problem
I am attempting to use a Ray Actor (functioning as a data reader/cache singleton) inside of a function that is being optimized using Ray Tune. After the optimization process runs for about 10 minutes, it always fails with a Raylet connection closed exception and quits the optimization process.
e.g.
Exception: [RayletClient] Raylet connection closed.
2019-05-25 22:23:57,216 ERROR worker.py:1678 -- The node with client ID 13f2e72398ffd893090f7fd31bae0c1409755432 has been marked dead because the monitor has missed too many heartbeats from it.
The error seems to be related to the duration of the optimization task, not the cpu load as evident from the minimal example (see below). Even with various changes to the task (run on OSX, run on Ubuntu, change python version, set reuse actors to false/true, change number of cpus in init), the error still appears. A minimal code to reproduce the error is provided below. Is this a bug in the heartbeat generation for Ray Actors?
A functioning work around was to remove the Ray Actor and instead create a function with a filesystem lock file to create the singleton. Are Ray Actors supported as part of a Ray Tune optimization process?
Source code / logs
Minimal source code to reproduce the error (Takes about 10 minutes to hit the error):
import ray
import time
from lockfile import LockFile
import numpy as np
import pickle
from ray.tune import Trainable, run, Experiment, sample_from
from ray.tune.schedulers import HyperBandScheduler, AsyncHyperBandScheduler
@ray.remote(resources={'datasource': 1})
class DataService(object):
def __init__(self):
pass
def get_data_for_item(self,config):
lock = LockFile
lock = LockFile("/tmp/test.lockb")
print(lock.path, 'trying to acquire.')
lock.acquire()
print(lock.path, 'is locked.')
time.sleep(2)
thing = config.get('identifier',"")+"whatever"
lock.release()
print(lock.path, 'is released.')
return thing
class MyTrainableClass(Trainable):
def _setup(self, config):
self.config = config
self.timestep = 0
def _train(self):
self.timestep += 1
print("what is params",self.config)
val = ray.get(self.config['remote_service'].get_data_for_item.remote(self.config))
time.sleep(20)
self.result = val
print("result",self.config['identifier'],val)
return {'objective': len(val), 'done': True}
return {'objective': -1, 'done': True}
def _save(self, checkpoint_dir):
path = os.path.join(checkpoint_dir, "checkpoint",str(self.timestep))
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
with open(path, "wb") as f:
pickle.dump(self.result,f)
return path
def _restore(self, checkpoint_path):
with open(checkpoint_path,'rb') as f:
self.config = pickle.load(f)
self.timestep = 1
ray.shutdown()
ray.init(num_cpus=7,resources={"datasource": 1},object_store_memory=200000000,log_to_driver=True,)
remote_h = DataService.remote()
print("remote_h", remote_h)
hyperband = AsyncHyperBandScheduler(
time_attr="training_iteration",
reward_attr="objective",
max_t=100,)
exp = Experiment(
name="asynchyperband_class_testx",
run=MyTrainableClass,
num_samples=150,
stop={"training_iteration": 150},
config=
{
'identifier' : ray.tune.sample_from(lambda x: np.random.choice(['good','better','best','longest'])),
'useless' : ray.tune.sample_from(lambda x: np.random.choice([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16])),
#'window' : ray.tune.sample_from(lambda x: np.random.choice([40,50,60,70,80,90,100,110])),
#'window2' : ray.tune.sample_from(lambda x: np.random.choice([40,50,60,70,80,90,100,110])),
#'thresh' : ray.tune.sample_from(lambda x: np.random.choice(list(range(20,70,2)))),
#'thresh2' : ray.tune.sample_from(lambda x: np.random.choice(list(range(20,60,2)))),
'remote_service' : remote_h
}
)
trials = run(exp,scheduler=hyperband,verbose=1, reuse_actors=True)
del remote_h
ray.shutdown()
Observed Exception (OSX 10.12, Python 3.6.0, Ray 0.7.0 from pip):
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
<ipython-input-4-38f977638a63> in <module>()
24 }
25 )
---> 26 trials = run(exp,scheduler=hyperband,verbose=1, reuse_actors=True)
27 del remote_h
28 ray.shutdown()
/Users/wgroves/venv/tensorflow/lib/python3.6/site-packages/ray/tune/tune.py in run(run_or_experiment, name, stop, config, resources_per_trial, num_samples, local_dir, upload_dir, trial_name_creator, loggers, sync_function, checkpoint_freq, checkpoint_at_end, export_formats, max_failures, restore, search_alg, scheduler, with_server, server_port, verbose, resume, queue_trials, reuse_actors, trial_executor, raise_on_failed_trial)
245 last_debug = 0
246 while not runner.is_finished():
--> 247 runner.step()
248 if time.time() - last_debug > DEBUG_PRINT_INTERVAL:
249 if verbose:
/Users/wgroves/venv/tensorflow/lib/python3.6/site-packages/ray/tune/trial_runner.py in step(self)
274 self.trial_executor.start_trial(next_trial)
275 elif self.trial_executor.get_running_trials():
--> 276 self._process_events() # blocking
277 else:
278 for trial in self._trials:
/Users/wgroves/venv/tensorflow/lib/python3.6/site-packages/ray/tune/trial_runner.py in _process_events(self)
438
439 def _process_events(self):
--> 440 trial = self.trial_executor.get_next_available_trial() # blocking
441 with warn_if_slow("process_trial"):
442 self._process_trial(trial)
/Users/wgroves/venv/tensorflow/lib/python3.6/site-packages/ray/tune/ray_trial_executor.py in get_next_available_trial(self)
290 # See https://github.com/ray-project/ray/issues/4211 for details.
291 start = time.time()
--> 292 [result_id], _ = ray.wait(shuffled_results)
293 wait_time = time.time() - start
294 if wait_time > NONTRIVIAL_WAIT_TIME_THRESHOLD_S:
/Users/wgroves/venv/tensorflow/lib/python3.6/site-packages/ray/worker.py in wait(object_ids, num_returns, timeout)
2322 timeout_milliseconds,
2323 False,
-> 2324 worker.current_task_id,
2325 )
2326 return ready_ids, remaining_ids
python/ray/_raylet.pyx in ray._raylet.RayletClient.wait()
python/ray/_raylet.pyx in ray._raylet.check_status()
Exception: [RayletClient] Raylet connection closed.
2019-05-25 22:23:57,216 ERROR worker.py:1678 -- The node with client ID 13f2e72398ffd893090f7fd31bae0c1409755432 has been marked dead because the monitor has missed too many heartbeats from it.
raylet_monitor.err file:
WARNING: Logging before InitGoogleLogging() is written to STDERR
W0525 22:23:57.210012 3839128512 monitor.cc:49] Client timed out: 13f2e72398ffd893090f7fd31bae0c1409755432
raylet.err file:
WARNING: Logging before InitGoogleLogging() is written to STDERR
W0525 22:13:38.203748 3839128512 stats.h:50] Failed to create the Prometheus exporter. This doesn't affect anything except stats. Caused by: null context when constructing CivetServer. Possible problem binding to port.
W0525 22:14:23.657073 3839128512 node_manager.cc:1726] Resources oversubscribed: {CPU,0.000000}
W0525 22:14:25.661929 3839128512 node_manager.cc:1726] Resources oversubscribed: {CPU,0.000000}
W0525 22:14:27.668455 3839128512 node_manager.cc:1726] Resources oversubscribed: {CPU,0.000000}
W0525 22:14:29.674670 3839128512 node_manager.cc:1726] Resources oversubscribed: {CPU,0.000000}
W0525 22:14:31.684083 3839128512 node_manager.cc:1726] Resources oversubscribed: {CPU,0.000000}
W0525 22:14:33.697531 3839128512 node_manager.cc:1726] Resources oversubscribed: {CPU,0.000000}
W0525 22:14:47.302045 3839128512 node_manager.cc:1726] Resources oversubscribed: {CPU,0.000000}
W0525 22:14:49.309584 3839128512 node_manager.cc:1726] Resources oversubscribed: {CPU,0.000000}
W0525 22:14:51.318904 3839128512 node_manager.cc:1726] Resources oversubscribed: {CPU,0.000000}
W0525 22:14:53.325412 3839128512 node_manager.cc:1726] Resources oversubscribed: {}
W0525 22:14:56.674257 3839128512 node_manager.cc:1726] Resources oversubscribed: {CPU,0.000000}
W0525 22:14:58.680490 3839128512 node_manager.cc:1726] Resources oversubscribed: {CPU,0.000000}
W0525 22:15:10.775121 3839128512 node_manager.cc:1726] Resources oversubscribed: {CPU,0.000000}
W0525 22:15:12.781167 3839128512 node_manager.cc:1726] Resources oversubscribed: {}
...
W0525 22:22:57.761237 3839128512 node_manager.cc:1726] Resources oversubscribed: {}
W0525 22:23:21.603353 3839128512 node_manager.cc:1726] Resources oversubscribed: {}
F0525 22:23:25.221491 3839128512 scheduling_resources.cc:171] Check failed: resource_capacity_.count(resource_label) == 1 Attempt to acquire unknown resource: CPU
*** Check failure stack trace: ***
*** Aborted at 1558837405 (unix time) try "date -d @1558837405" if you are using GNU date ***
PC: @ 0x0 (unknown)
*** SIGABRT (@0x7fffdbf5ed42) received by PID 85294 (TID 0x7fffe4d473c0) stack trace: ***
@ 0x7fffdc03fb3a _sigtramp
@ 0x10ac0f551 (unknown)
@ 0x7fffdbec4420 abort
@ 0x105e1d899 google::logging_fail()
@ 0x105e1c5d6 google::LogMessage::SendToLog()
@ 0x105e1ccc5 google::LogMessage::Flush()
@ 0x105e1cda2 google::LogMessage::~LogMessage()
@ 0x105e16835 ray::RayLog::~RayLog()
@ 0x105d42a8c ray::raylet::ResourceSet::SubtractResourcesStrict()
@ 0x105d1368b ray::raylet::NodeManager::HandleTaskUnblocked()
@ 0x105d10748 ray::raylet::NodeManager::ProcessClientMessage()
@ 0x105d3331c std::__1::__function::__func<>::operator()()
@ 0x105e081fc ray::ClientConnection<>::ProcessMessage()
@ 0x105e101b8 boost::asio::detail::reactive_socket_recv_op<>::do_complete()
@ 0x105cecff3 boost::asio::detail::scheduler::do_run_one()
@ 0x105ceca42 boost::asio::detail::scheduler::run()
@ 0x105cdf561 main
@ 0x7fffdbe30235 start
@ 0x10 (unknown)