Skip to content

[Java] Support multiple workers in Java worker process#5505

Merged
raulchen merged 33 commits intoray-project:masterfrom
antgroup:java_worker_multi_thread
Sep 7, 2019
Merged

[Java] Support multiple workers in Java worker process#5505
raulchen merged 33 commits intoray-project:masterfrom
antgroup:java_worker_multi_thread

Conversation

@kfstorm
Copy link
Copy Markdown
Member

@kfstorm kfstorm commented Aug 22, 2019

Why are these changes needed?

JVM itself costs a significant amount of memory overhead. Using multi-processes means duplicating the overhead. Supporting multiple workers in a single Java worker process can mitigate it.

What do these changes do?

Adds the RayMultiWorkerNativeRuntime to wrap multiple RayNativeRuntime instances, and to act as a proxy to each instance.

Adds the Ray.asyncClosure methods to carry the runtime of current thread to the async Runnable or Callable. Users are required to wrap the Runnable or Callable with Ray.asyncClosure.

Adds a new option --java-num-workers-per-process for ray start.

Changes num_workers_per_process to num_workers_per_process_java in ray_config_def.h to make the config unrelated to Python workers.

NOTE: This is a straight forward solution by making everything related to worker runtime multi-instance. We need more detailed controls about which should be multi-instance and which should be shared. e.g. connections from worker to GCS should be shared to reduce the connection overhead. However, this PR will not address this. By default, a Java worker process will only start one worker unless the num workers per process config is specified, so it should be compatible with current behavior.

Related issue number

#2215

Linter

  • I've run scripts/format.sh to lint the changes in this PR.

