Skip to content

Commit e33d0ea

Browse files
authored
Add dynamic worker options for worker command. (#4970)
* Add fields for fbs * WIP * Fix complition errors * Add java part * FIx * Fix * Fix * Fix lint * Refine API * address comments and add test * Fix * Address comment. * Address comments. * Fix linting * Refine * Fix lint * WIP: address comment. * Fix java * Fix py * Refin * Fix * Fix * Fix linting * Fix lint * Address comments * WIP * Fix * Fix * minor refine * Fix lint * Fix raylet test. * Fix lint * Update src/ray/raylet/worker_pool.h Co-Authored-By: Hao Chen <chenh1024@gmail.com> * Update java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java Co-Authored-By: Hao Chen <chenh1024@gmail.com> * Address comments. * Address comments. * Fix test. * Update src/ray/raylet/worker_pool.h Co-Authored-By: Hao Chen <chenh1024@gmail.com> * Address comments. * Address comments. * Fix * Fix lint * Fix lint * Fix * Address comments. * Fix linting
1 parent 2e342ef commit e33d0ea

File tree

15 files changed

+290
-62
lines changed

15 files changed

+290
-62
lines changed

java/api/src/main/java/org/ray/api/options/ActorCreationOptions.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,14 @@ public class ActorCreationOptions extends BaseTaskOptions {
1313

1414
public final int maxReconstructions;
1515

16-
private ActorCreationOptions(Map<String, Double> resources, int maxReconstructions) {
16+
public final String jvmOptions;
17+
18+
private ActorCreationOptions(Map<String, Double> resources,
19+
int maxReconstructions,
20+
String jvmOptions) {
1721
super(resources);
1822
this.maxReconstructions = maxReconstructions;
23+
this.jvmOptions = jvmOptions;
1924
}
2025

2126
/**
@@ -25,6 +30,7 @@ public static class Builder {
2530

2631
private Map<String, Double> resources = new HashMap<>();
2732
private int maxReconstructions = NO_RECONSTRUCTION;
33+
private String jvmOptions = "";
2834

2935
public Builder setResources(Map<String, Double> resources) {
3036
this.resources = resources;
@@ -36,8 +42,13 @@ public Builder setMaxReconstructions(int maxReconstructions) {
3642
return this;
3743
}
3844

45+
public Builder setJvmOptions(String jvmOptions) {
46+
this.jvmOptions = jvmOptions;
47+
return this;
48+
}
49+
3950
public ActorCreationOptions createActorCreationOptions() {
40-
return new ActorCreationOptions(resources, maxReconstructions);
51+
return new ActorCreationOptions(resources, maxReconstructions, jvmOptions);
4152
}
4253
}
4354

java/runtime/src/main/java/org/ray/runtime/AbstractRayRuntime.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.ray.runtime.task.TaskLanguage;
3636
import org.ray.runtime.task.TaskSpec;
3737
import org.ray.runtime.util.IdUtil;
38+
import org.ray.runtime.util.StringUtil;
3839
import org.slf4j.Logger;
3940
import org.slf4j.LoggerFactory;
4041

@@ -363,8 +364,13 @@ private TaskSpec createTaskSpec(RayFunc func, PyFunctionDescriptor pyFunctionDes
363364
}
364365

365366
int maxActorReconstruction = 0;
367+
List<String> dynamicWorkerOptions = ImmutableList.of();
366368
if (taskOptions instanceof ActorCreationOptions) {
367369
maxActorReconstruction = ((ActorCreationOptions) taskOptions).maxReconstructions;
370+
String jvmOptions = ((ActorCreationOptions) taskOptions).jvmOptions;
371+
if (!StringUtil.isNullOrEmpty(jvmOptions)) {
372+
dynamicWorkerOptions = ImmutableList.of(((ActorCreationOptions) taskOptions).jvmOptions);
373+
}
368374
}
369375

370376
TaskLanguage language;
@@ -393,7 +399,8 @@ private TaskSpec createTaskSpec(RayFunc func, PyFunctionDescriptor pyFunctionDes
393399
numReturns,
394400
resources,
395401
language,
396-
functionDescriptor
402+
functionDescriptor,
403+
dynamicWorkerOptions
397404
);
398405
}
399406

java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,16 @@ private static TaskSpec parseTaskSpecFromFlatbuffer(ByteBuffer bb) {
190190
JavaFunctionDescriptor functionDescriptor = new JavaFunctionDescriptor(
191191
info.functionDescriptor(0), info.functionDescriptor(1), info.functionDescriptor(2)
192192
);
193+
194+
// Deserialize dynamic worker options.
195+
List<String> dynamicWorkerOptions = new ArrayList<>();
196+
for (int i = 0; i < info.dynamicWorkerOptionsLength(); ++i) {
197+
dynamicWorkerOptions.add(info.dynamicWorkerOptions(i));
198+
}
199+
193200
return new TaskSpec(driverId, taskId, parentTaskId, parentCounter, actorCreationId,
194201
maxActorReconstructions, actorId, actorHandleId, actorCounter, newActorHandles,
195-
args, numReturns, resources, TaskLanguage.JAVA, functionDescriptor);
202+
args, numReturns, resources, TaskLanguage.JAVA, functionDescriptor, dynamicWorkerOptions);
196203
}
197204

198205
private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
@@ -275,6 +282,12 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
275282
functionDescriptorOffset = fbb.createVectorOfTables(functionDescriptorOffsets);
276283
}
277284

285+
int [] dynamicWorkerOptionsOffsets = new int[task.dynamicWorkerOptions.size()];
286+
for (int index = 0; index < task.dynamicWorkerOptions.size(); ++index) {
287+
dynamicWorkerOptionsOffsets[index] = fbb.createString(task.dynamicWorkerOptions.get(index));
288+
}
289+
int dynamicWorkerOptionsOffset = fbb.createVectorOfTables(dynamicWorkerOptionsOffsets);
290+
278291
int root = TaskInfo.createTaskInfo(
279292
fbb,
280293
driverIdOffset,
@@ -293,7 +306,8 @@ private static ByteBuffer convertTaskSpecToFlatbuffer(TaskSpec task) {
293306
requiredResourcesOffset,
294307
requiredPlacementResourcesOffset,
295308
language,
296-
functionDescriptorOffset);
309+
functionDescriptorOffset,
310+
dynamicWorkerOptionsOffset);
297311
fbb.finish(root);
298312
ByteBuffer buffer = fbb.dataBuffer();
299313

java/runtime/src/main/java/org/ray/runtime/runner/RunManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,9 @@ private String buildWorkerCommandRaylet() {
319319

320320
cmd.addAll(rayConfig.jvmParameters);
321321

322+
// jvm options
323+
cmd.add("RAY_WORKER_OPTION_0");
324+
322325
// Main class
323326
cmd.add(WORKER_CLASS);
324327
String command = Joiner.on(" ").join(cmd);

java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class TaskSpec {
6363
// Language of this task.
6464
public final TaskLanguage language;
6565

66+
public final List<String> dynamicWorkerOptions;
67+
6668
// Descriptor of the remote function.
6769
// Note, if task language is Java, the type is JavaFunctionDescriptor. If the task language
6870
// is Python, the type is PyFunctionDescriptor.
@@ -93,7 +95,8 @@ public TaskSpec(
9395
int numReturns,
9496
Map<String, Double> resources,
9597
TaskLanguage language,
96-
FunctionDescriptor functionDescriptor) {
98+
FunctionDescriptor functionDescriptor,
99+
List<String> dynamicWorkerOptions) {
97100
this.driverId = driverId;
98101
this.taskId = taskId;
99102
this.parentTaskId = parentTaskId;
@@ -106,6 +109,8 @@ public TaskSpec(
106109
this.newActorHandles = newActorHandles;
107110
this.args = args;
108111
this.numReturns = numReturns;
112+
this.dynamicWorkerOptions = dynamicWorkerOptions;
113+
109114
returnIds = new ObjectId[numReturns];
110115
for (int i = 0; i < numReturns; ++i) {
111116
returnIds[i] = IdUtil.computeReturnId(taskId, i + 1);
@@ -157,6 +162,7 @@ public String toString() {
157162
", resources=" + resources +
158163
", language=" + language +
159164
", functionDescriptor=" + functionDescriptor +
165+
", dynamicWorkerOptions=" + dynamicWorkerOptions +
160166
", executionDependencies=" + executionDependencies +
161167
'}';
162168
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.ray.api.test;
2+
3+
import org.ray.api.Ray;
4+
import org.ray.api.RayActor;
5+
import org.ray.api.RayObject;
6+
import org.ray.api.TestUtils;
7+
import org.ray.api.annotation.RayRemote;
8+
import org.ray.api.options.ActorCreationOptions;
9+
import org.testng.Assert;
10+
import org.testng.annotations.Test;
11+
12+
public class WorkerJvmOptionsTest extends BaseTest {
13+
14+
@RayRemote
15+
public static class Echo {
16+
String getOptions() {
17+
return System.getProperty("test.suffix");
18+
}
19+
}
20+
21+
@Test
22+
public void testJvmOptions() {
23+
TestUtils.skipTestUnderSingleProcess();
24+
ActorCreationOptions options = new ActorCreationOptions.Builder()
25+
.setJvmOptions("-Dtest.suffix=suffix")
26+
.createActorCreationOptions();
27+
RayActor<Echo> actor = Ray.createActor(Echo::new, options);
28+
RayObject<String> obj = Ray.call(Echo::getOptions, actor);
29+
Assert.assertEquals(obj.get(), "suffix");
30+
}
31+
}

python/ray/services.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,6 +1233,7 @@ def build_java_worker_command(
12331233
assert java_worker_options is not None
12341234

12351235
command = "java "
1236+
12361237
if redis_address is not None:
12371238
command += "-Dray.redis.address={} ".format(redis_address)
12381239

@@ -1253,6 +1254,8 @@ def build_java_worker_command(
12531254
# Put `java_worker_options` in the last, so it can overwrite the
12541255
# above options.
12551256
command += java_worker_options + " "
1257+
1258+
command += "RAY_WORKER_OPTION_0 "
12561259
command += "org.ray.runtime.runner.worker.DefaultWorker"
12571260

12581261
return command

src/ray/common/constants.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@ constexpr char kObjectTablePrefix[] = "ObjectTable";
3636
/// Prefix for the task table keys in redis.
3737
constexpr char kTaskTablePrefix[] = "TaskTable";
3838

39+
constexpr char kWorkerDynamicOptionPlaceholderPrefix[] = "RAY_WORKER_OPTION_";
40+
3941
#endif // RAY_CONSTANTS_H_

src/ray/gcs/format/gcs.fbs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ table TaskInfo {
106106
// For a Python function, it should be: [module_name, class_name, function_name]
107107
// For a Java function, it should be: [class_name, method_name, type_descriptor]
108108
function_descriptor: [string];
109+
// The dynamic options used in the worker command when starting the worker process for
110+
// an actor creation task. If the list isn't empty, the options will be used to replace
111+
// the placeholder strings (`RAY_WORKER_OPTION_0`, `RAY_WORKER_OPTION_1`, etc) in the
112+
// worker command.
113+
dynamic_worker_options: [string];
109114
}
110115

111116
table ResourcePair {

src/ray/raylet/node_manager.cc

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
8383
initial_config_(config),
8484
local_available_resources_(config.resource_config),
8585
worker_pool_(config.num_initial_workers, config.num_workers_per_process,
86-
config.maximum_startup_concurrency, config.worker_commands),
86+
config.maximum_startup_concurrency, gcs_client_,
87+
config.worker_commands),
8788
scheduling_policy_(local_queues_),
8889
reconstruction_policy_(
8990
io_service_,
@@ -1723,18 +1724,6 @@ bool NodeManager::AssignTask(const Task &task) {
17231724
std::shared_ptr<Worker> worker = worker_pool_.PopWorker(spec);
17241725
if (worker == nullptr) {
17251726
// There are no workers that can execute this task.
1726-
if (!spec.IsActorTask()) {
1727-
// There are no more non-actor workers available to execute this task.
1728-
// Start a new worker.
1729-
worker_pool_.StartWorkerProcess(spec.GetLanguage());
1730-
// Push an error message to the user if the worker pool tells us that it is
1731-
// getting too big.
1732-
const std::string warning_message = worker_pool_.WarningAboutSize();
1733-
if (warning_message != "") {
1734-
RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver(
1735-
DriverID::Nil(), "worker_pool_large", warning_message, current_time_ms()));
1736-
}
1737-
}
17381727
// We couldn't assign this task, as no worker available.
17391728
return false;
17401729
}
@@ -2205,6 +2194,12 @@ void NodeManager::ForwardTask(
22052194
const auto &spec = task.GetTaskSpecification();
22062195
auto task_id = spec.TaskId();
22072196

2197+
if (worker_pool_.HasPendingWorkerForTask(spec.GetLanguage(), task_id)) {
2198+
// There is a worker being starting for this task,
2199+
// so we shouldn't forward this task to another node.
2200+
return;
2201+
}
2202+
22082203
// Get and serialize the task's unforwarded, uncommitted lineage.
22092204
Lineage uncommitted_lineage;
22102205
if (lineage_cache_.ContainsTask(task_id)) {

0 commit comments

Comments
 (0)