Skip to content

Commit cdd7d89

Browse files
oeyhgithub-actions[bot]
authored andcommitted
fix(otel_logs_source): fix NPE and regressions introduced by HTTP service support (#6572)
Signed-off-by: Hai Yan <oeyh@amazon.com> (cherry picked from commit 1bc5459)
1 parent db19fda commit cdd7d89

4 files changed

Lines changed: 27 additions & 36 deletions

File tree

data-prepper-plugins/otel-logs-source/src/main/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSource.java

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
package org.opensearch.dataprepper.plugins.source.otellogs;
77

8-
import static org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME;
9-
108
import com.linecorp.armeria.common.SessionProtocol;
119
import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction;
1210
import com.linecorp.armeria.server.Server;
@@ -18,7 +16,6 @@
1816
import com.linecorp.armeria.server.throttling.ThrottlingService;
1917

2018
import org.opensearch.dataprepper.GrpcRequestExceptionHandler;
21-
import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider;
2219
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
2320
import org.opensearch.dataprepper.http.LogThrottlingRejectHandler;
2421
import org.opensearch.dataprepper.http.LogThrottlingStrategy;
@@ -56,8 +53,6 @@
5653
import java.util.ArrayList;
5754
import java.util.Collections;
5855
import java.util.List;
59-
import java.util.Map;
60-
import java.util.Optional;
6156
import java.util.concurrent.BlockingQueue;
6257
import java.util.concurrent.ExecutionException;
6358
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -135,7 +130,7 @@ public void start(Buffer<Record<Object>> buffer) {
135130

136131
private Server createServer(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer) {
137132
serverBuilder.disableServerHeader();
138-
if (oTelLogsSourceConfig.isSsl()) {
133+
if (oTelLogsSourceConfig.isSsl() || oTelLogsSourceConfig.useAcmCertForSSL()) {
139134
LOG.info("Creating http source with SSL/TLS enabled.");
140135
final CertificateProvider certificateProvider = certificateProviderFactory.getCertificateProvider();
141136
final Certificate certificate = certificateProvider.getCertificate();
@@ -149,11 +144,8 @@ private Server createServer(ServerBuilder serverBuilder, Buffer<Record<Object>>
149144
serverBuilder.http(oTelLogsSourceConfig.getPort());
150145
}
151146

152-
if (oTelLogsSourceConfig.getAuthentication() != null) {
153-
createHttpAuthentication()
154-
.flatMap(ArmeriaHttpAuthenticationProvider::getAuthenticationDecorator)
155-
.ifPresent(serverBuilder::decorator);
156-
}
147+
final GrpcAuthenticationProvider authProvider = createGrpcAuthenticationProvider(pluginFactory);
148+
authProvider.getHttpAuthenticationService().ifPresent(serverBuilder::decorator);
157149

158150
serverBuilder.maxNumConnections(oTelLogsSourceConfig.getMaxConnectionCount());
159151
serverBuilder.requestTimeout(Duration.ofMillis(oTelLogsSourceConfig.getRequestTimeoutInMillis()));
@@ -165,13 +157,16 @@ private Server createServer(ServerBuilder serverBuilder, Buffer<Record<Object>>
165157
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(threadCount);
166158
serverBuilder.blockingTaskExecutor(executor, true);
167159

168-
if (oTelLogsSourceConfig.hasHealthCheck()) {
160+
if ((oTelLogsSourceConfig.enableUnframedRequests() || oTelLogsSourceConfig.getHttpPath() != null)
161+
&& oTelLogsSourceConfig.hasHealthCheck()) {
169162
LOG.info("HTTP source health check is enabled");
170163
serverBuilder.service(HEALTH_CHECK_PATH, HealthCheckService.builder().longPolling(0).build());
171164
}
172165

173-
configureGrpcService(serverBuilder, buffer);
174-
configureHttpService(serverBuilder, buffer, executor.getQueue());
166+
configureGrpcService(serverBuilder, buffer, authProvider);
167+
if (oTelLogsSourceConfig.getHttpPath() != null) {
168+
configureHttpService(serverBuilder, buffer, executor.getQueue());
169+
}
175170

176171
return serverBuilder.build();
177172
}
@@ -200,7 +195,8 @@ private void configureHttpService(ServerBuilder serverBuilder, Buffer<Record<Obj
200195
}
201196
}
202197

203-
private void configureGrpcService(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer) {
198+
private void configureGrpcService(ServerBuilder serverBuilder, Buffer<Record<Object>> buffer,
199+
GrpcAuthenticationProvider authProvider) {
204200
LOG.info("Configuring gRPC service");
205201

206202
final GrpcServiceBuilder grpcServiceBuilder = GrpcService
@@ -215,7 +211,6 @@ private void configureGrpcService(ServerBuilder serverBuilder, Buffer<Record<Obj
215211
pluginMetrics,
216212
null
217213
);
218-
GrpcAuthenticationProvider authProvider = createGrpcAuthenticationProvider(pluginFactory);
219214

220215
final List<ServerInterceptor> interceptors = new ArrayList<>();
221216
if (authProvider.getAuthenticationInterceptor() != null) {
@@ -296,21 +291,6 @@ private GrpcAuthenticationProvider createGrpcAuthenticationProvider(final Plugin
296291
return pluginFactory.loadPlugin(GrpcAuthenticationProvider.class, authenticationPluginSetting);
297292
}
298293

299-
private Optional<ArmeriaHttpAuthenticationProvider> createHttpAuthentication() {
300-
if (oTelLogsSourceConfig.getAuthentication() == null || oTelLogsSourceConfig.getAuthentication().getPluginName().equals(UNAUTHENTICATED_PLUGIN_NAME)) {
301-
LOG.warn("Creating otel_trace_source http service without authentication. This is not secure.");
302-
LOG.warn("In order to set up Http Basic authentication for the otel-trace-source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-trace-source#authentication-configurations");
303-
return Optional.empty();
304-
} else {
305-
return Optional.of(createGrpcAuthenticationProvider(oTelLogsSourceConfig.getAuthentication()));
306-
}
307-
}
308-
309-
private ArmeriaHttpAuthenticationProvider createGrpcAuthenticationProvider(final PluginModel authenticationConfiguration) {
310-
Map<String, Object> pluginSettings = authenticationConfiguration.getPluginSettings();
311-
return pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, new PluginSetting(authenticationConfiguration.getPluginName(), pluginSettings));
312-
}
313-
314294
private GrpcExceptionHandlerFunction createGrpExceptionHandler(OTelLogsSourceConfig config) {
315295
RetryInfoConfig retryInfo = config.getRetryInfo() != null
316296
? config.getRetryInfo()

data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceGrpcTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createBuilderForConfigWithAcmeSsl;
114114
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createConfigBuilderWithBasicAuth;
115115
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createDefaultConfig;
116+
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createDefaultConfigBuilder;
116117
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigFixture.createBuilderForConfigWithSsl;
117118
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.BASIC_AUTH_PASSWORD;
118119
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.BASIC_AUTH_USERNAME;
@@ -340,6 +341,18 @@ void testStartWithEmptyBuffer() {
340341
assertThrows(IllegalStateException.class, () -> source.start(null));
341342
}
342343

344+
@Test
345+
void start_withoutHttpPath_doesNotThrowNPE() {
346+
final OTelLogsSourceConfig config = createDefaultConfigBuilder()
347+
.httpPath(null)
348+
.path("/test-pipeline/v1/logs")
349+
.build();
350+
final OTelLogsSource source = new OTelLogsSource(config, pluginMetrics, pluginFactory,
351+
certificateProviderFactory, pipelineDescription);
352+
source.start(buffer);
353+
source.stop();
354+
}
355+
343356
@Test
344357
void testStartWithServerExecutionExceptionNoCause() throws ExecutionException, InterruptedException {
345358
// Prepare

data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OTelLogsSourceHttpTest.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
import org.mockito.Mock;
7070
import org.mockito.junit.jupiter.MockitoExtension;
7171
import org.mockito.verification.VerificationMode;
72-
import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider;
7372
import org.opensearch.dataprepper.armeria.authentication.GrpcAuthenticationProvider;
7473
import org.opensearch.dataprepper.armeria.authentication.HttpBasicAuthenticationConfig;
7574
import org.opensearch.dataprepper.metrics.MetricsTestUtil;
@@ -81,7 +80,6 @@
8180
import org.opensearch.dataprepper.model.record.Record;
8281
import org.opensearch.dataprepper.model.types.ByteCount;
8382
import org.opensearch.dataprepper.plugins.GrpcBasicAuthenticationProvider;
84-
import org.opensearch.dataprepper.plugins.HttpBasicArmeriaHttpAuthenticationProvider;
8583
import org.opensearch.dataprepper.plugins.buffer.blockingbuffer.BlockingBuffer;
8684
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
8785
import org.opensearch.dataprepper.plugins.otel.codec.OTelLogsDecoder;
@@ -272,8 +270,7 @@ void httpRequest_payloadIsCompressed_returns200() throws IOException {
272270
@MethodSource("getBasicAuthTestData")
273271
void httpRequest_withBasicAuth_returnsAppropriateResponse(String givenUsername, String givenPassword, HttpStatus expectedStatus, VerificationMode expectedBufferWrites) throws Exception {
274272
final HttpBasicAuthenticationConfig basicAuthConfig = new HttpBasicAuthenticationConfig(BASIC_AUTH_USERNAME, BASIC_AUTH_PASSWORD);
275-
final HttpBasicArmeriaHttpAuthenticationProvider authProvider = new HttpBasicArmeriaHttpAuthenticationProvider(basicAuthConfig);
276-
when(pluginFactory.loadPlugin(eq(ArmeriaHttpAuthenticationProvider.class), any(PluginSetting.class))).thenReturn(authProvider);
273+
when(pluginFactory.loadPlugin(eq(GrpcAuthenticationProvider.class), any(PluginSetting.class))).thenReturn(new GrpcBasicAuthenticationProvider(basicAuthConfig));
277274
configureSource(createConfigBuilderWithBasicAuth().build());
278275
SOURCE.start(buffer);
279276

data-prepper-plugins/otel-logs-source/src/test/java/org/opensearch/dataprepper/plugins/source/otellogs/OtelLogsSource_RetryInfoTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.opensearch.dataprepper.plugins.source.otellogs.OtelLogsSourceConfigTestData.CONFIG_HTTP_PATH;
2222

2323
import java.time.Duration;
24+
import java.util.Optional;
2425

2526
import org.junit.jupiter.api.AfterEach;
2627
import org.junit.jupiter.api.BeforeEach;
@@ -79,7 +80,7 @@ class OtelLogsSource_RetryInfoTest {
7980

8081
@BeforeEach
8182
void beforeEach() throws Exception {
82-
lenient().when(authenticationProvider.getHttpAuthenticationService()).thenCallRealMethod();
83+
lenient().when(authenticationProvider.getHttpAuthenticationService()).thenReturn(Optional.empty());
8384
Mockito.lenient().doThrow(SizeOverflowException.class).when(buffer).writeAll(any(), anyInt());
8485

8586
when(oTelLogsSourceConfig.getPort()).thenReturn(DEFAULT_PORT);

0 commit comments

Comments
 (0)