@kfstorm kfstorm changed the title Support multiple workers in Java worker process [Java] Support multiple workers in Java worker process Aug 22, 2019
} else {
runtime = new RayNativeRuntime(rayConfig);
if (rayConfig.workerMode == WorkerType.DRIVER || rayConfig.numWorkersPerProcess == 1) {
runtime = new RayNativeRuntime(rayConfig);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we still use RayMultiWorkerNativeRuntime in this case?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed rayConfig.numWorkersPerProcess == 1.

LOGGER.info("Starting {} workers.", workerCount);

for (int i = 0; i < workerCount; i++) {
int finalI = i;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
int finalI = i;
final int worker_index = i;

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

private final RayNativeRuntime[] runtimes;
private final ThreadLocal<RayNativeRuntime> currentThreadRuntime = new ThreadLocal<>();

private CountDownLatch shutdownCountDownLatch = new CountDownLatch(1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

document these fields

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

required=False,
default=None,
type=int,
help="The number of workers per Java worker process.")
Copy link
Copy Markdown
Contributor

@raulchen raulchen Aug 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not necessary to add this option to ray start. ray start is already very complex. if one wants to config this option, they can use --internal-config for now. In the future, we should make C++ config easier to set.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The help text of --internal-config says that it's only for debugging and development purposes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

/// Number of workers per process
RAY_CONFIG(int, num_workers_per_process, 1)
/// Number of workers per Java worker process
RAY_CONFIG(int, num_workers_per_process_java, 1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a num_workers_per_process_python and set it to 1?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

/// The number of workers per process.
int num_workers_per_process;
/// The number of workers per process per language.
std::unordered_map<Language, int, std::hash<int>> num_workers_per_process_by_lang;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use the EnumUnorderedMap util class to get rid of the std::hash<int> part.
But I think it's also fine to just remove this filed from NodeManagerConfig and just let WorkerPool read the config from RayConfig::instance().

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to read config in WorkerPool.


constexpr char kWorkerDynamicOptionPlaceholderPrefix[] = "RAY_WORKER_OPTION_";

constexpr char kWorkerNumWorkersPlaceholder[] = "RAY_WORKER_NUM_WORKERS";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this just be RAY_WORKER_OPTION_0?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't. This placeholder is meant to be language independent. The idea is to keep WorkerPool have no idea how the real argument is, but only need to replace the placeholder with a number.

On the other hand, the full java argument is -Dray.raylet.config.num_workers_per_process_java=RAY_WORKER_PLACEHOLDER_NUM_WORKERS. If you replace RAY_WORKER_PLACEHOLDER_NUM_WORKERS with RAY_WORKER_PLACEHOLDER_OPTION_0, then we have to specificly add the -Dray.raylet.config.num_workers_per_process_java= part in C++.

Plus, this argument has nothing to do with dynamic worker options. It's odd to occupy 1 token of dynamic worker options. And we need to make a convention that it would always occupy the first token.

* @param runnable The runnable to wrap.
* @return The wrapped runnable.
*/
public static Runnable asyncClosure(Runnable runnable) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name sounds kind of ambiguous to me. But I don't have a good name either. What about just wrapRunnable and wrapCallable?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to wrapRunnable and wrapCallable.

@kfstorm kfstorm force-pushed the java_worker_multi_thread branch from 38b1e07 to 4834567 Compare August 22, 2019 08:06
Copy link
Copy Markdown
Contributor

@zhijunfu zhijunfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM. Just left a few comments.

/// language.
WorkerPool(int num_worker_processes, int num_workers_per_process,
WorkerPool(int num_worker_processes,
const std::unordered_map<Language, int, std::hash<int>> &num_workers_per_process_by_lang,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can use EnumUnorderedMap in src/ray/util/util.h

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Managed to get rid of it.

<< "Number of workers per process of language " << Language_Name(entry.first)
<< " must be positive.";
state.multiple_for_warning =
std::max(num_worker_processes, maximum_startup_concurrency) *
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be max or min?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be max. See line 49 of the original version.

LOGGER.info("Starting {} workers.", workerCount);

for (int i = 0; i < workerCount; i++) {
int finalI = i;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe rename finalI ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like threadIndex or workerIndex?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16448/
Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16454/
Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16455/
Test PASSed.


private static final Logger LOGGER = LoggerFactory.getLogger(RayMultiWorkerNativeRuntime.class);

private final int workerCount;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workerCount -> numWorkers?

to follow the naming style of numWorkersPerProcess above.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
} catch (Exception e) {
LOGGER.error("Failed to start worker.", e);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should invoke Ray.shutdown() here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, Ray.shutdown() will be called during JVM shutdown because a shutdown hook is set in Ray.init().

include_java=self._ray_params.include_java,
java_worker_options=self._ray_params.java_worker_options,
java_num_workers_per_process=self._ray_params.
java_num_workers_per_process,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_java_workers_per_process ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this parameter.

command += "-Dray.log-dir={} ".format(os.path.join(session_dir, "logs"))

command += ("-Dray.raylet.config.num_workers_per_process_java=" +
"RAY_WORKER_NUM_WORKERS ")
Copy link
Copy Markdown
Contributor

@jovany-wang jovany-wang Aug 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a prefix like JAVA_WORKER_OPTION_ or JAVA_WORKER_PLACEHOLDER_?

Otherwise it's hard to be read as a placeholder for other developers.(I read it for a while to understand)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated both placeholders to RAY_WORKER_PLACEHOLDER_DYNAMIC_OPTION_ and RAY_WORKER_PLACEHOLDER_NUM_WORKERS.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16568/
Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16569/
Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16572/
Test PASSed.

@AmplabJenkins
Copy link
Copy Markdown

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16832/
Test FAILed.

Copy link
Copy Markdown
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. LGTM. Just a few small comments.

// Wait until the object is deleted, because the above free operation is async.
while (true) {
NativeRayObject result = ((AbstractRayRuntime) Ray.internal()).getObjectStore()
NativeRayObject result = TestUtils.getRuntime().getObjectStore()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice util function!

@AmplabJenkins
Copy link
Copy Markdown

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/16857/
Test FAILed.

@raulchen raulchen merged commit 732336f into ray-project:master Sep 7, 2019
@raulchen raulchen deleted the java_worker_multi_thread branch September 7, 2019 14:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants