Register thread pool settings#18674
Register thread pool settings#18674jasontedor merged 19 commits intoelastic:masterfrom jasontedor:thread-pool-refactor
Conversation
This commit refactors the handling of thread pool settings so that the individual settings can be registered rather than registering the top level group. With this refactoring, individual plugins must now register their own settings for custom thread pools that they need, but a dedicated API is provided for this in the thread pool module. This commit also renames the prefix on the thread pool settings from "threadpool" to "thread_pool". This enables a hard break on the settings so that: - some of the settings can be given more sensible names (e.g., the max number of threads in a scaling thread pool is now named "max" instead of "size") - change the soft limit on the number of threads in the bulk and indexing thread pools to a hard limit - the settings names for custom plugins for thread pools can be prefixed (e.g., "xpack.watcher.thread_pool.size")
|
@jasontedor I think if we are breaking things here hard we should go and make them node level settings and prevent them from being updated. |
I think that having them as node-level settings makes the most sense, especially when considering heterogeneous clusters. The one downside to the settings not being dynamically updatable is that right now there is an "out" if an executor or backing queue is too small, it can be dynamically increased as solution to the problem. I'm happy to remove this though; do you think that should be done in this PR or a follow-up @s1monw? |
* master: Update reindex.asciidoc (#18687) Add more logging to reindex rethrottle [TEST] Increase timeout to wait on folder deletion in IndexWithShadowReplicasIT [TEST] Fix tests that rely on assumption that data dirs are removed after index deletion (#18681) Acknowledge index deletion requests based on standard cluster state acknowledgment (#18602) Register "cloud.node.auto_attributes" setting (#18678) Fix StoreRecoveryTests after 6.0.1 upgrade. Upgrade to Lucene 6.0.1. ingest: added `ignore_failure` option to all processors Throw IllegalStateException when handshake fails due to version or cluster mismatch (#18676) AggregatorBuilder and PipelineAggregatorBuilder do not need generics. #18368 Move PageCacheRecycler into BigArrays (#18666) Index Template: parse and validate mappings in template when template puts [TEST] Set BWC version to 5.0.0-SNAP since this is it's min compat version Make cluster health classes immutable and have them implement Writeable instead of Streamable Fix javadoc that stated a throws clause that didn't exist. Clarify the semantics of the BlobContainer interface Build: Rework eclipse settings copy so it does not get automatically cleaned
This commit removes the ability for thread pool settings to be adjusted dynamically. With this commit, thread pool settings are now node-level settings and can not be modified via the cluster settings API.
This commit removes some unused imports that are no longer needed after refactoring thread pool settings.
This commit modifies settings prefixes so that the name is not duplicated (e.g., "xpack.watcher.thread_pool.watcher" under the previous code).
This commit removes the now unnecessary retired executors field from ThreadPool after retiring executors are no longer a thing due to remove dynamic thread pool settings.
| Setting.Property.Dynamic, Setting.Property.NodeScope); | ||
| final String queueSizeKey = settingsKey(prefix, name, "queue_size"); | ||
| this.queueSizeSetting = | ||
| Setting.intSetting(queueSizeKey, queueSize, Setting.Property.Dynamic, Setting.Property.NodeScope); |
There was a problem hiding this comment.
Setting.Property.Dynamic, can go away
There was a problem hiding this comment.
That was a mistake, I removed them from the scaling executor settings but forget them here. I'm glad that you noticed. Removed in 76026f1.
This commit removes the volatile keyword from the executors field; this is no longer needed as the executors can no longer be updated.
| } | ||
|
|
||
| } | ||
|
|
There was a problem hiding this comment.
You get a newline, and you get a newline, and you get a newline, everyone gets a newline! 😄
Removed in 2f29247.
This commit removes some unnecessary trailing newlines from o/e/t/ScalingExecutorBuilder.java.
| * | ||
| */ | ||
| public class ThreadPool extends AbstractComponent implements Closeable { | ||
| public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { |
There was a problem hiding this comment.
do we really need to extend this class? I wanna get rid of it? We should set things up in a ctor and impl. Closeable IMO
There was a problem hiding this comment.
It's tricky. I want to construct the executors map once, making it effectively final (although it can not be actually final right now since we do not build it in the constructor). Given this desire, we can not build it in the constructor because we need to wait until all plugins have had a chance to register their custom thread pool settings. But the thread pool instance is needed earlier than this happens because it's needed for the MonitorService which is in turn needed for the NodeModule which must be built before we start processing plugins. The timeline is thus:
constructor runs < inject into MonitorService < inject into NodeModule < plugins process modules < start thread pool to create executors
I looked at options around this but I didn't like any of them, they were all super hacky and involved registering suppliers that used the injector.
There was a problem hiding this comment.
can we just get rid of custom threadpools instead?
There was a problem hiding this comment.
I think this patch could work just fine?
diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java
index 8a1df50..94a0ab1 100644
--- a/core/src/main/java/org/elasticsearch/node/Node.java
+++ b/core/src/main/java/org/elasticsearch/node/Node.java
@@ -98,6 +98,7 @@ import org.elasticsearch.search.SearchService;
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskResultsService;
+import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.transport.TransportService;
@@ -210,12 +211,12 @@ public class Node implements Closeable {
throw new IllegalStateException("Failed to created node environment", ex);
}
final NetworkService networkService = new NetworkService(settings);
- final ThreadPool threadPool = new ThreadPool(settings);
+ final List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders();
+ final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
boolean success = false;
try {
- final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
modules.add(new CircuitBreakerModule(settings));
@@ -223,6 +224,7 @@ public class Node implements Closeable {
for (Module pluginModule : pluginsService.nodeModules()) {
modules.add(pluginModule);
}
+ final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool);
modules.add(new PluginsModule(pluginsService));
SettingsModule settingsModule = new SettingsModule(this.settings);
modules.add(settingsModule);
diff --git a/core/src/main/java/org/elasticsearch/plugins/Plugin.java b/core/src/main/java/org/elasticsearch/plugins/Plugin.java
index 1efc151..695a255 100644
--- a/core/src/main/java/org/elasticsearch/plugins/Plugin.java
+++ b/core/src/main/java/org/elasticsearch/plugins/Plugin.java
@@ -23,9 +23,12 @@ import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
+import org.elasticsearch.threadpool.ExecutorBuilder;
+import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
/**
* An extension point allowing to plug in custom functionality.
@@ -80,4 +83,8 @@ public abstract class Plugin {
*/
@Deprecated
public final void onModule(IndexModule indexModule) {}
+
+ public List<ExecutorBuilder<?>> getExecutorBuilders() {
+ return Collections.emptyList();
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/plugins/PluginsService.java b/core/src/main/java/org/elasticsearch/plugins/PluginsService.java
index f373da6..bb22854 100644
--- a/core/src/main/java/org/elasticsearch/plugins/PluginsService.java
+++ b/core/src/main/java/org/elasticsearch/plugins/PluginsService.java
@@ -40,6 +40,7 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexModule;
+import org.elasticsearch.threadpool.ExecutorBuilder;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
@@ -261,6 +262,14 @@ public class PluginsService extends AbstractComponent {
return modules;
}
+ public List<ExecutorBuilder<?>> getExecutorBuilders() {
+ ArrayList<ExecutorBuilder<?>> builders = new ArrayList<>();
+ for (Tuple<PluginInfo, Plugin> plugin : plugins) {
+ builders.addAll(plugin.v2().getExecutorBuilders());
+ }
+ return getExecutorBuilders();
+ }
+
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
List<Class<? extends LifecycleComponent>> services = new ArrayList<>();
for (Tuple<PluginInfo, Plugin> plugin : plugins) {
diff --git a/core/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java b/core/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java
index 31f3f31..61e5141 100644
--- a/core/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java
+++ b/core/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java
@@ -30,7 +30,7 @@ import java.util.List;
*
* @param <U> the underlying type of the executor settings
*/
-abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings> {
+public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings> {
private final String name;
diff --git a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
index 0b564b2..1d641aa 100644
--- a/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
+++ b/core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java
@@ -151,7 +151,7 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> {
return Collections.unmodifiableCollection(builders.values());
}
- public ThreadPool(Settings settings) {
+ public ThreadPool(Settings settings, ExecutorBuilder<?>... customBuilders) {
super(settings);
final Map<String, ExecutorBuilder> builders = new HashMap<>();
@@ -175,7 +175,13 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> {
builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));
builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
- this.builders = builders;
+ for (ExecutorBuilder<?> builder : customBuilders) {
+ if (builders.containsKey(builder.name())) {
+ throw new IllegalArgumentException("builder with name: " + builder.name() + " already exists");
+ }
+ builders.put(builder.name(), builder);
+ }
+ this.builders = Collections.unmodifiableMap(builders);
assert Node.NODE_NAME_SETTING.exists(settings);
threadContext = new ThreadContext(settings);
@@ -190,10 +196,6 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> {
this.estimatedTimeThread.start();
}
- void add(ExecutorBuilder builder) {
- builders.put(builder.name(), builder);
- }
-
@Override
protected void doStart() {
final Map<String, ExecutorHolder> executors = new HashMap<>();This commit removes the dynamic properties from fixed executor settings.
| import java.util.concurrent.ThreadFactory; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| public class ScalingExecutorBuilder extends ExecutorBuilder<ScalingExecutorBuilder.ScalingExecutorSettings> { |
There was a problem hiding this comment.
these look awesome. Can we get some java docs?
| info.getQueueSize() == null ? "unbounded" : info.getQueueSize()); | ||
| } | ||
|
|
||
| private static SizeValue queueSizeValue(int queueSize) { |
There was a problem hiding this comment.
only used once so maybe a local var is easier ie. inline
This commit cleans up some simple issues for executor builders: - marks the implementations as final - restricts the visibilty of some methods - adds Javadocs - changes a couple of method names for clarity
This commit inlines the queue size utility method in the fixed executor builder as it was only used in one place.
This commit increases the hard size limit on bulk and indexing threads to one plus the bounded number of processors. This allows for an extra schedulable thread in case one of the others from the pool is blocked on I/O or a lock.
This commit refactors custom thread pool registration. Rather than processing the thread pool module in an onModule method on plugins, we simply ask the plugins for their custom thread pool registration. This simplifies custom thread pool registration and ultimately thread pool construction.
This commit reverts an inadvertent import order chance in AbstractClient.java.
* master: (22 commits) Fix recovery throttling to properly handle relocating non-primary shards (#18701) Fix merge stats rendering in RestIndicesAction (#18720) [TEST] mute RandomAllocationDeciderTests.testRandomDecisions Reworked docs for index-shrink API (#18705) Improve painless compile-time exceptions Adds UUIDs to snapshots Add test rethrottle test case for delete-by-query Do not start scheduled pings until transport start Adressing review comments Only filter intial recovery (post API) when shrinking an index (#18661) Add tests to check that toQuery() doesn't return null Removing handling of null lucene query where we catch this at parse time Handle empty query bodies at parse time and remove EmptyQueryBuilder Mute failing assertions in IndexWithShadowReplicasIT until fix Remove allow running as root Add upgrade-not-supported warning to alpha release notes remove unrecognized javadoc tag from matrix aggregation module set ValuesSourceConfig fields as private Adding MultiValuesSource support classes and documentation to matrix stats agg module New Matrix Stats Aggregation module ...
| } | ||
|
|
||
| synchronized void addSettingsUpdater(SettingUpdater<?> updater) { | ||
| public synchronized void addSettingsUpdater(SettingUpdater<?> updater) { |
There was a problem hiding this comment.
can we move this back to pkg private?
This commit reverts the method AbstractScopedSettings#addSettingsUpdater to being package-private. This method was previously made public for an earlier refactoring but that refactoring has been undone after making thread pool settings non-dynamic.
| @Deprecated | ||
| public final void onModule(IndexModule indexModule) {} | ||
|
|
||
| public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) { |
There was a problem hiding this comment.
can you put some javadocs on this?
|
I added some minors LGTM otehrwise - no need for another round of review. |
This commit renames two methods in ExecutorBuilder to add "get" prefixes to their method names.
This commit adds Javadocs for the extension point for plugins to register custom thread pools.
|
LGTM thanks @jasontedor |
This commit registers the estimated time interval setting, the interval at which the estimated time thread updates the estimated time.
This commit refactors the handling of thread pool settings so that the
individual settings can be registered rather than registering the top
level group. With this refactoring, individual plugins must now register
their own settings for custom thread pools that they need, but a
dedicated API is provided for this in the thread pool module. This
commit also renames the prefix on the thread pool settings from
"threadpool" to "thread_pool". This enables a hard break on the settings
so that:
number of threads in a scaling thread pool is now named "max" instead
of "size")
indexing thread pools to a hard limit
prefixed (e.g., "xpack.watcher.thread_pool.size")
Relates #18613, closes #9216