Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions test/external-modules/delayed-aggs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

apply plugin: 'elasticsearch.esplugin'
apply plugin: 'elasticsearch.yaml-rest-test'

esplugin {
description 'A test module that allows to delay aggregations on shards with a configurable time'
classname 'org.elasticsearch.search.aggregations.DelayedShardAggregationPlugin'
}

restResources {
restApi {
includeCore '_common', 'indices', 'index', 'cluster', 'search'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

public class DelayedShardAggregationIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(DelayedShardAggregationPlugin.class);
}

public void testSimple() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index"));
float expectedMax = Float.MIN_VALUE;
List<IndexRequestBuilder> reqs = new ArrayList<>();
for (int i = 0; i < 5; i++) {
float rand = randomFloat();
expectedMax = Math.max(rand, expectedMax);
reqs.add(client().prepareIndex("index").setSource("number", rand));
}
indexRandom(true, reqs);
SearchResponse response = client().prepareSearch("index")
.addAggregation(
new DelayedShardAggregationBuilder("delay", TimeValue.timeValueMillis(10)).subAggregation(
new MaxAggregationBuilder("max").field("number")
)
)
.get();
Aggregations aggs = response.getAggregations();
assertThat(aggs.get("delay"), instanceOf(InternalFilter.class));
InternalFilter filter = aggs.get("delay");
InternalMax max = filter.getAggregations().get("max");
assertThat((float) max.getValue(), equalTo(expectedMax));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

public class DelayedShardAggregationBuilder extends AbstractAggregationBuilder<DelayedShardAggregationBuilder> {
public static final String NAME = "shard_delay";

private TimeValue delay;

public DelayedShardAggregationBuilder(String name, TimeValue delay) {
super(name);
this.delay = delay;
}

public DelayedShardAggregationBuilder(StreamInput in) throws IOException {
super(in);
this.delay = in.readTimeValue();
}

@Override
protected AggregationBuilder shallowCopy(AggregatorFactories.Builder factoriesBuilder, Map<String, Object> metaData) {
return new DelayedShardAggregationBuilder(name, delay);
}

@Override
public BucketCardinality bucketCardinality() {
return BucketCardinality.ONE;
}

@Override
public String getType() {
return NAME;
}

@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeTimeValue(delay);
}

@Override
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("value", delay.toString());
builder.endObject();
return builder;
}

static final ConstructingObjectParser<DelayedShardAggregationBuilder, String> PARSER = new ConstructingObjectParser<>(
NAME,
false,
(args, name) -> new DelayedShardAggregationBuilder(name, TimeValue.parseTimeValue((String) args[0], "value"))
);

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), new ParseField("value"));
}

@Override
@SuppressWarnings("unchecked")
protected AggregatorFactory doBuild(
QueryShardContext queryShardContext,
AggregatorFactory parent,
AggregatorFactories.Builder subfactoriesBuilder
) throws IOException {

// Disable the request cache
queryShardContext.nowInMillis();

final FilterAggregationBuilder filterAgg = new FilterAggregationBuilder(name, QueryBuilders.matchAllQuery()).subAggregations(
subfactoriesBuilder
);
final AggregatorFactory factory = filterAgg.build(queryShardContext, parent);
return new AggregatorFactory(name, queryShardContext, parent, subfactoriesBuilder, metadata) {
@Override
protected Aggregator createInternal(
SearchContext searchContext,
Aggregator parent,
CardinalityUpperBound cardinality,
Map<String, Object> metadata
) throws IOException {
long start = searchContext.getRelativeTimeInMillis();
long sleepTime = Math.min(delay.getMillis(), 100);
do {
if (searchContext.isCancelled()) {
break;
}
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
throw new IOException(e);
}
} while (searchContext.getRelativeTimeInMillis() - start < delay.getMillis());
return factory.create(searchContext, parent, cardinality);
}
};
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
DelayedShardAggregationBuilder that = (DelayedShardAggregationBuilder) o;
return Objects.equals(delay, that.delay);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), delay);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations;

import java.util.List;

import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilter;

import static java.util.Collections.singletonList;

/**
* Test plugin that allows to delay aggregations on shards with a configurable time
*/
public class DelayedShardAggregationPlugin extends Plugin implements SearchPlugin {
public DelayedShardAggregationPlugin() {}

@Override
public List<AggregationSpec> getAggregations() {
return singletonList(
new AggregationSpec(
DelayedShardAggregationBuilder.NAME,
DelayedShardAggregationBuilder::new,
DelayedShardAggregationBuilder.PARSER
).addResultReader(InternalFilter::new)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations;

import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.TestGeoShapeFieldMapperPlugin;

import java.util.Arrays;
import java.util.Collection;

public class DelayedShardAggregationBuilderTests extends BaseAggregationTestCase<DelayedShardAggregationBuilder> {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Arrays.asList(DelayedShardAggregationPlugin.class, TestGeoShapeFieldMapperPlugin.class);
}

@Override
protected DelayedShardAggregationBuilder createTestAggregatorBuilder() {
return new DelayedShardAggregationBuilder(randomAlphaOfLength(10), TimeValue.timeValueMillis(100));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.aggregations;

import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;

public class DelayedShardAggregationClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
public DelayedShardAggregationClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}

@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return ESClientYamlSuiteTestCase.createParameters();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Integration tests for DelayAggregation components
#

---
"Delayed Aggs":
- do:
indices.create:
index: test

- do:
search:
index: test
body:
aggs:
delay:
shard_delay:
value: "200ms"


- match: { hits.total.value: 0 }
- match: { aggregations.delay.doc_count: 0 }
- gt: { took: 100 }