Skip to content

Commit fae0223

Browse files
author
Hendrik Muhs
committed
add throttling to cat transform
1 parent 9ed0f1d commit fae0223

8 files changed

Lines changed: 40 additions & 24 deletions

File tree

docs/reference/cat/transforms.asciidoc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ Returns configuration and usage information about {transforms}.
2020
[[cat-transforms-api-prereqs]]
2121
==== {api-prereq-title}
2222

23-
* If the {es} {security-features} are enabled, you must have `monitor_transform`
24-
cluster privileges to use this API. The built-in `transform_user` role has these
25-
privileges. For more information, see <<security-privileges>> and
23+
* If the {es} {security-features} are enabled, you must have `monitor_transform`
24+
cluster privileges to use this API. The built-in `transform_user` role has these
25+
privileges. For more information, see <<security-privileges>> and
2626
<<built-in-roles>>.
2727

2828
//[[cat-transforms-api-desc]]
@@ -77,6 +77,10 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-index]
7777
`documents_indexed`, `doci`:::
7878
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=docs-indexed]
7979

80+
`docs_per_second`, `dps`:::
81+
(Default)
82+
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-docs-per-second]
83+
8084
`documents_processed`, `docp`:::
8185
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=docs-processed]
8286

@@ -139,7 +143,7 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=state-transform]
139143

140144
`transform_type`, `tt`:::
141145
(Default)
142-
Indicates the type of {transform}: `batch` or `continuous`.
146+
Indicates the type of {transform}: `batch` or `continuous`.
143147

144148
`trigger_count`, `tc`:::
145149
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=trigger-count]

x-pack/plugin/src/test/resources/rest-api-spec/test/transform/transforms_cat_apis.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ teardown:
5555
transform_id: "airline-transform-stats"
5656
- match:
5757
$body: |
58-
/^ #id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ state \n
59-
(airline\-transform\-stats \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data \s+ airline-data-by-airline \s+ \s+ batch \s+ 1m \s+ 500 \s+ STOPPED \n)+ $/
58+
/^ #id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state \n
59+
(airline\-transform\-stats \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data \s+ airline-data-by-airline \s+ \s+ batch \s+ 1m \s+ 500 \s+ - \s+ STOPPED \n)+ $/
6060
6161
---
6262
"Test cat transform stats with column selection":
@@ -95,8 +95,8 @@ teardown:
9595
v: true
9696
- match:
9797
$body: |
98-
/^ id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ state \n
99-
(airline\-transform\-batch \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data,airline-data-other \s+ airline-data-by-airline-batch \s+ \s+ description \s+ batch \s+ 1m \s+ 500 \s+ STOPPED \n)+ $/
98+
/^ id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state \n
99+
(airline\-transform\-batch \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data,airline-data-other \s+ airline-data-by-airline-batch \s+ \s+ description \s+ batch \s+ 1m \s+ 500 \s+ - \s+ STOPPED \n)+ $/
100100
- do:
101101
transform.delete_transform:
102102
transform_id: "airline-transform-batch"
@@ -131,8 +131,8 @@ teardown:
131131
v: true
132132
- match:
133133
$body: |
134-
/^ id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ state \n
135-
(airline\-transform\-continuous \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data,airline-data-other \s+ airline-data-by-airline-continuous \s+ \s+ description \s+ continuous \s+ 10s \s+ 500 \s+ STOPPED \n)+ $/
134+
/^ id \s+ create_time \s+ version \s+ source_index \s+ dest_index \s+ pipeline \s+ description \s+ transform_type \s+ frequency \s+ max_page_search_size \s+ docs_per_second \s+ state \n
135+
(airline\-transform\-continuous \s+ [^\s]+ \s+ [^\s]+ \s+ airline-data,airline-data-other \s+ airline-data-by-airline-continuous \s+ \s+ description \s+ continuous \s+ 10s \s+ 500 \s+ - \s+ STOPPED \n)+ $/
136136
- do:
137137
transform.delete_transform:
138138
transform_id: "airline-transform-continuous"

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.common.settings.Settings;
2727
import org.elasticsearch.common.settings.SettingsFilter;
2828
import org.elasticsearch.common.settings.SettingsModule;
29+
import org.elasticsearch.common.unit.TimeValue;
2930
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3031
import org.elasticsearch.common.xcontent.NamedXContentRegistry.Entry;
3132
import org.elasticsearch.env.Environment;
@@ -128,6 +129,8 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
128129
private final SetOnce<TransformServices> transformServices = new SetOnce<>();
129130

