Skip to content

Commit fefb704

Browse files
[2.11] Introduce header verifier step in netty pipeline (#10443)
Signed-off-by: Craig Perkins <cwperx@amazon.com> (cherry picked from commit 94257ad) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 693f84b commit fefb704

6 files changed

Lines changed: 248 additions & 5 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1616
- Add unreferenced file cleanup count to merge stats ([#10204](https://github.com/opensearch-project/OpenSearch/pull/10204))
1717
- Configurable merge policy for index with an option to choose from LogByteSize and Tiered merge policy ([#9992](https://github.com/opensearch-project/OpenSearch/pull/9992))
1818
- [Remote Store] Add support to restrict creation & deletion if system repository and mutation of immutable settings of system repository ([#9839](https://github.com/opensearch-project/OpenSearch/pull/9839))
19+
- Improve compressed request handling ([#10261](https://github.com/opensearch-project/OpenSearch/pull/10261))
20+
1921
### Dependencies
2022
- Bump JNA version from 5.5 to 5.13 ([#9963](https://github.com/opensearch-project/OpenSearch/pull/9963))
2123
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.http.netty4;
10+
11+
import org.opensearch.OpenSearchNetty4IntegTestCase;
12+
import org.opensearch.core.common.transport.TransportAddress;
13+
import org.opensearch.http.HttpServerTransport;
14+
import org.opensearch.plugins.Plugin;
15+
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
16+
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
17+
import org.opensearch.transport.Netty4BlockingPlugin;
18+
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.ArrayList;
21+
import java.util.Collection;
22+
import java.util.Collections;
23+
import java.util.List;
24+
25+
import io.netty.buffer.ByteBufUtil;
26+
import io.netty.handler.codec.http.DefaultFullHttpRequest;
27+
import io.netty.handler.codec.http.FullHttpRequest;
28+
import io.netty.handler.codec.http.FullHttpResponse;
29+
import io.netty.handler.codec.http.HttpMethod;
30+
import io.netty.handler.codec.http.HttpVersion;
31+
import io.netty.util.ReferenceCounted;
32+
33+
import static org.hamcrest.CoreMatchers.containsString;
34+
import static org.hamcrest.CoreMatchers.equalTo;
35+
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
36+
37+
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
38+
public class Netty4HeaderVerifierIT extends OpenSearchNetty4IntegTestCase {
39+
40+
@Override
41+
protected boolean addMockHttpTransport() {
42+
return false; // enable http
43+
}
44+
45+
@Override
46+
protected Collection<Class<? extends Plugin>> nodePlugins() {
47+
return Collections.singletonList(Netty4BlockingPlugin.class);
48+
}
49+
50+
public void testThatNettyHttpServerRequestBlockedWithHeaderVerifier() throws Exception {
51+
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
52+
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
53+
TransportAddress transportAddress = randomFrom(boundAddresses);
54+
55+
final FullHttpRequest blockedRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
56+
blockedRequest.headers().add("blockme", "Not Allowed");
57+
blockedRequest.headers().add(HOST, "localhost");
58+
59+
final List<FullHttpResponse> responses = new ArrayList<>();
60+
try (Netty4HttpClient nettyHttpClient = new Netty4HttpClient()) {
61+
try {
62+
FullHttpResponse blockedResponse = nettyHttpClient.send(transportAddress.address(), blockedRequest);
63+
responses.add(blockedResponse);
64+
String blockedResponseContent = new String(ByteBufUtil.getBytes(blockedResponse.content()), StandardCharsets.UTF_8);
65+
assertThat(blockedResponseContent, containsString("Hit header_verifier"));
66+
assertThat(blockedResponse.status().code(), equalTo(401));
67+
} finally {
68+
responses.forEach(ReferenceCounted::release);
69+
}
70+
}
71+
}
72+
73+
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.transport;
10+
11+
import org.opensearch.common.network.NetworkService;
12+
import org.opensearch.common.settings.ClusterSettings;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.common.util.BigArrays;
15+
import org.opensearch.common.util.PageCacheRecycler;
16+
import org.opensearch.common.util.concurrent.ThreadContext;
17+
import org.opensearch.core.indices.breaker.CircuitBreakerService;
18+
import org.opensearch.core.rest.RestStatus;
19+
import org.opensearch.core.xcontent.NamedXContentRegistry;
20+
import org.opensearch.http.HttpServerTransport;
21+
import org.opensearch.http.netty4.Netty4HttpServerTransport;
22+
import org.opensearch.rest.BytesRestResponse;
23+
import org.opensearch.rest.RestChannel;
24+
import org.opensearch.rest.RestRequest;
25+
import org.opensearch.telemetry.tracing.Tracer;
26+
import org.opensearch.threadpool.ThreadPool;
27+
28+
import java.util.Collections;
29+
import java.util.Map;
30+
import java.util.function.Supplier;
31+
32+
import io.netty.channel.ChannelHandlerContext;
33+
import io.netty.channel.ChannelInboundHandlerAdapter;
34+
import io.netty.channel.SimpleChannelInboundHandler;
35+
import io.netty.handler.codec.http.DefaultHttpRequest;
36+
import io.netty.handler.codec.http.HttpMessage;
37+
import io.netty.util.AttributeKey;
38+
import io.netty.util.ReferenceCountUtil;
39+
40+
public class Netty4BlockingPlugin extends Netty4Plugin {
41+
42+
private static final AttributeKey<Boolean> SHOULD_BLOCK = AttributeKey.newInstance("should-block");
43+
44+
public class Netty4BlockingHttpServerTransport extends Netty4HttpServerTransport {
45+
46+
public Netty4BlockingHttpServerTransport(
47+
Settings settings,
48+
NetworkService networkService,
49+
BigArrays bigArrays,
50+
ThreadPool threadPool,
51+
NamedXContentRegistry xContentRegistry,
52+
Dispatcher dispatcher,
53+
ClusterSettings clusterSettings,
54+
SharedGroupFactory sharedGroupFactory,
55+
Tracer tracer
56+
) {
57+
super(
58+
settings,
59+
networkService,
60+
bigArrays,
61+
threadPool,
62+
xContentRegistry,
63+
dispatcher,
64+
clusterSettings,
65+
sharedGroupFactory,
66+
tracer
67+
);
68+
}
69+
70+
@Override
71+
protected ChannelInboundHandlerAdapter createHeaderVerifier() {
72+
return new ExampleBlockingNetty4HeaderVerifier();
73+
}
74+
}
75+
76+
@Override
77+
public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
78+
Settings settings,
79+
ThreadPool threadPool,
80+
BigArrays bigArrays,
81+
PageCacheRecycler pageCacheRecycler,
82+
CircuitBreakerService circuitBreakerService,
83+
NamedXContentRegistry xContentRegistry,
84+
NetworkService networkService,
85+
HttpServerTransport.Dispatcher dispatcher,
86+
ClusterSettings clusterSettings,
87+
Tracer tracer
88+
) {
89+
return Collections.singletonMap(
90+
NETTY_HTTP_TRANSPORT_NAME,
91+
() -> new Netty4BlockingHttpServerTransport(
92+
settings,
93+
networkService,
94+
bigArrays,
95+
threadPool,
96+
xContentRegistry,
97+
new BlockingDispatcher(dispatcher),
98+
clusterSettings,
99+
getSharedGroupFactory(settings),
100+
tracer
101+
)
102+
);
103+
}
104+
105+
/** POC for how an external header verifier would be implemented */
106+
public class ExampleBlockingNetty4HeaderVerifier extends SimpleChannelInboundHandler<DefaultHttpRequest> {
107+
108+
@Override
109+
public void channelRead0(ChannelHandlerContext ctx, DefaultHttpRequest msg) throws Exception {
110+
ReferenceCountUtil.retain(msg);
111+
if (isBlocked(msg)) {
112+
msg.headers().add("blocked", true);
113+
}
114+
ctx.fireChannelRead(msg);
115+
}
116+
117+
private boolean isBlocked(HttpMessage request) {
118+
final boolean shouldBlock = request.headers().contains("blockme");
119+
120+
return shouldBlock;
121+
}
122+
}
123+
124+
class BlockingDispatcher implements HttpServerTransport.Dispatcher {
125+
126+
private HttpServerTransport.Dispatcher originalDispatcher;
127+
128+
public BlockingDispatcher(final HttpServerTransport.Dispatcher originalDispatcher) {
129+
super();
130+
this.originalDispatcher = originalDispatcher;
131+
}
132+
133+
@Override
134+
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
135+
if (request.getHeaders().containsKey("blocked")) {
136+
channel.sendResponse(new BytesRestResponse(RestStatus.UNAUTHORIZED, "Hit header_verifier"));
137+
return;
138+
}
139+
originalDispatcher.dispatchRequest(request, channel, threadContext);
140+
141+
}
142+
143+
@Override
144+
public void dispatchBadRequest(RestChannel channel, ThreadContext threadContext, Throwable cause) {
145+
originalDispatcher.dispatchBadRequest(channel, threadContext, cause);
146+
}
147+
}
148+
}

modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,8 +317,10 @@ public ChannelHandler configureServerChannelHandler() {
317317
return new HttpChannelHandler(this, handlingSettings);
318318
}
319319

320-
static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("opensearch-http-channel");
321-
static final AttributeKey<Netty4HttpServerChannel> HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance("opensearch-http-server-channel");
320+
public static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("opensearch-http-channel");
321+
protected static final AttributeKey<Netty4HttpServerChannel> HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance(
322+
"opensearch-http-server-channel"
323+
);
322324

323325
protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
324326

@@ -351,7 +353,8 @@ protected void initChannel(Channel ch) throws Exception {
351353
);
352354
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
353355
ch.pipeline().addLast("decoder", decoder);
354-
ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
356+
ch.pipeline().addLast("header_verifier", transport.createHeaderVerifier());
357+
ch.pipeline().addLast("decoder_compress", transport.createDecompressor());
355358
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
356359
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
357360
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
@@ -393,4 +396,21 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
393396
}
394397
}
395398
}
399+
400+
/**
401+
* Extension point that allows a NetworkPlugin to extend the netty pipeline and inspect headers after request decoding
402+
*/
403+
protected ChannelInboundHandlerAdapter createHeaderVerifier() {
404+
// pass-through
405+
return new ChannelInboundHandlerAdapter();
406+
}
407+
408+
/**
409+
* Extension point that allows a NetworkPlugin to override the default netty HttpContentDecompressor and supply a custom decompressor.
410+
*
411+
* Used in instances to conditionally decompress depending on the outcome from header verification
412+
*/
413+
protected ChannelInboundHandlerAdapter createDecompressor() {
414+
return new HttpContentDecompressor();
415+
}
396416
}

modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4Plugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
144144
);
145145
}
146146

147-
private SharedGroupFactory getSharedGroupFactory(Settings settings) {
147+
SharedGroupFactory getSharedGroupFactory(Settings settings) {
148148
SharedGroupFactory groupFactory = this.groupFactory.get();
149149
if (groupFactory != null) {
150150
assert groupFactory.getSettings().equals(settings) : "Different settings than originally provided";

server/src/main/java/org/opensearch/rest/RestHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ default List<ReplacedRoute> replacedRoutes() {
108108
}
109109

110110
/**
111-
* Controls whether requests handled by this class are allowed to to access system indices by default.
111+
* Controls whether requests handled by this class are allowed to access system indices by default.
112112
* @return {@code true} if requests handled by this class should be allowed to access system indices.
113113
*/
114114
default boolean allowSystemIndexAccessByDefault() {

0 commit comments

Comments
 (0)