[Java] Support multiple workers in Java worker process#5505
[Java] Support multiple workers in Java worker process#5505raulchen merged 33 commits intoray-project:masterfrom
Conversation
…e same worker process." This reverts commit 223c8c24538ed67a0c5831b7f8631fc4b10d3b2d.
| } else { | ||
| runtime = new RayNativeRuntime(rayConfig); | ||
| if (rayConfig.workerMode == WorkerType.DRIVER || rayConfig.numWorkersPerProcess == 1) { | ||
| runtime = new RayNativeRuntime(rayConfig); |
There was a problem hiding this comment.
Can we still use RayMultiWorkerNativeRuntime in this case?
There was a problem hiding this comment.
I've removed rayConfig.numWorkersPerProcess == 1.
java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java
Show resolved
Hide resolved
| LOGGER.info("Starting {} workers.", workerCount); | ||
|
|
||
| for (int i = 0; i < workerCount; i++) { | ||
| int finalI = i; |
There was a problem hiding this comment.
| int finalI = i; | |
| final int worker_index = i; |
| private final RayNativeRuntime[] runtimes; | ||
| private final ThreadLocal<RayNativeRuntime> currentThreadRuntime = new ThreadLocal<>(); | ||
|
|
||
| private CountDownLatch shutdownCountDownLatch = new CountDownLatch(1); |
java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java
Show resolved
Hide resolved
python/ray/scripts/scripts.py
Outdated
| required=False, | ||
| default=None, | ||
| type=int, | ||
| help="The number of workers per Java worker process.") |
There was a problem hiding this comment.
I think it's not necessary to add this option to ray start. ray start is already very complex. if one wants to config this option, they can use --internal-config for now. In the future, we should make C++ config easier to set.
There was a problem hiding this comment.
The help text of --internal-config says that it's only for debugging and development purposes.
src/ray/common/ray_config_def.h
Outdated
| /// Number of workers per process | ||
| RAY_CONFIG(int, num_workers_per_process, 1) | ||
| /// Number of workers per Java worker process | ||
| RAY_CONFIG(int, num_workers_per_process_java, 1) |
There was a problem hiding this comment.
Also add a num_workers_per_process_python and set it to 1?
src/ray/raylet/node_manager.h
Outdated
| /// The number of workers per process. | ||
| int num_workers_per_process; | ||
| /// The number of workers per process per language. | ||
| std::unordered_map<Language, int, std::hash<int>> num_workers_per_process_by_lang; |
There was a problem hiding this comment.
you can use the EnumUnorderedMap util class to get rid of the std::hash<int> part.
But I think it's also fine to just remove this filed from NodeManagerConfig and just let WorkerPool read the config from RayConfig::instance().
There was a problem hiding this comment.
Changed to read config in WorkerPool.
src/ray/common/constants.h
Outdated
|
|
||
| constexpr char kWorkerDynamicOptionPlaceholderPrefix[] = "RAY_WORKER_OPTION_"; | ||
|
|
||
| constexpr char kWorkerNumWorkersPlaceholder[] = "RAY_WORKER_NUM_WORKERS"; |
There was a problem hiding this comment.
Can this just be RAY_WORKER_OPTION_0?
There was a problem hiding this comment.
It can't. This placeholder is meant to be language independent. The idea is to keep WorkerPool have no idea how the real argument is, but only need to replace the placeholder with a number.
On the other hand, the full java argument is -Dray.raylet.config.num_workers_per_process_java=RAY_WORKER_PLACEHOLDER_NUM_WORKERS. If you replace RAY_WORKER_PLACEHOLDER_NUM_WORKERS with RAY_WORKER_PLACEHOLDER_OPTION_0, then we have to specificly add the -Dray.raylet.config.num_workers_per_process_java= part in C++.
Plus, this argument has nothing to do with dynamic worker options. It's odd to occupy 1 token of dynamic worker options. And we need to make a convention that it would always occupy the first token.
| * @param runnable The runnable to wrap. | ||
| * @return The wrapped runnable. | ||
| */ | ||
| public static Runnable asyncClosure(Runnable runnable) { |
There was a problem hiding this comment.
This name sounds kind of ambiguous to me. But I don't have a good name either. What about just wrapRunnable and wrapCallable?
There was a problem hiding this comment.
Changed to wrapRunnable and wrapCallable.
38b1e07 to
4834567
Compare
zhijunfu
left a comment
There was a problem hiding this comment.
Thanks, LGTM. Just left a few comments.
src/ray/raylet/worker_pool.h
Outdated
| /// language. | ||
| WorkerPool(int num_worker_processes, int num_workers_per_process, | ||
| WorkerPool(int num_worker_processes, | ||
| const std::unordered_map<Language, int, std::hash<int>> &num_workers_per_process_by_lang, |
There was a problem hiding this comment.
nit: can use EnumUnorderedMap in src/ray/util/util.h
There was a problem hiding this comment.
Thanks. Managed to get rid of it.
| << "Number of workers per process of language " << Language_Name(entry.first) | ||
| << " must be positive."; | ||
| state.multiple_for_warning = | ||
| std::max(num_worker_processes, maximum_startup_concurrency) * |
There was a problem hiding this comment.
should this be max or min?
There was a problem hiding this comment.
This should be max. See line 49 of the original version.
java/runtime/src/main/java/org/ray/runtime/RayNativeRuntime.java
Outdated
Show resolved
Hide resolved
| LOGGER.info("Starting {} workers.", workerCount); | ||
|
|
||
| for (int i = 0; i < workerCount; i++) { | ||
| int finalI = i; |
There was a problem hiding this comment.
nit: maybe rename finalI ?
There was a problem hiding this comment.
something like threadIndex or workerIndex?
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
|
|
||
| private static final Logger LOGGER = LoggerFactory.getLogger(RayMultiWorkerNativeRuntime.class); | ||
|
|
||
| private final int workerCount; |
There was a problem hiding this comment.
workerCount -> numWorkers?
to follow the naming style of numWorkersPerProcess above.
| } | ||
| } catch (Exception e) { | ||
| LOGGER.error("Failed to start worker.", e); | ||
| } |
There was a problem hiding this comment.
I think we should invoke Ray.shutdown() here.
There was a problem hiding this comment.
FYI, Ray.shutdown() will be called during JVM shutdown because a shutdown hook is set in Ray.init().
python/ray/node.py
Outdated
| include_java=self._ray_params.include_java, | ||
| java_worker_options=self._ray_params.java_worker_options, | ||
| java_num_workers_per_process=self._ray_params. | ||
| java_num_workers_per_process, |
There was a problem hiding this comment.
num_java_workers_per_process ?
python/ray/services.py
Outdated
| command += "-Dray.log-dir={} ".format(os.path.join(session_dir, "logs")) | ||
|
|
||
| command += ("-Dray.raylet.config.num_workers_per_process_java=" + | ||
| "RAY_WORKER_NUM_WORKERS ") |
There was a problem hiding this comment.
Use a prefix like JAVA_WORKER_OPTION_ or JAVA_WORKER_PLACEHOLDER_?
Otherwise it's hard to be read as a placeholder for other developers.(I read it for a while to understand)
There was a problem hiding this comment.
Updated both placeholders to RAY_WORKER_PLACEHOLDER_DYNAMIC_OPTION_ and RAY_WORKER_PLACEHOLDER_NUM_WORKERS.
This reverts commit fab5ae6.
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
|
Test FAILed. |
raulchen
left a comment
There was a problem hiding this comment.
Thanks. LGTM. Just a few small comments.
java/runtime/src/main/java/org/ray/runtime/RayMultiWorkerNativeRuntime.java
Outdated
Show resolved
Hide resolved
| // Wait until the object is deleted, because the above free operation is async. | ||
| while (true) { | ||
| NativeRayObject result = ((AbstractRayRuntime) Ray.internal()).getObjectStore() | ||
| NativeRayObject result = TestUtils.getRuntime().getObjectStore() |
|
Test FAILed. |
Why are these changes needed?
JVM itself costs a significant amount of memory overhead. Using multi-processes means duplicating the overhead. Supporting multiple workers in a single Java worker process can mitigate it.
What do these changes do?
Adds the
RayMultiWorkerNativeRuntimeto wrap multipleRayNativeRuntimeinstances, and to act as a proxy to each instance.Adds the
Ray.asyncClosuremethods to carry the runtime of current thread to the asyncRunnableorCallable. Users are required to wrap theRunnableorCallablewithRay.asyncClosure.Adds a new option
--java-num-workers-per-processforray start.Changes
num_workers_per_processtonum_workers_per_process_javainray_config_def.hto make the config unrelated to Python workers.NOTE: This is a straight forward solution by making everything related to worker runtime multi-instance. We need more detailed controls about which should be multi-instance and which should be shared. e.g. connections from worker to GCS should be shared to reduce the connection overhead. However, this PR will not address this. By default, a Java worker process will only start one worker unless the num workers per process config is specified, so it should be compatible with current behavior.
Related issue number
#2215
Linter
scripts/format.shto lint the changes in this PR.