130131
public static final int DEFAULT_FAILURE_RETRIES = 10;
132+
public static final Integer DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE = Integer.valueOf(500);
133+
public static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueMillis(60000);
131134

132135
// How many times the transform task can retry on an non-critical failure
133136
public static final Setting<Integer> NUM_FAILURE_RETRIES_SETTING = Setting.intSetting(

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestCatTransformAction.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@
3636

3737
public class RestCatTransformAction extends AbstractCatAction {
3838

39-
private static final Integer DEFAULT_MAX_PAGE_SEARCH_SIZE = Integer.valueOf(500);
40-
private static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueMillis(60000);
41-
4239
@Override
4340
public List<Route> routes() {
4441
return List.of(new Route(GET, "_cat/transforms"), new Route(GET, "_cat/transforms/{" + TransformField.TRANSFORM_ID + "}"));
@@ -111,6 +108,7 @@ private static Table getTableWithHeader() {
111108
.addCell("transform_type", TableColumnAttributeBuilder.builder("batch or continuous transform").setAliases("tt").build())
112109
.addCell("frequency", TableColumnAttributeBuilder.builder("frequency of transform").setAliases("f").build())
113110
.addCell("max_page_search_size", TableColumnAttributeBuilder.builder("max page search size").setAliases("mpsz").build())
111+
.addCell("docs_per_second", TableColumnAttributeBuilder.builder("docs per second").setAliases("dps").build())
114112
// Transform stats info
115113
.addCell(
116114
"state",
@@ -195,6 +193,12 @@ private Table buildTable(GetTransformAction.Response response, GetTransformStats
195193
transformIndexerStats = stats.getIndexerStats();
196194
}
197195

196+
Integer maxPageSearchSize = config.getSettings() == null || config.getSettings().getMaxPageSearchSize() == null
197+
? config.getPivotConfig() == null || config.getPivotConfig().getMaxPageSearchSize() == null
198+
? Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE
199+
: config.getPivotConfig().getMaxPageSearchSize()
200+
: config.getSettings().getMaxPageSearchSize();
201+
198202
table.startRow()
199203
.addCell(config.getId())
200204
.addCell(config.getCreateTime())
@@ -204,9 +208,13 @@ private Table buildTable(GetTransformAction.Response response, GetTransformStats
204208
.addCell(config.getDestination().getPipeline())
205209
.addCell(config.getDescription())
206210
.addCell(config.getSyncConfig() == null ? "batch" : "continuous")
207-
.addCell(config.getFrequency() == null ? DEFAULT_TRANSFORM_FREQUENCY : config.getFrequency())
208-
.addCell(config.getPivotConfig() == null || config.getPivotConfig().getMaxPageSearchSize() == null ?
209-
DEFAULT_MAX_PAGE_SEARCH_SIZE : config.getPivotConfig().getMaxPageSearchSize())
211+
.addCell(config.getFrequency() == null ? Transform.DEFAULT_TRANSFORM_FREQUENCY : config.getFrequency())
212+
.addCell(maxPageSearchSize)
213+
.addCell(
214+
config.getSettings() == null || config.getSettings().getDocsPerSecond() == null
215+
? "-"
216+
: config.getSettings().getDocsPerSecond()
217+
)
210218
.addCell(stats == null ? null : stats.getState())
211219
.addCell(stats == null ? null : stats.getReason())
212220
.addCell(checkpointingInfo == null ? null : checkpointingInfo.getChangesLastDetectedAt())

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
3838
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
3939
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
40+
import org.elasticsearch.xpack.transform.Transform;
4041
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
4142
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
4243

@@ -49,7 +50,6 @@
4950
public class TransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener, TransformContext.Listener {
5051

5152
// Default interval the scheduler sends an event if the config does not specify a frequency
52-
private static final long SCHEDULER_NEXT_MILLISECONDS = 60000;
5353
private static final Logger logger = LogManager.getLogger(TransformTask.class);
5454
private static final IndexerState[] RUNNING_STATES = new IndexerState[] { IndexerState.STARTED, IndexerState.INDEXING };
5555
public static final String SCHEDULE_NAME = TransformField.TASK_NAME + "/schedule";
@@ -538,7 +538,7 @@ private String schedulerJobName() {
538538
private SchedulerEngine.Schedule next() {
539539
return (startTime, now) -> {
540540
TimeValue frequency = transform.getFrequency();
541-
return now + (frequency == null ? SCHEDULER_NEXT_MILLISECONDS : frequency.getMillis());
541+
return now + (frequency == null ? Transform.DEFAULT_TRANSFORM_FREQUENCY.getMillis() : frequency.getMillis());
542542
};
543543
}
544544

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/pivot/Pivot.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfig;
3737
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
3838
import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
39+
import org.elasticsearch.xpack.transform.Transform;
3940

4041
import java.io.IOException;
4142
import java.util.Collection;
@@ -49,7 +50,6 @@
4950
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
5051

5152
public class Pivot {
52-
public static final int DEFAULT_INITIAL_PAGE_SIZE = 500;
5353
public static final int TEST_QUERY_PAGE_SIZE = 50;
5454

5555
private static final String COMPOSITE_AGGREGATION_NAME = "_transform";
@@ -122,14 +122,14 @@ public void deduceMappings(Client client, SourceConfig sourceConfig, final Actio
122122
* per page the page size is a multiplier for the costs of aggregating bucket.
123123
*
124124
* The user may set a maximum in the {@link PivotConfig#getMaxPageSearchSize()}, but if that is not provided,
125-
* the default {@link Pivot#DEFAULT_INITIAL_PAGE_SIZE} is used.
125+
* the default {@link Transform#DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE} is used.
126126
*
127127
* In future we might inspect the configuration and base the initial size on the aggregations used.
128128
*
129129
* @return the page size
130130
*/
131131
public int getInitialPageSize() {
132-
return config.getMaxPageSearchSize() == null ? DEFAULT_INITIAL_PAGE_SIZE : config.getMaxPageSearchSize();
132+
return config.getMaxPageSearchSize() == null ? Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE : config.getMaxPageSearchSize();
133133
}
134134

135135
public SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map<String, Object> position, int pageSize) {

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@
3636
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
3737
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
3838
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
39+
import org.elasticsearch.xpack.transform.Transform;
3940
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
4041
import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;
4142
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
4243
import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigManager;
43-
import org.elasticsearch.xpack.transform.transforms.pivot.Pivot;
4444
import org.junit.After;
4545
import org.junit.Before;
4646

@@ -240,7 +240,7 @@ public void testPageSizeAdapt() throws Exception {
240240
new SettingsConfig(pageSize, null)
241241
);
242242
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
243-
final long initialPageSize = pageSize == null ? Pivot.DEFAULT_INITIAL_PAGE_SIZE : pageSize;
243+
final long initialPageSize = pageSize == null ? Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE : pageSize;
244244
Function<SearchRequest, SearchResponse> searchFunction = searchRequest -> {
245245
throw new SearchPhaseExecutionException(
246246
"query",

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.xpack.core.transform.transforms.pivot.AggregationConfig;
3434
import org.elasticsearch.xpack.core.transform.transforms.pivot.GroupConfigTests;
3535
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfig;
36+
import org.elasticsearch.xpack.transform.Transform;
3637
import org.elasticsearch.xpack.transform.transforms.pivot.Aggregations.AggregationType;
3738
import org.junit.After;
3839
import org.junit.Before;
@@ -108,7 +109,7 @@ public void testInitialPageSize() throws Exception {
108109
assertThat(pivot.getInitialPageSize(), equalTo(expectedPageSize));
109110

110111
pivot = new Pivot(new PivotConfig(GroupConfigTests.randomGroupConfig(), getValidAggregationConfig(), null));
111-
assertThat(pivot.getInitialPageSize(), equalTo(Pivot.DEFAULT_INITIAL_PAGE_SIZE));
112+
assertThat(pivot.getInitialPageSize(), equalTo(Transform.DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE));
112113

113114
assertWarnings("[max_page_search_size] is deprecated inside pivot please use settings instead");
114115
}

0 commit comments

Comments
 (0)