55
66package org .opensearch .dataprepper .plugins .source .otellogs ;
77
8- import static org .opensearch .dataprepper .armeria .authentication .ArmeriaHttpAuthenticationProvider .UNAUTHENTICATED_PLUGIN_NAME ;
9-
108import com .linecorp .armeria .common .SessionProtocol ;
119import com .linecorp .armeria .common .grpc .GrpcExceptionHandlerFunction ;
1210import com .linecorp .armeria .server .Server ;
1816import com .linecorp .armeria .server .throttling .ThrottlingService ;
1917
2018import org .opensearch .dataprepper .GrpcRequestExceptionHandler ;
21- import org .opensearch .dataprepper .armeria .authentication .ArmeriaHttpAuthenticationProvider ;
2219import org .opensearch .dataprepper .armeria .authentication .GrpcAuthenticationProvider ;
2320import org .opensearch .dataprepper .http .LogThrottlingRejectHandler ;
2421import org .opensearch .dataprepper .http .LogThrottlingStrategy ;
5653import java .util .ArrayList ;
5754import java .util .Collections ;
5855import java .util .List ;
59- import java .util .Map ;
60- import java .util .Optional ;
6156import java .util .concurrent .BlockingQueue ;
6257import java .util .concurrent .ExecutionException ;
6358import java .util .concurrent .ScheduledThreadPoolExecutor ;
@@ -135,7 +130,7 @@ public void start(Buffer<Record<Object>> buffer) {
135130
136131 private Server createServer (ServerBuilder serverBuilder , Buffer <Record <Object >> buffer ) {
137132 serverBuilder .disableServerHeader ();
138- if (oTelLogsSourceConfig .isSsl ()) {
133+ if (oTelLogsSourceConfig .isSsl () || oTelLogsSourceConfig . useAcmCertForSSL () ) {
139134 LOG .info ("Creating http source with SSL/TLS enabled." );
140135 final CertificateProvider certificateProvider = certificateProviderFactory .getCertificateProvider ();
141136 final Certificate certificate = certificateProvider .getCertificate ();
@@ -149,11 +144,8 @@ private Server createServer(ServerBuilder serverBuilder, Buffer<Record<Object>>
149144 serverBuilder .http (oTelLogsSourceConfig .getPort ());
150145 }
151146
152- if (oTelLogsSourceConfig .getAuthentication () != null ) {
153- createHttpAuthentication ()
154- .flatMap (ArmeriaHttpAuthenticationProvider ::getAuthenticationDecorator )
155- .ifPresent (serverBuilder ::decorator );
156- }
147+ final GrpcAuthenticationProvider authProvider = createGrpcAuthenticationProvider (pluginFactory );
148+ authProvider .getHttpAuthenticationService ().ifPresent (serverBuilder ::decorator );
157149
158150 serverBuilder .maxNumConnections (oTelLogsSourceConfig .getMaxConnectionCount ());
159151 serverBuilder .requestTimeout (Duration .ofMillis (oTelLogsSourceConfig .getRequestTimeoutInMillis ()));
@@ -165,13 +157,16 @@ private Server createServer(ServerBuilder serverBuilder, Buffer<Record<Object>>
165157 ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor (threadCount );
166158 serverBuilder .blockingTaskExecutor (executor , true );
167159
168- if (oTelLogsSourceConfig .hasHealthCheck ()) {
160+ if ((oTelLogsSourceConfig .enableUnframedRequests () || oTelLogsSourceConfig .getHttpPath () != null )
161+ && oTelLogsSourceConfig .hasHealthCheck ()) {
169162 LOG .info ("HTTP source health check is enabled" );
170163 serverBuilder .service (HEALTH_CHECK_PATH , HealthCheckService .builder ().longPolling (0 ).build ());
171164 }
172165
173- configureGrpcService (serverBuilder , buffer );
174- configureHttpService (serverBuilder , buffer , executor .getQueue ());
166+ configureGrpcService (serverBuilder , buffer , authProvider );
167+ if (oTelLogsSourceConfig .getHttpPath () != null ) {
168+ configureHttpService (serverBuilder , buffer , executor .getQueue ());
169+ }
175170
176171 return serverBuilder .build ();
177172 }
@@ -200,7 +195,8 @@ private void configureHttpService(ServerBuilder serverBuilder, Buffer<Record<Obj
200195 }
201196 }
202197
203- private void configureGrpcService (ServerBuilder serverBuilder , Buffer <Record <Object >> buffer ) {
198+ private void configureGrpcService (ServerBuilder serverBuilder , Buffer <Record <Object >> buffer ,
199+ GrpcAuthenticationProvider authProvider ) {
204200 LOG .info ("Configuring gRPC service" );
205201
206202 final GrpcServiceBuilder grpcServiceBuilder = GrpcService
@@ -215,7 +211,6 @@ private void configureGrpcService(ServerBuilder serverBuilder, Buffer<Record<Obj
215211 pluginMetrics ,
216212 null
217213 );
218- GrpcAuthenticationProvider authProvider = createGrpcAuthenticationProvider (pluginFactory );
219214
220215 final List <ServerInterceptor > interceptors = new ArrayList <>();
221216 if (authProvider .getAuthenticationInterceptor () != null ) {
@@ -296,21 +291,6 @@ private GrpcAuthenticationProvider createGrpcAuthenticationProvider(final Plugin
296291 return pluginFactory .loadPlugin (GrpcAuthenticationProvider .class , authenticationPluginSetting );
297292 }
298293
299- private Optional <ArmeriaHttpAuthenticationProvider > createHttpAuthentication () {
300- if (oTelLogsSourceConfig .getAuthentication () == null || oTelLogsSourceConfig .getAuthentication ().getPluginName ().equals (UNAUTHENTICATED_PLUGIN_NAME )) {
301- LOG .warn ("Creating otel_trace_source http service without authentication. This is not secure." );
302- LOG .warn ("In order to set up Http Basic authentication for the otel-trace-source, go here: https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-trace-source#authentication-configurations" );
303- return Optional .empty ();
304- } else {
305- return Optional .of (createGrpcAuthenticationProvider (oTelLogsSourceConfig .getAuthentication ()));
306- }
307- }
308-
309- private ArmeriaHttpAuthenticationProvider createGrpcAuthenticationProvider (final PluginModel authenticationConfiguration ) {
310- Map <String , Object > pluginSettings = authenticationConfiguration .getPluginSettings ();
311- return pluginFactory .loadPlugin (ArmeriaHttpAuthenticationProvider .class , new PluginSetting (authenticationConfiguration .getPluginName (), pluginSettings ));
312- }
313-
314294 private GrpcExceptionHandlerFunction createGrpExceptionHandler (OTelLogsSourceConfig config ) {
315295 RetryInfoConfig retryInfo = config .getRetryInfo () != null
316296 ? config .getRetryInfo ()
0 commit comments