Skip to content

Commit fa8ccf0

Browse files
cwperkspeternied
authored andcommitted
[Backport 1.3] Allow customization of netty channel handles before and during decompression (#10261)
Signed-off-by: Peter Nied <petern@amazon.com>
1 parent 77adb21 commit fa8ccf0

5 files changed

Lines changed: 225 additions & 3 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
55
## [Unreleased 1.3.x]
66

77
### Added
8+
- Improve compressed request handling ([#10261](https://github.com/opensearch-project/OpenSearch/pull/10261))
9+
810
### Dependencies
911
- Bump asm from 9.5 to 9.6 ([#10302](https://github.com/opensearch-project/OpenSearch/pull/10302))
1012
- Bump netty from 4.1.97.Final to 4.1.99.Final ([#10303](https://github.com/opensearch-project/OpenSearch/pull/10303))
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.http.netty4;
10+
11+
import org.opensearch.OpenSearchNetty4IntegTestCase;
12+
import org.opensearch.core.common.transport.TransportAddress;
13+
import org.opensearch.http.HttpServerTransport;
14+
import org.opensearch.plugins.Plugin;
15+
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
16+
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
17+
import org.opensearch.transport.Netty4BlockingPlugin;
18+
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.ArrayList;
21+
import java.util.Collection;
22+
import java.util.Collections;
23+
import java.util.List;
24+
25+
import io.netty.buffer.ByteBufUtil;
26+
import io.netty.handler.codec.http.DefaultFullHttpRequest;
27+
import io.netty.handler.codec.http.FullHttpRequest;
28+
import io.netty.handler.codec.http.FullHttpResponse;
29+
import io.netty.handler.codec.http.HttpMethod;
30+
import io.netty.handler.codec.http.HttpVersion;
31+
import io.netty.handler.codec.http2.HttpConversionUtil;
32+
import io.netty.util.ReferenceCounted;
33+
34+
import static org.hamcrest.CoreMatchers.containsString;
35+
import static org.hamcrest.CoreMatchers.equalTo;
36+
import static io.netty.handler.codec.http.HttpHeaderNames.HOST;
37+
38+
@ClusterScope(scope = Scope.TEST, supportsDedicatedMasters = false, numDataNodes = 1)
39+
public class Netty4HeaderVerifierIT extends OpenSearchNetty4IntegTestCase {
40+
41+
@Override
42+
protected boolean addMockHttpTransport() {
43+
return false; // enable http
44+
}
45+
46+
@Override
47+
protected Collection<Class<? extends Plugin>> nodePlugins() {
48+
return Collections.singletonList(Netty4BlockingPlugin.class);
49+
}
50+
51+
public void testThatNettyHttpServerRequestBlockedWithHeaderVerifier() throws Exception {
52+
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
53+
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
54+
TransportAddress transportAddress = randomFrom(boundAddresses);
55+
56+
final FullHttpRequest blockedRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
57+
blockedRequest.headers().add("blockme", "Not Allowed");
58+
blockedRequest.headers().add(HOST, "localhost");
59+
blockedRequest.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), "http");
60+
61+
final List<FullHttpResponse> responses = new ArrayList<>();
62+
try (Netty4HttpClient nettyHttpClient = Netty4HttpClient.http2()) {
63+
try {
64+
FullHttpResponse blockedResponse = nettyHttpClient.send(transportAddress.address(), blockedRequest);
65+
responses.add(blockedResponse);
66+
String blockedResponseContent = new String(ByteBufUtil.getBytes(blockedResponse.content()), StandardCharsets.UTF_8);
67+
assertThat(blockedResponseContent, containsString("Hit header_verifier"));
68+
assertThat(blockedResponse.status().code(), equalTo(401));
69+
} finally {
70+
responses.forEach(ReferenceCounted::release);
71+
}
72+
}
73+
}
74+
75+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.transport;
10+
11+
import org.opensearch.common.network.NetworkService;
12+
import org.opensearch.common.settings.ClusterSettings;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.common.util.BigArrays;
15+
import org.opensearch.common.util.PageCacheRecycler;
16+
import org.opensearch.core.indices.breaker.CircuitBreakerService;
17+
import org.opensearch.core.xcontent.NamedXContentRegistry;
18+
import org.opensearch.http.HttpServerTransport;
19+
import org.opensearch.http.netty4.Netty4HttpServerTransport;
20+
import org.opensearch.telemetry.tracing.Tracer;
21+
import org.opensearch.threadpool.ThreadPool;
22+
23+
import java.nio.charset.StandardCharsets;
24+
import java.util.Collections;
25+
import java.util.Map;
26+
import java.util.function.Supplier;
27+
28+
import io.netty.buffer.ByteBuf;
29+
import io.netty.buffer.Unpooled;
30+
import io.netty.channel.ChannelFutureListener;
31+
import io.netty.channel.ChannelHandlerContext;
32+
import io.netty.channel.ChannelInboundHandlerAdapter;
33+
import io.netty.channel.SimpleChannelInboundHandler;
34+
import io.netty.handler.codec.http.DefaultFullHttpResponse;
35+
import io.netty.handler.codec.http.DefaultHttpRequest;
36+
import io.netty.handler.codec.http.FullHttpResponse;
37+
import io.netty.handler.codec.http.HttpRequest;
38+
import io.netty.handler.codec.http.HttpResponseStatus;
39+
import io.netty.util.ReferenceCountUtil;
40+
41+
public class Netty4BlockingPlugin extends Netty4ModulePlugin {
42+
43+
public class Netty4BlockingHttpServerTransport extends Netty4HttpServerTransport {
44+
45+
public Netty4BlockingHttpServerTransport(
46+
Settings settings,
47+
NetworkService networkService,
48+
BigArrays bigArrays,
49+
ThreadPool threadPool,
50+
NamedXContentRegistry xContentRegistry,
51+
Dispatcher dispatcher,
52+
ClusterSettings clusterSettings,
53+
SharedGroupFactory sharedGroupFactory,
54+
Tracer tracer
55+
) {
56+
super(
57+
settings,
58+
networkService,
59+
bigArrays,
60+
threadPool,
61+
xContentRegistry,
62+
dispatcher,
63+
clusterSettings,
64+
sharedGroupFactory,
65+
tracer
66+
);
67+
}
68+
69+
@Override
70+
protected ChannelInboundHandlerAdapter createHeaderVerifier() {
71+
return new ExampleBlockingNetty4HeaderVerifier();
72+
}
73+
}
74+
75+
@Override
76+
public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
77+
Settings settings,
78+
ThreadPool threadPool,
79+
BigArrays bigArrays,
80+
PageCacheRecycler pageCacheRecycler,
81+
CircuitBreakerService circuitBreakerService,
82+
NamedXContentRegistry xContentRegistry,
83+
NetworkService networkService,
84+
HttpServerTransport.Dispatcher dispatcher,
85+
ClusterSettings clusterSettings,
86+
Tracer tracer
87+
) {
88+
return Collections.singletonMap(
89+
NETTY_HTTP_TRANSPORT_NAME,
90+
() -> new Netty4BlockingHttpServerTransport(
91+
settings,
92+
networkService,
93+
bigArrays,
94+
threadPool,
95+
xContentRegistry,
96+
dispatcher,
97+
clusterSettings,
98+
getSharedGroupFactory(settings),
99+
tracer
100+
)
101+
);
102+
}
103+
104+
/** POC for how an external header verifier would be implemented */
105+
public class ExampleBlockingNetty4HeaderVerifier extends SimpleChannelInboundHandler<DefaultHttpRequest> {
106+
107+
@Override
108+
public void channelRead0(ChannelHandlerContext ctx, DefaultHttpRequest msg) throws Exception {
109+
ReferenceCountUtil.retain(msg);
110+
if (isBlocked(msg)) {
111+
ByteBuf buf = Unpooled.copiedBuffer("Hit header_verifier".getBytes(StandardCharsets.UTF_8));
112+
final FullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.UNAUTHORIZED, buf);
113+
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
114+
ReferenceCountUtil.release(msg);
115+
} else {
116+
// Lets the request pass to the next channel handler
117+
ctx.fireChannelRead(msg);
118+
}
119+
}
120+
121+
private boolean isBlocked(HttpRequest request) {
122+
final boolean shouldBlock = request.headers().contains("blockme");
123+
124+
return shouldBlock;
125+
}
126+
}
127+
}

modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ public ChannelHandler configureServerChannelHandler() {
314314
return new HttpChannelHandler(this, handlingSettings);
315315
}
316316

317-
static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
317+
public static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
318318
static final AttributeKey<Netty4HttpServerChannel> HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-http-server-channel");
319319

320320
protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
@@ -348,7 +348,8 @@ protected void initChannel(Channel ch) throws Exception {
348348
);
349349
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
350350
ch.pipeline().addLast("decoder", decoder);
351-
ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
351+
ch.pipeline().addLast("header_verifier", transport.createHeaderVerifier());
352+
ch.pipeline().addLast("decoder_compress", transport.createDecompressor());
352353
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
353354
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
354355
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
@@ -390,4 +391,21 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
390391
}
391392
}
392393
}
394+
395+
/**
396+
* Extension point that allows a NetworkPlugin to extend the netty pipeline and inspect headers after request decoding
397+
*/
398+
protected ChannelInboundHandlerAdapter createHeaderVerifier() {
399+
// pass-through
400+
return new ChannelInboundHandlerAdapter();
401+
}
402+
403+
/**
404+
* Extension point that allows a NetworkPlugin to override the default netty HttpContentDecompressor and supply a custom decompressor.
405+
*
406+
* Used in instances to conditionally decompress depending on the outcome from header verification
407+
*/
408+
protected ChannelInboundHandlerAdapter createDecompressor() {
409+
return new HttpContentDecompressor();
410+
}
393411
}

modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4Plugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public Map<String, Supplier<HttpServerTransport>> getHttpTransports(
139139
);
140140
}
141141

142-
private SharedGroupFactory getSharedGroupFactory(Settings settings) {
142+
SharedGroupFactory getSharedGroupFactory(Settings settings) {
143143
SharedGroupFactory groupFactory = this.groupFactory.get();
144144
if (groupFactory != null) {
145145
assert groupFactory.getSettings().equals(settings) : "Different settings than originally provided";

0 commit comments

Comments
 (0)